diff --git a/racket/src/cs/demo/control.ss b/racket/src/cs/demo/control.ss index c9bbf8a013..26acc619a8 100644 --- a/racket/src/cs/demo/control.ss +++ b/racket/src/cs/demo/control.ss @@ -434,11 +434,15 @@ (define engine-tag (default-continuation-prompt-tag)) (define e (make-engine (lambda () 'done) engine-tag #f #f #f)) -(check (e 100 void (lambda (e results remain-ticks) results)) +(check (call-with-engine-completion + (lambda (done) + (e 100 void (lambda (e results remain-ticks) (done results))))) '(done)) (define e-forever (make-engine (lambda () (let loop () (loop))) engine-tag #f #f #f)) -(check (procedure? (e-forever 10 void (lambda (e results remain-ticks) e))) +(check (procedure? (call-with-engine-completion + (lambda (done) + (e-forever 10 void (lambda (e results remain-ticks) (done e)))))) #t) (define e-10 (make-engine (lambda () @@ -453,13 +457,15 @@ engine-tag #f #f #f)) (check (let ([started 0]) - (let loop ([e e-10] [n 0]) - (e 100 - (lambda () (set! started (add1 started))) - (lambda (e results remain) - (if e - (loop e (add1 n)) - (list results n started)))))) + (call-with-engine-completion + (lambda (done) + (let loop ([e e-10] [n 0]) + (e 100 + (lambda () (set! started (add1 started))) + (lambda (e results remain) + (if e + (loop e (add1 n)) + (done (list results n started))))))))) '((done) 10 11)) ;; Check that winders are not run on engine swaps: @@ -478,13 +484,15 @@ (lambda () (set! post (add1 post))))]))) engine-tag #f #f #f)]) - (check (let loop ([e e-10/dw] [n 0]) - (e 200 - void - (lambda (e results remain) - (if e - (loop e (add1 n)) - (vector results post n))))) + (check (call-with-engine-completion + (lambda (done) + (let loop ([e e-10/dw] [n 0]) + (e 200 + void + (lambda (e results remain) + (if e + (loop e (add1 n)) + (done (vector results post n)))))))) '#((done 10 0) 10 10)))) ;; ---------------------------------------- @@ -502,14 +510,18 @@ (make-engine gen engine-tag #f #f #f) (thread-cell-ref ut) (thread-cell-ref pt))) - (define l1 ((make-engine gen engine-tag #f #f #f) - 100 - void - (lambda (e results remain) (car results)))) - (define l2 ((list-ref l1 2) - 100 - void - (lambda (e results remain) (car results)))) + (define l1 (call-with-engine-completion + (lambda (done) + ((make-engine gen engine-tag #f #f #f) + 100 + void + (lambda (e results remain) (done (car results))))))) + (define l2 (call-with-engine-completion + (lambda (done) + ((list-ref l1 2) + 100 + void + (lambda (e results remain) (done (car results))))))) (check (list-ref l1 0) 1) (check (list-ref l1 1) 100) (check (list-ref l1 3) 2) @@ -528,7 +540,10 @@ (extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set) (make-engine (lambda () (|#%app| my-param)) engine-tag #f #f #f))]) (check (|#%app| my-param) 'init) - (check (e 1000 void (lambda (e vs remain) vs)) '(set))) + (check (call-with-engine-completion + (lambda (done) + (e 1000 void (lambda (e vs remain) (done vs))))) + '(set))) (let ([also-my-param (make-derived-parameter my-param (lambda (v) (list v)) @@ -626,16 +641,19 @@ #f #f)) (check (let ([prefixes 0]) - (let loop ([e e-sw] [i 0]) - (e 100 - (lambda () (set! prefixes (add1 prefixes))) - (lambda (e v remain) - (if e - (loop e (add1 i)) - (list (> i 2) - (= prefixes (add1 i)) - (- (car v) i) - (- (cadr v) i))))))) + (call-with-engine-completion + (lambda (done) + (let loop ([e e-sw] [i 0]) + (e 100 + (lambda () (set! prefixes (add1 prefixes))) + (lambda (e v remain) + (if e + (loop e (add1 i)) + (done + (list (> i 2) + (= prefixes (add1 i)) + (- (car v) i) + (- (cadr v) i)))))))))) '(#t #t 1 0)) ;; ---------------------------------------- diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index f3ecb28840..cca5e0664c 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -47,16 +47,18 @@ chaperone-continuation-mark-key call-with-system-wind ; not exported to Racket + ;; not exported to Racket: make-engine engine-block engine-timeout engine-return - current-engine-state ; not exported to Racket - set-ctl-c-handler! ; not exported to Racket - get-ctl-c-handler ; not exported to Racket - set-scheduler-lock-callbacks! ; not exported to Racket - set-scheduler-atomicity-callbacks! ; not exported to Racket - set-engine-exit-handler! ; not exported to Racket + call-with-engine-completion + current-engine-state + set-ctl-c-handler! + get-ctl-c-handler + set-scheduler-lock-callbacks! + set-scheduler-atomicity-callbacks! + set-engine-exit-handler! make-thread-cell thread-cell? diff --git a/racket/src/cs/rumble/control.ss b/racket/src/cs/rumble/control.ss index 0e0b2c3194..8ec12ce4f5 100644 --- a/racket/src/cs/rumble/control.ss +++ b/racket/src/cs/rumble/control.ss @@ -1908,22 +1908,23 @@ ;; applying an old continuation, but does not run winders; ;; this operation makes sense for thread or engine context ;; switches -(define (swap-metacontinuation saved proc) +(define (call-with-current-metacontinuation proc) (cond [(current-system-wind-start-k) - => (lambda (k) (swap-metacontinuation-with-system-wind saved proc k))] + => (lambda (k) (call-with-current-metacontinuation-with-system-wind proc k))] [else (call-in-empty-metacontinuation-frame-for-swap (lambda () - (let ([now-saved (make-saved-metacontinuation - (current-metacontinuation) - (#%$current-winders) - (current-exception-state))]) - (current-metacontinuation (saved-metacontinuation-mc saved)) - (#%$current-winders (saved-metacontinuation-system-winders saved)) - (current-exception-state (saved-metacontinuation-exn-state saved)) - (set! saved #f) ; break link for space safety - (proc now-saved))))])) + (proc (make-saved-metacontinuation + (current-metacontinuation) + (#%$current-winders) + (current-exception-state)))))])) + +(define (apply-meta-continuation saved k) + (current-metacontinuation (saved-metacontinuation-mc saved)) + (#%$current-winders (saved-metacontinuation-system-winders saved)) + (current-exception-state (saved-metacontinuation-exn-state saved)) + (k)) ;; ---------------------------------------- @@ -1951,14 +1952,14 @@ (#%apply values args))))) (lambda () (current-system-wind-start-k #f))))))) -(define (swap-metacontinuation-with-system-wind saved proc start-k) +(define (call-with-current-metacontinuation-with-system-wind proc start-k) (current-system-wind-start-k #f) (call/cc (lambda (system-wind-k) ; continuation with system `dynamic-wind` behavior ;; escape to starting point, running winders, before ;; capturing the rest of the metacontinuation: (start-k (lambda () - (let ([prefix (swap-metacontinuation saved proc)]) + (let ([prefix (call-with-current-metacontinuation proc)]) (current-system-wind-start-k start-k) (system-wind-k prefix))))))) diff --git a/racket/src/cs/rumble/engine.ss b/racket/src/cs/rumble/engine.ss index 7110fceb15..2cb8c5d2c3 100644 --- a/racket/src/cs/rumble/engine.ss +++ b/racket/src/cs/rumble/engine.ss @@ -1,13 +1,15 @@ -;; Like Chez's engine API, but +;; Inspried by Chez's engine API, but ;; - works with delimited-continuations extensions in "control.ss" ;; - doesn't run winders when suspending or resuming an engine ;; - accepts an extra "prefix" argument to run code within an engine ;; just before resuming the engine's continuation +;; - supports direct engine-to-engine transition instead of a +;; back-and-forth between an engine scheduler ;; Don't mix Chez engines with this implementation, because we take ;; over the timer. -(define-record engine-state (mc complete-or-expire thread-cell-values init-break-enabled-cell reset-handler)) +(define-record engine-state (complete-or-expire thread-cell-values init-break-enabled-cell)) (define-virtual-register current-engine-state #f) @@ -24,13 +26,17 @@ (define (set-engine-exit-handler! proc) (set! engine-exit proc)) -;; An engine is repesented by a procedure that takes thee arguments: +;; An engine is repesented by a procedure that takes three arguments, where the +;; procedure must be tail-called either within `call-with-engine-completion` or +;; in an engine call's `complete-or-expire` callback: ;; * ticks: number of ticks to run before exire ;; * prefix: thunk to invoke just before continuing (counts toward ticks) -;; * complete-or-expire: callback that accepts 3 arguments: +;; * complete-or-expire: callback that accepts 3 arguments, ;; - engine or #f: an engine if the original `thunk` has not yet returned ;; - list or #f: a list if the original `thunk` has returned values ;; - remining-ticks: a number of ticks leftover due to complete or `(engine-block)` +;; where the callback must end by tail-calling another engine procedure or +;; the procedure provided by `call-with-engine-completion` (define (make-engine thunk ; can return any number of values prompt-tag ; prompt to wrap around call to `thunk` abort-handler ; handler for that prompt @@ -63,6 +69,8 @@ (new-engine-thread-cell-values)) init-break-enabled-cell))) +;; Internal: creates an engine procedure to be called within `call-with-engine-completion` +;; or from an enginer procedure's `complete-or-expire` callback (define (create-engine to-saves proc thread-cell-values init-break-enabled-cell) (case-lambda ;; For `continuation-marks`: @@ -70,16 +78,31 @@ ;; Normal engine case: [(ticks prefix complete-or-expire) (start-implicit-uninterrupted 'create) - ((swap-metacontinuation - to-saves - (lambda (saves) - (current-engine-state (make-engine-state saves complete-or-expire thread-cell-values - init-break-enabled-cell (reset-handler))) - (reset-handler engine-reset-handler) - (timer-interrupt-handler engine-block-via-timer) - (end-implicit-uninterrupted 'create) - (set-timer ticks) - (proc prefix))))])) + (apply-meta-continuation + to-saves + (lambda () + (current-engine-state + (make-engine-state complete-or-expire thread-cell-values init-break-enabled-cell)) + (reset-handler engine-reset-handler) + (timer-interrupt-handler engine-block-via-timer) + (end-implicit-uninterrupted 'create) + (set-timer ticks) + (proc prefix)))])) + +;; Captures the current metacontinuation as an engine runner, and calls `proc` +;; with a procedure to be tail-called from an engine procedure's `complete-or-expire` +;; callback to return to the metacontinuation +(define (call-with-engine-completion proc) + (call-with-current-metacontinuation + (lambda (saves) + (let ([rh (reset-handler)]) + (proc (lambda args + (current-engine-state #f) + (apply-meta-continuation + saves + (lambda () + (reset-handler rh) + (#%apply values args))))))))) (define (engine-reset-handler) (end-uninterrupted 'reset) @@ -105,24 +128,19 @@ (set-timer 0))]) (unless es (error 'engine-block "not currently running an engine")) - (reset-handler (engine-state-reset-handler es)) (start-implicit-uninterrupted 'block) - ;; Extra pair of parens around swap is to apply a prefix - ;; function on swapping back in: - ((swap-metacontinuation - (engine-state-mc es) - (lambda (saves) - (end-implicit-uninterrupted 'block) - (current-engine-state #f) - (lambda () ; returned to the `swap-continuation` in `create-engine` - ((engine-state-complete-or-expire es) - (create-engine - saves - (lambda (prefix) prefix) ; returns `prefix` to the above "((" - (engine-state-thread-cell-values es) - (engine-state-init-break-enabled-cell es)) - #f - remain-ticks))))))] + (call-with-current-metacontinuation + (lambda (saves) + (end-implicit-uninterrupted 'block) + (current-engine-state #f) + ((engine-state-complete-or-expire es) + (create-engine + saves + (lambda (prefix) (prefix)) + (engine-state-thread-cell-values es) + (engine-state-init-break-enabled-cell es)) + #f + remain-ticks))))] [() (engine-block #f)])) (define (engine-block/timeout) @@ -145,16 +163,13 @@ (let ([es (current-engine-state)]) (unless es (error 'engine-return "not currently running an engine")) - (reset-handler (engine-state-reset-handler es)) (let ([remain-ticks (set-timer 0)]) - (start-implicit-uninterrupted 'return) - (swap-metacontinuation - (engine-state-mc es) - (lambda (saves) + (start-implicit-uninterrupted 'block) + (call-with-current-metacontinuation + (lambda (ignored-saves) + (end-implicit-uninterrupted 'block) (current-engine-state #f) - (end-implicit-uninterrupted 'return) - (lambda () ; returned to the `swap-continuation` in `create-engine` - ((engine-state-complete-or-expire es) #f results remain-ticks))))))) + ((engine-state-complete-or-expire es) #f results remain-ticks)))))) (define (make-empty-thread-cell-values) (make-ephemeron-eq-hashtable)) diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index b804598ad6..0ca6faff69 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -13,6 +13,7 @@ [engine-timeout rumble:engine-timeout] [engine-return rumble:engine-return] [current-engine-state rumble:current-engine-state] + [call-with-engine-completion rumble:call-with-engine-completion] [make-condition rumble:make-condition] [condition-wait rumble:condition-wait] [condition-signal rumble:condition-signal] @@ -132,6 +133,7 @@ 'engine-timeout rumble:engine-timeout 'engine-return rumble:engine-return 'current-engine-state (lambda (v) (rumble:current-engine-state v)) + 'call-with-engine-completion rumble:call-with-engine-completion 'set-ctl-c-handler! rumble:set-ctl-c-handler! 'poll-will-executors poll-will-executors 'make-will-executor rumble:make-will-executor diff --git a/racket/src/thread/bootstrap.rkt b/racket/src/thread/bootstrap.rkt index f1800e5179..9c275651b0 100644 --- a/racket/src/thread/bootstrap.rkt +++ b/racket/src/thread/bootstrap.rkt @@ -96,6 +96,9 @@ (define (engine-timeout) (void)) +(define (call-with-engine-completion proc) + (proc values)) + (define ctl-c-handler #f) (define (set-ctl-c-handler! proc) @@ -233,6 +236,7 @@ 'engine-timeout engine-timeout 'engine-return (lambda args (error "engine-return: not ready")) + 'call-with-engine-completion call-with-engine-completion 'current-process-milliseconds current-process-milliseconds 'set-ctl-c-handler! set-ctl-c-handler! 'set-break-enabled-transition-hook! void diff --git a/racket/src/thread/future.rkt b/racket/src/thread/future.rkt index e85a970bf4..70e72866d4 100644 --- a/racket/src/thread/future.rkt +++ b/racket/src/thread/future.rkt @@ -564,28 +564,30 @@ break-enabled-default-cell #t)) (current-atomic (add1 (current-atomic))) - (let loop ([e e]) - (e TICKS - (lambda () - ;; Check whether the main pthread wants to know we're here - (when (and (zero? (current-atomic)) - (worker-pinged? w)) - (host:mutex-acquire (scheduler-mutex (current-scheduler))) - (check-in w) - (host:mutex-release (scheduler-mutex (current-scheduler)))) - ;; Check that the future should still run - (when (and (custodian-shut-down?/other-pthread (future*-custodian f)) - (zero? (current-atomic))) - (lock-acquire (future*-lock f)) - (set-future*-state! f #f) - (on-transition-to-unfinished) - (future-suspend))) - (lambda (e results leftover-ticks) - (cond - [e (loop e)] - [else - ;; Done --- completed or suspend (e.g., blocked) - (void)])))) + (call-with-engine-completion + (lambda (done) + (let loop ([e e]) + (e TICKS + (lambda () + ;; Check whether the main pthread wants to know we're here + (when (and (zero? (current-atomic)) + (worker-pinged? w)) + (host:mutex-acquire (scheduler-mutex (current-scheduler))) + (check-in w) + (host:mutex-release (scheduler-mutex (current-scheduler)))) + ;; Check that the future should still run + (when (and (custodian-shut-down?/other-pthread (future*-custodian f)) + (zero? (current-atomic))) + (lock-acquire (future*-lock f)) + (set-future*-state! f #f) + (on-transition-to-unfinished) + (future-suspend))) + (lambda (e results leftover-ticks) + (cond + [e (loop e)] + [else + ;; Done --- completed or suspend (e.g., blocked) + (done (void))])))))) (log-future 'end-work (future*-id f)) (current-future 'worker) (set-box! (worker-current-future-box w) #f)) diff --git a/racket/src/thread/host.rkt b/racket/src/thread/host.rkt index 12d071dad8..f511fd90cd 100644 --- a/racket/src/thread/host.rkt +++ b/racket/src/thread/host.rkt @@ -37,6 +37,7 @@ engine-timeout engine-return current-engine-state + call-with-engine-completion current-process-milliseconds set-ctl-c-handler! set-break-enabled-transition-hook!