From f508bb25abd79bf8a7d9692643b75cbb45aca9d7 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Wed, 3 Jun 2020 09:38:45 -0600 Subject: [PATCH] cs & threads: fix `replace-evt` leading to `choice-evt` Closes #3229 --- pkgs/racket-test-core/tests/racket/sync.rktl | 23 +++++++ racket/src/thread/evt.rkt | 7 ++ racket/src/thread/sync.rkt | 69 ++++++++++++-------- 3 files changed, 73 insertions(+), 26 deletions(-) diff --git a/pkgs/racket-test-core/tests/racket/sync.rktl b/pkgs/racket-test-core/tests/racket/sync.rktl index fa170563af..97cdb405ec 100644 --- a/pkgs/racket-test-core/tests/racket/sync.rktl +++ b/pkgs/racket-test-core/tests/racket/sync.rktl @@ -689,6 +689,29 @@ (lambda (_) (+ a b)))))) +(let () + (define (chain-evts e1 e2) + (sync (make-semaphore) (replace-evt e1 + (lambda (v) + (choice-evt + e2 + (make-semaphore)))))) + (test always-evt chain-evts always-evt always-evt) + (test always-evt chain-evts (make-semaphore 1) always-evt) + (let ([s (make-semaphore 1)]) + (test always-evt chain-evts s always-evt)) + (let ([s (make-semaphore 1)]) + (test s chain-evts (make-semaphore 1) s)) + (let ([s (make-semaphore 2)]) + (test s chain-evts s s) + (test #f sync/timeout 0 s)) + (let ([s (make-semaphore)]) + (thread (lambda () (semaphore-post s))) + (test always-evt chain-evts s always-evt)) + (let ([s (make-semaphore)]) + (thread (lambda () (semaphore-post s) (sleep) (semaphore-post s))) + (test s chain-evts s s))) + ;; ---------------------------------------- ;; Structures as waitables diff --git a/racket/src/thread/evt.rkt b/racket/src/thread/evt.rkt index 75178f3a9c..3def9285ee 100644 --- a/racket/src/thread/evt.rkt +++ b/racket/src/thread/evt.rkt @@ -137,6 +137,12 @@ ;; semaphore was meanwhile posted). As another example, a ;; `nack-guard-evt`'s result uses `abandon-proc` to post to the NACK ;; event. +;; +;; A sync slot can have at most one non-`void` `interupt-proc` and +;; `retry-proc` (but any number of `abandon-proc`s). The +;; `interrupt-proc` and `abandon-proc` fields in `control-state-evt` +;; can be 'reset to discard existing procedures for the slot. +;; ;; Beware that it doesn't make sense to use `wrap-evt` around the ;; `control-state-evt` or the `evt` inside for an asynchronously ;; satisfied event (like the way that semaphores are implemented). The @@ -144,6 +150,7 @@ ;; event is found, so that the result turns out to be an unwrapped ;; event. Or the `interrupt-proc`, etc., callbacks may not be found ;; early enough if the `control-state-evt` is wrapped. +;; (struct control-state-evt (evt wrap-proc interrupt-proc ; thunk for break/kill initiated or otherwise before `abandon-proc` diff --git a/racket/src/thread/sync.rkt b/racket/src/thread/sync.rkt index b4fcf688ec..1bf074736d 100644 --- a/racket/src/thread/sync.rkt +++ b/racket/src/thread/sync.rkt @@ -35,16 +35,16 @@ wraps ; list of wraps to apply if selected commits ; list of thunks to run atomically when selected interrupted? ; kill/break in progress? - interrupts ; list of thunks to run on kill/break initiation + interrupt ; #f or a thunk to run on kill/break initiation abandons ; list of thunks to run on kill/break completion - retries ; list of thunks to run on retry: returns `(values _val _ready?)` + retry ; #f or a thunk to run on retry: returns `(values _val _ready?)` prev ; previous in linked list next) ; next in linked list #:transparent #:mutable) (define (make-syncer evt wraps prev) - (syncer evt wraps null #f null null null prev #f)) + (syncer evt wraps null #f #f null #f prev #f)) (define none-syncer (make-syncer #f null #f)) @@ -437,10 +437,10 @@ (void)) (loop sr (add1 retries) polled-all-so-far? #f))])] [(choice-evt? new-evt) - (when (or (pair? (syncer-interrupts sr)) - (pair? (syncer-retries sr))) + (when (or (syncer-interrupt sr) + (syncer-retry sr)) (end-atomic) - (internal-error "choice event discovered after interrupt/retry callbacks")) + (internal-error "choice event discovered after interrupt/retry callback")) (define new-syncers (random-rotate (evts->syncers #f (choice-evt-evts new-evt) @@ -487,11 +487,19 @@ (unless (eq? wrap-proc values) (set-syncer-wraps! sr (cons wrap-proc (syncer-wraps sr)))) (unless (eq? interrupt-proc void) - (set-syncer-interrupts! sr (cons interrupt-proc (syncer-interrupts sr)))) + (cond + [(eq? interrupt-proc 'reset) (set-syncer-interrupt! sr #f)] + [else + (when (syncer-interrupt sr) (internal-error "syncer already has an interrupt callback")) + (set-syncer-interrupt! sr interrupt-proc)])) (unless (eq? abandon-proc void) (set-syncer-abandons! sr (cons abandon-proc (syncer-abandons sr)))) (unless (eq? retry-proc void) - (set-syncer-retries! sr (cons retry-proc (syncer-retries sr)))) + (cond + [(eq? retry-proc 'reset) (set-syncer-retry! sr #f)] + [else + (when (syncer-retry sr) (internal-error "syncer already has an retry callback")) + (set-syncer-retry! sr retry-proc)])) (set-syncer-evt! sr (control-state-evt-evt new-evt)) (end-atomic) (cond @@ -517,7 +525,7 @@ (loop sr (add1 retries) polled-all-so-far? #f)])] [(and (never-evt? new-evt) (not (evt-impersonator? new-evt)) - (null? (syncer-interrupts sr)) + (not (syncer-interrupt sr)) (null? (syncer-commits sr)) (null? (syncer-abandons sr))) ;; Drop this event, since it will never get selected @@ -573,8 +581,9 @@ (when sr (unless (eq? sr selected-sr) (unless (syncer-interrupted? sr) - (for ([interrupt (in-list (syncer-interrupts sr))]) - (interrupt))) + (let ([interrupt (syncer-interrupt sr)]) + (when interrupt + (interrupt)))) (for ([abandon (in-list (syncer-abandons sr))]) (abandon))) (loop (syncer-next sr)))) @@ -597,8 +606,9 @@ (when sr (unless (syncer-interrupted? sr) (set-syncer-interrupted?! sr #t) - (for ([interrupt (in-list (syncer-interrupts sr))]) - (interrupt))) + (let ([interrupt (syncer-interrupt sr)]) + (when interrupt + (interrupt)))) (loop (syncer-next sr))))) ;; Called in atomic mode @@ -614,13 +624,12 @@ (not (syncing-selected s))) (when (syncer-interrupted? sr) (set-syncer-interrupted?! sr #f) - ;; Although we keep a list of retries, we expect only - ;; one to be relevant - (for ([retry (in-list (syncer-retries sr))]) - (define-values (result ready?) (retry)) - (when ready? - (set-syncer-wraps! sr (cons (lambda args result) (syncer-wraps sr))) - (syncing-done! s sr)))) + (let ([retry (syncer-retry sr)]) + (when retry + (define-values (result ready?) (retry)) + (when ready? + (set-syncer-wraps! sr (cons (lambda args result) (syncer-wraps sr))) + (syncing-done! s sr))))) (loop (syncer-next sr))))) ;; Queue a retry when a check for breaks should happen before a retry @@ -737,6 +746,9 @@ (control-state-evt (nested-sync-evt s next orig-evt) values + ;; The interrupt and retry callbacks get discarded + ;; when a new event is returned (but the abandon + ;; callback is preserved) (lambda () (syncing-interrupt! s)) (lambda () (syncing-abandon! s)) (lambda () (syncing-retry! s))))))) @@ -755,12 +767,17 @@ (define orig-evt (nested-sync-evt-orig-evt ns)) (values #f ;; and this is the "replace" step: - (poll-guard-evt - (lambda (poll?) - (define r (call-with-values thunk next)) - (cond - [(evt? r) r] - [else (wrap-evt always-evt (lambda (v) orig-evt))]))))) + (control-state-evt + (poll-guard-evt + (lambda (poll?) + (define r (call-with-values thunk next)) + (cond + [(evt? r) r] + [else (wrap-evt always-evt (lambda (v) orig-evt))]))) + values + 'reset + void + 'reset))) #:just-poll? (poll-ctx-poll? poll-ctx) #:done-after-poll? #f #:schedule-info (poll-ctx-sched-info poll-ctx)))