cs & thread: move retry callback to cancel result

Adjust part of the internal scheduling protocol to make a retry
callback generated by another callback that sets up the retry. This
helps clarify the protocol and avoids allocating a closure that is
rarely used.
This commit is contained in:
Matthew Flatt 2019-12-29 18:56:42 -07:00
parent 755e914c7c
commit ce4ad668b6
6 changed files with 69 additions and 56 deletions

View File

@ -30,12 +30,19 @@
;; "atomically" is atomic within a place; when a future-running
;; pthread tries to enter atomic mode, it is suspended
(define-syntax-rule (atomically expr ...)
(begin
(start-atomic)
(begin0
(let () expr ...)
(end-atomic))))
(define-syntax atomically
(syntax-rules (void)
[(_ expr ... (void)) ; `(void)` => no need for `begin0`
(begin
(start-atomic)
expr ...
(end-atomic))]
[(_ expr ...)
(begin
(start-atomic)
(begin0
(let () expr ...)
(end-atomic)))]))
(define-syntax-rule (atomically/no-interrupts expr ...)
(begin

View File

@ -69,9 +69,10 @@
(define n (queue-add! gq (cons gw b)))
(waiter-suspend! gw
;; On break/kill/suspend:
(lambda () (queue-remove-node! gq n))
;; On retry after break or resume:
(lambda () (receive)))]
(lambda ()
(queue-remove-node! gq n)
;; On retry after break or resume:
(lambda () (receive))))]
[else
(set-box! b (cdr pw+v))
(waiter-resume! (car pw+v) (void))
@ -132,9 +133,10 @@
(define n (queue-add! pq (cons pw v)))
(waiter-suspend! pw
;; On break/kill/suspend:
(lambda () (queue-remove-node! pq n))
;; On retry after break or resume:
(lambda () (channel-put ch v)))]
(lambda ()
(queue-remove-node! pq n)
;; On retry after break or resume:
(lambda () (channel-put ch v))))]
[else
(set-box! (cdr gw+b) v)
(waiter-resume! (car gw+b) v)

View File

@ -74,7 +74,9 @@
(unsafe-struct*-cas! s count-field-pos c (add1 c)))
(void)]
[else
(atomically (semaphore-post/atomic s))]))
(atomically
(semaphore-post/atomic s)
(void))]))
;; In atomic mode:
(define (semaphore-post/atomic s)
@ -101,7 +103,8 @@
(define (semaphore-post-all s)
(atomically
(semaphore-post-all/atomic s)))
(semaphore-post-all/atomic s)
(void)))
;; In atomic mode:
(define (semaphore-any-waiters? s)
@ -148,13 +151,12 @@
(lambda ()
(queue-remove-node! s n)
(when (queue-empty? s)
(set-semaphore-count! s 0))) ; allow CAS again
;; This callback is used, in addition to the previous one, if
;; the thread receives a break signal but doesn't escape
;; (either because breaks are disabled or the handler
;; continues), or if the interrupt was to suspend and the thread
;; is resumed:
(lambda () (semaphore-wait s)))])))]))
(set-semaphore-count! s 0)) ; allow CAS again
;; This callback is used if the thread receives a break
;; signal but doesn't escape (either because breaks are
;; disabled or the handler continues), or if the
;; interrupt was to suspend and the thread is resumed:
(lambda () (unsafe-semaphore-wait s))))])))]))
;; In atomic mode
(define (semaphore-wait/poll s self poll-ctx

View File

@ -88,7 +88,9 @@
(when (or (and (real? timeout) (zero? timeout))
(procedure? timeout))
(atomically (call-pre-poll-external-callbacks)))
(atomically
(call-pre-poll-external-callbacks)
(void)))
;; General polling loop
(define (go #:thunk-result? [thunk-result? #t])
@ -99,7 +101,8 @@
(lambda () (syncing-abandon! s)))
(thread-push-suspend+resume-callbacks!
(lambda () (syncing-interrupt! s))
(lambda () (syncing-queue-retry! s)))))
(lambda () (syncing-queue-retry! s)))
(void)))
(lambda ()
(when enable-break? (check-for-break))
(cond
@ -167,7 +170,8 @@
(when local-break-cell
(thread-remove-ignored-break-cell! (current-thread/in-atomic) local-break-cell))
;; On escape, post nacks, etc.:
(syncing-abandon! s)))))
(syncing-abandon! s)
(void)))))
;; Result thunk (if needed) is called in tail position:
(cond
@ -429,7 +433,8 @@
;; still syncing before installing the replacement event:
(atomically
(unless (syncing-selected s)
(set-syncer-evt! sr new-evt)))
(set-syncer-evt! sr new-evt))
(void))
(loop sr (add1 retries) polled-all-so-far? #f))])]
[(choice-evt? new-evt)
(when (or (pair? (syncer-interrupts sr))
@ -699,14 +704,14 @@
;; Interrupt due to break/kill/suspend
(set-syncing-wakeup! s void)
(unless (syncing-selected s)
(syncing-interrupt! s)))
;; In non-atomic mode and tail position:
(lambda ()
;; Continue from suspend or ignored break...
((atomically
(unless (syncing-selected s)
(syncing-retry! s))
(retry)))))])))))
(syncing-interrupt! s))
;; In non-atomic mode and tail position:
(lambda ()
;; Continue from suspend or ignored break...
((atomically
(unless (syncing-selected s)
(syncing-retry! s))
(retry))))))])))))
;; ----------------------------------------

View File

@ -136,7 +136,7 @@
#:property host:prop:unsafe-authentic-override #t ; allow evt chaperone
#:property prop:waiter
(make-waiter-methods
#:suspend! (lambda (t i-cb r-cb) (thread-deschedule! t #f i-cb r-cb))
#:suspend! (lambda (t i-cb) (thread-deschedule! t #f i-cb))
#:resume! (lambda (t v) (thread-reschedule! t) v))
#:property prop:evt (lambda (t) (wrap-evt (get-thread-dead-evt t)
(lambda (v) t)))
@ -306,7 +306,8 @@
(do-thread-suspend t)))]
[else
(atomically
(do-kill-thread t))
(do-kill-thread t)
(void))
(when (eq? t (current-thread/in-atomic))
(when (eq? t root-thread)
(force-exit 0))
@ -451,14 +452,13 @@
;; was previously called, and neither is called if the thread is
;; "internal"-resumed normally instead of by a break signal of a
;; `thread-resume`.
(define (thread-deschedule! t timeout-at interrupt-callback retry-callback)
(define needs-retry? #f)
(define (thread-deschedule! t timeout-at interrupt-callback)
(define retry-callback #f)
(atomically
(set-thread-interrupt-callback! t (lambda ()
;; If the interrupt callback gets invoked,
;; then remember that we need a retry
(set! needs-retry? #t)
(interrupt-callback)))
(set! retry-callback (interrupt-callback))))
(define finish (do-thread-deschedule! t timeout-at))
;; It's ok if the thread gets interrupted
;; outside the atomic region, because we'd
@ -466,7 +466,7 @@
(lambda ()
;; In non-atomic mode:
(finish)
(when needs-retry?
(when retry-callback
(retry-callback)))))
;; in atomic mode
@ -716,11 +716,11 @@
(let loop ()
((thread-deschedule! (current-thread)
until-msecs
void
(lambda ()
;; Woke up due to an ignored break?
;; Try again:
(loop)))))]))
(lambda ()
;; Woke up due to an ignored break?
;; Try again:
(loop))))))]))
;; ----------------------------------------
;; Tracking thread progress
@ -958,9 +958,9 @@
#f
;; Interrupted for break => not waiting for mail
(lambda ()
(set-thread-mailbox-wakeup! t void))
;; No retry action, because we always retry:
void))
(set-thread-mailbox-wakeup! t void)
;; No retry action, because we always retry:
void)))
;; called out of atomic mode:
(lambda ()
(do-yield)

View File

@ -24,15 +24,12 @@
(define (waiter-resume! w s)
((waiter-methods-resume (waiter-ref w)) w s))
;; `interrupt-cb` is run if the suspend is interrupted by
;; either a break or kill; `abandon-cb` is called in
;; addition if it's a kill or a bresk escape;
;; `retry-cb` is run, instead, if the suspend
;; should be retired, and it's a thunk that runs in
;; atomic mode and returns a thunk to run in tail position
;; out of atomic mode
(define (waiter-suspend! w interrupt-cb retry-cb)
((waiter-methods-suspend (waiter-ref w)) w interrupt-cb retry-cb))
;; `interrupt-cb` is run in atomic mode if the suspend is interrupted
;; by either a break or kill; the result is a `retry-cb`, which is run
;; out of atomic mode is the suspend-triggering action should be
;; retried
(define (waiter-suspend! w interrupt-cb)
((waiter-methods-suspend (waiter-ref w)) w interrupt-cb))
;; Used for semaphores and channels to run a "just selected" callback
;; when synchronized: