cs: reduce thread-swap overhead
Reduce overhead by taking a shortcut for capturing a thread's metacontinuation and by reducing closure allocations.
This commit is contained in:
parent
20e0252664
commit
f93f959506
|
@ -434,18 +434,18 @@
|
|||
(define engine-tag (default-continuation-prompt-tag))
|
||||
|
||||
(define e (make-engine (lambda () 'done) engine-tag #f #f #f))
|
||||
(check (cdr (e 100 void list vector))
|
||||
(check (e 100 void (lambda (e results remain-ticks) results))
|
||||
'(done))
|
||||
|
||||
(define e-forever (make-engine (lambda () (let loop () (loop))) engine-tag #f #f #f))
|
||||
(check (vector? (e-forever 10 void list vector))
|
||||
(check (procedure? (e-forever 10 void (lambda (e results remain-ticks) e)))
|
||||
#t)
|
||||
|
||||
(define e-10 (make-engine (lambda ()
|
||||
(let loop ([n 10])
|
||||
(cond
|
||||
[(zero? n)
|
||||
(engine-return 1 2 3)
|
||||
(engine-return 'done)
|
||||
(loop 0)]
|
||||
[else
|
||||
(engine-block)
|
||||
|
@ -456,10 +456,11 @@
|
|||
(let loop ([e e-10] [n 0])
|
||||
(e 100
|
||||
(lambda () (set! started (add1 started)))
|
||||
(lambda (remain a b c) (list a b c n started))
|
||||
(lambda (e remain)
|
||||
(loop e (add1 n))))))
|
||||
'(1 2 3 10 11))
|
||||
(lambda (e results remain)
|
||||
(if e
|
||||
(loop e (add1 n))
|
||||
(list results n started))))))
|
||||
'((done) 10 11))
|
||||
|
||||
;; Check that winders are not run on engine swaps:
|
||||
(let ([pre 0]
|
||||
|
@ -468,7 +469,7 @@
|
|||
(let loop ([n 10])
|
||||
(cond
|
||||
[(zero? n)
|
||||
(values 1 2 3 pre post)]
|
||||
(values 'done pre post)]
|
||||
[else
|
||||
(engine-block)
|
||||
(dynamic-wind
|
||||
|
@ -480,10 +481,11 @@
|
|||
(check (let loop ([e e-10/dw] [n 0])
|
||||
(e 200
|
||||
void
|
||||
(lambda (remain a b c pre t-post) (list a b c pre t-post post n))
|
||||
(lambda (e remain)
|
||||
(loop e (add1 n)))))
|
||||
'(1 2 3 10 0 10 10))))
|
||||
(lambda (e results remain)
|
||||
(if e
|
||||
(loop e (add1 n))
|
||||
(vector results post n)))))
|
||||
'#((done 10 0) 10 10))))
|
||||
|
||||
;; ----------------------------------------
|
||||
;; Thread cells (which are really engine cells):
|
||||
|
@ -503,13 +505,11 @@
|
|||
(define l1 ((make-engine gen engine-tag #f #f #f)
|
||||
100
|
||||
void
|
||||
(lambda (remain l) l)
|
||||
(lambda (e remain) (error 'engine "oops"))))
|
||||
(lambda (e results remain) (car results))))
|
||||
(define l2 ((list-ref l1 2)
|
||||
100
|
||||
void
|
||||
(lambda (remain l) l)
|
||||
(lambda (e remain) (error 'engine "oops"))))
|
||||
(lambda (e results remain) (car results))))
|
||||
(check (list-ref l1 0) 1)
|
||||
(check (list-ref l1 1) 100)
|
||||
(check (list-ref l1 3) 2)
|
||||
|
@ -528,7 +528,7 @@
|
|||
(extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set)
|
||||
(make-engine (lambda () (|#%app| my-param)) engine-tag #f #f #f))])
|
||||
(check (|#%app| my-param) 'init)
|
||||
(check (e 1000 void (lambda (remain v) v) (lambda (e remain) (error 'engine "oops"))) 'set))
|
||||
(check (e 1000 void (lambda (e vs remain) vs)) '(set)))
|
||||
|
||||
(let ([also-my-param (make-derived-parameter my-param
|
||||
(lambda (v) (list v))
|
||||
|
@ -618,7 +618,7 @@
|
|||
(lambda ()
|
||||
(let loop ([n 1000])
|
||||
(if (zero? n)
|
||||
(list pre post)
|
||||
(values pre post)
|
||||
(loop (sub1 n)))))
|
||||
(lambda ()
|
||||
(set! post (add1 post))))))))
|
||||
|
@ -629,11 +629,13 @@
|
|||
(let loop ([e e-sw] [i 0])
|
||||
(e 100
|
||||
(lambda () (set! prefixes (add1 prefixes)))
|
||||
(lambda (remain v) (list (> i 2)
|
||||
(= prefixes (add1 i))
|
||||
(- (car v) i)
|
||||
(- (cadr v) i)))
|
||||
(lambda (e remain) (loop e (add1 i))))))
|
||||
(lambda (e v remain)
|
||||
(if e
|
||||
(loop e (add1 i))
|
||||
(list (> i 2)
|
||||
(= prefixes (add1 i))
|
||||
(- (car v) i)
|
||||
(- (cadr v) i)))))))
|
||||
'(#t #t 1 0))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
(define null '())
|
||||
(define eof #!eof)
|
||||
|
||||
(define (void . args) (#%void))
|
||||
(define void
|
||||
(case-lambda
|
||||
[() (#%void)]
|
||||
[(arg) (#%void)]
|
||||
[args (#%void)]))
|
||||
(define (void? v) (eq? v (#%void)))
|
||||
|
|
|
@ -310,14 +310,15 @@
|
|||
;; ready:
|
||||
(proc))))])
|
||||
;; Prepare to use cc-guard, if one was enabled:
|
||||
(let ([cc-guard (or (metacontinuation-frame-cc-guard (car (current-metacontinuation)))
|
||||
values)])
|
||||
(let ([cc-guard (metacontinuation-frame-cc-guard (car (current-metacontinuation)))])
|
||||
;; Continue normally; the metacontinuation could be different
|
||||
;; than when we captured this metafunction frame, though:
|
||||
(resume-metacontinuation
|
||||
delimit?
|
||||
;; Apply the cc-guard, if any, outside of the prompt:
|
||||
(lambda () (apply cc-guard results)))))))])
|
||||
(if cc-guard
|
||||
(lambda () (apply cc-guard results))
|
||||
results))))))])
|
||||
(cond
|
||||
[(aborting? r)
|
||||
;; Remove the prompt as we call the handler:
|
||||
|
@ -329,7 +330,35 @@
|
|||
;; We're returning normally; the metacontinuation frame has
|
||||
;; been popped already by `resume-metacontinuation`
|
||||
(end-uninterrupted 'resume)
|
||||
(r)])))))]))))
|
||||
(if (#%procedure? r)
|
||||
(r)
|
||||
(if (and (pair? r) (null? (cdr r)))
|
||||
(car r)
|
||||
(#%apply values r)))])))))]))))
|
||||
|
||||
;; Simplified `call-in-empty-metacontinuation-frame` suitable for swapping engines:
|
||||
(define (call-in-empty-metacontinuation-frame-for-swap proc)
|
||||
(assert-in-uninterrupted)
|
||||
(assert-not-in-system-wind)
|
||||
(call/cc
|
||||
(lambda (resume-k)
|
||||
(#%$current-stack-link #%$null-continuation)
|
||||
(current-mark-stack '())
|
||||
(let ([mf (make-metacontinuation-frame #f
|
||||
resume-k
|
||||
(current-winders)
|
||||
(current-mark-splice)
|
||||
#f
|
||||
#f
|
||||
#f
|
||||
#f)])
|
||||
(current-winders '())
|
||||
(current-mark-splice empty-mark-frame)
|
||||
(current-metacontinuation (cons mf (current-metacontinuation)))
|
||||
(let ([r (proc)])
|
||||
(let ([mf (car (current-metacontinuation))])
|
||||
(pop-metacontinuation-frame)
|
||||
((metacontinuation-frame-resume-k mf) r)))))))
|
||||
|
||||
(define (metacontinuation-frame-update-mark-splice current-mf mark-splice)
|
||||
(make-metacontinuation-frame (metacontinuation-frame-tag current-mf)
|
||||
|
@ -995,7 +1024,7 @@
|
|||
;; captured by `call/cc/end-uninterrupted`:
|
||||
(define (end-uninterrupted-with-values args)
|
||||
(end-uninterrupted/call-hook 'cc)
|
||||
(apply values args))
|
||||
(#%apply values args))
|
||||
|
||||
(define (current-mark-chain)
|
||||
(get-mark-chain (current-mark-stack) (current-mark-splice) (current-metacontinuation)))
|
||||
|
@ -1770,7 +1799,7 @@
|
|||
(current-winders winders)
|
||||
(call-winder-thunk 'dw-post post)
|
||||
(end-uninterrupted/call-hook 'dw)
|
||||
(lambda () (apply values args))))))))))
|
||||
(lambda () (#%apply values args))))))))))
|
||||
|
||||
(define (call-winder-thunk who thunk)
|
||||
(with-continuation-mark
|
||||
|
@ -1884,11 +1913,7 @@
|
|||
[(current-system-wind-start-k)
|
||||
=> (lambda (k) (swap-metacontinuation-with-system-wind saved proc k))]
|
||||
[else
|
||||
(call-in-empty-metacontinuation-frame
|
||||
#f
|
||||
fail-abort-to-delimit-continuation
|
||||
#f ; don't try to shift continuation marks
|
||||
#t ; delimit
|
||||
(call-in-empty-metacontinuation-frame-for-swap
|
||||
(lambda ()
|
||||
(let ([now-saved (make-saved-metacontinuation
|
||||
(current-metacontinuation)
|
||||
|
@ -1923,7 +1948,7 @@
|
|||
proc
|
||||
(lambda args
|
||||
(lambda ()
|
||||
(apply values args)))))
|
||||
(#%apply values args)))))
|
||||
(lambda () (current-system-wind-start-k #f)))))))
|
||||
|
||||
(define (swap-metacontinuation-with-system-wind saved proc start-k)
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
;; Don't mix Chez engines with this implementation, because we take
|
||||
;; over the timer.
|
||||
|
||||
(define-record engine-state (mc complete expire thread-cell-values init-break-enabled-cell reset-handler))
|
||||
(define-record engine-state (mc complete-or-expire thread-cell-values init-break-enabled-cell reset-handler))
|
||||
|
||||
(define-virtual-register current-engine-state #f)
|
||||
|
||||
|
@ -24,7 +24,18 @@
|
|||
(define (set-engine-exit-handler! proc)
|
||||
(set! engine-exit proc))
|
||||
|
||||
(define (make-engine thunk prompt-tag abort-handler init-break-enabled-cell empty-config?)
|
||||
;; An engine is repesented by a procedure that takes thee arguments:
|
||||
;; * ticks: number of ticks to run before exire
|
||||
;; * prefix: thunk to invoke just before continuing (counts toward ticks)
|
||||
;; * complete-or-expire: callback that accepts 3 arguments:
|
||||
;; - engine or #f: an engine if the original `thunk` has not yet returned
|
||||
;; - list or #f: a list if the original `thunk` has returned values
|
||||
;; - remining-ticks: a number of ticks leftover due to complete or `(engine-block)`
|
||||
(define (make-engine thunk ; can return any number of values
|
||||
prompt-tag ; prompt to wrap around call to `thunk`
|
||||
abort-handler ; handler for that prompt
|
||||
init-break-enabled-cell ; default break-enable cell
|
||||
empty-config?) ; whether to clone the current parameterization
|
||||
(let ([paramz (if empty-config?
|
||||
empty-parameterization
|
||||
(current-parameterization))])
|
||||
|
@ -57,23 +68,25 @@
|
|||
;; For `continuation-marks`:
|
||||
[() to-saves]
|
||||
;; Normal engine case:
|
||||
[(ticks prefix complete expire)
|
||||
[(ticks prefix complete-or-expire)
|
||||
(start-implicit-uninterrupted 'create)
|
||||
((swap-metacontinuation
|
||||
to-saves
|
||||
(lambda (saves)
|
||||
(current-engine-state (make-engine-state saves complete expire thread-cell-values
|
||||
(current-engine-state (make-engine-state saves complete-or-expire thread-cell-values
|
||||
init-break-enabled-cell (reset-handler)))
|
||||
(reset-handler (lambda ()
|
||||
(end-uninterrupted 'reset)
|
||||
(if (current-engine-state)
|
||||
(engine-return (void))
|
||||
(chez:exit))))
|
||||
(reset-handler engine-reset-handler)
|
||||
(timer-interrupt-handler engine-block-via-timer)
|
||||
(end-implicit-uninterrupted 'create)
|
||||
(set-timer ticks)
|
||||
(proc prefix))))]))
|
||||
|
||||
(define (engine-reset-handler)
|
||||
(end-uninterrupted 'reset)
|
||||
(if (current-engine-state)
|
||||
(engine-return (void))
|
||||
(chez:exit)))
|
||||
|
||||
(define (engine-block-via-timer)
|
||||
(cond
|
||||
[(current-in-uninterrupted)
|
||||
|
@ -102,12 +115,13 @@
|
|||
(end-implicit-uninterrupted 'block)
|
||||
(current-engine-state #f)
|
||||
(lambda () ; returned to the `swap-continuation` in `create-engine`
|
||||
((engine-state-expire es)
|
||||
((engine-state-complete-or-expire es)
|
||||
(create-engine
|
||||
saves
|
||||
(lambda (prefix) prefix) ; returns `prefix` to the above "(("
|
||||
(engine-state-thread-cell-values es)
|
||||
(engine-state-init-break-enabled-cell es))
|
||||
#f
|
||||
remain-ticks))))))]
|
||||
[() (engine-block #f)]))
|
||||
|
||||
|
@ -125,7 +139,7 @@
|
|||
;; as interrupts are enabled)
|
||||
(set-timer 1)])))
|
||||
|
||||
(define (engine-return . args)
|
||||
(define (engine-return . results)
|
||||
(assert-not-in-uninterrupted)
|
||||
(timer-interrupt-handler void)
|
||||
(let ([es (current-engine-state)])
|
||||
|
@ -140,7 +154,7 @@
|
|||
(current-engine-state #f)
|
||||
(end-implicit-uninterrupted 'return)
|
||||
(lambda () ; returned to the `swap-continuation` in `create-engine`
|
||||
(apply (engine-state-complete es) remain-ticks args)))))))
|
||||
((engine-state-complete-or-expire es) #f results remain-ticks)))))))
|
||||
|
||||
(define (make-empty-thread-cell-values)
|
||||
(make-ephemeron-eq-hashtable))
|
||||
|
|
|
@ -62,7 +62,7 @@
|
|||
(semaphore-wait ready-s)
|
||||
(thread-suspend t)
|
||||
(semaphore-post s)
|
||||
(define (go ticks next-prefix complete expire)
|
||||
(define (go ticks next-prefix complete-or-expire)
|
||||
(set! prefix next-prefix)
|
||||
(break-thread t)
|
||||
(thread-resume t)
|
||||
|
@ -85,9 +85,9 @@
|
|||
(sync t t2 (thread-suspend-evt t))))
|
||||
(cond
|
||||
[(thread-dead? t)
|
||||
(apply complete 0 results)]
|
||||
(complete-or-expire #f results 0)]
|
||||
[else
|
||||
(expire go (if timeout? 0 10))]))
|
||||
(complete-or-expire go #f (if timeout? 0 10))]))
|
||||
go)
|
||||
|
||||
(define (engine-block)
|
||||
|
|
|
@ -50,7 +50,7 @@
|
|||
|
||||
;; in atomic mode and only in main pthread
|
||||
(define (flush-future-log)
|
||||
(define new-events (unbox events))
|
||||
(define new-events (unbox* events))
|
||||
(unless (null? new-events)
|
||||
(cond
|
||||
[(box-cas! events new-events null)
|
||||
|
|
|
@ -580,11 +580,12 @@
|
|||
(set-future*-state! f #f)
|
||||
(on-transition-to-unfinished)
|
||||
(future-suspend)))
|
||||
(lambda (leftover-ticks result)
|
||||
;; Done --- completed or suspend (e.g., blocked)
|
||||
(void))
|
||||
(lambda (e timeout?)
|
||||
(loop e))))
|
||||
(lambda (e results leftover-ticks)
|
||||
(cond
|
||||
[e (loop e)]
|
||||
[else
|
||||
;; Done --- completed or suspend (e.g., blocked)
|
||||
(void)]))))
|
||||
(log-future 'end-work (future*-id f))
|
||||
(current-future 'worker)
|
||||
(set-box! (worker-current-future-box w) #f))
|
||||
|
|
|
@ -109,20 +109,20 @@
|
|||
(current-thread/in-atomic t)
|
||||
(set-place-current-thread! current-place t)
|
||||
(set! thread-swap-count (add1 thread-swap-count))
|
||||
(run-callbacks-in-engine
|
||||
e callbacks
|
||||
(lambda (e)
|
||||
(let loop ([e e])
|
||||
(end-implicit-atomic-mode)
|
||||
(e
|
||||
TICKS
|
||||
(lambda ()
|
||||
(check-for-break)
|
||||
(when atomic-timeout-callback
|
||||
(when (positive? (current-atomic))
|
||||
(atomic-timeout-callback #f))))
|
||||
(lambda (remaining-ticks . args)
|
||||
(start-implicit-atomic-mode)
|
||||
(run-callbacks-in-engine e callbacks t leftover-ticks
|
||||
swap-in-engine))
|
||||
|
||||
(define (swap-in-engine e t leftover-ticks)
|
||||
(let loop ([e e])
|
||||
(end-implicit-atomic-mode)
|
||||
(e
|
||||
TICKS
|
||||
check-break-prefix
|
||||
(lambda (e results remaining-ticks)
|
||||
(start-implicit-atomic-mode)
|
||||
(cond
|
||||
[(not e)
|
||||
;; Thread completed
|
||||
(accum-cpu-time! t #t)
|
||||
(set-thread-future! t #f)
|
||||
(current-thread/in-atomic #f)
|
||||
|
@ -134,9 +134,9 @@
|
|||
(when (eq? root-thread t)
|
||||
(force-exit 0))
|
||||
(thread-did-work!)
|
||||
(poll-and-select-thread! (- leftover-ticks (- TICKS remaining-ticks))))
|
||||
(lambda (e remaining-ticks)
|
||||
(start-implicit-atomic-mode)
|
||||
(poll-and-select-thread! (- leftover-ticks (- TICKS remaining-ticks)))]
|
||||
[else
|
||||
;; Thread continues
|
||||
(cond
|
||||
[(zero? (current-atomic))
|
||||
(define new-leftover-ticks (- leftover-ticks (- TICKS remaining-ticks)))
|
||||
|
@ -153,7 +153,13 @@
|
|||
;; where host-system interrupts are not disabled (i.e.,
|
||||
;; don't use `engine-block` instead of `engine-timeout`):
|
||||
(add-end-atomic-callback! engine-timeout)
|
||||
(loop e)])))))))
|
||||
(loop e)])])))))
|
||||
|
||||
(define (check-break-prefix)
|
||||
(check-for-break)
|
||||
(when atomic-timeout-callback
|
||||
(when (positive? (current-atomic))
|
||||
(atomic-timeout-callback #f))))
|
||||
|
||||
(define (maybe-done callbacks)
|
||||
(cond
|
||||
|
@ -194,9 +200,9 @@
|
|||
|
||||
;; Run callbacks within the thread for `e`, and don't give up until
|
||||
;; the callbacks are done
|
||||
(define (run-callbacks-in-engine e callbacks k)
|
||||
(define (run-callbacks-in-engine e callbacks t leftover-ticks k)
|
||||
(cond
|
||||
[(null? callbacks) (k e)]
|
||||
[(null? callbacks) (k e t leftover-ticks)]
|
||||
[else
|
||||
(define done? #f)
|
||||
(let loop ([e e])
|
||||
|
@ -207,12 +213,12 @@
|
|||
(run-callbacks callbacks)
|
||||
(set! done? #t)
|
||||
(engine-block))
|
||||
(lambda args
|
||||
(internal-error "thread ended while it should run callbacks atomically"))
|
||||
(lambda (e remaining)
|
||||
(lambda (e result remaining)
|
||||
(start-implicit-atomic-mode)
|
||||
(unless e
|
||||
(internal-error "thread ended while it should run callbacks atomically"))
|
||||
(if done?
|
||||
(k e)
|
||||
(k e t leftover-ticks)
|
||||
(loop e)))))]))
|
||||
|
||||
;; Run foreign "async-apply" callbacks, now that we're in some thread
|
||||
|
|
|
@ -13,7 +13,8 @@
|
|||
(define-values (prop:waiter waiter? waiter-ref)
|
||||
(make-struct-type-property 'waiter))
|
||||
|
||||
(struct waiter-methods (suspend resume))
|
||||
(struct waiter-methods (suspend resume)
|
||||
#:authentic)
|
||||
|
||||
(define (make-waiter-methods #:suspend! suspend
|
||||
#:resume! resume)
|
||||
|
|
Loading…
Reference in New Issue
Block a user