diff --git a/pkgs/racket-doc/scribblings/reference/futures-logging.scrbl b/pkgs/racket-doc/scribblings/reference/futures-logging.scrbl index 669f302c43..5423433718 100644 --- a/pkgs/racket-doc/scribblings/reference/futures-logging.scrbl +++ b/pkgs/racket-doc/scribblings/reference/futures-logging.scrbl @@ -101,10 +101,10 @@ with @racket['result], @racket['abort], or @racket['suspend]; and In process 0, some event pairs can be nested within other event pairs: @racket['sync], @racket['block], or @racket['touch] with -@racket['result] or @racket['abort]; and @racket['touch-pause] with -@racket['touch-resume]. +@racket['result] or @racket['abort]; @racket['touch-pause] with +@racket['touch-resume]; and @racket['start-work] with @racket['end-work]. -An @racket['block] in process 0 is generated when an unsafe operation +A @racket['block] in process 0 is generated when an unsafe operation is handled. This type of event will contain a symbol in the @racket[unsafe-op-name] field that is the name of the operation. In all other cases, this field contains @racket[#f]. @@ -123,10 +123,10 @@ values depending on both the @racket[action] and @racket[prim-name] fields: @item{@racket['touch] on process 0: contains the integer ID of the future being touched.} - @item{@racket['sync] and @racket[prim-name] = @racket[|allocate memory|]: + @item{@racket['sync] and @racket[prim-name] is @racket['|allocate memory|]: The size (in bytes) of the requested allocation.} - @item{@racket['sync] and @racket[prim-name] = @racket[|jit_on_demand|]: + @item{@racket['sync] and @racket[prim-name] is @racket['|jit_on_demand|]: The runtime thread is performing a JIT compilation on behalf of the future @racket[future-id]. The field contains the name of the function being JIT compiled (as a symbol).} diff --git a/pkgs/racket-test/tests/future/future.rkt b/pkgs/racket-test/tests/future/future.rkt index 9c018fd110..88d0d75f54 100644 --- a/pkgs/racket-test/tests/future/future.rkt +++ b/pkgs/racket-test/tests/future/future.rkt @@ -1,8 +1,10 @@ #lang scheme/base -(require scheme/future - scheme/list - rackunit) +(require scheme/future + scheme/list + (rename-in rackunit + [check-equal? real:check-equal?]) + (for-syntax racket/base)) #|Need to add expressions which raise exceptions inside a future thunk which can be caught at the touch site @@ -12,7 +14,14 @@ Both future and touch should be called from within a future thunk. We should also test deep continuations. -|# +|# + +(define-syntax (check-equal? stx) + (syntax-case stx () + [(_ e ...) + #`(begin + (writeln `(check-equal? e ...)) + #,(syntax/loc stx (real:check-equal? e ...)))])) ;Tests specific to would-be-future (define-struct future-event (future-id process-id what time prim-name target-fid) @@ -53,8 +62,17 @@ We should also test deep continuations. (printf "hello3")))]) (touch f) (let ([log (raw-log-output)]) - (check-equal? 3 (length (get-blocks log))) - (check-equal? 3 (length (get-blocks-on 'printf log))) + ;; Racket CS would-be futures behave slightly differently + ;; --- reflecting the way that futures always suspend + ;; when they hit a blocking operation + (case (system-type 'vm) + [(chez-scheme) + (check-equal? 1 (length (get-blocks log))) + ;; `printf` is ok up to the point that it tries to get the currrent output port: + (check-equal? 1 (length (get-blocks-on 'continuation-mark-set-first log)))] + [else + (check-equal? 3 (length (get-blocks log))) + (check-equal? 3 (length (get-blocks-on 'printf log)))]) (check-equal? 0 (length (get-touch-blocks log))))) (let ([f1 (would-be-future @@ -68,14 +86,21 @@ We should also test deep continuations. 42))))))]) (touch f1) (let ([log (raw-log-output)]) - (check-equal? 5 (length (get-blocks log))) - (check-equal? 1 (length (get-touch-blocks log))) - (check-equal? 4 (length (get-blocks-on 'printf log))) - (check-equal? 1 (length (get-blocks-on 'would-be-future log)))))) + (case (system-type 'vm) + [(chez-scheme) + (check-equal? 2 (length (get-blocks log))) + (check-equal? 0 (length (get-touch-blocks log))) + (check-equal? 2 (length (get-blocks-on 'continuation-mark-set-first log))) + (check-equal? 0 (length (get-blocks-on 'would-be-future log)))] + [else + (check-equal? 5 (length (get-blocks log))) + (check-equal? 1 (length (get-touch-blocks log))) + (check-equal? 4 (length (get-blocks-on 'printf log))) + (check-equal? 1 (length (get-blocks-on 'would-be-future log)))])))) ;; ---------------------------------------- -(define (run-tests func) +(define (run-tests func) (check-equal? 'yes (let/ec k @@ -854,7 +879,8 @@ We should also test deep continuations. (define f (func (lambda () (loop i)))) (sleep 0.1) (with-handlers ([exn:fail? (lambda (exn) - (unless (regexp-match #rx"expected number of values not received" (exn-message exn)) + (unless (or (regexp-match #rx"expected number of values not received" (exn-message exn)) + (regexp-match #rx"returned two values to single value return context" (exn-message exn))) (raise exn)))]) (touch f)))) diff --git a/racket/src/cs/demo/control.ss b/racket/src/cs/demo/control.ss index 49fd04a900..7ce4be408d 100644 --- a/racket/src/cs/demo/control.ss +++ b/racket/src/cs/demo/control.ss @@ -433,11 +433,11 @@ (define engine-tag (default-continuation-prompt-tag)) -(define e (make-engine (lambda () 'done) engine-tag #f #f)) +(define e (make-engine (lambda () 'done) engine-tag #f #f #f)) (check (cdr (e 100 void list vector)) '(done)) -(define e-forever (make-engine (lambda () (let loop () (loop))) engine-tag #f #f)) +(define e-forever (make-engine (lambda () (let loop () (loop))) engine-tag #f #f #f)) (check (vector? (e-forever 10 void list vector)) #t) @@ -450,7 +450,7 @@ [else (engine-block) (loop (sub1 n))]))) - engine-tag + engine-tag #f #f #f)) (check (let ([started 0]) (let loop ([e e-10] [n 0]) @@ -475,7 +475,7 @@ (lambda () (set! pre (add1 pre))) (lambda () (loop (sub1 n))) (lambda () (set! post (add1 post))))]))) - engine-tag + engine-tag #f #f #f)]) (check (let loop ([e e-10/dw] [n 0]) (e 200 @@ -497,10 +497,10 @@ (thread-cell-set! pt (add1 p-old)) (list u-old p-old - (make-engine gen engine-tag #f #f) + (make-engine gen engine-tag #f #f #f) (thread-cell-ref ut) (thread-cell-ref pt))) - (define l1 ((make-engine gen engine-tag #f #f) + (define l1 ((make-engine gen engine-tag #f #f #f) 100 void (lambda (remain l) l) @@ -526,7 +526,7 @@ (check (procedure? my-param) #t) (let ([e (with-continuation-mark parameterization-key (extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set) - (make-engine (lambda () (|#%app| my-param)) engine-tag #f #f))]) + (make-engine (lambda () (|#%app| my-param)) engine-tag #f #f #f))]) (check (|#%app| my-param) 'init) (check (e 1000 void (lambda (remain v) v) (lambda (e timeout?) (error 'engine "oops"))) 'set)) @@ -622,7 +622,7 @@ (loop (sub1 n))))) (lambda () (set! post (add1 post)))))))) - engine-tag + engine-tag #f #f #f)) (check (let ([prefixes 0]) diff --git a/racket/src/cs/demo/thread.ss b/racket/src/cs/demo/thread.ss index 58b88a7b16..a2925f00ef 100644 --- a/racket/src/cs/demo/thread.ss +++ b/racket/src/cs/demo/thread.ss @@ -242,6 +242,28 @@ (check #t (evt? (sync (place-dead-evt pl4)))) (check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4)))))) + (let () + (check 'ok (touch (future (lambda () 'ok)))) + (check 'ok (touch (would-be-future (lambda () 'ok)))) + (check 'ok (touch (would-be-future (lambda () (touch (would-be-future (lambda () 'ok)))))))) + + (let () + (define fts (let loop ([i 0]) + (if (= i 50) + '() + (cons + (future (lambda () + (let loop ([i i]) + (if (zero? i) + i + (add1 (loop (sub1 i))))))) + (loop (add1 i)))))) + (check (let loop ([i 0]) + (if (= i 50) + '() + (cons i (loop (add1 i))))) + (map touch fts))) + ;; Measure thread quantum: #; (let ([t1 (thread (lambda () (let loop () (loop))))] diff --git a/racket/src/cs/io.sls b/racket/src/cs/io.sls index fd5d8433df..8e57b48588 100644 --- a/racket/src/cs/io.sls +++ b/racket/src/cs/io.sls @@ -478,4 +478,6 @@ (apply 1/fprintf (|#%app| 1/current-error-port) fmt args))) (set-ffi-get-lib-and-obj! ffi-get-lib ffi-get-obj ptr->address) (set-make-async-callback-poll-wakeup! unsafe-make-signal-received) - (set-get-machine-info! get-machine-info)) + (set-get-machine-info! get-machine-info) + (set-processor-count! (1/processor-count)) + (install-future-logging-procs! logging-future-events? log-future-event)) diff --git a/racket/src/cs/linklet.sls b/racket/src/cs/linklet.sls index 4b9d377c1e..0711d9e6d9 100644 --- a/racket/src/cs/linklet.sls +++ b/racket/src/cs/linklet.sls @@ -224,7 +224,9 @@ (for-each (lambda (table) (hash-for-each table (lambda (k v) (hash-set! primitives k v)))) - tables)) + tables) + ;; prropagate table to the rumble layer + (install-primitives-table! primitives)) (define (outer-eval s paths format) (if (eq? format 'interpret) diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index e01bea4abd..7d020613c0 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -697,7 +697,14 @@ mutex-acquire mutex-release threaded? - set-future-callbacks!) + set-future-callbacks! + install-primitives-table! + continuation-current-primitive + + ;; compile-time use in "thread.sls" + current-atomic-virtual-register + end-atomic-virtual-register + current-future-virtual-register) (import (rename (chezpart) [define define/no-lift]) (rename (only (chezscheme) sleep) diff --git a/racket/src/cs/rumble/control.ss b/racket/src/cs/rumble/control.ss index eaa6cdb0e1..df17a28e14 100644 --- a/racket/src/cs/rumble/control.ss +++ b/racket/src/cs/rumble/control.ss @@ -156,24 +156,26 @@ [else (loop (cdr mc))]))))) (define/who (maybe-future-barricade tag) - (when (future? (current-future)) ;; running in a future - (check who continuation-prompt-tag? tag) + (when (current-future) (let ([fp (strip-impersonator (current-future-prompt))] [tag (strip-impersonator tag)]) (cond + [(eq? fp tag) + ;; shortcut: boundary is the future prompt + (void)] [(eq? tag the-root-continuation-prompt-tag) - (block)] + (block-future)] [else (let loop ([mc (current-metacontinuation)]) (cond [(null? mc) ;; Won't happen normally, since every thread starts with a explicit prompt - (block)] + (block-future)] [(eq? tag (strip-impersonator (metacontinuation-frame-tag (car mc)))) (void)] [(eq? (metacontinuation-frame-tag (car mc)) fp) ;; tag must be above future prompt - (block)] + (block-future)] [else (loop (cdr mc))]))])))) @@ -1031,7 +1033,7 @@ [(marks key none-v orig-prompt-tag) (check who continuation-mark-set? :or-false marks) (check who continuation-prompt-tag? orig-prompt-tag) - (maybe-future-barricade orig-prompt-tag) + (unless marks (maybe-future-barricade orig-prompt-tag)) (let ([prompt-tag (strip-impersonator orig-prompt-tag)]) (let-values ([(key wrapper) (extract-continuation-mark-key-and-wrapper 'continuation-mark-set-first key)]) (let* ([v0 (if marks diff --git a/racket/src/cs/rumble/engine.ss b/racket/src/cs/rumble/engine.ss index 3e9446c96b..98cc9a5aa3 100644 --- a/racket/src/cs/rumble/engine.ss +++ b/racket/src/cs/rumble/engine.ss @@ -24,7 +24,7 @@ (define (set-engine-exit-handler! proc) (set! engine-exit proc)) -(define (make-engine thunk prompt-tag init-break-enabled-cell empty-config?) +(define (make-engine thunk prompt-tag abort-handler init-break-enabled-cell empty-config?) (let ([paramz (if empty-config? empty-parameterization (current-parameterization))]) @@ -44,7 +44,8 @@ (with-continuation-mark parameterization-key paramz (|#%app| thunk))) - prompt-tag)) + prompt-tag + abort-handler)) engine-return)))) (if empty-config? (make-empty-thread-cell-values) diff --git a/racket/src/cs/rumble/error.ss b/racket/src/cs/rumble/error.ss index 102b3b8964..49fdddb006 100644 --- a/racket/src/cs/rumble/error.ss +++ b/racket/src/cs/rumble/error.ss @@ -533,44 +533,33 @@ (lambda (slow-k l) l))) -(define (continuation->trace* k) - (call-with-values - (lambda () - (let loop ([k k] [slow-k k] [move? #f]) - (cond - [(or (not (#%$continuation? k)) - (eq? k #%$null-continuation)) - (values slow-k '())] +(define primitive-names #f) +(define (install-primitives-table! primitives) + (set! primitive-names primitives)) + +;; Simplified variant of `continuation->trace` that can be called to +;; get a likely primitive to blame for a blocking future. +(define (continuation-current-primitive k exclusions) + (let loop ([k (if (full-continuation? k) (full-continuation-k k) k)]) + (cond + [(or (not (#%$continuation? k)) + (eq? k #%$null-continuation)) + #f] + [else + (let* ([name (or (let ([n #f]) + (and n + (string->symbol (format "body of ~a" n)))) + (let* ([c (#%$continuation-return-code k)] + [n (#%$code-name c)]) + (and n (string->symbol n))))]) + (cond + [(and name + (hash-ref primitive-names name #f) + (not (#%memq name exclusions))) + name] [else - (let* ([name (or (let ([n #f]) - (and n - (string->symbol (format "body of ~a" n)))) - (let* ([c (#%$continuation-return-code k)] - [n (#%$code-name c)]) - n))] - [desc - (let* ([ci (#%$code-info (#%$continuation-return-code k))] - [src (and - (code-info? ci) - (or - ;; when per-expression inspector info is available: - (find-rpi (#%$continuation-return-offset k) ci) - ;; when only per-function source location is available: - (code-info-src ci)))]) - (and (or name src) - (cons name src)))]) - (#%$split-continuation k 0) - (call-with-values - (lambda () (loop (#%$continuation-link k) (if move? (#%$continuation-link slow-k) slow-k) (not move?))) - (lambda (slow-k l) - (let ([l (if desc - (cons desc l) - l)]) - (when (eq? k slow-k) - (hashtable-set! cached-traces k l)) - (values slow-k l)))))]))) - (lambda (slow-k l) - l))) + (#%$split-continuation k 0) + (loop (#%$continuation-link k))]))]))) (define (traces->context ls) (let loop ([l '()] [ls ls]) diff --git a/racket/src/cs/rumble/foreign.ss b/racket/src/cs/rumble/foreign.ss index 4c9883210f..0b3cbae7e1 100644 --- a/racket/src/cs/rumble/foreign.ss +++ b/racket/src/cs/rumble/foreign.ss @@ -1850,8 +1850,8 @@ (cond [(eqv? (place-thread-category) PLACE-MAIN-THREAD) ;; In the main thread of a place. We must have gotten here by a - ;; foreign call that called back, so interrupts are currently - ;; disabled. + ;; foreign call that called back, so interrupts are currently + ;; disabled. (cond [(not atomic?) ;; reenable interrupts @@ -1876,7 +1876,7 @@ [q async-callback-queue] [m (async-callback-queue-lock q)] [need-interrupts? - ;; If we created this therad by `fork-pthread`, we must + ;; If we created this thread by `fork-pthread`, we must ;; have gotten here by a foreign call, so interrupts are ;; currently disabled (eqv? (place-thread-category) PLACE-KNOWN-THREAD)]) diff --git a/racket/src/cs/rumble/future.ss b/racket/src/cs/rumble/future.ss index b3593cf4c4..c65b90a31c 100644 --- a/racket/src/cs/rumble/future.ss +++ b/racket/src/cs/rumble/future.ss @@ -1,14 +1,14 @@ -;; Futures API +;; We need a little support for futures, because they interact with +;; continuation operations that may need to block the future. -(define future? (lambda (f) #f)) -(define current-future (lambda () #f)) -(define block (lambda () (void))) +(define-syntax (current-future stx) + (syntax-case stx () + [(_) (with-syntax ([pos current-future-virtual-register]) + #'(virtual-register pos))])) + +(define block-future (lambda () (void))) (define current-future-prompt (lambda () (void))) -(define future-wait (lambda () (void))) -(define (set-future-callbacks! _future? _current-future _block wait cfp) - (set! future? _future?) - (set! current-future _current-future) - (set! block _block) - (set! future-wait wait) - (set! current-future-prompt cfp)) +(define (set-future-callbacks! block current-prompt) + (set! block-future block) + (set! current-future-prompt current-prompt)) diff --git a/racket/src/cs/rumble/virtual-register.ss b/racket/src/cs/rumble/virtual-register.ss index e50759d24e..2874061aa2 100644 --- a/racket/src/cs/rumble/virtual-register.ss +++ b/racket/src/cs/rumble/virtual-register.ss @@ -2,7 +2,10 @@ ;; pthread-specific bindings. ;; The last few virtual registers are reserved for use by the thread system -(meta define num-reserved-virtual-registers 2) +(meta define num-reserved-virtual-registers 3) +(meta define current-atomic-virtual-register (- (virtual-register-count) 1)) +(meta define end-atomic-virtual-register (- (virtual-register-count) 2)) +(meta define current-future-virtual-register (- (virtual-register-count) 3)) (meta define virtual-register-initial-values '()) @@ -30,5 +33,8 @@ [else (cons (with-syntax ([pos (datum->syntax #'here pos)] [init (car l)]) #'(set-virtual-register! pos init)) - (loop (cdr l) (add1 pos)))]))]) - #'(define (id) init ...))])) + (loop (cdr l) (add1 pos)))]))] + [future-pos current-future-virtual-register]) + #'(define (id) + init ... + (set-virtual-register! future-pos #f)))])) diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index 5142d706be..364a96c684 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -32,35 +32,50 @@ [set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!] [set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback!] [set-custodian-memory-use-proc! rumble:set-custodian-memory-use-proc!] - [set-immediate-allocation-check-proc! rumble:set-immediate-allocation-check-proc!])) + [set-immediate-allocation-check-proc! rumble:set-immediate-allocation-check-proc!] + [continuation-current-primitive rumble:continuation-current-primitive])) (include "place-register.ss") (define-place-register-define place:define thread-register-start thread-register-count) - ;; Special handling of `current-atomic`: use the last virtual register; - ;; we rely on the fact that the register's default value is 0. + ;; Special handling of `current-atomic` to use the last virtual register, and + ;; similarr for other. We rely on the fact that the register's default value is 0 + ;; or the rumble layer installs a suitable default. Also, force inline a few + ;; functions and handle other special cases. (define-syntax (define stx) - (syntax-case stx (current-atomic end-atomic-callback make-pthread-parameter unsafe-make-place-local) - ;; Recognize definition of `current-atomic`: - [(_ current-atomic (make-pthread-parameter 0)) - (with-syntax ([(_ id _) stx] - [n (datum->syntax #'here (- (virtual-register-count) 1))]) - #'(define-syntax id - (syntax-rules () - [(_) (virtual-register n)] - [(_ v) (set-virtual-register! n v)])))] - ;; Recognize definition of `end-atomic-callback`: - [(_ end-atomic-callback (make-pthread-parameter 0)) - (with-syntax ([(_ id _) stx] - [n (datum->syntax #'here (- (virtual-register-count) 2))]) - #'(define-syntax id - (syntax-rules () - [(_) (virtual-register n)] - [(_ v) (set-virtual-register! n v)])))] - ;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread: - [(_ alias-id unsafe-make-place-local) #'(begin)] - ;; Chain to place-register handling: - [(_ . rest) #'(place:define . rest)])) + (let ([define-as-virtual-register + (lambda (stx n) + (with-syntax ([(_ id _) stx] + [n (datum->syntax #'here n)]) + #'(define-syntax id + (syntax-rules () + [(_) (virtual-register n)] + [(_ v) (set-virtual-register! n v)]))))]) + (syntax-case stx (current-atomic end-atomic-callback current-future$1 + lambda make-pthread-parameter unsafe-make-place-local) + ;; Recognize definition of `current-atomic`: + [(_ current-atomic (make-pthread-parameter 0)) + (define-as-virtual-register stx current-atomic-virtual-register)] + ;; Recognize definition of `end-atomic-callback`: + [(_ end-atomic-callback (make-pthread-parameter 0)) + (define-as-virtual-register stx end-atomic-virtual-register)] + ;; Recognize definition of `current-future`: + [(_ current-future$1 (make-pthread-parameter #f)) + (define-as-virtual-register stx current-future-virtual-register)] + ;; Force-inline `start-atomic`, `end-atomic`, and `future-barrier`, + ;; at least within the core layers: + [(_ id (lambda () expr ...)) + (#%memq (syntax->datum #'id) '(start-atomic end-atomic future-barrier)) + #'(begin + (define proc (let ([id (lambda () expr ...)]) id)) + (define-syntax (id stx) + (syntax-case stx () + [(_) #'(let () expr ...)] + [_ #'proc])))] + ;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread: + [(_ alias-id unsafe-make-place-local) #'(begin)] + ;; Chain to place-register handling: + [(_ . rest) #'(place:define . rest)]))) ;; This implementation of `sleep`, `get-wakeup-handle`, and `wakeup` is relevant ;; only for running the places part of the thread demo. The relevant callbacks get @@ -99,7 +114,7 @@ ;; directly in "compiled/thread.scm". To make that work, the ;; entries need to be registered as built-in names with the ;; expander, and they need to be listed in - ;; "primitives/internal.ss". + ;; "primitive/internal.ss". (hasheq 'make-pthread-parameter make-pthread-parameter 'unsafe-root-continuation-prompt-tag unsafe-root-continuation-prompt-tag @@ -155,7 +170,8 @@ 'make-mutex rumble:make-mutex 'mutex-acquire rumble:mutex-acquire 'mutex-release rumble:mutex-release - 'threaded? rumble:threaded?)] + 'threaded? rumble:threaded? + 'continuation-current-primitive rumble:continuation-current-primitive)] [else #f])) ;; Tie knots: @@ -178,5 +194,4 @@ (lambda () (current-atomic (fx- (current-atomic) 1)))) - (set-future-callbacks! 1/future? 1/current-future - future-block future-wait current-future-prompt)) + (set-future-callbacks! future-block current-future-prompt)) diff --git a/racket/src/expander/extract/defn-known.rkt b/racket/src/expander/extract/defn-known.rkt index 1be845dec3..31c8533957 100644 --- a/racket/src/expander/extract/defn-known.rkt +++ b/racket/src/expander/extract/defn-known.rkt @@ -1,6 +1,7 @@ #lang racket/base (require racket/list racket/match + "../common/set.rkt" "../run/status.rkt" "../compile/side-effect.rkt" "../compile/known.rkt") @@ -8,11 +9,15 @@ (struct struct-shape (num-fields num-parent-fields op-types)) -(define (add-defn-known! seen-defns syms rhs) +(define (add-defn-known! seen-defns all-mutated-vars syms rhs) (for ([s (in-list syms)]) (unless (hash-ref seen-defns s #f) (hash-set! seen-defns s (known-defined)))) - (cond + (cond + [(for/or ([s (in-list syms)]) + (set-member? all-mutated-vars s)) + ;; Don't record anything more specific for a mutated definition + (void)] ;; Recognize known-arity `lambda` and `case-lambda` [(and (= 1 (length syms)) (lambda-arity rhs)) => diff --git a/racket/src/expander/extract/gc-defn.rkt b/racket/src/expander/extract/gc-defn.rkt index e0c590e967..a28bd9ca28 100644 --- a/racket/src/expander/extract/gc-defn.rkt +++ b/racket/src/expander/extract/gc-defn.rkt @@ -104,6 +104,7 @@ (for ([sym (in-list (defn-syms defn))]) (hash-set! seen-defns sym (known-defined))) (add-defn-known! seen-defns + (seteq) (defn-syms defn) (defn-rhs defn))))) (for ([sym (in-list (defn-syms defn))]) diff --git a/racket/src/expander/extract/simplify-defn.rkt b/racket/src/expander/extract/simplify-defn.rkt index f9a7f7e379..22d5276bd2 100644 --- a/racket/src/expander/extract/simplify-defn.rkt +++ b/racket/src/expander/extract/simplify-defn.rkt @@ -151,11 +151,11 @@ (hash-ref seen-defns (car (defn-syms d)) #f)) #:break (not (safe-defn-or-expr? d)) #:when (defn? d)) - (add-defn-known! seen-defns (defn-syms d) (defn-rhs d))) + (add-defn-known! seen-defns all-mutated-vars (defn-syms d) (defn-rhs d))) (define e (car body)) (define new-defn (list 'define-values (defn-syms e) (simplify-expr (defn-rhs e) all-mutated-vars safe-ref? seen-defns))) - (add-defn-known! seen-defns (defn-syms e) (defn-rhs e)) + (add-defn-known! seen-defns all-mutated-vars (defn-syms e) (defn-rhs e)) (cons new-defn (loop (cdr body)))] [else (define e diff --git a/racket/src/io/logger/main.rkt b/racket/src/io/logger/main.rkt index bef09d6281..d3153d6160 100644 --- a/racket/src/io/logger/main.rkt +++ b/racket/src/io/logger/main.rkt @@ -23,7 +23,9 @@ add-stderr-log-receiver! add-stdout-log-receiver! add-syslog-log-receiver! - logger-init!) + logger-init! + logging-future-events? + log-future-event) (define (make-root-logger) (create-logger #:topic #f #:parent #f #:propagate-filters 'none)) @@ -62,6 +64,10 @@ (atomically/no-interrupts/no-wind (log-level?* logger level topic))) +(define (logging-future-events?) + (atomically/no-interrupts/no-wind + (log-level?* root-logger 'debug 'future))) + ;; In atomic mode with interrupts disabled (define/who (log-level?* logger level topic) (level>=? (logger-wanted-level logger topic) level)) @@ -129,6 +135,10 @@ (atomically/no-interrupts/no-wind (log-message* logger level topic message data prefix? #f))) +(define (log-future-event message data) + (atomically/no-interrupts/no-wind + (log-message* root-logger 'debug 'future message data #t #f))) + ;; In atomic mode with interrupts disabled ;; Can be called in any host Scheme thread and in interrupt handler, ;; like `log-level?*` diff --git a/racket/src/io/main.rkt b/racket/src/io/main.rkt index 62373c057e..807540f95b 100644 --- a/racket/src/io/main.rkt +++ b/racket/src/io/main.rkt @@ -59,7 +59,6 @@ get-original-error-port) (define (io-place-init! in-fd out-fd err-fd cust plumber) - (sandman-place-init!) (rktio-place-init!) (logger-init!) (shared-ltps-place-init!) diff --git a/racket/src/io/sandman/lock.rkt b/racket/src/io/sandman/lock.rkt deleted file mode 100644 index b7f5982612..0000000000 --- a/racket/src/io/sandman/lock.rkt +++ /dev/null @@ -1,20 +0,0 @@ -#lang racket/base -(require "../common/internal-error.rkt") - -;; Simple lock for sandman - -(provide make-lock - lock-acquire - lock-release) - -(define (make-lock) - (box 0)) - -(define (lock-acquire box) - (let loop () - (unless (and (= 0 (unbox box)) (box-cas! box 0 1)) - (loop)))) - -(define (lock-release box) - (unless (box-cas! box 1 0) - (internal-error "failed to release lock"))) diff --git a/racket/src/io/sandman/main.rkt b/racket/src/io/sandman/main.rkt index 256b75fc28..83f6d73f31 100644 --- a/racket/src/io/sandman/main.rkt +++ b/racket/src/io/sandman/main.rkt @@ -4,7 +4,6 @@ "../common/internal-error.rkt" "../host/thread.rkt" "../host/rktio.rkt" - "lock.rkt" "ltps.rkt") ;; Create an extended sandman that can sleep with a rktio poll set. An @@ -22,8 +21,7 @@ sandman-poll-ctx-add-poll-set-adder! sandman-poll-ctx-merge-timeout sandman-set-background-sleep! - sandman-poll-ctx-poll? - sandman-place-init!) + sandman-poll-ctx-poll?) (struct exts (timeout-at fd-adders)) @@ -57,13 +55,6 @@ (set! background-sleep sleep) (set! background-sleep-fd fd)) -(define-place-local lock (make-lock)) -(define-place-local waiting-threads '()) -(define-place-local awoken-threads '()) - -(define (sandman-place-init!) - (set! lock (make-lock))) - (void (current-sandman (let ([timeout-sandman (current-sandman)]) @@ -165,37 +156,4 @@ ;; extract-timeout (lambda (exts) - (exts-timeout-at exts)) - - ;; condition-wait - (lambda (t) - (lock-acquire lock) - (set! waiting-threads (cons t waiting-threads)) - (lock-release lock) - ;; awoken callback. for when thread is awoken - (lambda () - (lock-acquire lock) - (if (memq t waiting-threads) - (begin - (set! waiting-threads (remove t waiting-threads eq?)) - (set! awoken-threads (cons t awoken-threads)) - (rktio_signal_received_at (rktio_get_signal_handle rktio))) ;; wakeup main thread if sleeping - (internal-error "thread is not a member of waiting-threads\n")) - (lock-release lock))) - - ;; condition-poll - (lambda (mode wakeup) - (lock-acquire lock) - (define at awoken-threads) - (set! awoken-threads '()) - (lock-release lock) - (for-each (lambda (t) - (wakeup t)) at)) - - ;; any-waiters? - (lambda () - (or (not (null? waiting-threads)) (not (null? awoken-threads)))) - - - ;; lock - lock)))) + (exts-timeout-at exts)))))) diff --git a/racket/src/schemify/schemify.rkt b/racket/src/schemify/schemify.rkt index 309a74f09e..94272ada3b 100644 --- a/racket/src/schemify/schemify.rkt +++ b/racket/src/schemify/schemify.rkt @@ -744,6 +744,8 @@ prim-knowns knowns imports mutated simples)])))] [`,_ (let ([u-v (unwrap v)]) + (when (eq? u-v 'ensure-place-wakeup-handle) + (log-error "here")) (cond [(not (symbol? u-v)) v] diff --git a/racket/src/thread/Makefile b/racket/src/thread/Makefile index 561391e94c..0f613d84a1 100644 --- a/racket/src/thread/Makefile +++ b/racket/src/thread/Makefile @@ -31,7 +31,14 @@ GLOBALS = --no-global \ ++global-ok place-wakeup \ ++global-ok compute-memory-sizes \ ++global-ok check-place-activity \ - ++global-ok make-place-ports+fds + ++global-ok make-place-ports+fds \ + ++global-ok future-block-for-atomic \ + ++global-ok pthread-count \ + ++global-ok wakeup-this-place \ + ++global-ok ensure-place-wakeup-handle \ + ++global-ok futures-sync-for-custodian-shutdown \ + ++global-ok logging-future-events? \ + ++global-ok log-future-event GENERATE_ARGS = -t main.rkt --submod main \ --check-depends $(BUILDDIR)compiled/thread-dep.rktd \ diff --git a/racket/src/thread/api.rkt b/racket/src/thread/api.rkt index c634eb6f69..4037e1dd5f 100644 --- a/racket/src/thread/api.rkt +++ b/racket/src/thread/api.rkt @@ -9,9 +9,15 @@ [poll-guard-evt raw:poll-guard-evt] [choice-evt raw:choice-evt]) (only-in "sync.rkt" - sync/enable-break)) + sync/enable-break) + (only-in "parameter.rkt" + [current-future raw:current-future]) + (only-in "future.rkt" + future-block + currently-running-future)) -(provide wrap-evt +(provide current-future + wrap-evt handle-evt handle-evt? guard-evt @@ -23,6 +29,10 @@ call-with-semaphore call-with-semaphore/enable-break) +(define (current-future) + (or (raw:current-future) + (currently-running-future))) + (define/who (choice-evt . args) (for ([arg (in-list args)]) (check who evt? arg)) diff --git a/racket/src/thread/atomic.rkt b/racket/src/thread/atomic.rkt index a38f1196fa..1f052db6bf 100644 --- a/racket/src/thread/atomic.rkt +++ b/racket/src/thread/atomic.rkt @@ -3,6 +3,7 @@ "host.rkt" "place-local.rkt" "internal-error.rkt" + "parameter.rkt" "debug.rkt") (provide atomically @@ -17,16 +18,18 @@ in-atomic-mode? + future-barrier + add-end-atomic-callback! start-implicit-atomic-mode end-implicit-atomic-mode - assert-atomic-mode) + assert-atomic-mode -;; This definition is specially recognized for Racket on -;; Chez Scheme and converted to use a virtual register: -(define current-atomic (make-pthread-parameter 0)) + set-future-block!) +;; "atomically" is atomic within a place; when a future-running +;; pthread tries to enter atomic mode, it is suspended (define-syntax-rule (atomically expr ...) (begin (start-atomic) @@ -41,9 +44,12 @@ (let () expr ...) (end-atomic/no-interrupts)))) +;; inlined in Chez Scheme embedding: (define (start-atomic) + (future-barrier) (current-atomic (fx+ (current-atomic) 1))) +;; inlined in Chez Scheme embedding: (define (end-atomic) (define n (fx- (current-atomic) 1)) (cond @@ -82,6 +88,11 @@ (define (in-atomic-mode?) (positive? (current-atomic))) +;; inlined in Chez Scheme embedding: +(define (future-barrier) + (when (current-future) + (future-block-for-atomic))) + ;; ---------------------------------------- ;; A "list" of callbacks to run when exiting atomic mode, @@ -99,6 +110,13 @@ ;; ---------------------------------------- +(define future-block-for-atomic (lambda () (void))) + +(define (set-future-block! block) + (set! future-block-for-atomic block)) + +;; ---------------------------------------- + (debug-select #:on [(define current-implicit-atomic (make-pthread-parameter #t)) diff --git a/racket/src/thread/bootstrap.rkt b/racket/src/thread/bootstrap.rkt index 61aff7cf1a..3c50d9add2 100644 --- a/racket/src/thread/bootstrap.rkt +++ b/racket/src/thread/bootstrap.rkt @@ -13,7 +13,7 @@ (provide register-place-symbol! set-io-place-init!) -(define (make-engine thunk prompt-tag init-break-enabled-cell empty-config?) +(define (make-engine thunk prompt-tag abort-handler init-break-enabled-cell empty-config?) (define ready-s (make-semaphore)) (define s (make-semaphore)) (define prefix void) @@ -280,7 +280,8 @@ (error "current-engine state: not ready")) 'make-mutex (lambda () (make-semaphore 1)) 'mutex-acquire (lambda (s) (semaphore-wait s)) - 'mutex-release (lambda (s) (semaphore-post s)))) + 'mutex-release (lambda (s) (semaphore-post s)) + 'continuation-current-primitive (lambda (k) #f))) ;; add dummy definitions that implement pthreads and conditions etc. ;; dummy definitions that error diff --git a/racket/src/thread/channel.rkt b/racket/src/thread/channel.rkt index c58954d20b..f6b41e20a5 100644 --- a/racket/src/thread/channel.rkt +++ b/racket/src/thread/channel.rkt @@ -62,7 +62,7 @@ (let receive () ; loop if a retry is needed ((atomically (define pw+v (queue-remove! (channel-put-queue ch))) - (define gw (current-thread)) + (define gw (current-thread/in-atomic)) (cond [(not pw+v) (define gq (channel-get-queue ch)) @@ -95,7 +95,7 @@ (define b (box #f)) (define gq (channel-get-queue ch)) (define gw (channel-select-waiter (poll-ctx-select-proc poll-ctx) - (current-thread))) + (current-thread/in-atomic))) (define n (queue-add! gq (cons gw b))) (values #f (wrap-evt @@ -126,7 +126,7 @@ [else ((atomically (define gw+b (queue-remove! (channel-get-queue ch))) - (define pw (current-thread)) + (define pw (current-thread/in-atomic)) (cond [(not gw+b) (define pq (channel-put-queue ch)) @@ -158,7 +158,7 @@ [else (define pq (channel-put-queue ch)) (define pw (channel-select-waiter (poll-ctx-select-proc poll-ctx) - (current-thread))) + (current-thread/in-atomic))) (define n (queue-add! pq (cons pw v))) (values #f (wrap-evt @@ -201,10 +201,11 @@ ;; ---------------------------------------- +;; in atomic mode (define (not-matching-select-waiter w+b/v) (define w (car w+b/v)) (or (not (channel-select-waiter? w)) - (not (eq? (current-thread) + (not (eq? (current-thread/in-atomic) (channel-select-waiter-thread w))))) ;; ---------------------------------------- diff --git a/racket/src/thread/config.rkt b/racket/src/thread/config.rkt new file mode 100644 index 0000000000..e732cfbe19 --- /dev/null +++ b/racket/src/thread/config.rkt @@ -0,0 +1,5 @@ +#lang racket/base + +(provide (all-defined-out)) + +(define TICKS 100000) diff --git a/racket/src/thread/custodian-object.rkt b/racket/src/thread/custodian-object.rkt index 8e316c4da2..302ad87ac0 100644 --- a/racket/src/thread/custodian-object.rkt +++ b/racket/src/thread/custodian-object.rkt @@ -15,8 +15,9 @@ [place #:mutable] ; place containing the custodian [memory-use #:mutable] ; set after a major GC [gc-roots #:mutable] ; weak references to charge to custodian; access without interrupts - [memory-limits #:mutable] ; list of (cons limit cust) - [immediate-limit #:mutable]) ; limit on immediate allocation + [memory-limits #:mutable] ; list of (cons limit cust) + [immediate-limit #:mutable] ; limit on immediate allocation + [sync-futures? #:mutable]) ; whether a sync witht future threads is needed on shutdown #:authentic) (define (create-custodian parent) @@ -30,7 +31,8 @@ 0 ; memory use #f ; GC roots null ; memory limits - #f)) ; immediate limit + #f ; immediate limit + #f)) ; sync-futures? (define initial-place-root-custodian (create-custodian #f)) diff --git a/racket/src/thread/custodian.rkt b/racket/src/thread/custodian.rkt index 7465a3d236..9c5fa69519 100644 --- a/racket/src/thread/custodian.rkt +++ b/racket/src/thread/custodian.rkt @@ -43,6 +43,9 @@ create-custodian poll-custodian-will-executor)) +(module+ for-future + (provide set-custodian-futures-sync!)) + ;; For `(struct custodian ...)`, see "custodian-object.rkt" (struct custodian-box ([v #:mutable] sema) @@ -259,6 +262,8 @@ (define (do-custodian-shutdown-all c) (unless (custodian-shut-down? c) (set-custodian-shut-down?! c #t) + (when (custodian-sync-futures? c) + (futures-sync-for-custodian-shutdown)) (for ([(child callback) (in-hash (custodian-children c))]) (if (procedure-arity-includes? callback 2) (callback child c) @@ -364,6 +369,13 @@ ;; ---------------------------------------- +(define futures-sync-for-custodian-shutdown (lambda () (void))) + +(define (set-custodian-futures-sync! proc) + (set! futures-sync-for-custodian-shutdown proc)) + +;; ---------------------------------------- + ;; Disable interrupts before taking this lock, since it ;; guards values that are manipulated by a GC callback (define memory-limit-lock (host:make-mutex)) diff --git a/racket/src/thread/demo.rkt b/racket/src/thread/demo.rkt index 2cce89534d..362e7198e9 100644 --- a/racket/src/thread/demo.rkt +++ b/racket/src/thread/demo.rkt @@ -562,6 +562,35 @@ (check (vector 'again) got5) (check 0 (place-wait pl5)) + (check #f (current-future)) + + (define f (future (lambda () 10))) + (check 10 (touch f)) + + (check 11 (touch (would-be-future (lambda () 11)))) + (check 12 (touch (would-be-future (lambda () (sleep) 12)))) ; blocks on `(sleep)` + + (define fx (future (lambda () (current-future)))) + (check fx (touch fx)) + + (define f0s (for/list ([i (in-range 50)]) + (future (lambda () + (let loop ([i i]) + (if (zero? i) + i + (add1 (loop (sub1 i))))))))) + (check (for/list ([i (in-range 50)]) i) (map touch f0s)) + + (define fs (make-fsemaphore 0)) + (check (void) (fsemaphore-post fs)) + (check (void) (fsemaphore-wait fs)) + (define f1 (future (lambda () (fsemaphore-wait fs) 'f1))) + (define f2 (future (lambda () (touch f1) 'f2))) + (sync (system-idle-evt)) + (check (void) (fsemaphore-post fs)) + (check 'f2 (touch f2)) + (check 'f1 (touch f1)) + (set! done? #t))) (unless done? diff --git a/racket/src/thread/fsemaphore.rkt b/racket/src/thread/fsemaphore.rkt index 9b2f5094ab..32de30d23f 100644 --- a/racket/src/thread/fsemaphore.rkt +++ b/racket/src/thread/fsemaphore.rkt @@ -1,6 +1,10 @@ #lang racket/base (require "check.rkt" - "semaphore.rkt") + "parameter.rkt" + "future-lock.rkt" + (submod "future.rkt" for-fsemaphore) + "evt.rkt" + "sync.rkt") (provide fsemaphore? make-fsemaphore @@ -9,24 +13,93 @@ fsemaphore-try-wait? fsemaphore-count) -(struct fsemaphore (sema)) +(struct fsemaphore ([c #:mutable] ; counter + lock + [dependents #:mutable] ; dependent futures + [dep-box #:mutable]) ; for waiting by non-futures + #:authentic) + +(struct fsemaphore-box-evt (b) + #:property prop:evt (poller (lambda (fsb poll-ctx) + (define b (fsemaphore-box-evt-b fsb)) + (cond + [(unbox b) (values '(#t) #f)] + [else (values #f fsb)])))) + (define/who (make-fsemaphore init) (check who exact-nonnegative-integer? init) - (fsemaphore (make-semaphore init))) + (fsemaphore init + (make-lock) + #hasheq() + #f)) -(define/who (fsemaphore-post fsema) - (check who fsemaphore? fsema) - (semaphore-post (fsemaphore-sema fsema))) +(define/who (fsemaphore-post fs) + (check who fsemaphore? fs) + (with-lock (fsemaphore-lock fs) + (define c (fsemaphore-c fs)) + (cond + [(zero? c) + (define b (fsemaphore-dep-box fs)) + (define deps (fsemaphore-dependents fs)) + ;; If a future is waiting on the semaphore, it wins over any + ;; non-future threads that are blocked on the fsemaphore. + ;; That's not a great choice, but it we don't have to worry + ;; about keeping track of threads that are in still line versus + ;; threads that have been interrupted. + (cond + [(not (hash-empty? deps)) + (define f (hash-iterate-key deps (hash-iterate-first deps))) + (set-fsemaphore-dependents! fs (hash-remove deps f)) + (future-notify-dependent f)] + [else + (when b + ;; This is a kind of broadcast wakeup, and then the + ;; awakened threads will compete for the fsemaphore: + (set-fsemaphore-dep-box! fs #f) + (set-box! b #t)) + (set-fsemaphore-c! fs 1)])] + [else + (set-fsemaphore-c! fs (add1 c))]))) -(define/who (fsemaphore-wait fsema) - (check who fsemaphore? fsema) - (semaphore-wait (fsemaphore-sema fsema))) +(define/who (fsemaphore-wait fs) + (check who fsemaphore? fs) + (lock-acquire (fsemaphore-lock fs)) + (define c (fsemaphore-c fs)) + (cond + [(zero? c) + (define me-f (current-future)) + (cond + [me-f + (lock-acquire (future*-lock me-f)) + (set-fsemaphore-dependents! fs (hash-set (fsemaphore-dependents fs) me-f #t)) + (set-future*-state! me-f 'fsema) + (lock-release (fsemaphore-lock fs)) + (future-suspend) ; expects lock on f and releases it + (void)] + [else + (define dep-box (or (fsemaphore-dep-box fs) + (let ([b (box #f)]) + (set-fsemaphore-dep-box! fs b) + b))) + (lock-release (fsemaphore-lock fs)) + (sync (fsemaphore-box-evt dep-box)) + (fsemaphore-wait fs)])] + [else + (set-fsemaphore-c! fs (sub1 c)) + (lock-release (fsemaphore-lock fs))])) -(define/who (fsemaphore-try-wait? fsema) - (check who fsemaphore? fsema) - (semaphore-try-wait? (fsemaphore-sema fsema))) +(define/who (fsemaphore-try-wait? fs) + (check who fsemaphore? fs) + (with-lock (fsemaphore-lock fs) + (define c (fsemaphore-c fs)) + (cond + [(zero? c) #f] + [else + (set-fsemaphore-c! fs (sub1 c)) + #t]))) -(define/who (fsemaphore-count fsema) - (check who fsemaphore? fsema) - 0) +(define/who (fsemaphore-count fs) + (check who fsemaphore? fs) + (with-lock (fsemaphore-lock fs) + (fsemaphore-c fs))) diff --git a/racket/src/thread/future-id.rkt b/racket/src/thread/future-id.rkt new file mode 100644 index 0000000000..e6e773c129 --- /dev/null +++ b/racket/src/thread/future-id.rkt @@ -0,0 +1,12 @@ +#lang racket/base + +(provide get-next-id) + +(define ID (box 1)) + +(define get-next-id + (lambda () + (let ([id (unbox ID)]) + (if (box-cas! ID id (+ 1 id)) + id + (get-next-id))))) diff --git a/racket/src/thread/future-lock.rkt b/racket/src/thread/future-lock.rkt new file mode 100644 index 0000000000..0f1bac2733 --- /dev/null +++ b/racket/src/thread/future-lock.rkt @@ -0,0 +1,62 @@ +#lang racket/base +(require "internal-error.rkt" + "host.rkt" + "parameter.rkt" + "atomic.rkt") + +;; This module implements a lightweight spinlock for futures and +;; fsemaphores. + +;; The overall locking regime depends on this lock order (i.e., +;; when multiple locks are held at once, they must be acquired +;; in this order): +;; +;; - fsemaphore [one at a time] +;; - schedule queue +;; - futures, lower ID before higher ID +;; +;; A future's lock must be held to change the future's fields, except +;; that the fields to implement the schedule queue should be modified +;; only with the schedule-queue lock held. +;; +;; A future with state #f is available to run, but it must be either +;; `would-be?` (never in the queue, only run by a Racket thread) or +;; currently in a queue for a future pthread to take ownership of the +;; future. + +(provide with-lock + make-lock + lock-acquire + lock-release + start-future-uninterrupted + end-future-uninterrupted) + +(define-syntax-rule (with-lock lock-expr expr ...) + (let ([lock lock-expr]) + (lock-acquire lock) + (begin0 + (let () expr ...) + (lock-release lock)))) + +(define (make-lock) (box 0)) + +(define (start-future-uninterrupted) + (if (current-future) + (current-atomic (add1 (current-atomic))) ; see `run-future-in-worker` + (start-atomic))) + +(define (end-future-uninterrupted) + (if (current-future) + (current-atomic (sub1 (current-atomic))) ; see `run-future-in-worker` + (end-atomic))) + +(define (lock-acquire lock) + (start-future-uninterrupted) + (let loop () + (unless (box-cas! lock 0 1) + (loop)))) + +(define (lock-release lock) + (unless (box-cas! lock 1 0) + (internal-error "lock release failed!")) + (end-future-uninterrupted)) diff --git a/racket/src/thread/future-logging.rkt b/racket/src/thread/future-logging.rkt new file mode 100644 index 0000000000..569be41281 --- /dev/null +++ b/racket/src/thread/future-logging.rkt @@ -0,0 +1,94 @@ +#lang racket/base +(require "host.rkt" + "parameter.rkt" + "future-object.rkt" + "place-local.rkt") + +(provide log-future + logging-futures? + flush-future-log + + install-future-logging-procs!) + +(struct future-event (future-id proc-id action time prim-name user-data) + #:prefab) + +(define-place-local events null) + +;; called with no future locks held +(define (log-future action [future-id #f] + #:prim-name [prim-name #f] + #:data [data #f]) + (cond + [(current-future) + => (lambda (me-f) + (set! events (cons (future-event (or future-id (future*-id me-f)) + (get-pthread-id) + action + (current-inexact-milliseconds) + prim-name + data) + events)))] + [(logging-futures?) + (flush-future-log) + (define id (or future-id + (let ([f (currently-running-future)]) + (if f + (future*-id f) + -1)))) + (log-future-event* (future-event id 0 action (current-inexact-milliseconds) prim-name data))])) + +;; maybe in atomic mode and only in main pthread +(define (logging-futures?) + (logging-future-events?)) + +;; maybe in atomic mode and only in main pthread +(define (flush-future-log) + (define new-events events) + (unless (null? new-events) + (set! events null) + (when (logging-futures?) + (for ([e (in-list (reverse new-events))]) + (log-future-event* e))))) + +(define (log-future-event* e) + (define proc-id (future-event-proc-id e)) + (define action (future-event-action e)) + (define msg (string-append "id " + (number->string (future-event-future-id e)) + ", process " + (number->string proc-id) + ": " + (if (and (eqv? proc-id 0) + (eq? action 'block)) + (string-append "HANDLING: " + (symbol->string + (or (future-event-prim-name e) + '|[unknown]|))) + (action->string action)) + "; time: " + (number->string (future-event-time e)))) + (log-future-event msg e)) + +(define (action->string a) + (case a + [(create) "created"] + [(complete) "completed"] + [(start-work) "started work"] + [(end-work) "ended work"] + [(block) "BLOCKING on process 0"] + [(touch) "touching future: touch"] + [(result) "result determined"] + [(suspend) "suspended"] + [(touch-pause) "paused for touch"] + [(touch-resume) "resumed for touch"] + [else "[unknown action]"])) + +;; ---------------------------------------- + +(define logging-future-events? (lambda () #f)) +(define log-future-event (lambda (msg e) (void))) + +(define (install-future-logging-procs! logging? log) + (set! logging-future-events? logging?) + (set! log-future-event log)) diff --git a/racket/src/thread/future-object.rkt b/racket/src/thread/future-object.rkt new file mode 100644 index 0000000000..e309ee4e51 --- /dev/null +++ b/racket/src/thread/future-object.rkt @@ -0,0 +1,35 @@ +#lang racket/base +(require "host.rkt") + +(provide (struct-out future*) + + currently-running-future-key + currently-running-future) + +;; ---------------------------------------- + +;; See "future-lock.rkt" for information on locking rules +(struct future* (id + lock + custodian ; don't run in future pthread if custodian is shut down + [would-be? #:mutable] ; transitions from #t to 'blocked after blocked + [thunk #:mutable] ; thunk or continuation + [prev #:mutable] ; queue previous + [next #:mutable] ; queue next + [results #:mutable] + [state #:mutable] ; #f (could run), 'running, 'blocked, 'done, 'aborted, 'fsema, or future waiting on + [dependents #:mutable]) ; futures that are blocked on this one + #:authentic + #:reflection-name 'future) + +;; ---------------------------------------- + +(define currently-running-future-key (gensym 'future)) + +;; Only called in a Racket thread: +(define (currently-running-future) + (continuation-mark-set-first + #f + currently-running-future-key + #f + (unsafe-root-continuation-prompt-tag))) diff --git a/racket/src/thread/future.rkt b/racket/src/thread/future.rkt index d2d1327de6..c48da3e8cf 100644 --- a/racket/src/thread/future.rkt +++ b/racket/src/thread/future.rkt @@ -1,100 +1,163 @@ #lang racket/base - -(require "place-local.rkt" +(require "config.rkt" + "place-local.rkt" "check.rkt" "internal-error.rkt" "host.rkt" - "atomic.rkt" "parameter.rkt" - "../common/queue.rkt" + "atomic.rkt" + "custodian-object.rkt" "thread.rkt" - "lock.rkt") + (submod "thread.rkt" for-future) + (submod "custodian.rkt" for-future) + "sync.rkt" + "evt.rkt" + "future-object.rkt" + "future-id.rkt" + "future-lock.rkt" + "future-logging.rkt") (provide init-future-place! futures-enabled? - current-future future future? would-be-future touch future-block - future-wait current-future-prompt - future:condition-broadcast - future:condition-signal - future:condition-wait - future:make-condition - signal-future + currently-running-future reset-future-logs-for-tracing! - mark-future-trace-end!) + mark-future-trace-end! + set-processor-count!) -(define place-main-thread-id (make-pthread-parameter 0)) +(module+ for-place + (provide set-place-future-procs! + kill-future-scheduler)) + +(module+ for-fsemaphore + (provide future*-lock + set-future*-state! + future-suspend + future-notify-dependent)) (define (init-future-place!) - (place-main-thread-id (get-pthread-id))) + (void)) -;; not sure of order here... -(define (get-caller) - (cond - [(current-future) - (current-future)] - [(not (= (place-main-thread-id) (get-pthread-id))) - (get-pthread-id)] - [else - (current-thread)])) +(define (futures-enabled?) + (threaded?)) +;; ---------------------------------------- -;; ---------------------------- futures ---------------------------------- +(struct future-evt (future) + #:property prop:evt (poller (lambda (fe poll-ctx) + (define f (future-evt-future fe)) + (lock-acquire (future*-lock f)) + (define s (future*-state f)) + (lock-release (future*-lock f)) + (cond + [(or (eq? s 'running) + (eq? s 'fsema)) + (values #f fe)] + [else (values '(#t) #f)])))) +(define (create-future thunk cust would-be?) + (define id (get-next-id)) + (log-future 'create #:data id) + (future* id + (make-lock) ; lock + cust + would-be? + thunk + #f ; prev + #f ; next + #f ; results + #f ; state + #hasheq())) ; dependents -(define ID (box 1)) +(define (future? v) + (future*? v)) -(define get-next-id - (lambda () - (let ([id (unbox ID)]) - (if (box-cas! ID id (+ 1 id)) - id - (get-next-id))))) - -(define futures-enabled? threaded?) - -(struct future* (id cond lock prompt - would-be? [thunk #:mutable] [engine #:mutable] - [cont #:mutable] [result #:mutable] [done? #:mutable] - [blocked? #:mutable][resumed? #:mutable] - [cond-wait? #:mutable])) - -(define (create-future would-be-future?) - (future* (get-next-id) ;; id - (future:make-condition) ;; cond - (make-lock) ;; lock - (make-continuation-prompt-tag 'future) ;; prompt - would-be-future? ;; would-be? - #f ;; thunk - #f ;; engine - #f ;; cont - #f ;; result - #f ;; done? - #f ;; blocked? - #f ;; resumed? - #f)) ;; cond-wait? - -(define future? future*?) - -(define current-future (make-pthread-parameter #f)) +(define future-scheduler-prompt-tag (make-continuation-prompt-tag 'future-scheduler)) +(define future-start-prompt-tag (make-continuation-prompt-tag 'future-star)) (define (current-future-prompt) (if (current-future) - (future*-prompt (current-future)) - (internal-error "Not running in a future."))) + future-scheduler-prompt-tag + (internal-error "not running in a future"))) -(define (thunk-wrapper f thunk) - (lambda () - (let ([result (thunk)]) - (with-lock ((future*-lock f) (current-future)) - (set-future*-result! f result) - (set-future*-done?! f #t) - (future:condition-broadcast (future*-cond f)))))) +;; called with lock on f held; +;; in a non-main pthread, caller is responsible for logging 'end-work +(define (run-future f #:was-blocked? [was-blocked? #f]) + (set-future*-state! f 'running) + (define thunk (future*-thunk f)) + (set-future*-thunk! f #f) + (lock-release (future*-lock f)) + (when was-blocked? + (when (logging-futures?) + (log-future 'block (future*-id f) #:prim-name (continuation-current-primitive + thunk + '(unsafe-start-atomic))) + (log-future 'result (future*-id f)))) + (unless (eq? (future*-would-be? f) 'blocked) + (log-future 'start-work (future*-id f))) + (define (finish! results state) + (start-future-uninterrupted) + (lock-acquire (future*-lock f)) + (set-future*-results! f results) + (set-future*-state! f state) + (define deps (future*-dependents f)) + (set-future*-dependents! f #hasheq()) + (lock-release (future*-lock f)) + ;; stay in uninterrupted mode here, because we need to make sure + ;; that dependents get rescheduled + (future-notify-dependents deps) + (end-future-uninterrupted) + (log-future 'complete (future*-id f))) + (cond + [(current-future) + ;; An attempt to escape will cause the future to block, so + ;; we only need to handle success + (call-with-values (lambda () + (call-with-continuation-prompt + thunk + future-start-prompt-tag + (lambda args (void)))) + (lambda results + (finish! results 'done)))] + [(eq? (future*-would-be? f) #t) + ;; Similar to `(current-future)` case, but retries + ;; excplitily if the future blocks + (call-with-values (lambda () + (call-with-continuation-prompt + (lambda () + (current-future f) + (begin0 + (thunk) + (current-future #f))) + future-start-prompt-tag + (lambda args + ;; Blocked as a would-be future; `(current-future)` has been + ;; reset to #f, and we can retry immediately + (set-future*-would-be?! f 'blocked) + (touch f)))) + (lambda results + (when (eq? (future*-state f) 'running) + (finish! results 'done) + (log-future 'end-work (future*-id f)))))] + [else + ;; No need for the future prompt tag + (dynamic-wind + (lambda () (void)) + (lambda () + (with-continuation-mark + currently-running-future-key f + (call-with-values thunk + (lambda results + (finish! results 'done))))) + (lambda () + (unless (eq? (future*-state f) 'done) + (finish! #f 'aborted)) + (log-future 'end-work (future*-id f))))])) (define/who (future thunk) (check who (procedure-arity-includes/c 0) thunk) @@ -102,311 +165,419 @@ [(not (futures-enabled?)) (would-be-future thunk)] [else - (let ([f (create-future #f)]) - (set-future*-engine! f (make-engine (thunk-wrapper f thunk) (future*-prompt f) #f #t)) - (schedule-future f) - f)])) + (define me-f (current-future)) + (define cust (future-custodian me-f)) + (define f (create-future thunk cust #f)) + (when cust + (unless me-f + (maybe-start-scheduler) + (set-custodian-sync-futures?! cust #t)) + (schedule-future! f)) + f])) (define/who (would-be-future thunk) (check who (procedure-arity-includes/c 0) thunk) - (let ([f (create-future #t)]) - (set-future*-thunk! f (thunk-wrapper f thunk)) - f)) + (ensure-place-wakeup-handle) + (create-future thunk (future-custodian (current-future)) #t)) + +(define (future-custodian me-f) + (if me-f + (future*-custodian me-f) + (thread-representative-custodian (current-thread/in-atomic)))) + +;; When two futures interact, we may need to adjust both; +;; to keep locks ordered, take lock of future with the +;; lower ID, first; beware that the two futures make be +;; the same (in which case we're headed for a circular +;; dependency) +(define (lock-acquire-both f) + (define me-f (current-future)) + (cond + [(or (not me-f) + (eq? me-f f)) + (lock-acquire (future*-lock f))] + [((future*-id me-f) . < . (future*-id f)) + (lock-acquire (future*-lock me-f)) + (lock-acquire (future*-lock f))] + [else + (lock-acquire (future*-lock f)) + (lock-acquire (future*-lock me-f))])) + +(define (lock-release-both f) + (lock-release-current) + (lock-release (future*-lock f))) + +(define (lock-release-current) + (define me-f (current-future)) + (when me-f + (lock-release (future*-lock me-f)))) (define/who (touch f) (check who future*? f) + (lock-acquire-both f) + (define s (future*-state f)) (cond - [(future*-done? f) - (future*-result f)] - [(future*-would-be? f) - ((future*-thunk f)) - (future*-result f)] - [(lock-acquire (future*-lock f) (get-caller) #f) ;; got lock - (when (or (and (not (future*-blocked? f)) (not (future*-done? f))) - (and (future*-blocked? f) (not (future*-cont f)))) - (future:condition-wait (future*-cond f) (future*-lock f))) - (future-awoken f)] + [(eq? s 'done) + (lock-release-both f) + (apply values (future*-results f))] + [(eq? s 'aborted) + (lock-release-both f) + (raise (exn:fail "touch: future previously aborted" + (current-continuation-marks)))] + [(eq? s 'blocked) + (cond + [(current-future) + ;; Can't run a blocked future in a future pthread + (dependent-on-future f)] + [else + ;; Lock on f is held (and no current future to lock) + (run-future f #:was-blocked? #t) + (apply values (future*-results f))])] + [(eq? s #f) + (cond + [(current-future) + ;; Need to wait on `f`, so deschedule current one; + ;; we may pick `f` next the queue (or maybe later) + (dependent-on-future f)] + [(future*-would-be? f) ; => not scheduled + (lock-release-current) + ;; Lock on f is held + (run-future f) + (apply values (future*-results f))] + [else + ;; Give up locks in hope of geting `f` off the + ;; schedule queue + (lock-release (future*-lock f)) + (cond + [(try-deschedule-future? f) + ;; lock on `f` is held... + (run-future f) + (apply values (future*-results f))] + [else + ;; Contention, so try again + (touch f)])])] + [(eq? s 'running) + (cond + [(current-future) + ;; Stop working on this one until `f` is done + (dependent-on-future f)] + [else + ;; Have to wait until it's not running anywhere + (set-future*-dependents! f (hash-set (future*-dependents f) 'place #t)) + (lock-release (future*-lock f)) + (log-future 'touch-pause (future*-id f)) + (sync (future-evt f)) + (log-future 'touch-resume (future*-id f)) + (touch f)])] + [(future? s) + (cond + [(current-future) + ;; Waiting on `s` on, so give up on the current future for now + (dependent-on-future f)] + [else + ;; Maybe we can start running `s` to get `f` moving... + (lock-release (future*-lock f)) + (touch s) + (touch f)])] + [(box? s) ; => dependent on fsemaphore + (cond + [(current-future) + ;; Lots to wait on, so give up on the current future for now + (dependent-on-future f)] + [else + ;; Wait until fsemaphore post succeeds for the future, then try again. + (lock-release (future*-lock f)) + (log-future 'touch-pause (future*-id f)) + (sync (future-evt f)) + (log-future 'touch-resume (future*-id f)) + (touch f)])] [else - (touch f)])) + (lock-release (future*-lock f)) + (internal-error "unrecognized future state")])) -(define (future-awoken f) - (cond - [(future*-done? f) ;; someone else ran continuation - (lock-release (future*-lock f) (get-caller)) - (future*-result f)] - [(future*-blocked? f) ;; we need to run continuation - (set-future*-blocked?! f #f) - (set-future*-resumed?! f #t) - (lock-release (future*-lock f) (get-caller)) - ((future*-cont f) '()) - (future*-result f)] - [else - (internal-error "Awoken but future is neither blocked nor done.")])) +;; called in a futurre pthread; +;; called with lock held for both `f` and the current future +(define (dependent-on-future f) + ;; in a future pthread, so set up a dependency and on `f` and + ;; bail out, so the current future pthread can do other things; + ;; note that `me-f` might be the same as `f`, in which case we'll + ;; create a circular dependency + (define me-f (current-future)) + (set-future*-dependents! f (hash-set (future*-dependents f) me-f #t)) + (set-future*-state! me-f f) + (on-transition-to-unfinished) + (unless (eq? me-f f) + (lock-release (future*-lock f))) + ;; almost the same as being blocked, but when `f` completes, + ;; it will reschedule `me-f` + (future-suspend f) + ;; on return from `future-suspend`, no locks are held + (touch f)) -;; called from chez layer. +;; called in a future pthread; +;; can be called from Rumble layer (define (future-block) - (define f (current-future)) - (when (and f (not (future*-blocked? f)) (not (future*-resumed? f))) - (with-lock ((future*-lock f) f) - (set-future*-blocked?! f #t)) - (engine-block))) + (define me-f (current-future)) + (unless (future*-would-be? me-f) + (log-future 'block (future*-id me-f))) + (lock-acquire (future*-lock me-f)) + (set-future*-state! me-f 'blocked) + (on-transition-to-unfinished) + (future-suspend)) -;; called from chez layer. -;; this should never be called from outside a future. -(define (future-wait) - (define f (current-future)) - (with-lock ((future*-lock f) f) - (future:condition-wait (future*-cond f) (future*-lock f)))) +;; called with lock held on the current future +(define (future-suspend [touching-f #f]) + (define me-f (current-future)) + (call-with-composable-continuation + (lambda (k) + (set-future*-thunk! me-f k) + (lock-release (future*-lock me-f)) + (when touching-f + (log-future 'touch (future*-id me-f) #:data (future*-id touching-f))) + (unless (future*-would-be? me-f) + (log-future 'suspend (future*-id me-f))) + (cond + [(future*-would-be? me-f) + (current-future #f) + (abort-current-continuation future-start-prompt-tag (void))] + [else + (abort-current-continuation future-scheduler-prompt-tag (void))])) + future-start-prompt-tag)) -;; futures and conditions +;; ---------------------------------------- -(define (wait-future f m) - (with-lock ((future*-lock f) f) - (set-future*-cond-wait?! f #t)) - (lock-release m (get-caller)) - (engine-block)) +(define pthread-count 1) -(define (awaken-future f) - (with-lock ((future*-lock f) (get-caller)) - (set-future*-cond-wait?! f #f))) +;; Called by io layer +(define (set-processor-count! n) + (set! pthread-count n)) -;; --------------------------- conditions ------------------------------------ +(define-place-local the-scheduler #f) -(struct future-condition* (queue lock)) +(struct scheduler ([workers #:mutable] + [futures-head #:mutable] + [futures-tail #:mutable] + mutex ; guards futures chain; see "future-lock.rkt" for discipline + cond) ; signaled when chain goes from empty to non-empty + #:authentic) -(define (future:make-condition) - (future-condition* (make-queue) (make-lock))) +(struct worker (id + [die? #:mutable] + sync-state) ; box used to sync shutdowns: 'idle, 'running, or 'pending + #:authentic) -(define (future:condition-wait c m) - (define caller (get-caller)) - (if (own-lock? m caller) - (begin - (with-lock ((future-condition*-lock c) caller) - (queue-add! (future-condition*-queue c) caller)) - (if (future? caller) - (wait-future caller m) - (thread-condition-wait (lambda () (lock-release m caller)))) - (lock-acquire m (get-caller))) ;; reaquire lock - (internal-error "Caller does not hold lock\n"))) +(define (make-worker id) + (worker id + #f ; die? + (box 'idle))) -(define (signal-future f) - (future:condition-signal (future*-cond f))) - -(define (future:condition-signal c) - (with-lock ((future-condition*-lock c) (get-caller)) - (let ([waitees (future-condition*-queue c)]) - (unless (queue-empty? waitees) - (let ([waitee (queue-remove! waitees)]) - (if (future? waitee) - (awaken-future waitee) - (thread-condition-awaken waitee))))))) - -(define (future:condition-broadcast c) - (with-lock ((future-condition*-lock c) (get-caller)) - (define waitees '()) - (queue-remove-all! (future-condition*-queue c) - (lambda (e) - (set! waitees (cons e waitees)))) - (let loop ([q waitees]) - (unless (null? q) - (let ([waitee (car q)]) - (if (future? waitee) - (awaken-future waitee) - (thread-condition-awaken waitee)) - (loop (cdr q))))))) - -;; ------------------------------------- future scheduler ---------------------------------------- - -(define THREAD-COUNT 2) -(define TICKS 1000000000) - -(define-place-local global-scheduler #f) -(define (scheduler-running?) - (not (not global-scheduler))) - -(struct worker (id lock mutex cond - [queue #:mutable] [idle? #:mutable] - [pthread #:mutable #:auto] [die? #:mutable #:auto]) - #:auto-value #f) - -(struct scheduler ([workers #:mutable #:auto]) - #:auto-value #f) - -;; I think this atomically is sufficient to guarantee scheduler is only created once. +;; called in a Racket thread (define (maybe-start-scheduler) (atomically - (unless global-scheduler - (set! global-scheduler (scheduler)) - (let ([workers (create-workers)]) - (set-scheduler-workers! global-scheduler workers) - (start-workers workers))))) + (unless the-scheduler + (ensure-place-wakeup-handle) + (set! the-scheduler (scheduler '() + #f ; futures-head + #f ; futures-tail + (host:make-mutex) + (host:make-condition))) + (define workers + (for/list ([id (in-range 1 (add1 pthread-count))]) + (define w (make-worker id)) + (start-worker w) + w)) + (set-scheduler-workers! the-scheduler workers)))) -(define (kill-scheduler) - (when global-scheduler - (for-each (lambda (w) - (with-lock ((worker-lock w) (get-caller)) - (set-worker-die?! w #t))) - (scheduler-workers global-scheduler)))) +;; called in atomic mode +(define (kill-future-scheduler) + (when the-scheduler + (define s the-scheduler) + (host:mutex-acquire (scheduler-mutex s)) + (for ([w (in-list (scheduler-workers s))]) + (set-worker-die?! w #t)) + (host:condition-signal (scheduler-cond s)) + (host:mutex-release (scheduler-mutex s)) + (futures-sync-for-shutdown))) -(define (create-workers) - (let loop ([id 1]) +;; called in any pthread +;; called maybe holding an fsemaphore lock, but nothing else +(define (schedule-future! f #:front? [front? #f]) + (define s the-scheduler) + (host:mutex-acquire (scheduler-mutex s)) + (define old (if front? + (scheduler-futures-head s) + (scheduler-futures-tail s))) + (cond + [(not old) + (set-scheduler-futures-head! s f) + (set-scheduler-futures-tail! s f) + (host:condition-signal (scheduler-cond s))] + [front? + (set-future*-next! f old) + (set-future*-prev! old f) + (set-scheduler-futures-head! s f)] + [else + (set-future*-prev! f old) + (set-future*-next! old f) + (set-scheduler-futures-tail! s f)]) + (host:mutex-release (scheduler-mutex s))) + +;; called with queue lock held +(define (deschedule-future f) + (define s the-scheduler) + (cond + [(or (future*-prev f) + (future*-next f)) + (if (future*-prev f) + (set-future*-next! (future*-prev f) (future*-next f)) + (set-scheduler-futures-head! s (future*-next f))) + (if (future*-next f) + (set-future*-prev! (future*-next f) (future*-prev f)) + (set-scheduler-futures-tail! s (future*-prev f))) + (set-future*-prev! f #f) + (set-future*-next! f #f)] + [(eq? f (scheduler-futures-head s)) + (set-scheduler-futures-head! s #f) + (set-scheduler-futures-tail! s #f)] + [else + (internal-error "future is not in queue")])) + +;; called with no locks held; if successful, +;; returns with lock held on f +(define (try-deschedule-future? f) + (define s the-scheduler) + (host:mutex-acquire (scheduler-mutex s)) + (define ok? (cond - [(< id (+ 1 THREAD-COUNT)) - (cons (worker id (make-lock) (host:make-mutex) (host:make-condition) (make-queue) #t) - (loop (+ id 1)))] + [(and (not (future*-prev f)) + (not (future*-next f)) + (not (eq? f (scheduler-futures-head s)))) + ;; Was descheduled by someone else already, or maybe + ;; hasn't yet made it back into the schedule after a + ;; dependency triggered `future-notify-dependent` + #f] [else - '()]))) + (deschedule-future f) + (lock-acquire (future*-lock f)) + #t])) + (host:mutex-release (scheduler-mutex s)) + ok?) -;; When a new thread is forked it inherits the values of thread parameters from its creator -;; So, if current-atomic is set for the main thread and then new threads are forked, those new -;; threads current-atomic will be set and then never unset because they will not run code that -;; unsets it. -(define (start-workers workers) - (for-each (lambda (w) - (set-worker-pthread! w (fork-pthread (lambda () - (current-atomic 0) - (current-thread #f) - (current-engine-state #f) - (current-future #f) - ((worker-scheduler-func w)))))) - workers)) - -(define (schedule-future f) - (maybe-start-scheduler) - - (let ([w (pick-worker)]) - (with-lock ((worker-lock w) (get-caller)) - (host:mutex-acquire (worker-mutex w)) - (queue-add! (worker-queue w) f) - (host:condition-signal (worker-cond w)) - (host:mutex-release (worker-mutex w))))) - -(define (pick-worker) - (define workers (scheduler-workers global-scheduler)) - (let loop ([workers* (cdr workers)] - [best (car workers)]) +;; called in any pthread +;; called maybe holding an fsemaphore lock, but nothing else +(define (future-notify-dependents deps) + (for ([f (in-hash-keys deps)]) (cond - [(or (null? workers*) - (queue-empty? (worker-queue best))) - best] - [(< (queue-length (worker-queue (car workers*))) - (queue-length (worker-queue best))) - (loop (cdr workers*) - (car workers*))] - [else - (loop (cdr workers*) - best)]))) + [(eq? f 'place) (wakeup-this-place)] + [else (future-notify-dependent f)]))) -(define (wait-for-work w) - (define m (worker-mutex w)) - (let try () - (cond - [(not (queue-empty? (worker-queue w))) ;; got work in meantime - (void)] - [(host:mutex-acquire m #f) ;; cannot acquire lock while worker is being given work. - (host:condition-wait (worker-cond w) m) - (host:mutex-release m)] - [else ;; try to get lock again. - (try)]))) +;; called in any pthread +;; called maybe holding an fsemaphore lock, but nothing else +(define (future-notify-dependent f) + (with-lock (future*-lock f) + (set-future*-state! f #f)) + (on-transition-to-unfinished) + (if (future*-would-be? f) + (wakeup-this-place) + (schedule-future! f #:front? #t))) -(define (worker-scheduler-func worker) - (lambda () - - (define (loop) - (lock-acquire (worker-lock worker) (get-pthread-id)) ;; block - (cond - [(worker-die? worker) ;; worker was killed - (lock-release (worker-lock worker) (get-pthread-id))] - [(queue-empty? (worker-queue worker)) ;; have lock. no work - (lock-release (worker-lock worker) (get-pthread-id)) - (cond - [(steal-work worker) - (do-work)] - [else - (wait-for-work worker)]) - (loop)] - [else - (do-work) - (loop)])) - - (define (complete ticks args) - (void)) - - (define (expire future worker) - (lambda (new-eng timeout?) - (set-future*-engine! future new-eng) - (cond - [(positive? (current-atomic)) - ((future*-engine future) TICKS (prefix future) complete (expire future worker))] - [(future*-resumed? future) ;; run to completion - ((future*-engine future) TICKS void complete (expire future worker))] - [(not (future*-cont future)) ;; don't want to reschedule future with a saved continuation - (with-lock ((worker-lock worker) (get-caller)) - (host:mutex-acquire (worker-mutex worker)) - (queue-add! (worker-queue worker) future) - (host:mutex-release (worker-mutex worker)))] - [else - (with-lock ((future*-lock future) (get-caller)) - (future:condition-signal (future*-cond future)))]))) - - (define (prefix f) - (lambda () - (when (future*-blocked? f) - (call-with-composable-continuation - (lambda (k) - (with-lock ((future*-lock f) (current-future)) - (set-future*-cont! f k)) - (engine-block)) - (future*-prompt f))))) +;; ---------------------------------------- - - ;; need to have lock here. - (define (do-work) - (let ([work (queue-remove! (worker-queue worker))]) - (cond - [(future*-cond-wait? work) - (queue-add! (worker-queue worker) work) - (lock-release (worker-lock worker) (get-pthread-id))] ;; put back on queue - [else - (lock-release (worker-lock worker) (get-pthread-id)) - (current-future work) - ((future*-engine work) TICKS (prefix work) complete (expire work worker)) ;; call engine. - (current-future #f)]))) - - (loop))) +(define (start-worker w) + (define s the-scheduler) + (fork-pthread + (lambda () + (current-future 'worker) + (host:mutex-acquire (scheduler-mutex s)) + (let loop () + (or (box-cas! (worker-sync-state w) 'idle 'running) + (box-cas! (worker-sync-state w) 'pending 'running)) + (cond + [(worker-die? w) ; worker was killed + (host:mutex-release (scheduler-mutex s)) + (box-cas! (worker-sync-state w) 'running 'idle)] + [(scheduler-futures-head s) + => (lambda (f) + (deschedule-future f) + (host:mutex-release (scheduler-mutex s)) + (lock-acquire (future*-lock f)) + ;; lock is held on f; run the future + (maybe-run-future-in-worker f w) + ;; look for more work + (host:mutex-acquire (scheduler-mutex s)) + (loop))] + [else + ;; wait for work + (or (box-cas! (worker-sync-state w) 'pending 'idle) + (box-cas! (worker-sync-state w) 'running 'idle)) + (host:condition-wait (scheduler-cond s) (scheduler-mutex s)) + (loop)]))))) -(define (order-workers w1 w2) - (cond - [(< (worker-id w1) (worker-id w2)) - (values w1 w2)] - [else - (values w2 w1)])) +;; called with lock on f +(define (maybe-run-future-in-worker f w) + ;; Don't start the future if the custodian is shut down, + ;; because we may have transitioned from 'pending to + ;; 'running without an intervening check + (cond + [(custodian-shut-down? (future*-custodian f)) + (set-future*-state! f 'blocked) + (on-transition-to-unfinished) + (lock-release (future*-lock f))] + [else + (run-future-in-worker f w)])) - ;; Acquire lock of peer with smallest id # first. - ;; worker is attempting to steal work from peers - (define (steal-work worker) - (let loop ([q (scheduler-workers global-scheduler)]) - (cond - [(null? q) #f] ;; failed to steal work. - [(not (eq? (worker-id worker) (worker-id (car q)))) ;; not ourselves - (let*-values ([(peer) (car q)] - [(w1 w2) (order-workers worker peer)]) ;; order them. - (lock-acquire (worker-lock w1) (get-pthread-id)) - (lock-acquire (worker-lock w2) (get-pthread-id)) - (cond - [(> (queue-length (worker-queue peer)) 2) ;; going to steal. Should likely made this # higher. - (do ([i (floor (/ (queue-length (worker-queue peer)) 2)) (- i 1)]) - [(zero? i) (void)] - (let ([work (queue-remove-end! (worker-queue peer))]) - (queue-add! (worker-queue worker) work))) - - (lock-release (worker-lock peer) (get-pthread-id)) ;; don't want to release our own lock. - #t] ;; stole work - [else ;; try a different peer - (lock-release (worker-lock worker) (get-pthread-id)) - (lock-release (worker-lock peer) (get-pthread-id)) - (loop (cdr q))]))] - [else (loop (cdr q))]))) +(define (run-future-in-worker f w) + (current-future f) + ;; If we didn't need to check custodians, could be just + ;; (call-with-continuation-prompt + ;; (lambda () (run-future f)) + ;; future-scheduler-prompt-tag + ;; void) + ;; But use an engine so we can periodically check that the future is + ;; still supposed to run. + ;; We take advantage of `current-atomic`, which would otherwise + ;; be unused, to disable interruptions. + (define e (make-engine (lambda () (run-future f)) + future-scheduler-prompt-tag + void + break-enabled-default-cell + #t)) + (let loop ([e e]) + (e TICKS + (lambda () + ;; Check that the future should still run + (when (and (custodian-shut-down? (future*-custodian f)) + (zero? (current-atomic))) + (lock-acquire (future*-lock f)) + (set-future*-state! f #f) + (on-transition-to-unfinished) + (future-suspend))) + (lambda (leftover-ticks result) + ;; Done --- completed or suspend (e.g., blocked) + (void)) + (lambda (e timeout?) + (loop e)))) + (log-future 'end-work (future*-id f)) + (current-future 'worker)) + +;; in atomic mode +(define (futures-sync-for-shutdown) + ;; Make sure any futures that are running in a future pthread + ;; have had a chance to notice a custodian shutdown or a + ;; future-scheduler shutdown. + ;; + ;; Move each 'running worker into the 'pending state: + (for ([w (in-list (scheduler-workers the-scheduler))]) + (box-cas! (worker-sync-state w) 'running 'pending)) + ;; A worker that transitions from 'pending to 'running or 'idle + ;; is guaranteed to not run a future chose custodian is + ;; shutdown or run any future if the worker is terminated + (for ([w (in-list (scheduler-workers the-scheduler))]) + (define bx (worker-sync-state w)) + (let loop () + (when (box-cas! bx 'pending 'pending) + (host:sleep 0.001) ; not much alternative to spinning + (loop))))) ;; ---------------------------------------- @@ -415,3 +586,25 @@ (define (mark-future-trace-end!) (void)) + +;; ---------------------------------------- + +;; When a future changes from a state where the main thread may be +;; waiting for it, then make sure there's a wakeup signal +(define (on-transition-to-unfinished) + (define me-f (current-future)) + (when (and me-f + (not (future*-would-be? me-f))) + (wakeup-this-place))) + +(define wakeup-this-place (lambda () (void))) +(define ensure-place-wakeup-handle (lambda () (void))) + +(define (set-place-future-procs! wakeup ensure) + (set! wakeup-this-place wakeup) + (set! ensure-place-wakeup-handle ensure)) + +;; tell "atomic.rkt" layer how to block: +(void (set-future-block! future-block)) + +(void (set-custodian-futures-sync! futures-sync-for-shutdown)) diff --git a/racket/src/thread/host.rkt b/racket/src/thread/host.rkt index 374784cbd9..12d071dad8 100644 --- a/racket/src/thread/host.rkt +++ b/racket/src/thread/host.rkt @@ -93,4 +93,6 @@ [make-mutex host:make-mutex] [mutex-acquire host:mutex-acquire] [mutex-release host:mutex-release] - threaded?) + threaded? + + continuation-current-primitive) diff --git a/racket/src/thread/lock.rkt b/racket/src/thread/lock.rkt deleted file mode 100644 index 0d0bde0653..0000000000 --- a/racket/src/thread/lock.rkt +++ /dev/null @@ -1,52 +0,0 @@ -#lang racket/base - -(require "internal-error.rkt") - -(provide with-lock - make-lock - lock-acquire - lock-release - own-lock?) - -(define-syntax-rule (with-lock (lock caller) expr ...) - (begin - (lock-acquire lock caller) - (begin0 - (let () expr ...) - (lock-release lock caller)))) - -(struct future-lock* (box owner)) - -(define (lock-owner lock) - (unbox (future-lock*-owner lock))) - -(define (make-lock) - (future-lock* (box 0) (box #f))) - -(define (lock-acquire lock caller [block? #t]) - (define box (future-lock*-box lock)) - (let loop () - (cond - [(and (= 0 (unbox box)) (box-cas! box 0 1)) ;; got lock - (unless (box-cas! (future-lock*-owner lock) #f caller) - (internal-error "Lock already has owner.")) - #t] - [block? - (loop)] - [else - #f]))) - -(define (lock-release lock caller) - (when (eq? caller (unbox (future-lock*-owner lock))) - (unless (box-cas! (future-lock*-owner lock) caller #f) - (internal-error "Failed to reset owner\n")) - (unless (box-cas! (future-lock*-box lock) 1 0) - (internal-error "Lock release failed\n")))) - -(define (own-lock? lock caller) - (and (eq? caller (unbox (future-lock*-owner lock))) - (begin0 - #t - (unless (= 1 (unbox (future-lock*-box lock))) - (internal-error "Caller 'owns' lock but lock is free."))))) - diff --git a/racket/src/thread/main.rkt b/racket/src/thread/main.rkt index acd5255c14..3ff15a49fb 100644 --- a/racket/src/thread/main.rkt +++ b/racket/src/thread/main.rkt @@ -28,6 +28,7 @@ "place.rkt" "place-message.rkt" "future.rkt" + "future-logging.rkt" "fsemaphore.rkt" "os-thread.rkt") @@ -188,11 +189,12 @@ would-be-future current-future future-block - future-wait current-future-prompt reset-future-logs-for-tracing! mark-future-trace-end! - + set-processor-count! + install-future-logging-procs! + fsemaphore? make-fsemaphore fsemaphore-post diff --git a/racket/src/thread/nested-thread.rkt b/racket/src/thread/nested-thread.rkt index 07340c4c47..2c9bb84d4a 100644 --- a/racket/src/thread/nested-thread.rkt +++ b/racket/src/thread/nested-thread.rkt @@ -2,6 +2,7 @@ (require "check.rkt" "atomic.rkt" "host.rkt" + "parameter.rkt" "thread.rkt" (except-in (submod "thread.rkt" scheduling) thread @@ -58,7 +59,7 @@ (engine-block))))) #:custodian cust))) (atomically - (set-thread-forward-break-to! (current-thread) t)) + (set-thread-forward-break-to! (current-thread/in-atomic) t)) (semaphore-post ready-sema) ; let the nested thread run ;; Wait for the nested thread to complete -- and any thread nested @@ -76,7 +77,7 @@ ;; killed or aborted to the original continuation (atomically - (set-thread-forward-break-to! (current-thread) #f)) + (set-thread-forward-break-to! (current-thread/in-atomic) #f)) ;; Propagate any leftover break, but give a propagated ;; exception priority over a break exception: diff --git a/racket/src/thread/parameter.rkt b/racket/src/thread/parameter.rkt index 8e64f50484..92a8bc1655 100644 --- a/racket/src/thread/parameter.rkt +++ b/racket/src/thread/parameter.rkt @@ -1,6 +1,24 @@ #lang racket/base (require "host.rkt") -(provide current-thread) +(provide current-atomic + current-thread/in-atomic + current-future) ; not the one exported to Racket; see "api.rkt" -(define current-thread (make-pthread-parameter #f)) +;; These definitions are specially recognized for Racket on +;; Chez Scheme and converted to use a virtual register. + +(define current-atomic (make-pthread-parameter 0)) + +;; The `current-thread` wrapper disallows access to this +;; pthread-local value in a future pthread: +(define current-thread/in-atomic (make-pthread-parameter #f)) + +;; Normally #f for a place's main pthread (running a Racket thread) +;; and non-#f for a future pthread, but can be a would-be future +;; in the main pthread +(define current-future (make-pthread-parameter #f)) + +;; Calling `(current-thread/in-atomic)` is faster than +;; `(current-thread)`, but it's only valid in a place's main pthread +;; --- not in a future thread. diff --git a/racket/src/thread/place.rkt b/racket/src/thread/place.rkt index 94f2cd00a8..c1e57f5040 100644 --- a/racket/src/thread/place.rkt +++ b/racket/src/thread/place.rkt @@ -17,6 +17,7 @@ "semaphore.rkt" "evt.rkt" "sandman.rkt" + (submod "future.rkt" for-place) "place-message.rkt") (provide dynamic-place @@ -121,6 +122,7 @@ (do-custodian-shutdown-all orig-cust) (for ([proc (in-list (place-post-shutdown new-place))]) (proc)) + (kill-future-scheduler) (host:mutex-acquire lock) (set-place-result! new-place result) (host:mutex-release lock) @@ -440,3 +442,10 @@ ;; in atomic mode (lambda (pl) (wakeup-waiting pl)))) + +(void (set-place-future-procs! + (lambda () + (place-has-activity! current-place)) + ;; in atomic mode + (lambda () + (ensure-wakeup-handle!)))) diff --git a/racket/src/thread/sandman-struct.rkt b/racket/src/thread/sandman-struct.rkt index 7bb8b7cad0..004fe2e5ff 100644 --- a/racket/src/thread/sandman-struct.rkt +++ b/racket/src/thread/sandman-struct.rkt @@ -29,14 +29,6 @@ do-merge-timeout ; -> do-extract-timeout ; -> - do-condition-wait ; set a thread to wait on a condition - - do-condition-poll ; reschedule awoken threads - - do-any-waiters? ; -> boolean - - lock - #;...) ; sandman implementations can add more methods #:prefab) diff --git a/racket/src/thread/sandman.rkt b/racket/src/thread/sandman.rkt index 62d753b1a8..c2a439bc83 100644 --- a/racket/src/thread/sandman.rkt +++ b/racket/src/thread/sandman.rkt @@ -41,9 +41,6 @@ sandman-wakeup sandman-any-sleepers? sandman-sleepers-external-events - sandman-condition-wait - sandman-condition-poll - sandman-any-waiters? current-sandman) @@ -87,31 +84,6 @@ (define (sandman-sleepers-external-events) ((sandman-do-sleepers-external-events the-sandman))) -;; in atomic mode -(define (sandman-condition-wait thread) - ((sandman-do-condition-wait the-sandman) thread)) - -;; in atomic mode -(define (sandman-condition-poll mode thread-wakeup) - ((sandman-do-condition-poll the-sandman) mode thread-wakeup)) - -;; in atomic mode -(define (sandman-any-waiters?) - ((sandman-do-any-waiters? the-sandman))) - -;; created simple lock here to avoid cycle in loading from using lock defined in future.rkt -(define (make-lock) - (box 0)) - -(define (lock-acquire box) - (let loop () - (unless (and (= 0 (unbox box)) (box-cas! box 0 1)) - (loop)))) - -(define (lock-release box) - (unless (box-cas! box 1 0) - (internal-error "Failed to release lock\n"))) - (define-place-local waiting-threads '()) (define-place-local awoken-threads '()) @@ -194,37 +166,7 @@ (min sleep-until timeout-at) timeout-at)) ;; extract-timeout - (lambda (sleep-until) sleep-until) - - ;; condition-wait - (lambda (t) - (lock-acquire (sandman-lock the-sandman)) - (set! waiting-threads (cons t waiting-threads)) - (lock-release (sandman-lock the-sandman)) - ;; awoken callback. for when thread is awoken - (lambda (root-thread) - (lock-acquire (sandman-lock the-sandman)) - (if (memq t waiting-threads) - (begin - (set! waiting-threads (remove t waiting-threads eq?)) - (set! awoken-threads (cons t awoken-threads))) - (internal-error "thread is not a member of waiting-threads\n")) - (lock-release (sandman-lock the-sandman)))) - - ;; condition-poll - (lambda (mode wakeup) - (lock-acquire (sandman-lock the-sandman)) - (define at awoken-threads) - (set! awoken-threads '()) - (lock-release (sandman-lock the-sandman)) - (for-each (lambda (t) - (wakeup t)) at)) - - ;; any waiters? - (lambda () - (or (not (null? waiting-threads)) (not (null? awoken-threads)))) - - (make-lock))) + (lambda (sleep-until) sleep-until))) (void (current-sandman the-default-sandman)) diff --git a/racket/src/thread/schedule.rkt b/racket/src/thread/schedule.rkt index d75d4f7177..d001a8c8b1 100644 --- a/racket/src/thread/schedule.rkt +++ b/racket/src/thread/schedule.rkt @@ -1,5 +1,6 @@ #lang racket/base -(require "place-local.rkt" +(require "config.rkt" + "place-local.rkt" "place-object.rkt" "atomic.rkt" "host.rkt" @@ -14,7 +15,8 @@ "future.rkt" "custodian.rkt" (submod "custodian.rkt" scheduling) - "pre-poll.rkt") + "pre-poll.rkt" + "future-logging.rkt") ;; Many scheduler details are implemented in "thread.rkt", but this ;; module handles the thread selection, thread swapping, and @@ -26,8 +28,6 @@ set-check-place-activity! thread-swap-count) -(define TICKS 100000) - ;; Initializes the thread system: (define (call-in-main-thread thunk) (make-initial-thread (lambda () @@ -64,6 +64,7 @@ (call-pre-poll-external-callbacks) (check-place-activity) (check-queued-custodian-shutdown) + (flush-future-log) (when (and (null? callbacks) (all-threads-poll-done?) (waiting-on-external-or-idle?)) @@ -83,7 +84,8 @@ (define e (thread-engine t)) (set-thread-engine! t 'running) (set-thread-sched-info! t #f) - (current-thread t) + (current-future (thread-future t)) + (current-thread/in-atomic t) (set-place-current-thread! current-place t) (set! thread-swap-count (add1 thread-swap-count)) (run-callbacks-in-engine @@ -101,8 +103,10 @@ (lambda args (start-implicit-atomic-mode) (accum-cpu-time! t #t) - (current-thread #f) + (set-thread-future! t #f) + (current-thread/in-atomic #f) (set-place-current-thread! current-place #f) + (current-future #f) (unless (zero? (current-atomic)) (internal-error "terminated in atomic mode!")) (thread-dead! t) @@ -115,7 +119,9 @@ (cond [(zero? (current-atomic)) (accum-cpu-time! t timeout?) - (current-thread #f) + (set-thread-future! t (current-future)) + (current-thread/in-atomic #f) + (current-future #f) (set-place-current-thread! current-place #f) (unless (eq? (thread-engine t) 'done) (set-thread-engine! t e)) @@ -137,7 +143,6 @@ #:custodian #f) (select-thread! callbacks)] [(and (not (sandman-any-sleepers?)) - (not (sandman-any-waiters?)) (not (any-idle-waiters?))) ;; all threads done or blocked (cond @@ -161,10 +166,6 @@ (lambda (t) (thread-reschedule! t) (set! did? #t))) - (sandman-condition-poll mode - (lambda (t) - (thread-reschedule! t) - (set! did? #t))) (when did? (thread-did-work!)) did?) diff --git a/racket/src/thread/semaphore.rkt b/racket/src/thread/semaphore.rkt index 5644c75eca..fd540560f3 100644 --- a/racket/src/thread/semaphore.rkt +++ b/racket/src/thread/semaphore.rkt @@ -139,7 +139,7 @@ (set-semaphore-count! s (sub1 c)) void] [else - (define w (current-thread)) + (define w (current-thread/in-atomic)) (define n (queue-add! s w)) (set-semaphore-count! s -1) ; so CAS not tried for `semaphore-post` (waiter-suspend! diff --git a/racket/src/thread/sync.rkt b/racket/src/thread/sync.rkt index 7d29502fe1..f8dfdc84da 100644 --- a/racket/src/thread/sync.rkt +++ b/racket/src/thread/sync.rkt @@ -7,6 +7,7 @@ "channel.rkt" (submod "channel.rkt" for-sync) "thread.rkt" + "parameter.rkt" (only-in (submod "thread.rkt" scheduling) thread-descheduled?) "schedule-info.rkt" @@ -149,7 +150,7 @@ (thread-pop-suspend+resume-callbacks!) (thread-pop-kill-callback!) (when local-break-cell - (thread-remove-ignored-break-cell! (current-thread) local-break-cell)) + (thread-remove-ignored-break-cell! (current-thread/in-atomic) local-break-cell)) ;; On escape, post nacks, etc.: (syncing-abandon! s))))) @@ -587,7 +588,7 @@ ;; don't suspend after all void] [else - (define t (current-thread)) + (define t (current-thread/in-atomic)) (set-syncing-wakeup! s (lambda () diff --git a/racket/src/thread/thread.rkt b/racket/src/thread/thread.rkt index 24996afb1e..54a4f43f32 100644 --- a/racket/src/thread/thread.rkt +++ b/racket/src/thread/thread.rkt @@ -54,15 +54,14 @@ thread-ignore-break-cell! thread-remove-ignored-break-cell! + + thread-representative-custodian thread-send thread-receive thread-try-receive thread-rewind-receive - thread-receive-evt - - thread-condition-awaken - thread-condition-wait) + thread-receive-evt) ;; Exports needed by "schedule.rkt": (module* scheduling #f @@ -93,6 +92,9 @@ break>? thread-did-work!)) +(module* for-future #f + (provide break-enabled-default-cell)) + ;; ---------------------------------------- (struct thread node (name @@ -125,6 +127,8 @@ [mailbox-wakeup #:mutable] ; callback to trigger (in atomic mode) on `thread-send` [cpu-time #:mutable] ; accumulates CPU time in milliseconds + + [future #:mutable] ; current would-be future [condition-wakeup #:mutable]) #:property prop:waiter @@ -137,6 +141,10 @@ (define-place-local root-thread #f) +(define (current-thread) + (future-barrier) + (current-thread/in-atomic)) + ;; ---------------------------------------- ;; Thread creation @@ -152,6 +160,7 @@ (current-thread-group))) (define e (make-engine proc (default-continuation-prompt-tag) + #f (if (or initial? at-root?) break-enabled-default-cell (current-break-enabled-cell)) @@ -190,6 +199,8 @@ 0 ; cpu-time + #f ; future + void ; condition-wakeup )) ((atomically @@ -269,13 +280,13 @@ ;; Called in atomic mode: (define (thread-push-kill-callback! cb) (assert-atomic-mode) - (define t (current-thread)) + (define t (current-thread/in-atomic)) (set-thread-kill-callbacks! t (cons cb (thread-kill-callbacks t)))) ;; Called in atomic mode: (define (thread-pop-kill-callback!) (assert-atomic-mode) - (define t (current-thread)) + (define t (current-thread/in-atomic)) (set-thread-kill-callbacks! t (cdr (thread-kill-callbacks t)))) (define/who (kill-thread t) @@ -292,7 +303,7 @@ [else (atomically (do-kill-thread t)) - (when (eq? t (current-thread)) + (when (eq? t (current-thread/in-atomic)) (when (eq? t root-thread) (force-exit 0)) (engine-block)) @@ -318,6 +329,12 @@ [else (do-kill-thread t)]))) +(define (thread-representative-custodian t) + (atomically + (define cs (thread-custodian-references t)) + (and (pair? cs) + (custodian-reference->custodian (car cs))))) + ;; Called in atomic mode: (define (run-kill-callbacks! t) (assert-atomic-mode) @@ -407,7 +424,7 @@ (thread-group-remove! (thread-parent t) t) (when timeout-at (add-to-sleeping-threads! t (sandman-merge-timeout #f timeout-at))) - (when (eq? t (current-thread)) + (when (eq? t (current-thread/in-atomic)) (thread-did-work!)) ;; Beware that this thunk is not used when a thread is descheduled ;; by a custodian callback @@ -592,14 +609,14 @@ ;; Given callbacks are also called in atomic mode (define (thread-push-suspend+resume-callbacks! s-cb r-cb) (assert-atomic-mode) - (define t (current-thread)) + (define t (current-thread/in-atomic)) (set-thread-suspend+resume-callbacks! t (cons (cons s-cb r-cb) (thread-suspend+resume-callbacks t)))) ;; Called in atomic mode: (define (thread-pop-suspend+resume-callbacks!) (assert-atomic-mode) - (define t (current-thread)) + (define t (current-thread/in-atomic)) (set-thread-suspend+resume-callbacks! t (cdr (thread-suspend+resume-callbacks t)))) ;; Called in atomic mode: @@ -675,7 +692,7 @@ (schedule-info-did-work? sched-info)) (thread-did-work!)] [else (thread-did-no-work!)]) - (set-thread-sched-info! (current-thread) sched-info)) + (set-thread-sched-info! (current-thread/in-atomic) sched-info)) (engine-block)) ;; Sleep for a while @@ -752,32 +769,36 @@ ;; changed, or when a thread is just swapped in, then ;; `check-for-break` should be called. (define (check-for-break) - (define t (current-thread)) - (when (and - ;; allow `check-for-break` before threads are running: - t - ;; quick pre-test before going atomic: - (thread-pending-break t)) - ((atomically - (cond - [(and (thread-pending-break t) - (break-enabled) - (not (thread-ignore-break-cell? t (current-break-enabled-cell))) - (>= (add1 (current-breakable-atomic)) (current-atomic))) - (define exn:break* (case (thread-pending-break t) - [(hang-up) exn:break:hang-up/non-engine] - [(terminate) exn:break:terminate/non-engine] - [else exn:break/non-engine])) - (set-thread-pending-break! t #f) - (lambda () - ;; Out of atomic mode - (call-with-escape-continuation - (lambda (k) - (raise (exn:break* - "user break" - (current-continuation-marks) - k)))))] - [else void]))))) + (unless (current-future) + (define t (current-thread)) + (when (and + ;; allow `check-for-break` before threads are running: + t + ;; quick pre-test before going atomic: + (thread-pending-break t)) + ((atomically + (cond + [(and (thread-pending-break t) + ;; check atomicity early to avoid nested break checks, + ;; since `continuation-mark-set-first` inside `break-enabled` + ;; can take a while + (>= (add1 (current-breakable-atomic)) (current-atomic)) + (break-enabled) + (not (thread-ignore-break-cell? t (current-break-enabled-cell)))) + (define exn:break* (case (thread-pending-break t) + [(hang-up) exn:break:hang-up/non-engine] + [(terminate) exn:break:terminate/non-engine] + [else exn:break/non-engine])) + (set-thread-pending-break! t #f) + (lambda () + ;; Out of atomic mode + (call-with-escape-continuation + (lambda (k) + (raise (exn:break* + "user break" + (current-continuation-marks) + k)))))] + [else void])))))) ;; The break-enabled transition hook is called by the host ;; system when a control transfer (such as a continuation jump) @@ -912,36 +933,9 @@ [else (lambda () #f)])))) -(define/who (thread-condition-awaken thd) - (check who thread? thd) - ((atomically - (cond - [(not (thread-dead? thd)) - (define wakeup (thread-condition-wakeup thd)) - (set-thread-condition-wakeup! thd void) - wakeup] ;; should be called outside of atomic mode? - [else - (lambda () #f)])))) - -(define (thread-condition-wait lock-release) - ((atomically - (define t (current-thread)) - (set-thread-condition-wakeup! t (sandman-condition-wait t)) - (lock-release) - (define do-yield - (thread-deschedule! t - #f - void - (lambda () - ;; try again? - (do-yield)) - )) - (lambda () - (do-yield))))) - (define (thread-receive) ((atomically - (define t (current-thread)) + (define t (current-thread/in-atomic)) (cond [(is-mail? t) (define v (dequeue-mail! t)) @@ -966,7 +960,7 @@ (define (thread-try-receive) (atomically - (define t (current-thread)) + (define t (current-thread/in-atomic)) (if (is-mail? t) (dequeue-mail! t) #f))) @@ -974,7 +968,7 @@ (define/who (thread-rewind-receive lst) (check who list? lst) (atomically - (define t (current-thread)) + (define t (current-thread/in-atomic)) (for-each (lambda (msg) (push-mail! t msg)) lst))) @@ -986,7 +980,7 @@ ;; in atomic mode: (lambda (self poll-ctx) (assert-atomic-mode) - (define t (current-thread)) + (define t (current-thread/in-atomic)) (cond [(is-mail? t) (values (list self) #f)] [(poll-ctx-poll? poll-ctx) (values #f self)]