cs: add evt-polling shortcut

For simple cases, avoid `dynamic-wind` and related work when polling
events.
This commit is contained in:
Matthew Flatt 2019-10-07 15:08:24 -06:00
parent ada70bbd68
commit 0c4fbda8ba
8 changed files with 254 additions and 130 deletions

View File

@ -0,0 +1,19 @@
#lang racket/base
(require racket/include)
(include "config.rktl")
;; ----------------------------------------
'barrier
(times
(let ([one (lambda () 1)])
(let loop ([i M])
(unless (zero? i)
(loop (- i (call-with-continuation-barrier one)))))))
'thunk+barrier
(times
(let loop ([i M])
(unless (zero? i)
(loop (call-with-continuation-barrier (lambda () (sub1 i)))))))

View File

@ -25,14 +25,49 @@
(for ([i (in-range M)]) (for ([i (in-range M)])
(sync e)))) (sync e))))
'sync-two 'sync-guard
(times
(let ([g (guard-evt (lambda () always-evt))])
(for ([i (in-range M)])
(sync g))))
'sync-semaphore+never
(times (times
(let ([s (make-semaphore M)]) (let ([s (make-semaphore M)])
(for ([i (in-range M)]) (for ([i (in-range M)])
(sync s never-evt)))) (sync s never-evt))))
'sync-three 'sync-semaphore+semaphore
(times (times
(let ([s (make-semaphore M)]) (let ([s (make-semaphore M)]
[s2 (make-semaphore M)])
(for ([i (in-range M)]) (for ([i (in-range M)])
(sync s always-evt never-evt)))) (sync s s2))))
'sync-semaphore+semaphore/timeout
(times
(let ([s (make-semaphore)]
[s2 (make-semaphore)])
(for ([i (in-range M)])
(sync/timeout 0 s s2))))
'sync-semaphore+semaphore/timeout-callback
(times
(let ([s (make-semaphore)]
[s2 (make-semaphore)])
(for ([i (in-range M)])
(sync/timeout list s s2))))
'sync-guard+guard
(times
(let ([g (guard-evt (lambda () always-evt))])
(for ([i (in-range M)])
(sync g g))))
'sync-three-semaphores
(times
(let ([s (make-semaphore M)]
[s2 (make-semaphore M)]
[s3 (make-semaphore M)])
(for ([i (in-range M)])
(sync s s2 s3))))

View File

@ -566,6 +566,15 @@
s)) s))
(make-semaphore)))) (make-semaphore))))
(let ()
(define k #f)
(test always-evt sync (poll-guard-evt
(lambda (poll?)
(let/cc now-k
(set! k now-k))
always-evt)))
(err/rt-test (k 10)))
;; ---------------------------------------- ;; ----------------------------------------
;; Replace waitables ;; Replace waitables

View File

@ -676,7 +676,8 @@
[`(call-with-values ,generator ,receiver) [`(call-with-values ,generator ,receiver)
(cond (cond
[(and (lambda? generator) [(and (lambda? generator)
(lambda? receiver)) (or (lambda? receiver)
(eq? (unwrap receiver) 'list)))
`(call-with-values ,(schemify generator 'fresh) ,(schemify receiver 'fresh))] `(call-with-values ,(schemify generator 'fresh) ,(schemify receiver 'fresh))]
[else [else
(left-to-right/app (if for-cify? 'call-with-values '#%call-with-values) (left-to-right/app (if for-cify? 'call-with-values '#%call-with-values)

View File

@ -90,7 +90,7 @@
(waiter-resume! (car pw+v) (void)) (waiter-resume! (car pw+v) (void))
(values (list (cdr pw+v)) #f)] (values (list (cdr pw+v)) #f)]
[(poll-ctx-poll? poll-ctx) [(poll-ctx-poll? poll-ctx)
(values #f never-evt)] (values #f ch)]
[else [else
(define b (box #f)) (define b (box #f))
(define gq (channel-get-queue ch)) (define gq (channel-get-queue ch))
@ -142,7 +142,7 @@
void])))])) void])))]))
;; In atomic mode ;; In atomic mode
(define (channel-put/poll ch v result poll-ctx) (define (channel-put/poll ch v self poll-ctx)
;; Similar to `channel-put`, but works in terms of a ;; Similar to `channel-put`, but works in terms of a
;; `select-waiter` instead of a thread ;; `select-waiter` instead of a thread
(assert-atomic-mode) (assert-atomic-mode)
@ -152,9 +152,9 @@
[gw+b [gw+b
(set-box! (cdr gw+b) v) (set-box! (cdr gw+b) v)
(waiter-resume! (car gw+b) v) (waiter-resume! (car gw+b) v)
(values (list result) #f)] (values (list self) #f)]
[(poll-ctx-poll? poll-ctx) [(poll-ctx-poll? poll-ctx)
(values #f async-evt)] (values #f self)]
[else [else
(define pq (channel-put-queue ch)) (define pq (channel-put-queue ch))
(define pw (channel-select-waiter (poll-ctx-select-proc poll-ctx) (define pw (channel-select-waiter (poll-ctx-select-proc poll-ctx)
@ -172,11 +172,11 @@
[gw+b [gw+b
(set-box! (cdr gw+b) v) (set-box! (cdr gw+b) v)
(waiter-resume! (car gw+b) v) (waiter-resume! (car gw+b) v)
(values result #t)] (values self #t)]
[else [else
(set! n (queue-add! pq (cons pw v))) (set! n (queue-add! pq (cons pw v)))
(values #f #f)]))) (values #f #f)])))
(lambda (v) result)))])) (lambda (v) self)))]))
(define/who (channel-put-evt ch v) (define/who (channel-put-evt ch v)
(check who channel? ch) (check who channel? ch)

View File

@ -10,6 +10,7 @@
"thread-group.rkt" "thread-group.rkt"
"schedule-info.rkt" "schedule-info.rkt"
(submod "thread.rkt" scheduling) (submod "thread.rkt" scheduling)
(submod "sync.rkt" scheduling)
"system-idle-evt.rkt" "system-idle-evt.rkt"
"exit.rkt" "exit.rkt"
"future.rkt" "future.rkt"
@ -44,6 +45,7 @@
(init-system-idle-evt!) (init-system-idle-evt!)
(init-future-place!) (init-future-place!)
(init-schedule-counters!) (init-schedule-counters!)
(init-sync-place!)
(call-in-main-thread thunk)) (call-in-main-thread thunk))
;; ---------------------------------------- ;; ----------------------------------------

View File

@ -32,7 +32,7 @@
#:property #:property
prop:evt prop:evt
(poller (lambda (s poll-ctx) (poller (lambda (s poll-ctx)
(semaphore-wait/poll s poll-ctx)))) (semaphore-wait/poll s s poll-ctx))))
(define count-field-pos 2) ; used with `unsafe-struct*-cas!` (define count-field-pos 2) ; used with `unsafe-struct*-cas!`
(struct semaphore-peek-evt (sema) (struct semaphore-peek-evt (sema)
@ -40,6 +40,7 @@
prop:evt prop:evt
(poller (lambda (sp poll-ctx) (poller (lambda (sp poll-ctx)
(semaphore-wait/poll (semaphore-peek-evt-sema sp) (semaphore-wait/poll (semaphore-peek-evt-sema sp)
sp
poll-ctx poll-ctx
#:peek? #t #:peek? #t
#:result sp)))) #:result sp))))
@ -157,7 +158,7 @@
(lambda () (semaphore-wait s)))])))])) (lambda () (semaphore-wait s)))])))]))
;; In atomic mode ;; In atomic mode
(define (semaphore-wait/poll s poll-ctx (define (semaphore-wait/poll s self poll-ctx
#:peek? [peek? #f] #:peek? [peek? #f]
#:result [result s]) #:result [result s])
;; Similar to `semaphore-wait, but as called by `sync`, ;; Similar to `semaphore-wait, but as called by `sync`,
@ -170,7 +171,7 @@
(set-semaphore-count! s (sub1 c))) (set-semaphore-count! s (sub1 c)))
(values (list result) #f)] (values (list result) #f)]
[(poll-ctx-poll? poll-ctx) [(poll-ctx-poll? poll-ctx)
(values #f never-evt)] (values #f self)]
[else [else
(define w (if peek? (define w (if peek?
(semaphore-peek-select-waiter (poll-ctx-select-proc poll-ctx)) (semaphore-peek-select-waiter (poll-ctx-select-proc poll-ctx))

View File

@ -21,6 +21,9 @@
current-evt-pseudo-random-generator current-evt-pseudo-random-generator
replace-evt) replace-evt)
(module+ scheduling
(provide init-sync-place!))
(struct syncing (selected ; #f or a syncer that has been selected (struct syncing (selected ; #f or a syncer that has been selected
syncers ; linked list of `syncer`s syncers ; linked list of `syncer`s
wakeup ; a callback for when something is selected wakeup ; a callback for when something is selected
@ -76,15 +79,19 @@
(define local-break-cell (and enable-break? (define local-break-cell (and enable-break?
(make-thread-cell #t))) (make-thread-cell #t)))
(define syncers (evts->syncers who args)) (define s (make-syncing (random-rotate (evts->syncers who args))
(define s (make-syncing syncers
#:disable-break #:disable-break
(and local-break-cell (and local-break-cell
(let ([t (current-thread)]) (let ([t (current-thread)])
(lambda () (lambda ()
(thread-ignore-break-cell! t local-break-cell)))))) (thread-ignore-break-cell! t local-break-cell))))))
(define (go) (when (or (and (real? timeout) (zero? timeout))
(procedure? timeout))
(atomically (call-pre-poll-external-callbacks)))
;; General polling loop
(define (go #:thunk-result? [thunk-result? #f])
(dynamic-wind (dynamic-wind
(lambda () (lambda ()
(atomically (atomically
@ -98,16 +105,21 @@
(cond (cond
[(or (and (real? timeout) (zero? timeout)) [(or (and (real? timeout) (zero? timeout))
(procedure? timeout)) (procedure? timeout))
(atomically (call-pre-poll-external-callbacks))
(let poll-loop () (let poll-loop ()
(sync-poll s #:fail-k (lambda (sched-info polled-all?) (sync-poll s
(cond #:success-k (and thunk-result? (lambda (thunk) thunk))
[(not polled-all?) #:fail-k (lambda (sched-info polled-all?)
(poll-loop)] (cond
[(procedure? timeout) [(not polled-all?)
timeout] (poll-loop)]
[else [(procedure? timeout)
(lambda () #f)])) (if thunk-result?
timeout
(timeout))]
[else
(if thunk-result?
(lambda () #f)
#f)]))
#:just-poll? #t))] #:just-poll? #t))]
[else [else
;; Loop to poll; if all events end up with asynchronous-select ;; Loop to poll; if all events end up with asynchronous-select
@ -129,8 +141,10 @@
[else [else
(syncing-done! s none-syncer) (syncing-done! s none-syncer)
(end-atomic) (end-atomic)
;; Return result in a thunk: ;; Return result:
(lambda () #f)])] (if thunk-result?
(lambda () #f)
#f)])]
[(and (all-asynchronous? s) [(and (all-asynchronous? s)
(not (syncing-selected s)) (not (syncing-selected s))
(not (syncing-need-retry? s))) (not (syncing-need-retry? s)))
@ -139,6 +153,7 @@
(loop #f #t)] (loop #f #t)]
[else [else
(sync-poll s (sync-poll s
#:success-k (and thunk-result? (lambda (thunk) thunk))
#:did-work? did-work? #:did-work? did-work?
#:fail-k (lambda (sched-info now-polled-all?) #:fail-k (lambda (sched-info now-polled-all?)
(when timeout-at (when timeout-at
@ -154,20 +169,32 @@
;; On escape, post nacks, etc.: ;; On escape, post nacks, etc.:
(syncing-abandon! s))))) (syncing-abandon! s)))))
;; Result thunk is called in tail position: ;; Result thunk (if needed) is called in tail position:
((cond (cond
[enable-break? [enable-break?
;; Install a new break cell, and check for breaks at the end: ;; Install a new break cell, and check for breaks at the end:
(begin0 (define thunk (with-continuation-mark
(with-continuation-mark break-enabled-key
break-enabled-key local-break-cell
local-break-cell (go #:thunk-result? #t)))
(go)) ;; In case old break cell was meanwhile enabled:
;; In case old break cell was meanwhile enabled: (check-for-break)
(check-for-break))] ;; In tail position:
[else (thunk)]
;; Just `go`: [else
(go)]))) ;; Try a fast poll (avoiding `dynamic-wind`, etc.)
;; before chaining to `go`:
(sync-poll s
#:fail-k (lambda (sched-info polled-all?)
(cond
[polled-all?
(cond
[(and (real? timeout) (zero? timeout)) #f]
[(procedure? timeout) (timeout)]
[else (go)])]
[else (go)]))
#:just-poll? #t
#:fast-only? #t)]))
(define sync (define sync
(case-lambda (case-lambda
@ -338,65 +365,74 @@
;; call in tail position --- possibly one that calls `none-k`. ;; call in tail position --- possibly one that calls `none-k`.
(define (sync-poll s (define (sync-poll s
#:fail-k none-k #:fail-k none-k
#:success-k [success-k (lambda (thunk) thunk)] #:success-k [success-k #f] ; non-#f => result thunk passed to `success-k`
#:just-poll? [just-poll? #f] #:just-poll? [just-poll? #f]
#:fast-only? [fast-only? #f]
#:done-after-poll? [done-after-poll? #t] #:done-after-poll? [done-after-poll? #t]
#:did-work? [did-work? #f] #:did-work? [did-work? #f]
#:schedule-info [sched-info (make-schedule-info #:did-work? did-work?)]) #:schedule-info [sched-info (make-schedule-info #:did-work? did-work?)])
(random-rotate-syncing! s)
(let loop ([sr (syncing-syncers s)] (let loop ([sr (syncing-syncers s)]
[retries 0] ; count retries on `sr`, and advance if it's too many [retries 0] ; count retries on `sr`, and advance if it's too many
[polled-all-so-far? #t]) [polled-all-so-far? #t])
(when (syncing-need-retry? s) (when (syncing-need-retry? s)
(syncing-retry! s)) (syncing-retry! s))
((atomically (start-atomic)
(cond (cond
[(syncing-selected s) [(syncing-selected s)
=> (lambda (sr) => (lambda (sr)
;; Some concurrent synchronization happened ;; Some concurrent synchronization happened;
(make-result-thunk sr (list (syncer-evt sr)) success-k))] ;; note that `make-result` is responsible for
[(not sr) ;; exiting atomic mode
(when (and just-poll? done-after-poll? polled-all-so-far?) (make-result sr (list (syncer-evt sr)) success-k))]
(syncing-done! s none-syncer)) [(not sr)
(lambda () (none-k sched-info polled-all-so-far?))] (when (and just-poll? done-after-poll? polled-all-so-far? (not fast-only?))
[(= retries MAX-SYNC-TRIES-ON-ONE-EVT) (syncing-done! s none-syncer))
(schedule-info-did-work! sched-info) (end-atomic)
(lambda () (loop (syncer-next sr) 0 #f))] (none-k sched-info polled-all-so-far?)]
[else [(= retries MAX-SYNC-TRIES-ON-ONE-EVT)
(define ctx (poll-ctx just-poll? (schedule-info-did-work! sched-info)
;; Call back for asynchronous selection, (end-atomic)
;; such as by a semaphore when it's posted (loop (syncer-next sr) 0 #f)]
;; in a different thread; this callback [else
;; must be invoked in atomic mode (define ctx (poll-ctx just-poll?
(lambda () ;; Call back for asynchronous selection,
(assert-atomic-mode) ;; such as by a semaphore when it's posted
(syncing-done! s sr)) ;; in a different thread; this callback
;; Information to propagate to the thread ;; must be invoked in atomic mode
;; scheduler (lambda ()
sched-info (assert-atomic-mode)
;; Set to #t if getting the same result (syncing-done! s sr))
;; back should not be treated as a ;; Information to propagate to the thread
;; completed poll: ;; scheduler
#f)) sched-info
(define-values (results new-evt) ;; Set to #t if getting the same result
(evt-poll (syncer-evt sr) ctx)) ;; back should not be treated as a
(cond ;; completed poll:
#f))
(define-values (results new-evt)
(evt-poll (syncer-evt sr) ctx))
(cond
[results [results
(syncing-done! s sr) (syncing-done! s sr)
(make-result-thunk sr results success-k)] ;; `make-result` is responsible for leaving atomic mode
(make-result sr results success-k)]
[(delayed-poll? new-evt) [(delayed-poll? new-evt)
;; Have to go out of atomic mode to continue: ;; Have to go out of atomic mode to continue:
(lambda () (end-atomic)
(let ([new-evt ((delayed-poll-resume new-evt))]) (cond
;; Since we left atomic mode, double-check that we're [fast-only? (none-k sched-info #f)]
;; still syncing before installing the replacement event: [else
(atomically (let ([new-evt ((delayed-poll-resume new-evt))])
(unless (syncing-selected s) ;; Since we left atomic mode, double-check that we're
(set-syncer-evt! sr new-evt))) ;; still syncing before installing the replacement event:
(loop sr (add1 retries) polled-all-so-far?)))] (atomically
(unless (syncing-selected s)
(set-syncer-evt! sr new-evt)))
(loop sr (add1 retries) polled-all-so-far?))])]
[(choice-evt? new-evt) [(choice-evt? new-evt)
(when (or (pair? (syncer-interrupts sr)) (when (or (pair? (syncer-interrupts sr))
(pair? (syncer-retries sr))) (pair? (syncer-retries sr)))
(end-atomic)
(internal-error "choice event discovered after interrupt/retry callbacks")) (internal-error "choice event discovered after interrupt/retry callbacks"))
(define new-syncers (random-rotate (define new-syncers (random-rotate
(evts->syncers #f (evts->syncers #f
@ -408,11 +444,13 @@
[(not new-syncers) [(not new-syncers)
;; Empty choice, so drop it: ;; Empty choice, so drop it:
(syncer-remove! sr s) (syncer-remove! sr s)
(lambda () (loop (syncer-next sr) 0 polled-all-so-far?))] (end-atomic)
(loop (syncer-next sr) 0 polled-all-so-far?)]
[else [else
;; Splice in new syncers, and start there ;; Splice in new syncers, and start there
(syncer-replace! sr new-syncers s) (syncer-replace! sr new-syncers s)
(lambda () (loop new-syncers (add1 retries) polled-all-so-far?))])] (end-atomic)
(loop new-syncers (add1 retries) polled-all-so-far?)])]
[(wrap-evt? new-evt) [(wrap-evt? new-evt)
(set-syncer-wraps! sr (cons (wrap-evt-wrap new-evt) (set-syncer-wraps! sr (cons (wrap-evt-wrap new-evt)
(let ([l (syncer-wraps sr)]) (let ([l (syncer-wraps sr)])
@ -429,23 +467,44 @@
(cond (cond
[(eq? inner-new-evt always-evt) [(eq? inner-new-evt always-evt)
(syncing-done! s sr) (syncing-done! s sr)
(make-result-thunk sr (list always-evt) success-k)] ;; `make-result` is responsible for leaving atomic mode
(make-result sr (list always-evt) success-k)]
[else [else
(lambda () (loop sr (add1 retries) polled-all-so-far?))])] (end-atomic)
(loop sr (add1 retries) polled-all-so-far?)])]
[(control-state-evt? new-evt) [(control-state-evt? new-evt)
(set-syncer-interrupts! sr (cons-non-void (control-state-evt-interrupt-proc new-evt) (syncer-interrupts sr))) (define interrupt-proc (control-state-evt-interrupt-proc new-evt))
(set-syncer-abandons! sr (cons-non-void (control-state-evt-abandon-proc new-evt) (syncer-abandons sr))) (define abandon-proc (control-state-evt-abandon-proc new-evt))
(set-syncer-retries! sr (cons-non-void (control-state-evt-retry-proc new-evt) (syncer-retries sr))) (define retry-proc (control-state-evt-retry-proc new-evt))
(unless (eq? interrupt-proc void)
(set-syncer-interrupts! sr (cons interrupt-proc (syncer-interrupts sr))))
(unless (eq? abandon-proc void)
(set-syncer-abandons! sr (cons abandon-proc (syncer-abandons sr))))
(unless (eq? retry-proc void)
(set-syncer-retries! sr (cons retry-proc (syncer-retries sr))))
(set-syncer-evt! sr (control-state-evt-evt new-evt)) (set-syncer-evt! sr (control-state-evt-evt new-evt))
(lambda () (loop sr (add1 retries) polled-all-so-far?))] (end-atomic)
(cond
[(and fast-only?
(not (and (eq? interrupt-proc void)
(eq? abandon-proc void)
(eq? retry-proc void))))
(none-k sched-info #f)]
[else
(loop sr (add1 retries) polled-all-so-far?)])]
[(poll-guard-evt? new-evt) [(poll-guard-evt? new-evt)
(lambda () ;; Must leave atomic mode:
;; Out of atomic region: (end-atomic)
(define generated ((poll-guard-evt-proc new-evt) just-poll?)) (cond
(set-syncer-evt! sr (if (evt? generated) [fast-only? (none-k sched-info #f)]
generated [else
(wrap-evt always-evt (lambda (a) generated)))) (define generated (call-with-continuation-barrier
(loop sr (add1 retries) polled-all-so-far?))] (lambda ()
((poll-guard-evt-proc new-evt) just-poll?))))
(set-syncer-evt! sr (if (evt? generated)
generated
(wrap-evt always-evt (lambda (a) generated))))
(loop sr (add1 retries) polled-all-so-far?)])]
[(and (never-evt? new-evt) [(and (never-evt? new-evt)
(not (evt-impersonator? new-evt)) (not (evt-impersonator? new-evt))
(null? (syncer-interrupts sr)) (null? (syncer-interrupts sr))
@ -453,41 +512,40 @@
(null? (syncer-abandons sr))) (null? (syncer-abandons sr)))
;; Drop this event, since it will never get selected ;; Drop this event, since it will never get selected
(syncer-remove! sr s) (syncer-remove! sr s)
(lambda () (loop (syncer-next sr) 0 polled-all-so-far?))] (end-atomic)
(loop (syncer-next sr) 0 polled-all-so-far?)]
[(and (eq? new-evt (syncer-evt sr)) [(and (eq? new-evt (syncer-evt sr))
(not (poll-ctx-incomplete? ctx))) (not (poll-ctx-incomplete? ctx)))
;; No progress on this evt ;; No progress on this evt
(lambda () (loop (syncer-next sr) 0 polled-all-so-far?))] (end-atomic)
(loop (syncer-next sr) 0 polled-all-so-far?)]
[else [else
(set-syncer-evt! sr new-evt) (set-syncer-evt! sr new-evt)
(lambda () (loop sr (add1 retries) polled-all-so-far?))])]))))) (end-atomic)
(loop sr (add1 retries) polled-all-so-far?)])])))
;; Create a thunk that applies wraps immediately, while breaks are ;; Called in atomic mode, but leaves atomic mode
;; Applies wraps immediately, while breaks are
;; potentially still disabled (but not in atomic mode), and then ;; potentially still disabled (but not in atomic mode), and then
;; returns another thunk to call a handler (if any) in tail position ;; returns another thunk to call a handler (if any) in tail position
(define (make-result-thunk sr results success-k) (define (make-result sr results success-k)
(define wraps (syncer-wraps sr)) (define wraps (syncer-wraps sr))
(lambda () (end-atomic)
(let loop ([wraps wraps] [results results]) (let loop ([wraps wraps] [results results])
(cond (cond
[(null? wraps) [(null? wraps)
(success-k (if success-k
(lambda () (success-k (lambda () (apply values results)))
(apply values results)))] (apply values results))]
[(null? (cdr wraps)) [(null? (cdr wraps))
;; Call last one in tail position: ;; Call last one in tail position:
(let ([proc (car wraps)]) (let ([proc (car wraps)])
(success-k (if success-k
(lambda () (success-k (lambda () (apply proc results)))
(apply proc results))))] (apply proc results)))]
[else [else
(loop (cdr wraps) (loop (cdr wraps)
(call-with-values (lambda () (apply (car wraps) results)) list))])))) (call-with-values (lambda () (apply (car wraps) results)) list))])))
(define (cons-non-void a d)
(if (eq? a void)
d
(cons a d)))
;; ---------------------------------------- ;; ----------------------------------------
@ -674,9 +732,8 @@
v) v)
'current-evt-pseudo-random-generator)) 'current-evt-pseudo-random-generator))
;; rotates the order of syncers in `s` to implement fair selection: (define (init-sync-place!)
(define (random-rotate-syncing! s) (current-evt-pseudo-random-generator (make-pseudo-random-generator)))
(set-syncing-syncers! s (random-rotate (syncing-syncers s))))
(define (random-rotate first-sr) (define (random-rotate first-sr)
(define n (let loop ([sr first-sr] [n 0]) (define n (let loop ([sr first-sr] [n 0])