diff --git a/pkgs/racket-benchmarks/tests/racket/benchmarks/control/barrier.rkt b/pkgs/racket-benchmarks/tests/racket/benchmarks/control/barrier.rkt new file mode 100644 index 0000000000..b94c7254fa --- /dev/null +++ b/pkgs/racket-benchmarks/tests/racket/benchmarks/control/barrier.rkt @@ -0,0 +1,19 @@ +#lang racket/base +(require racket/include) + +(include "config.rktl") + +;; ---------------------------------------- + +'barrier +(times + (let ([one (lambda () 1)]) + (let loop ([i M]) + (unless (zero? i) + (loop (- i (call-with-continuation-barrier one))))))) + +'thunk+barrier +(times + (let loop ([i M]) + (unless (zero? i) + (loop (call-with-continuation-barrier (lambda () (sub1 i))))))) diff --git a/pkgs/racket-benchmarks/tests/racket/benchmarks/control/sync.rkt b/pkgs/racket-benchmarks/tests/racket/benchmarks/control/sync.rkt index 0f4e26ba6c..f2fa82d938 100644 --- a/pkgs/racket-benchmarks/tests/racket/benchmarks/control/sync.rkt +++ b/pkgs/racket-benchmarks/tests/racket/benchmarks/control/sync.rkt @@ -25,14 +25,49 @@ (for ([i (in-range M)]) (sync e)))) -'sync-two +'sync-guard +(times + (let ([g (guard-evt (lambda () always-evt))]) + (for ([i (in-range M)]) + (sync g)))) + +'sync-semaphore+never (times (let ([s (make-semaphore M)]) (for ([i (in-range M)]) (sync s never-evt)))) -'sync-three +'sync-semaphore+semaphore (times - (let ([s (make-semaphore M)]) + (let ([s (make-semaphore M)] + [s2 (make-semaphore M)]) (for ([i (in-range M)]) - (sync s always-evt never-evt)))) + (sync s s2)))) + +'sync-semaphore+semaphore/timeout +(times + (let ([s (make-semaphore)] + [s2 (make-semaphore)]) + (for ([i (in-range M)]) + (sync/timeout 0 s s2)))) + +'sync-semaphore+semaphore/timeout-callback +(times + (let ([s (make-semaphore)] + [s2 (make-semaphore)]) + (for ([i (in-range M)]) + (sync/timeout list s s2)))) + +'sync-guard+guard +(times + (let ([g (guard-evt (lambda () always-evt))]) + (for ([i (in-range M)]) + (sync g g)))) + +'sync-three-semaphores +(times + (let ([s (make-semaphore M)] + [s2 (make-semaphore M)] + [s3 (make-semaphore M)]) + (for ([i (in-range M)]) + (sync s s2 s3)))) diff --git a/pkgs/racket-test-core/tests/racket/sync.rktl b/pkgs/racket-test-core/tests/racket/sync.rktl index 36709e9513..0b0ba48b20 100644 --- a/pkgs/racket-test-core/tests/racket/sync.rktl +++ b/pkgs/racket-test-core/tests/racket/sync.rktl @@ -566,6 +566,15 @@ s)) (make-semaphore)))) +(let () + (define k #f) + (test always-evt sync (poll-guard-evt + (lambda (poll?) + (let/cc now-k + (set! k now-k)) + always-evt))) + (err/rt-test (k 10))) + ;; ---------------------------------------- ;; Replace waitables diff --git a/racket/src/schemify/schemify.rkt b/racket/src/schemify/schemify.rkt index 995ad4215d..93e61b8dae 100644 --- a/racket/src/schemify/schemify.rkt +++ b/racket/src/schemify/schemify.rkt @@ -676,7 +676,8 @@ [`(call-with-values ,generator ,receiver) (cond [(and (lambda? generator) - (lambda? receiver)) + (or (lambda? receiver) + (eq? (unwrap receiver) 'list))) `(call-with-values ,(schemify generator 'fresh) ,(schemify receiver 'fresh))] [else (left-to-right/app (if for-cify? 'call-with-values '#%call-with-values) diff --git a/racket/src/thread/channel.rkt b/racket/src/thread/channel.rkt index f6b41e20a5..77f866414e 100644 --- a/racket/src/thread/channel.rkt +++ b/racket/src/thread/channel.rkt @@ -90,7 +90,7 @@ (waiter-resume! (car pw+v) (void)) (values (list (cdr pw+v)) #f)] [(poll-ctx-poll? poll-ctx) - (values #f never-evt)] + (values #f ch)] [else (define b (box #f)) (define gq (channel-get-queue ch)) @@ -142,7 +142,7 @@ void])))])) ;; In atomic mode -(define (channel-put/poll ch v result poll-ctx) +(define (channel-put/poll ch v self poll-ctx) ;; Similar to `channel-put`, but works in terms of a ;; `select-waiter` instead of a thread (assert-atomic-mode) @@ -152,9 +152,9 @@ [gw+b (set-box! (cdr gw+b) v) (waiter-resume! (car gw+b) v) - (values (list result) #f)] + (values (list self) #f)] [(poll-ctx-poll? poll-ctx) - (values #f async-evt)] + (values #f self)] [else (define pq (channel-put-queue ch)) (define pw (channel-select-waiter (poll-ctx-select-proc poll-ctx) @@ -172,11 +172,11 @@ [gw+b (set-box! (cdr gw+b) v) (waiter-resume! (car gw+b) v) - (values result #t)] + (values self #t)] [else (set! n (queue-add! pq (cons pw v))) (values #f #f)]))) - (lambda (v) result)))])) + (lambda (v) self)))])) (define/who (channel-put-evt ch v) (check who channel? ch) diff --git a/racket/src/thread/schedule.rkt b/racket/src/thread/schedule.rkt index f09a2c9019..d8c8b0d5ea 100644 --- a/racket/src/thread/schedule.rkt +++ b/racket/src/thread/schedule.rkt @@ -10,6 +10,7 @@ "thread-group.rkt" "schedule-info.rkt" (submod "thread.rkt" scheduling) + (submod "sync.rkt" scheduling) "system-idle-evt.rkt" "exit.rkt" "future.rkt" @@ -44,6 +45,7 @@ (init-system-idle-evt!) (init-future-place!) (init-schedule-counters!) + (init-sync-place!) (call-in-main-thread thunk)) ;; ---------------------------------------- diff --git a/racket/src/thread/semaphore.rkt b/racket/src/thread/semaphore.rkt index fd540560f3..dc0a23869d 100644 --- a/racket/src/thread/semaphore.rkt +++ b/racket/src/thread/semaphore.rkt @@ -32,7 +32,7 @@ #:property prop:evt (poller (lambda (s poll-ctx) - (semaphore-wait/poll s poll-ctx)))) + (semaphore-wait/poll s s poll-ctx)))) (define count-field-pos 2) ; used with `unsafe-struct*-cas!` (struct semaphore-peek-evt (sema) @@ -40,6 +40,7 @@ prop:evt (poller (lambda (sp poll-ctx) (semaphore-wait/poll (semaphore-peek-evt-sema sp) + sp poll-ctx #:peek? #t #:result sp)))) @@ -157,7 +158,7 @@ (lambda () (semaphore-wait s)))])))])) ;; In atomic mode -(define (semaphore-wait/poll s poll-ctx +(define (semaphore-wait/poll s self poll-ctx #:peek? [peek? #f] #:result [result s]) ;; Similar to `semaphore-wait, but as called by `sync`, @@ -170,7 +171,7 @@ (set-semaphore-count! s (sub1 c))) (values (list result) #f)] [(poll-ctx-poll? poll-ctx) - (values #f never-evt)] + (values #f self)] [else (define w (if peek? (semaphore-peek-select-waiter (poll-ctx-select-proc poll-ctx)) diff --git a/racket/src/thread/sync.rkt b/racket/src/thread/sync.rkt index f3eeb2bf66..663368eff7 100644 --- a/racket/src/thread/sync.rkt +++ b/racket/src/thread/sync.rkt @@ -21,6 +21,9 @@ current-evt-pseudo-random-generator replace-evt) +(module+ scheduling + (provide init-sync-place!)) + (struct syncing (selected ; #f or a syncer that has been selected syncers ; linked list of `syncer`s wakeup ; a callback for when something is selected @@ -76,15 +79,19 @@ (define local-break-cell (and enable-break? (make-thread-cell #t))) - (define syncers (evts->syncers who args)) - (define s (make-syncing syncers + (define s (make-syncing (random-rotate (evts->syncers who args)) #:disable-break (and local-break-cell (let ([t (current-thread)]) (lambda () (thread-ignore-break-cell! t local-break-cell)))))) - (define (go) + (when (or (and (real? timeout) (zero? timeout)) + (procedure? timeout)) + (atomically (call-pre-poll-external-callbacks))) + + ;; General polling loop + (define (go #:thunk-result? [thunk-result? #f]) (dynamic-wind (lambda () (atomically @@ -98,16 +105,21 @@ (cond [(or (and (real? timeout) (zero? timeout)) (procedure? timeout)) - (atomically (call-pre-poll-external-callbacks)) (let poll-loop () - (sync-poll s #:fail-k (lambda (sched-info polled-all?) - (cond - [(not polled-all?) - (poll-loop)] - [(procedure? timeout) - timeout] - [else - (lambda () #f)])) + (sync-poll s + #:success-k (and thunk-result? (lambda (thunk) thunk)) + #:fail-k (lambda (sched-info polled-all?) + (cond + [(not polled-all?) + (poll-loop)] + [(procedure? timeout) + (if thunk-result? + timeout + (timeout))] + [else + (if thunk-result? + (lambda () #f) + #f)])) #:just-poll? #t))] [else ;; Loop to poll; if all events end up with asynchronous-select @@ -129,8 +141,10 @@ [else (syncing-done! s none-syncer) (end-atomic) - ;; Return result in a thunk: - (lambda () #f)])] + ;; Return result: + (if thunk-result? + (lambda () #f) + #f)])] [(and (all-asynchronous? s) (not (syncing-selected s)) (not (syncing-need-retry? s))) @@ -139,6 +153,7 @@ (loop #f #t)] [else (sync-poll s + #:success-k (and thunk-result? (lambda (thunk) thunk)) #:did-work? did-work? #:fail-k (lambda (sched-info now-polled-all?) (when timeout-at @@ -153,21 +168,33 @@ (thread-remove-ignored-break-cell! (current-thread/in-atomic) local-break-cell)) ;; On escape, post nacks, etc.: (syncing-abandon! s))))) - - ;; Result thunk is called in tail position: - ((cond - [enable-break? - ;; Install a new break cell, and check for breaks at the end: - (begin0 - (with-continuation-mark - break-enabled-key - local-break-cell - (go)) - ;; In case old break cell was meanwhile enabled: - (check-for-break))] - [else - ;; Just `go`: - (go)]))) + + ;; Result thunk (if needed) is called in tail position: + (cond + [enable-break? + ;; Install a new break cell, and check for breaks at the end: + (define thunk (with-continuation-mark + break-enabled-key + local-break-cell + (go #:thunk-result? #t))) + ;; In case old break cell was meanwhile enabled: + (check-for-break) + ;; In tail position: + (thunk)] + [else + ;; Try a fast poll (avoiding `dynamic-wind`, etc.) + ;; before chaining to `go`: + (sync-poll s + #:fail-k (lambda (sched-info polled-all?) + (cond + [polled-all? + (cond + [(and (real? timeout) (zero? timeout)) #f] + [(procedure? timeout) (timeout)] + [else (go)])] + [else (go)])) + #:just-poll? #t + #:fast-only? #t)])) (define sync (case-lambda @@ -338,65 +365,74 @@ ;; call in tail position --- possibly one that calls `none-k`. (define (sync-poll s #:fail-k none-k - #:success-k [success-k (lambda (thunk) thunk)] + #:success-k [success-k #f] ; non-#f => result thunk passed to `success-k` #:just-poll? [just-poll? #f] + #:fast-only? [fast-only? #f] #:done-after-poll? [done-after-poll? #t] #:did-work? [did-work? #f] #:schedule-info [sched-info (make-schedule-info #:did-work? did-work?)]) - (random-rotate-syncing! s) (let loop ([sr (syncing-syncers s)] [retries 0] ; count retries on `sr`, and advance if it's too many [polled-all-so-far? #t]) (when (syncing-need-retry? s) (syncing-retry! s)) - ((atomically - (cond - [(syncing-selected s) - => (lambda (sr) - ;; Some concurrent synchronization happened - (make-result-thunk sr (list (syncer-evt sr)) success-k))] - [(not sr) - (when (and just-poll? done-after-poll? polled-all-so-far?) - (syncing-done! s none-syncer)) - (lambda () (none-k sched-info polled-all-so-far?))] - [(= retries MAX-SYNC-TRIES-ON-ONE-EVT) - (schedule-info-did-work! sched-info) - (lambda () (loop (syncer-next sr) 0 #f))] - [else - (define ctx (poll-ctx just-poll? - ;; Call back for asynchronous selection, - ;; such as by a semaphore when it's posted - ;; in a different thread; this callback - ;; must be invoked in atomic mode - (lambda () - (assert-atomic-mode) - (syncing-done! s sr)) - ;; Information to propagate to the thread - ;; scheduler - sched-info - ;; Set to #t if getting the same result - ;; back should not be treated as a - ;; completed poll: - #f)) - (define-values (results new-evt) - (evt-poll (syncer-evt sr) ctx)) - (cond + (start-atomic) + (cond + [(syncing-selected s) + => (lambda (sr) + ;; Some concurrent synchronization happened; + ;; note that `make-result` is responsible for + ;; exiting atomic mode + (make-result sr (list (syncer-evt sr)) success-k))] + [(not sr) + (when (and just-poll? done-after-poll? polled-all-so-far? (not fast-only?)) + (syncing-done! s none-syncer)) + (end-atomic) + (none-k sched-info polled-all-so-far?)] + [(= retries MAX-SYNC-TRIES-ON-ONE-EVT) + (schedule-info-did-work! sched-info) + (end-atomic) + (loop (syncer-next sr) 0 #f)] + [else + (define ctx (poll-ctx just-poll? + ;; Call back for asynchronous selection, + ;; such as by a semaphore when it's posted + ;; in a different thread; this callback + ;; must be invoked in atomic mode + (lambda () + (assert-atomic-mode) + (syncing-done! s sr)) + ;; Information to propagate to the thread + ;; scheduler + sched-info + ;; Set to #t if getting the same result + ;; back should not be treated as a + ;; completed poll: + #f)) + (define-values (results new-evt) + (evt-poll (syncer-evt sr) ctx)) + (cond [results (syncing-done! s sr) - (make-result-thunk sr results success-k)] + ;; `make-result` is responsible for leaving atomic mode + (make-result sr results success-k)] [(delayed-poll? new-evt) ;; Have to go out of atomic mode to continue: - (lambda () - (let ([new-evt ((delayed-poll-resume new-evt))]) - ;; Since we left atomic mode, double-check that we're - ;; still syncing before installing the replacement event: - (atomically - (unless (syncing-selected s) - (set-syncer-evt! sr new-evt))) - (loop sr (add1 retries) polled-all-so-far?)))] + (end-atomic) + (cond + [fast-only? (none-k sched-info #f)] + [else + (let ([new-evt ((delayed-poll-resume new-evt))]) + ;; Since we left atomic mode, double-check that we're + ;; still syncing before installing the replacement event: + (atomically + (unless (syncing-selected s) + (set-syncer-evt! sr new-evt))) + (loop sr (add1 retries) polled-all-so-far?))])] [(choice-evt? new-evt) (when (or (pair? (syncer-interrupts sr)) (pair? (syncer-retries sr))) + (end-atomic) (internal-error "choice event discovered after interrupt/retry callbacks")) (define new-syncers (random-rotate (evts->syncers #f @@ -408,11 +444,13 @@ [(not new-syncers) ;; Empty choice, so drop it: (syncer-remove! sr s) - (lambda () (loop (syncer-next sr) 0 polled-all-so-far?))] + (end-atomic) + (loop (syncer-next sr) 0 polled-all-so-far?)] [else ;; Splice in new syncers, and start there (syncer-replace! sr new-syncers s) - (lambda () (loop new-syncers (add1 retries) polled-all-so-far?))])] + (end-atomic) + (loop new-syncers (add1 retries) polled-all-so-far?)])] [(wrap-evt? new-evt) (set-syncer-wraps! sr (cons (wrap-evt-wrap new-evt) (let ([l (syncer-wraps sr)]) @@ -429,23 +467,44 @@ (cond [(eq? inner-new-evt always-evt) (syncing-done! s sr) - (make-result-thunk sr (list always-evt) success-k)] + ;; `make-result` is responsible for leaving atomic mode + (make-result sr (list always-evt) success-k)] [else - (lambda () (loop sr (add1 retries) polled-all-so-far?))])] + (end-atomic) + (loop sr (add1 retries) polled-all-so-far?)])] [(control-state-evt? new-evt) - (set-syncer-interrupts! sr (cons-non-void (control-state-evt-interrupt-proc new-evt) (syncer-interrupts sr))) - (set-syncer-abandons! sr (cons-non-void (control-state-evt-abandon-proc new-evt) (syncer-abandons sr))) - (set-syncer-retries! sr (cons-non-void (control-state-evt-retry-proc new-evt) (syncer-retries sr))) + (define interrupt-proc (control-state-evt-interrupt-proc new-evt)) + (define abandon-proc (control-state-evt-abandon-proc new-evt)) + (define retry-proc (control-state-evt-retry-proc new-evt)) + (unless (eq? interrupt-proc void) + (set-syncer-interrupts! sr (cons interrupt-proc (syncer-interrupts sr)))) + (unless (eq? abandon-proc void) + (set-syncer-abandons! sr (cons abandon-proc (syncer-abandons sr)))) + (unless (eq? retry-proc void) + (set-syncer-retries! sr (cons retry-proc (syncer-retries sr)))) (set-syncer-evt! sr (control-state-evt-evt new-evt)) - (lambda () (loop sr (add1 retries) polled-all-so-far?))] + (end-atomic) + (cond + [(and fast-only? + (not (and (eq? interrupt-proc void) + (eq? abandon-proc void) + (eq? retry-proc void)))) + (none-k sched-info #f)] + [else + (loop sr (add1 retries) polled-all-so-far?)])] [(poll-guard-evt? new-evt) - (lambda () - ;; Out of atomic region: - (define generated ((poll-guard-evt-proc new-evt) just-poll?)) - (set-syncer-evt! sr (if (evt? generated) - generated - (wrap-evt always-evt (lambda (a) generated)))) - (loop sr (add1 retries) polled-all-so-far?))] + ;; Must leave atomic mode: + (end-atomic) + (cond + [fast-only? (none-k sched-info #f)] + [else + (define generated (call-with-continuation-barrier + (lambda () + ((poll-guard-evt-proc new-evt) just-poll?)))) + (set-syncer-evt! sr (if (evt? generated) + generated + (wrap-evt always-evt (lambda (a) generated)))) + (loop sr (add1 retries) polled-all-so-far?)])] [(and (never-evt? new-evt) (not (evt-impersonator? new-evt)) (null? (syncer-interrupts sr)) @@ -453,41 +512,40 @@ (null? (syncer-abandons sr))) ;; Drop this event, since it will never get selected (syncer-remove! sr s) - (lambda () (loop (syncer-next sr) 0 polled-all-so-far?))] + (end-atomic) + (loop (syncer-next sr) 0 polled-all-so-far?)] [(and (eq? new-evt (syncer-evt sr)) (not (poll-ctx-incomplete? ctx))) ;; No progress on this evt - (lambda () (loop (syncer-next sr) 0 polled-all-so-far?))] + (end-atomic) + (loop (syncer-next sr) 0 polled-all-so-far?)] [else (set-syncer-evt! sr new-evt) - (lambda () (loop sr (add1 retries) polled-all-so-far?))])]))))) + (end-atomic) + (loop sr (add1 retries) polled-all-so-far?)])]))) -;; Create a thunk that applies wraps immediately, while breaks are +;; Called in atomic mode, but leaves atomic mode +;; Applies wraps immediately, while breaks are ;; potentially still disabled (but not in atomic mode), and then ;; returns another thunk to call a handler (if any) in tail position -(define (make-result-thunk sr results success-k) +(define (make-result sr results success-k) (define wraps (syncer-wraps sr)) - (lambda () - (let loop ([wraps wraps] [results results]) - (cond - [(null? wraps) - (success-k - (lambda () - (apply values results)))] - [(null? (cdr wraps)) - ;; Call last one in tail position: - (let ([proc (car wraps)]) - (success-k - (lambda () - (apply proc results))))] - [else - (loop (cdr wraps) - (call-with-values (lambda () (apply (car wraps) results)) list))])))) - -(define (cons-non-void a d) - (if (eq? a void) - d - (cons a d))) + (end-atomic) + (let loop ([wraps wraps] [results results]) + (cond + [(null? wraps) + (if success-k + (success-k (lambda () (apply values results))) + (apply values results))] + [(null? (cdr wraps)) + ;; Call last one in tail position: + (let ([proc (car wraps)]) + (if success-k + (success-k (lambda () (apply proc results))) + (apply proc results)))] + [else + (loop (cdr wraps) + (call-with-values (lambda () (apply (car wraps) results)) list))]))) ;; ---------------------------------------- @@ -674,9 +732,8 @@ v) 'current-evt-pseudo-random-generator)) -;; rotates the order of syncers in `s` to implement fair selection: -(define (random-rotate-syncing! s) - (set-syncing-syncers! s (random-rotate (syncing-syncers s)))) +(define (init-sync-place!) + (current-evt-pseudo-random-generator (make-pseudo-random-generator))) (define (random-rotate first-sr) (define n (let loop ([sr first-sr] [n 0])