diff --git a/racket/src/thread/atomic.rkt b/racket/src/thread/atomic.rkt index fbf9911720..db3b1625a6 100644 --- a/racket/src/thread/atomic.rkt +++ b/racket/src/thread/atomic.rkt @@ -30,12 +30,19 @@ ;; "atomically" is atomic within a place; when a future-running ;; pthread tries to enter atomic mode, it is suspended -(define-syntax-rule (atomically expr ...) - (begin - (start-atomic) - (begin0 - (let () expr ...) - (end-atomic)))) +(define-syntax atomically + (syntax-rules (void) + [(_ expr ... (void)) ; `(void)` => no need for `begin0` + (begin + (start-atomic) + expr ... + (end-atomic))] + [(_ expr ...) + (begin + (start-atomic) + (begin0 + (let () expr ...) + (end-atomic)))])) (define-syntax-rule (atomically/no-interrupts expr ...) (begin diff --git a/racket/src/thread/channel.rkt b/racket/src/thread/channel.rkt index b89ad5471b..ab5ee8f4d1 100644 --- a/racket/src/thread/channel.rkt +++ b/racket/src/thread/channel.rkt @@ -69,9 +69,10 @@ (define n (queue-add! gq (cons gw b))) (waiter-suspend! gw ;; On break/kill/suspend: - (lambda () (queue-remove-node! gq n)) - ;; On retry after break or resume: - (lambda () (receive)))] + (lambda () + (queue-remove-node! gq n) + ;; On retry after break or resume: + (lambda () (receive))))] [else (set-box! b (cdr pw+v)) (waiter-resume! (car pw+v) (void)) @@ -132,9 +133,10 @@ (define n (queue-add! pq (cons pw v))) (waiter-suspend! pw ;; On break/kill/suspend: - (lambda () (queue-remove-node! pq n)) - ;; On retry after break or resume: - (lambda () (channel-put ch v)))] + (lambda () + (queue-remove-node! pq n) + ;; On retry after break or resume: + (lambda () (channel-put ch v))))] [else (set-box! (cdr gw+b) v) (waiter-resume! (car gw+b) v) diff --git a/racket/src/thread/semaphore.rkt b/racket/src/thread/semaphore.rkt index 50515a6e4f..b838b9a075 100644 --- a/racket/src/thread/semaphore.rkt +++ b/racket/src/thread/semaphore.rkt @@ -74,7 +74,9 @@ (unsafe-struct*-cas! s count-field-pos c (add1 c))) (void)] [else - (atomically (semaphore-post/atomic s))])) + (atomically + (semaphore-post/atomic s) + (void))])) ;; In atomic mode: (define (semaphore-post/atomic s) @@ -101,7 +103,8 @@ (define (semaphore-post-all s) (atomically - (semaphore-post-all/atomic s))) + (semaphore-post-all/atomic s) + (void))) ;; In atomic mode: (define (semaphore-any-waiters? s) @@ -148,13 +151,12 @@ (lambda () (queue-remove-node! s n) (when (queue-empty? s) - (set-semaphore-count! s 0))) ; allow CAS again - ;; This callback is used, in addition to the previous one, if - ;; the thread receives a break signal but doesn't escape - ;; (either because breaks are disabled or the handler - ;; continues), or if the interrupt was to suspend and the thread - ;; is resumed: - (lambda () (semaphore-wait s)))])))])) + (set-semaphore-count! s 0)) ; allow CAS again + ;; This callback is used if the thread receives a break + ;; signal but doesn't escape (either because breaks are + ;; disabled or the handler continues), or if the + ;; interrupt was to suspend and the thread is resumed: + (lambda () (unsafe-semaphore-wait s))))])))])) ;; In atomic mode (define (semaphore-wait/poll s self poll-ctx diff --git a/racket/src/thread/sync.rkt b/racket/src/thread/sync.rkt index c52fcec16e..b4fcf688ec 100644 --- a/racket/src/thread/sync.rkt +++ b/racket/src/thread/sync.rkt @@ -88,7 +88,9 @@ (when (or (and (real? timeout) (zero? timeout)) (procedure? timeout)) - (atomically (call-pre-poll-external-callbacks))) + (atomically + (call-pre-poll-external-callbacks) + (void))) ;; General polling loop (define (go #:thunk-result? [thunk-result? #t]) @@ -99,7 +101,8 @@ (lambda () (syncing-abandon! s))) (thread-push-suspend+resume-callbacks! (lambda () (syncing-interrupt! s)) - (lambda () (syncing-queue-retry! s))))) + (lambda () (syncing-queue-retry! s))) + (void))) (lambda () (when enable-break? (check-for-break)) (cond @@ -167,7 +170,8 @@ (when local-break-cell (thread-remove-ignored-break-cell! (current-thread/in-atomic) local-break-cell)) ;; On escape, post nacks, etc.: - (syncing-abandon! s))))) + (syncing-abandon! s) + (void))))) ;; Result thunk (if needed) is called in tail position: (cond @@ -429,7 +433,8 @@ ;; still syncing before installing the replacement event: (atomically (unless (syncing-selected s) - (set-syncer-evt! sr new-evt))) + (set-syncer-evt! sr new-evt)) + (void)) (loop sr (add1 retries) polled-all-so-far? #f))])] [(choice-evt? new-evt) (when (or (pair? (syncer-interrupts sr)) @@ -699,14 +704,14 @@ ;; Interrupt due to break/kill/suspend (set-syncing-wakeup! s void) (unless (syncing-selected s) - (syncing-interrupt! s))) - ;; In non-atomic mode and tail position: - (lambda () - ;; Continue from suspend or ignored break... - ((atomically - (unless (syncing-selected s) - (syncing-retry! s)) - (retry)))))]))))) + (syncing-interrupt! s)) + ;; In non-atomic mode and tail position: + (lambda () + ;; Continue from suspend or ignored break... + ((atomically + (unless (syncing-selected s) + (syncing-retry! s)) + (retry))))))]))))) ;; ---------------------------------------- diff --git a/racket/src/thread/thread.rkt b/racket/src/thread/thread.rkt index bb5c78abe5..e72957d2df 100644 --- a/racket/src/thread/thread.rkt +++ b/racket/src/thread/thread.rkt @@ -136,7 +136,7 @@ #:property host:prop:unsafe-authentic-override #t ; allow evt chaperone #:property prop:waiter (make-waiter-methods - #:suspend! (lambda (t i-cb r-cb) (thread-deschedule! t #f i-cb r-cb)) + #:suspend! (lambda (t i-cb) (thread-deschedule! t #f i-cb)) #:resume! (lambda (t v) (thread-reschedule! t) v)) #:property prop:evt (lambda (t) (wrap-evt (get-thread-dead-evt t) (lambda (v) t))) @@ -306,7 +306,8 @@ (do-thread-suspend t)))] [else (atomically - (do-kill-thread t)) + (do-kill-thread t) + (void)) (when (eq? t (current-thread/in-atomic)) (when (eq? t root-thread) (force-exit 0)) @@ -451,14 +452,13 @@ ;; was previously called, and neither is called if the thread is ;; "internal"-resumed normally instead of by a break signal of a ;; `thread-resume`. -(define (thread-deschedule! t timeout-at interrupt-callback retry-callback) - (define needs-retry? #f) +(define (thread-deschedule! t timeout-at interrupt-callback) + (define retry-callback #f) (atomically (set-thread-interrupt-callback! t (lambda () ;; If the interrupt callback gets invoked, ;; then remember that we need a retry - (set! needs-retry? #t) - (interrupt-callback))) + (set! retry-callback (interrupt-callback)))) (define finish (do-thread-deschedule! t timeout-at)) ;; It's ok if the thread gets interrupted ;; outside the atomic region, because we'd @@ -466,7 +466,7 @@ (lambda () ;; In non-atomic mode: (finish) - (when needs-retry? + (when retry-callback (retry-callback))))) ;; in atomic mode @@ -716,11 +716,11 @@ (let loop () ((thread-deschedule! (current-thread) until-msecs - void (lambda () - ;; Woke up due to an ignored break? - ;; Try again: - (loop)))))])) + (lambda () + ;; Woke up due to an ignored break? + ;; Try again: + (loop))))))])) ;; ---------------------------------------- ;; Tracking thread progress @@ -958,9 +958,9 @@ #f ;; Interrupted for break => not waiting for mail (lambda () - (set-thread-mailbox-wakeup! t void)) - ;; No retry action, because we always retry: - void)) + (set-thread-mailbox-wakeup! t void) + ;; No retry action, because we always retry: + void))) ;; called out of atomic mode: (lambda () (do-yield) diff --git a/racket/src/thread/waiter.rkt b/racket/src/thread/waiter.rkt index eb6e8e990d..4432376141 100644 --- a/racket/src/thread/waiter.rkt +++ b/racket/src/thread/waiter.rkt @@ -24,15 +24,12 @@ (define (waiter-resume! w s) ((waiter-methods-resume (waiter-ref w)) w s)) -;; `interrupt-cb` is run if the suspend is interrupted by -;; either a break or kill; `abandon-cb` is called in -;; addition if it's a kill or a bresk escape; -;; `retry-cb` is run, instead, if the suspend -;; should be retired, and it's a thunk that runs in -;; atomic mode and returns a thunk to run in tail position -;; out of atomic mode -(define (waiter-suspend! w interrupt-cb retry-cb) - ((waiter-methods-suspend (waiter-ref w)) w interrupt-cb retry-cb)) +;; `interrupt-cb` is run in atomic mode if the suspend is interrupted +;; by either a break or kill; the result is a `retry-cb`, which is run +;; out of atomic mode is the suspend-triggering action should be +;; retried +(define (waiter-suspend! w interrupt-cb) + ((waiter-methods-suspend (waiter-ref w)) w interrupt-cb)) ;; Used for semaphores and channels to run a "just selected" callback ;; when synchronized: