From 0ed1fc3850ba488567cb308df91a56134c780c28 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Tue, 20 Oct 2020 21:14:15 -0600 Subject: [PATCH] cs & thread: fix atomic mode in callbacks within `replace-evt` A nested evt poll did not move back to non-atomic mode when calling event generators in the first argument of `replace-evt`. --- pkgs/racket-test-core/tests/racket/sync.rktl | 13 + racket/src/cs/schemified/thread.scm | 645 ++++++++++--------- racket/src/thread/sync.rkt | 34 +- 3 files changed, 382 insertions(+), 310 deletions(-) diff --git a/pkgs/racket-test-core/tests/racket/sync.rktl b/pkgs/racket-test-core/tests/racket/sync.rktl index 97cdb405ec..1561a289ec 100644 --- a/pkgs/racket-test-core/tests/racket/sync.rktl +++ b/pkgs/racket-test-core/tests/racket/sync.rktl @@ -712,6 +712,19 @@ (thread (lambda () (semaphore-post s) (sleep) (semaphore-post s))) (test s chain-evts s s))) +;; indirectly check that a `guard-evt` callabck in a `replace-evt` +;; is not called in atomic mode; if it is, then the thread won't +;; escape and terminate right +(test #t thread? (sync + (thread + (lambda () + (let/cc esc + (sync (replace-evt + (guard-evt + (lambda () + (esc 'done))) + void))))))) + ;; ---------------------------------------- ;; Structures as waitables diff --git a/racket/src/cs/schemified/thread.scm b/racket/src/cs/schemified/thread.scm index 85224c55af..8ca8ca7160 100644 --- a/racket/src/cs/schemified/thread.scm +++ b/racket/src/cs/schemified/thread.scm @@ -10426,278 +10426,121 @@ (set-schedule-info-did-work?! sched-info_0 #t) (end-atomic) (loop_0 (syncer-next sr_0) 0 #f #f)) - (let ((ctx_0 - (poll-ctx3.1 - just-poll?20_0 - (lambda () (syncing-done! s32_0 sr_0)) - sched-info_0 - #f))) - (call-with-values - (lambda () (evt-poll (syncer-evt sr_0) ctx_0)) - (case-lambda - ((results_0 new-evt_0) - (if results_0 - (begin - (syncing-done! s32_0 sr_0) - (make-result sr_0 results_0 success-k19_0)) - (if (delayed-poll? new-evt_0) + (if (let ((app_0 nested-sync-evt?)) + (|#%app| app_0 (syncer-evt sr_0))) + (begin + (end-atomic) + (call-with-values + (lambda () + (let ((app_0 poll-nested-sync)) + (|#%app| + app_0 + (syncer-evt sr_0) + just-poll?20_0 + fast-only?21_0 + sched-info_0))) + (case-lambda + ((same?_0 new-evt_0) + (if same?_0 + (loop_0 + (syncer-next sr_0) + 0 + polled-all-so-far?_0 + no-wrappers?_0) (begin + (start-atomic) + (if (syncing-selected s32_0) + (void) + (set-syncer-evt! sr_0 new-evt_0)) (end-atomic) (if fast-only?21_0 (|#%app| fail-k18_0 sched-info_0 #f #f) - (let ((new-evt_1 - (|#%app| - (delayed-poll-resume - new-evt_0)))) - (begin - (start-atomic) - (if (syncing-selected s32_0) - (void) - (set-syncer-evt! sr_0 new-evt_1)) - (end-atomic) - (loop_0 - sr_0 - (add1 retries_0) - polled-all-so-far?_0 - #f))))) - (if (choice-evt? new-evt_0) + (loop_0 + sr_0 + (add1 retries_0) + polled-all-so-far?_0 + no-wrappers?_0))))) + (args + (raise-binding-result-arity-error 2 args))))) + (let ((ctx_0 + (poll-ctx3.1 + just-poll?20_0 + (lambda () (syncing-done! s32_0 sr_0)) + sched-info_0 + #f))) + (call-with-values + (lambda () (evt-poll (syncer-evt sr_0) ctx_0)) + (case-lambda + ((results_0 new-evt_0) + (if results_0 + (begin + (syncing-done! s32_0 sr_0) + (make-result + sr_0 + results_0 + success-k19_0)) + (if (delayed-poll? new-evt_0) (begin - (if (let ((or-part_0 - (syncer-interrupt sr_0))) - (if or-part_0 - or-part_0 - (syncer-retry sr_0))) - (begin - (end-atomic) - (internal-error - "choice event discovered after interrupt/retry callback")) - (void)) - (let ((new-syncers_0 - (let ((app_0 random-rotate)) + (end-atomic) + (if fast-only?21_0 + (|#%app| + fail-k18_0 + sched-info_0 + #f + #f) + (let ((new-evt_1 (|#%app| - app_0 - (let ((app_1 evts->syncers)) - (let ((app_2 - (choice-evt-evts - new-evt_0))) - (let ((app_3 - (syncer-wraps - sr_0))) - (let ((app_4 - (syncer-commits - sr_0))) - (|#%app| - app_1 - #f - app_2 - app_3 - app_4 - (syncer-abandons - sr_0)))))))))) - (if (not new-syncers_0) + (delayed-poll-resume + new-evt_0)))) (begin - (syncer-remove! sr_0 s32_0) + (start-atomic) + (if (syncing-selected s32_0) + (void) + (set-syncer-evt! + sr_0 + new-evt_1)) (end-atomic) (loop_0 - (syncer-next sr_0) - 0 - polled-all-so-far?_0 - no-wrappers?_0)) - (begin - (syncer-replace! sr_0 - new-syncers_0 - s32_0) - (end-atomic) - (loop_0 - new-syncers_0 (add1 retries_0) polled-all-so-far?_0 - no-wrappers?_0))))) - (if (wrap-evt? new-evt_0) + #f))))) + (if (choice-evt? new-evt_0) (begin - (set-syncer-wraps! - sr_0 - (let ((app_0 - (wrap-evt-wrap new-evt_0))) - (cons - app_0 - (let ((l_0 (syncer-wraps sr_0))) - (if (if (null? l_0) - (not - (handle-evt?$1 - new-evt_0)) - #f) - (list values) - l_0))))) - (let ((inner-new-evt_0 - (wrap-evt-evt new-evt_0))) - (begin - (set-syncer-evt! - sr_0 - inner-new-evt_0) - (if (eq? - inner-new-evt_0 - the-always-evt) - (begin - (syncing-done! s32_0 sr_0) - (make-result - sr_0 - (list the-always-evt) - success-k19_0)) - (begin - (end-atomic) - (loop_0 - sr_0 - (add1 retries_0) - polled-all-so-far?_0 - #f)))))) - (if (control-state-evt? new-evt_0) - (let ((wrap-proc_0 - (control-state-evt-wrap-proc - new-evt_0))) - (let ((interrupt-proc_0 - (control-state-evt-interrupt-proc - new-evt_0))) - (let ((abandon-proc_0 - (control-state-evt-abandon-proc - new-evt_0))) - (let ((retry-proc_0 - (control-state-evt-retry-proc - new-evt_0))) - (begin - (if (eq? - wrap-proc_0 - values) - (void) - (set-syncer-wraps! - sr_0 - (cons - wrap-proc_0 - (syncer-wraps sr_0)))) - (if (eq? - interrupt-proc_0 - void) - (void) - (if (eq? - interrupt-proc_0 - 'reset) - (set-syncer-interrupt! - sr_0 - #f) - (begin - (if (syncer-interrupt - sr_0) - (internal-error - "syncer already has an interrupt callback") - (void)) - (set-syncer-interrupt! - sr_0 - interrupt-proc_0)))) - (if (eq? - abandon-proc_0 - void) - (void) - (set-syncer-abandons! - sr_0 - (cons - abandon-proc_0 - (syncer-abandons - sr_0)))) - (if (eq? retry-proc_0 void) - (void) - (if (eq? - retry-proc_0 - 'reset) - (set-syncer-retry! - sr_0 - #f) - (begin - (if (syncer-retry - sr_0) - (internal-error - "syncer already has an retry callback") - (void)) - (set-syncer-retry! - sr_0 - retry-proc_0)))) - (set-syncer-evt! - sr_0 - (control-state-evt-evt - new-evt_0)) - (end-atomic) - (if (if fast-only?21_0 - (not - (if (eq? - interrupt-proc_0 - void) - (if (eq? - abandon-proc_0 - void) - (eq? - retry-proc_0 - void) - #f) - #f)) - #f) - (|#%app| - fail-k18_0 - sched-info_0 - #f - #f) - (loop_0 - sr_0 - (add1 retries_0) - polled-all-so-far?_0 - no-wrappers?_0))))))) - (if (poll-guard-evt? new-evt_0) + (if (let ((or-part_0 + (syncer-interrupt sr_0))) + (if or-part_0 + or-part_0 + (syncer-retry sr_0))) (begin (end-atomic) - (if fast-only?21_0 - (|#%app| - fail-k18_0 - sched-info_0 - #f - #f) - (let ((generated_0 - (call-with-continuation-barrier - (lambda () - (|#%app| - (poll-guard-evt-proc - new-evt_0) - just-poll?20_0))))) - (begin - (set-syncer-evt! - sr_0 - (if (1/evt? generated_0) - generated_0 - (wrap-evt7.1 - the-always-evt - (lambda (a_0) - generated_0)))) - (loop_0 - sr_0 - (add1 retries_0) - polled-all-so-far?_0 - #f))))) - (if (if (never-evt? new-evt_0) - (if (not - (|#%app| - evt-impersonator? - new-evt_0)) - (if (not - (syncer-interrupt - sr_0)) - (if (null? - (syncer-commits - sr_0)) - (null? - (syncer-abandons - sr_0)) - #f) - #f) - #f) - #f) + (internal-error + "choice event discovered after interrupt/retry callback")) + (void)) + (let ((new-syncers_0 + (let ((app_0 random-rotate)) + (|#%app| + app_0 + (let ((app_1 + evts->syncers)) + (let ((app_2 + (choice-evt-evts + new-evt_0))) + (let ((app_3 + (syncer-wraps + sr_0))) + (let ((app_4 + (syncer-commits + sr_0))) + (|#%app| + app_1 + #f + app_2 + app_3 + app_4 + (syncer-abandons + sr_0)))))))))) + (if (not new-syncers_0) (begin (syncer-remove! sr_0 s32_0) (end-atomic) @@ -10706,34 +10549,241 @@ 0 polled-all-so-far?_0 no-wrappers?_0)) - (if (if (eq? - new-evt_0 - (syncer-evt sr_0)) - (not - (poll-ctx-incomplete? - ctx_0)) + (begin + (syncer-replace! + sr_0 + new-syncers_0 + s32_0) + (end-atomic) + (loop_0 + new-syncers_0 + (add1 retries_0) + polled-all-so-far?_0 + no-wrappers?_0))))) + (if (wrap-evt? new-evt_0) + (begin + (set-syncer-wraps! + sr_0 + (let ((app_0 + (wrap-evt-wrap new-evt_0))) + (cons + app_0 + (let ((l_0 + (syncer-wraps sr_0))) + (if (if (null? l_0) + (not + (handle-evt?$1 + new-evt_0)) + #f) + (list values) + l_0))))) + (let ((inner-new-evt_0 + (wrap-evt-evt new-evt_0))) + (begin + (set-syncer-evt! + sr_0 + inner-new-evt_0) + (if (eq? + inner-new-evt_0 + the-always-evt) + (begin + (syncing-done! s32_0 sr_0) + (make-result + sr_0 + (list the-always-evt) + success-k19_0)) + (begin + (end-atomic) + (loop_0 + sr_0 + (add1 retries_0) + polled-all-so-far?_0 + #f)))))) + (if (control-state-evt? new-evt_0) + (let ((wrap-proc_0 + (control-state-evt-wrap-proc + new-evt_0))) + (let ((interrupt-proc_0 + (control-state-evt-interrupt-proc + new-evt_0))) + (let ((abandon-proc_0 + (control-state-evt-abandon-proc + new-evt_0))) + (let ((retry-proc_0 + (control-state-evt-retry-proc + new-evt_0))) + (begin + (if (eq? + wrap-proc_0 + values) + (void) + (set-syncer-wraps! + sr_0 + (cons + wrap-proc_0 + (syncer-wraps + sr_0)))) + (if (eq? + interrupt-proc_0 + void) + (void) + (if (eq? + interrupt-proc_0 + 'reset) + (set-syncer-interrupt! + sr_0 + #f) + (begin + (if (syncer-interrupt + sr_0) + (internal-error + "syncer already has an interrupt callback") + (void)) + (set-syncer-interrupt! + sr_0 + interrupt-proc_0)))) + (if (eq? + abandon-proc_0 + void) + (void) + (set-syncer-abandons! + sr_0 + (cons + abandon-proc_0 + (syncer-abandons + sr_0)))) + (if (eq? + retry-proc_0 + void) + (void) + (if (eq? + retry-proc_0 + 'reset) + (set-syncer-retry! + sr_0 + #f) + (begin + (if (syncer-retry + sr_0) + (internal-error + "syncer already has an retry callback") + (void)) + (set-syncer-retry! + sr_0 + retry-proc_0)))) + (set-syncer-evt! + sr_0 + (control-state-evt-evt + new-evt_0)) + (end-atomic) + (if (if fast-only?21_0 + (not + (if (eq? + interrupt-proc_0 + void) + (if (eq? + abandon-proc_0 + void) + (eq? + retry-proc_0 + void) + #f) + #f)) + #f) + (|#%app| + fail-k18_0 + sched-info_0 + #f + #f) + (loop_0 + sr_0 + (add1 retries_0) + polled-all-so-far?_0 + no-wrappers?_0))))))) + (if (poll-guard-evt? new-evt_0) + (begin + (end-atomic) + (if fast-only?21_0 + (|#%app| + fail-k18_0 + sched-info_0 + #f + #f) + (let ((generated_0 + (call-with-continuation-barrier + (lambda () + (|#%app| + (poll-guard-evt-proc + new-evt_0) + just-poll?20_0))))) + (begin + (set-syncer-evt! + sr_0 + (if (1/evt? generated_0) + generated_0 + (wrap-evt7.1 + the-always-evt + (lambda (a_0) + generated_0)))) + (loop_0 + sr_0 + (add1 retries_0) + polled-all-so-far?_0 + #f))))) + (if (if (never-evt? new-evt_0) + (if (not + (|#%app| + evt-impersonator? + new-evt_0)) + (if (not + (syncer-interrupt + sr_0)) + (if (null? + (syncer-commits + sr_0)) + (null? + (syncer-abandons + sr_0)) + #f) + #f) + #f) #f) (begin + (syncer-remove! sr_0 s32_0) (end-atomic) (loop_0 (syncer-next sr_0) 0 polled-all-so-far?_0 no-wrappers?_0)) - (begin - (set-syncer-evt! - sr_0 - new-evt_0) - (end-atomic) - (loop_0 - sr_0 - (add1 retries_0) - polled-all-so-far?_0 - no-wrappers?_0))))))))))) - (args - (raise-binding-result-arity-error - 2 - args))))))))))))))) + (if (if (eq? + new-evt_0 + (syncer-evt sr_0)) + (not + (poll-ctx-incomplete? + ctx_0)) + #f) + (begin + (end-atomic) + (loop_0 + (syncer-next sr_0) + 0 + polled-all-so-far?_0 + no-wrappers?_0)) + (begin + (set-syncer-evt! + sr_0 + new-evt_0) + (end-atomic) + (loop_0 + sr_0 + (add1 retries_0) + polled-all-so-far?_0 + no-wrappers?_0))))))))))) + (args + (raise-binding-result-arity-error + 2 + args)))))))))))))))) (loop_0 (syncing-syncers s32_0) 0 #t #t))))))) (define make-result (lambda (sr_0 results_0 success-k_0) @@ -11070,7 +11120,7 @@ (void))) (define struct:nested-sync-evt (make-record-type-descriptor* 'evt #f #f #f #f 3 7)) -(define effect_2073 +(define effect_2232 (struct-type-install-properties! struct:nested-sync-evt 'evt @@ -11080,9 +11130,7 @@ (list (cons 1/prop:evt - (poller2.1 - (lambda (self_0 poll-ctx_0) - (|#%app| poll-nested-sync self_0 poll-ctx_0))))) + (poller2.1 (lambda (self_0 poll-ctx_0) (values #f self_0))))) (current-inspector) #f '(0 1 2) @@ -11204,15 +11252,11 @@ (lambda () (syncing-retry! s_0)))))))) orig-evt_0))))))) (define poll-nested-sync - (lambda (ns_0 poll-ctx_0) + (lambda (ns_0 just-poll?_0 fast-only?_0 sched-info_0) (let ((temp90_0 (|#%app| nested-sync-evt-s ns_0))) (let ((temp91_0 - (lambda (sched-info_0 polled-all?_0 no-wrappers?_0) - (begin - (if polled-all?_0 - (void) - (set-poll-ctx-incomplete?! poll-ctx_0 #f)) - (values #f ns_0))))) + (lambda (sched-info_1 polled-all?_0 no-wrappers?_0) + (values polled-all?_0 ns_0)))) (let ((temp92_0 (lambda (thunk_0) (let ((next_0 (nested-sync-evt-next ns_0))) @@ -11232,21 +11276,16 @@ 'reset void 'reset))))))) - (let ((temp93_0 (poll-ctx-poll? poll-ctx_0))) - (let ((temp95_0 (poll-ctx-sched-info poll-ctx_0))) - (let ((temp93_1 temp93_0) - (temp92_1 temp92_0) - (temp91_1 temp91_0) - (temp90_1 temp90_0)) - (sync-poll.1 - #f - #f - temp91_1 - #f - temp93_1 - temp95_0 - temp92_1 - temp90_1))))))))) + (let ((temp91_1 temp91_0) (temp90_1 temp90_0)) + (sync-poll.1 + #f + #f + temp91_1 + fast-only?_0 + just-poll?_0 + sched-info_0 + temp92_0 + temp90_1))))))) (define 1/current-evt-pseudo-random-generator (make-parameter (make-pseudo-random-generator) diff --git a/racket/src/thread/sync.rkt b/racket/src/thread/sync.rkt index 1bf074736d..71cb51cfd9 100644 --- a/racket/src/thread/sync.rkt +++ b/racket/src/thread/sync.rkt @@ -399,6 +399,27 @@ (schedule-info-did-work! sched-info) (end-atomic) (loop (syncer-next sr) 0 #f #f)] + [(nested-sync-evt? (syncer-evt sr)) + ;; Have to go out of atomic mode to continue: + (end-atomic) + (define-values (same? new-evt) (poll-nested-sync (syncer-evt sr) just-poll? fast-only? sched-info)) + (cond + [same? + (loop (syncer-next sr) 0 polled-all-so-far? no-wrappers?)] + [else + ;; Since we left atomic mode, double-check that we're + ;; still syncing before installing the replacement event: + (atomically + (unless (syncing-selected s) + (set-syncer-evt! sr new-evt)) + (void)) + (cond + [fast-only? + ;; Conservative, because we don't know whether `same?` was #f + ;; because the nested sync had non-fast elements + (none-k sched-info #f #f)] + [else + (loop sr (add1 retries) polled-all-so-far? no-wrappers?)])])] [else (define ctx (poll-ctx just-poll? ;; Call back for asynchronous selection, @@ -729,7 +750,7 @@ #:reflection-name 'evt) (struct nested-sync-evt (s next orig-evt) - #:property prop:evt (poller (lambda (self poll-ctx) (poll-nested-sync self poll-ctx))) + #:property prop:evt (poller (lambda (self poll-ctx) (values #f self))) #:reflection-name 'evt) (define/who (replace-evt evt next) @@ -754,12 +775,10 @@ (lambda () (syncing-retry! s))))))) orig-evt) -(define (poll-nested-sync ns poll-ctx) +(define (poll-nested-sync ns just-poll? fast-only? sched-info) (sync-poll (nested-sync-evt-s ns) #:fail-k (lambda (sched-info polled-all? no-wrappers?) - (unless polled-all? - (set-poll-ctx-incomplete?! poll-ctx #f)) - (values #f ns)) + (values polled-all? ns)) #:success-k (lambda (thunk) ;; `thunk` produces the values of the evt ;; that was provided to `replace-evt`: @@ -778,9 +797,10 @@ 'reset void 'reset))) - #:just-poll? (poll-ctx-poll? poll-ctx) + #:just-poll? just-poll? #:done-after-poll? #f - #:schedule-info (poll-ctx-sched-info poll-ctx))) + #:fast-only? fast-only? + #:schedule-info sched-info)) ;; ----------------------------------------