cs & threads: make thread swapping more direct
Adjust the internal engine protocol to avoid a jump from a starting engine (representing a thread) to a scheduler outside of an engine to a target engine (for a swapped-ni thread); instead, jump from the first engine to the target, effectively running the scheduler within the starting engine's context.
This commit is contained in:
parent
f93f959506
commit
e970a9f882
|
@ -434,11 +434,15 @@
|
|||
(define engine-tag (default-continuation-prompt-tag))
|
||||
|
||||
(define e (make-engine (lambda () 'done) engine-tag #f #f #f))
|
||||
(check (e 100 void (lambda (e results remain-ticks) results))
|
||||
(check (call-with-engine-completion
|
||||
(lambda (done)
|
||||
(e 100 void (lambda (e results remain-ticks) (done results)))))
|
||||
'(done))
|
||||
|
||||
(define e-forever (make-engine (lambda () (let loop () (loop))) engine-tag #f #f #f))
|
||||
(check (procedure? (e-forever 10 void (lambda (e results remain-ticks) e)))
|
||||
(check (procedure? (call-with-engine-completion
|
||||
(lambda (done)
|
||||
(e-forever 10 void (lambda (e results remain-ticks) (done e))))))
|
||||
#t)
|
||||
|
||||
(define e-10 (make-engine (lambda ()
|
||||
|
@ -453,13 +457,15 @@
|
|||
engine-tag #f
|
||||
#f #f))
|
||||
(check (let ([started 0])
|
||||
(let loop ([e e-10] [n 0])
|
||||
(e 100
|
||||
(lambda () (set! started (add1 started)))
|
||||
(lambda (e results remain)
|
||||
(if e
|
||||
(loop e (add1 n))
|
||||
(list results n started))))))
|
||||
(call-with-engine-completion
|
||||
(lambda (done)
|
||||
(let loop ([e e-10] [n 0])
|
||||
(e 100
|
||||
(lambda () (set! started (add1 started)))
|
||||
(lambda (e results remain)
|
||||
(if e
|
||||
(loop e (add1 n))
|
||||
(done (list results n started)))))))))
|
||||
'((done) 10 11))
|
||||
|
||||
;; Check that winders are not run on engine swaps:
|
||||
|
@ -478,13 +484,15 @@
|
|||
(lambda () (set! post (add1 post))))])))
|
||||
engine-tag #f
|
||||
#f #f)])
|
||||
(check (let loop ([e e-10/dw] [n 0])
|
||||
(e 200
|
||||
void
|
||||
(lambda (e results remain)
|
||||
(if e
|
||||
(loop e (add1 n))
|
||||
(vector results post n)))))
|
||||
(check (call-with-engine-completion
|
||||
(lambda (done)
|
||||
(let loop ([e e-10/dw] [n 0])
|
||||
(e 200
|
||||
void
|
||||
(lambda (e results remain)
|
||||
(if e
|
||||
(loop e (add1 n))
|
||||
(done (vector results post n))))))))
|
||||
'#((done 10 0) 10 10))))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
@ -502,14 +510,18 @@
|
|||
(make-engine gen engine-tag #f #f #f)
|
||||
(thread-cell-ref ut)
|
||||
(thread-cell-ref pt)))
|
||||
(define l1 ((make-engine gen engine-tag #f #f #f)
|
||||
100
|
||||
void
|
||||
(lambda (e results remain) (car results))))
|
||||
(define l2 ((list-ref l1 2)
|
||||
100
|
||||
void
|
||||
(lambda (e results remain) (car results))))
|
||||
(define l1 (call-with-engine-completion
|
||||
(lambda (done)
|
||||
((make-engine gen engine-tag #f #f #f)
|
||||
100
|
||||
void
|
||||
(lambda (e results remain) (done (car results)))))))
|
||||
(define l2 (call-with-engine-completion
|
||||
(lambda (done)
|
||||
((list-ref l1 2)
|
||||
100
|
||||
void
|
||||
(lambda (e results remain) (done (car results)))))))
|
||||
(check (list-ref l1 0) 1)
|
||||
(check (list-ref l1 1) 100)
|
||||
(check (list-ref l1 3) 2)
|
||||
|
@ -528,7 +540,10 @@
|
|||
(extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set)
|
||||
(make-engine (lambda () (|#%app| my-param)) engine-tag #f #f #f))])
|
||||
(check (|#%app| my-param) 'init)
|
||||
(check (e 1000 void (lambda (e vs remain) vs)) '(set)))
|
||||
(check (call-with-engine-completion
|
||||
(lambda (done)
|
||||
(e 1000 void (lambda (e vs remain) (done vs)))))
|
||||
'(set)))
|
||||
|
||||
(let ([also-my-param (make-derived-parameter my-param
|
||||
(lambda (v) (list v))
|
||||
|
@ -626,16 +641,19 @@
|
|||
#f #f))
|
||||
|
||||
(check (let ([prefixes 0])
|
||||
(let loop ([e e-sw] [i 0])
|
||||
(e 100
|
||||
(lambda () (set! prefixes (add1 prefixes)))
|
||||
(lambda (e v remain)
|
||||
(if e
|
||||
(loop e (add1 i))
|
||||
(list (> i 2)
|
||||
(= prefixes (add1 i))
|
||||
(- (car v) i)
|
||||
(- (cadr v) i)))))))
|
||||
(call-with-engine-completion
|
||||
(lambda (done)
|
||||
(let loop ([e e-sw] [i 0])
|
||||
(e 100
|
||||
(lambda () (set! prefixes (add1 prefixes)))
|
||||
(lambda (e v remain)
|
||||
(if e
|
||||
(loop e (add1 i))
|
||||
(done
|
||||
(list (> i 2)
|
||||
(= prefixes (add1 i))
|
||||
(- (car v) i)
|
||||
(- (cadr v) i))))))))))
|
||||
'(#t #t 1 0))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
|
|
@ -47,16 +47,18 @@
|
|||
chaperone-continuation-mark-key
|
||||
call-with-system-wind ; not exported to Racket
|
||||
|
||||
;; not exported to Racket:
|
||||
make-engine
|
||||
engine-block
|
||||
engine-timeout
|
||||
engine-return
|
||||
current-engine-state ; not exported to Racket
|
||||
set-ctl-c-handler! ; not exported to Racket
|
||||
get-ctl-c-handler ; not exported to Racket
|
||||
set-scheduler-lock-callbacks! ; not exported to Racket
|
||||
set-scheduler-atomicity-callbacks! ; not exported to Racket
|
||||
set-engine-exit-handler! ; not exported to Racket
|
||||
call-with-engine-completion
|
||||
current-engine-state
|
||||
set-ctl-c-handler!
|
||||
get-ctl-c-handler
|
||||
set-scheduler-lock-callbacks!
|
||||
set-scheduler-atomicity-callbacks!
|
||||
set-engine-exit-handler!
|
||||
|
||||
make-thread-cell
|
||||
thread-cell?
|
||||
|
|
|
@ -1908,22 +1908,23 @@
|
|||
;; applying an old continuation, but does not run winders;
|
||||
;; this operation makes sense for thread or engine context
|
||||
;; switches
|
||||
(define (swap-metacontinuation saved proc)
|
||||
(define (call-with-current-metacontinuation proc)
|
||||
(cond
|
||||
[(current-system-wind-start-k)
|
||||
=> (lambda (k) (swap-metacontinuation-with-system-wind saved proc k))]
|
||||
=> (lambda (k) (call-with-current-metacontinuation-with-system-wind proc k))]
|
||||
[else
|
||||
(call-in-empty-metacontinuation-frame-for-swap
|
||||
(lambda ()
|
||||
(let ([now-saved (make-saved-metacontinuation
|
||||
(current-metacontinuation)
|
||||
(#%$current-winders)
|
||||
(current-exception-state))])
|
||||
(current-metacontinuation (saved-metacontinuation-mc saved))
|
||||
(#%$current-winders (saved-metacontinuation-system-winders saved))
|
||||
(current-exception-state (saved-metacontinuation-exn-state saved))
|
||||
(set! saved #f) ; break link for space safety
|
||||
(proc now-saved))))]))
|
||||
(proc (make-saved-metacontinuation
|
||||
(current-metacontinuation)
|
||||
(#%$current-winders)
|
||||
(current-exception-state)))))]))
|
||||
|
||||
(define (apply-meta-continuation saved k)
|
||||
(current-metacontinuation (saved-metacontinuation-mc saved))
|
||||
(#%$current-winders (saved-metacontinuation-system-winders saved))
|
||||
(current-exception-state (saved-metacontinuation-exn-state saved))
|
||||
(k))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
|
@ -1951,14 +1952,14 @@
|
|||
(#%apply values args)))))
|
||||
(lambda () (current-system-wind-start-k #f)))))))
|
||||
|
||||
(define (swap-metacontinuation-with-system-wind saved proc start-k)
|
||||
(define (call-with-current-metacontinuation-with-system-wind proc start-k)
|
||||
(current-system-wind-start-k #f)
|
||||
(call/cc
|
||||
(lambda (system-wind-k) ; continuation with system `dynamic-wind` behavior
|
||||
;; escape to starting point, running winders, before
|
||||
;; capturing the rest of the metacontinuation:
|
||||
(start-k (lambda ()
|
||||
(let ([prefix (swap-metacontinuation saved proc)])
|
||||
(let ([prefix (call-with-current-metacontinuation proc)])
|
||||
(current-system-wind-start-k start-k)
|
||||
(system-wind-k prefix)))))))
|
||||
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
;; Like Chez's engine API, but
|
||||
;; Inspried by Chez's engine API, but
|
||||
;; - works with delimited-continuations extensions in "control.ss"
|
||||
;; - doesn't run winders when suspending or resuming an engine
|
||||
;; - accepts an extra "prefix" argument to run code within an engine
|
||||
;; just before resuming the engine's continuation
|
||||
;; - supports direct engine-to-engine transition instead of a
|
||||
;; back-and-forth between an engine scheduler
|
||||
|
||||
;; Don't mix Chez engines with this implementation, because we take
|
||||
;; over the timer.
|
||||
|
||||
(define-record engine-state (mc complete-or-expire thread-cell-values init-break-enabled-cell reset-handler))
|
||||
(define-record engine-state (complete-or-expire thread-cell-values init-break-enabled-cell))
|
||||
|
||||
(define-virtual-register current-engine-state #f)
|
||||
|
||||
|
@ -24,13 +26,17 @@
|
|||
(define (set-engine-exit-handler! proc)
|
||||
(set! engine-exit proc))
|
||||
|
||||
;; An engine is repesented by a procedure that takes thee arguments:
|
||||
;; An engine is repesented by a procedure that takes three arguments, where the
|
||||
;; procedure must be tail-called either within `call-with-engine-completion` or
|
||||
;; in an engine call's `complete-or-expire` callback:
|
||||
;; * ticks: number of ticks to run before exire
|
||||
;; * prefix: thunk to invoke just before continuing (counts toward ticks)
|
||||
;; * complete-or-expire: callback that accepts 3 arguments:
|
||||
;; * complete-or-expire: callback that accepts 3 arguments,
|
||||
;; - engine or #f: an engine if the original `thunk` has not yet returned
|
||||
;; - list or #f: a list if the original `thunk` has returned values
|
||||
;; - remining-ticks: a number of ticks leftover due to complete or `(engine-block)`
|
||||
;; where the callback must end by tail-calling another engine procedure or
|
||||
;; the procedure provided by `call-with-engine-completion`
|
||||
(define (make-engine thunk ; can return any number of values
|
||||
prompt-tag ; prompt to wrap around call to `thunk`
|
||||
abort-handler ; handler for that prompt
|
||||
|
@ -63,6 +69,8 @@
|
|||
(new-engine-thread-cell-values))
|
||||
init-break-enabled-cell)))
|
||||
|
||||
;; Internal: creates an engine procedure to be called within `call-with-engine-completion`
|
||||
;; or from an enginer procedure's `complete-or-expire` callback
|
||||
(define (create-engine to-saves proc thread-cell-values init-break-enabled-cell)
|
||||
(case-lambda
|
||||
;; For `continuation-marks`:
|
||||
|
@ -70,16 +78,31 @@
|
|||
;; Normal engine case:
|
||||
[(ticks prefix complete-or-expire)
|
||||
(start-implicit-uninterrupted 'create)
|
||||
((swap-metacontinuation
|
||||
to-saves
|
||||
(lambda (saves)
|
||||
(current-engine-state (make-engine-state saves complete-or-expire thread-cell-values
|
||||
init-break-enabled-cell (reset-handler)))
|
||||
(reset-handler engine-reset-handler)
|
||||
(timer-interrupt-handler engine-block-via-timer)
|
||||
(end-implicit-uninterrupted 'create)
|
||||
(set-timer ticks)
|
||||
(proc prefix))))]))
|
||||
(apply-meta-continuation
|
||||
to-saves
|
||||
(lambda ()
|
||||
(current-engine-state
|
||||
(make-engine-state complete-or-expire thread-cell-values init-break-enabled-cell))
|
||||
(reset-handler engine-reset-handler)
|
||||
(timer-interrupt-handler engine-block-via-timer)
|
||||
(end-implicit-uninterrupted 'create)
|
||||
(set-timer ticks)
|
||||
(proc prefix)))]))
|
||||
|
||||
;; Captures the current metacontinuation as an engine runner, and calls `proc`
|
||||
;; with a procedure to be tail-called from an engine procedure's `complete-or-expire`
|
||||
;; callback to return to the metacontinuation
|
||||
(define (call-with-engine-completion proc)
|
||||
(call-with-current-metacontinuation
|
||||
(lambda (saves)
|
||||
(let ([rh (reset-handler)])
|
||||
(proc (lambda args
|
||||
(current-engine-state #f)
|
||||
(apply-meta-continuation
|
||||
saves
|
||||
(lambda ()
|
||||
(reset-handler rh)
|
||||
(#%apply values args)))))))))
|
||||
|
||||
(define (engine-reset-handler)
|
||||
(end-uninterrupted 'reset)
|
||||
|
@ -105,24 +128,19 @@
|
|||
(set-timer 0))])
|
||||
(unless es
|
||||
(error 'engine-block "not currently running an engine"))
|
||||
(reset-handler (engine-state-reset-handler es))
|
||||
(start-implicit-uninterrupted 'block)
|
||||
;; Extra pair of parens around swap is to apply a prefix
|
||||
;; function on swapping back in:
|
||||
((swap-metacontinuation
|
||||
(engine-state-mc es)
|
||||
(lambda (saves)
|
||||
(end-implicit-uninterrupted 'block)
|
||||
(current-engine-state #f)
|
||||
(lambda () ; returned to the `swap-continuation` in `create-engine`
|
||||
((engine-state-complete-or-expire es)
|
||||
(create-engine
|
||||
saves
|
||||
(lambda (prefix) prefix) ; returns `prefix` to the above "(("
|
||||
(engine-state-thread-cell-values es)
|
||||
(engine-state-init-break-enabled-cell es))
|
||||
#f
|
||||
remain-ticks))))))]
|
||||
(call-with-current-metacontinuation
|
||||
(lambda (saves)
|
||||
(end-implicit-uninterrupted 'block)
|
||||
(current-engine-state #f)
|
||||
((engine-state-complete-or-expire es)
|
||||
(create-engine
|
||||
saves
|
||||
(lambda (prefix) (prefix))
|
||||
(engine-state-thread-cell-values es)
|
||||
(engine-state-init-break-enabled-cell es))
|
||||
#f
|
||||
remain-ticks))))]
|
||||
[() (engine-block #f)]))
|
||||
|
||||
(define (engine-block/timeout)
|
||||
|
@ -145,16 +163,13 @@
|
|||
(let ([es (current-engine-state)])
|
||||
(unless es
|
||||
(error 'engine-return "not currently running an engine"))
|
||||
(reset-handler (engine-state-reset-handler es))
|
||||
(let ([remain-ticks (set-timer 0)])
|
||||
(start-implicit-uninterrupted 'return)
|
||||
(swap-metacontinuation
|
||||
(engine-state-mc es)
|
||||
(lambda (saves)
|
||||
(start-implicit-uninterrupted 'block)
|
||||
(call-with-current-metacontinuation
|
||||
(lambda (ignored-saves)
|
||||
(end-implicit-uninterrupted 'block)
|
||||
(current-engine-state #f)
|
||||
(end-implicit-uninterrupted 'return)
|
||||
(lambda () ; returned to the `swap-continuation` in `create-engine`
|
||||
((engine-state-complete-or-expire es) #f results remain-ticks)))))))
|
||||
((engine-state-complete-or-expire es) #f results remain-ticks))))))
|
||||
|
||||
(define (make-empty-thread-cell-values)
|
||||
(make-ephemeron-eq-hashtable))
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
[engine-timeout rumble:engine-timeout]
|
||||
[engine-return rumble:engine-return]
|
||||
[current-engine-state rumble:current-engine-state]
|
||||
[call-with-engine-completion rumble:call-with-engine-completion]
|
||||
[make-condition rumble:make-condition]
|
||||
[condition-wait rumble:condition-wait]
|
||||
[condition-signal rumble:condition-signal]
|
||||
|
@ -132,6 +133,7 @@
|
|||
'engine-timeout rumble:engine-timeout
|
||||
'engine-return rumble:engine-return
|
||||
'current-engine-state (lambda (v) (rumble:current-engine-state v))
|
||||
'call-with-engine-completion rumble:call-with-engine-completion
|
||||
'set-ctl-c-handler! rumble:set-ctl-c-handler!
|
||||
'poll-will-executors poll-will-executors
|
||||
'make-will-executor rumble:make-will-executor
|
||||
|
|
|
@ -96,6 +96,9 @@
|
|||
(define (engine-timeout)
|
||||
(void))
|
||||
|
||||
(define (call-with-engine-completion proc)
|
||||
(proc values))
|
||||
|
||||
(define ctl-c-handler #f)
|
||||
|
||||
(define (set-ctl-c-handler! proc)
|
||||
|
@ -233,6 +236,7 @@
|
|||
'engine-timeout engine-timeout
|
||||
'engine-return (lambda args
|
||||
(error "engine-return: not ready"))
|
||||
'call-with-engine-completion call-with-engine-completion
|
||||
'current-process-milliseconds current-process-milliseconds
|
||||
'set-ctl-c-handler! set-ctl-c-handler!
|
||||
'set-break-enabled-transition-hook! void
|
||||
|
|
|
@ -564,28 +564,30 @@
|
|||
break-enabled-default-cell
|
||||
#t))
|
||||
(current-atomic (add1 (current-atomic)))
|
||||
(let loop ([e e])
|
||||
(e TICKS
|
||||
(lambda ()
|
||||
;; Check whether the main pthread wants to know we're here
|
||||
(when (and (zero? (current-atomic))
|
||||
(worker-pinged? w))
|
||||
(host:mutex-acquire (scheduler-mutex (current-scheduler)))
|
||||
(check-in w)
|
||||
(host:mutex-release (scheduler-mutex (current-scheduler))))
|
||||
;; Check that the future should still run
|
||||
(when (and (custodian-shut-down?/other-pthread (future*-custodian f))
|
||||
(zero? (current-atomic)))
|
||||
(lock-acquire (future*-lock f))
|
||||
(set-future*-state! f #f)
|
||||
(on-transition-to-unfinished)
|
||||
(future-suspend)))
|
||||
(lambda (e results leftover-ticks)
|
||||
(cond
|
||||
[e (loop e)]
|
||||
[else
|
||||
;; Done --- completed or suspend (e.g., blocked)
|
||||
(void)]))))
|
||||
(call-with-engine-completion
|
||||
(lambda (done)
|
||||
(let loop ([e e])
|
||||
(e TICKS
|
||||
(lambda ()
|
||||
;; Check whether the main pthread wants to know we're here
|
||||
(when (and (zero? (current-atomic))
|
||||
(worker-pinged? w))
|
||||
(host:mutex-acquire (scheduler-mutex (current-scheduler)))
|
||||
(check-in w)
|
||||
(host:mutex-release (scheduler-mutex (current-scheduler))))
|
||||
;; Check that the future should still run
|
||||
(when (and (custodian-shut-down?/other-pthread (future*-custodian f))
|
||||
(zero? (current-atomic)))
|
||||
(lock-acquire (future*-lock f))
|
||||
(set-future*-state! f #f)
|
||||
(on-transition-to-unfinished)
|
||||
(future-suspend)))
|
||||
(lambda (e results leftover-ticks)
|
||||
(cond
|
||||
[e (loop e)]
|
||||
[else
|
||||
;; Done --- completed or suspend (e.g., blocked)
|
||||
(done (void))]))))))
|
||||
(log-future 'end-work (future*-id f))
|
||||
(current-future 'worker)
|
||||
(set-box! (worker-current-future-box w) #f))
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
engine-timeout
|
||||
engine-return
|
||||
current-engine-state
|
||||
call-with-engine-completion
|
||||
current-process-milliseconds
|
||||
set-ctl-c-handler!
|
||||
set-break-enabled-transition-hook!
|
||||
|
|
Loading…
Reference in New Issue
Block a user