cs & thread: avoid excessive polling for external events

Reduce polling in the case that threads that block and swap rapidly,
in which case polling on every thread swap can be useless and
expensive.
This commit is contained in:
Matthew Flatt 2019-06-19 17:27:24 -06:00
parent ba8d442e75
commit 1ba4d76fe0
3 changed files with 35 additions and 22 deletions

View File

@ -457,7 +457,7 @@
(e 100 (e 100
(lambda () (set! started (add1 started))) (lambda () (set! started (add1 started)))
(lambda (remain a b c) (list a b c n started)) (lambda (remain a b c) (list a b c n started))
(lambda (e timeout?) (lambda (e remain)
(loop e (add1 n)))))) (loop e (add1 n))))))
'(1 2 3 10 11)) '(1 2 3 10 11))
@ -481,7 +481,7 @@
(e 200 (e 200
void void
(lambda (remain a b c pre t-post) (list a b c pre t-post post n)) (lambda (remain a b c pre t-post) (list a b c pre t-post post n))
(lambda (e timeout?) (lambda (e remain)
(loop e (add1 n))))) (loop e (add1 n)))))
'(1 2 3 10 0 10 10)))) '(1 2 3 10 0 10 10))))
@ -504,12 +504,12 @@
100 100
void void
(lambda (remain l) l) (lambda (remain l) l)
(lambda (e timeout?) (error 'engine "oops")))) (lambda (e remain) (error 'engine "oops"))))
(define l2 ((list-ref l1 2) (define l2 ((list-ref l1 2)
100 100
void void
(lambda (remain l) l) (lambda (remain l) l)
(lambda (e timeout?) (error 'engine "oops")))) (lambda (e remain) (error 'engine "oops"))))
(check (list-ref l1 0) 1) (check (list-ref l1 0) 1)
(check (list-ref l1 1) 100) (check (list-ref l1 1) 100)
(check (list-ref l1 3) 2) (check (list-ref l1 3) 2)
@ -528,7 +528,7 @@
(extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set) (extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set)
(make-engine (lambda () (|#%app| my-param)) engine-tag #f #f #f))]) (make-engine (lambda () (|#%app| my-param)) engine-tag #f #f #f))])
(check (|#%app| my-param) 'init) (check (|#%app| my-param) 'init)
(check (e 1000 void (lambda (remain v) v) (lambda (e timeout?) (error 'engine "oops"))) 'set)) (check (e 1000 void (lambda (remain v) v) (lambda (e remain) (error 'engine "oops"))) 'set))
(let ([also-my-param (make-derived-parameter my-param (let ([also-my-param (make-derived-parameter my-param
(lambda (v) (list v)) (lambda (v) (list v))
@ -633,7 +633,7 @@
(= prefixes (add1 i)) (= prefixes (add1 i))
(- (car v) i) (- (car v) i)
(- (cadr v) i))) (- (cadr v) i)))
(lambda (e timeout?) (loop e (add1 i)))))) (lambda (e remain) (loop e (add1 i))))))
'(#t #t 1 0)) '(#t #t 1 0))
;; ---------------------------------------- ;; ----------------------------------------

View File

@ -86,7 +86,10 @@
[(timeout?) [(timeout?)
(assert-not-in-uninterrupted) (assert-not-in-uninterrupted)
(timer-interrupt-handler void) (timer-interrupt-handler void)
(let ([es (current-engine-state)]) (let ([es (current-engine-state)]
[remain-ticks (if timeout?
0
(set-timer 0))])
(unless es (unless es
(error 'engine-block "not currently running an engine")) (error 'engine-block "not currently running an engine"))
(reset-handler (engine-state-reset-handler es)) (reset-handler (engine-state-reset-handler es))
@ -105,7 +108,7 @@
(lambda (prefix) prefix) ; returns `prefix` to the above "((" (lambda (prefix) prefix) ; returns `prefix` to the above "(("
(engine-state-thread-cell-values es) (engine-state-thread-cell-values es)
(engine-state-init-break-enabled-cell es)) (engine-state-init-break-enabled-cell es))
timeout?))))))] remain-ticks))))))]
[() (engine-block #f)])) [() (engine-block #f)]))
(define (engine-block/timeout) (define (engine-block/timeout)

View File

@ -33,7 +33,7 @@
(make-initial-thread (lambda () (make-initial-thread (lambda ()
(set-place-host-roots! initial-place (host:current-place-roots)) (set-place-host-roots! initial-place (host:current-place-roots))
(thunk))) (thunk)))
(select-thread!)) (select-thread! 0))
;; Initializes the thread system in a new place: ;; Initializes the thread system in a new place:
(define (call-in-another-main-thread c thunk) (define (call-in-another-main-thread c thunk)
@ -54,13 +54,13 @@
(set! skipped-time-accums 0) (set! skipped-time-accums 0)
(set! thread-swap-count 0)) (set! thread-swap-count 0))
(define (select-thread! [pending-callbacks null]) (define (select-thread! in-leftover-ticks [pending-callbacks null])
(define callbacks (if (null? pending-callbacks) (define callbacks (if (null? pending-callbacks)
(host:poll-async-callbacks) (host:poll-async-callbacks)
pending-callbacks)) pending-callbacks))
(host:poll-will-executors) (host:poll-will-executors)
(poll-custodian-will-executor) (poll-custodian-will-executor)
(check-external-events 'fast) (define leftover-ticks (maybe-check-external-events in-leftover-ticks))
(call-pre-poll-external-callbacks) (call-pre-poll-external-callbacks)
(check-place-activity) (check-place-activity)
(check-queued-custodian-shutdown) (check-queued-custodian-shutdown)
@ -76,11 +76,11 @@
(cond (cond
[(not child) (none-k callbacks)] [(not child) (none-k callbacks)]
[(thread? child) [(thread? child)
(swap-in-thread child callbacks)] (swap-in-thread child leftover-ticks callbacks)]
[else [else
(loop child callbacks (lambda (pending-callbacks) (loop g none-k pending-callbacks)))]))) (loop child callbacks (lambda (pending-callbacks) (loop g none-k pending-callbacks)))])))
(define (swap-in-thread t callbacks) (define (swap-in-thread t leftover-ticks callbacks)
(define e (thread-engine t)) (define e (thread-engine t))
(set-thread-engine! t 'running) (set-thread-engine! t 'running)
(set-thread-sched-info! t #f) (set-thread-sched-info! t #f)
@ -100,7 +100,7 @@
(when atomic-timeout-callback (when atomic-timeout-callback
(when (positive? (current-atomic)) (when (positive? (current-atomic))
(atomic-timeout-callback #f)))) (atomic-timeout-callback #f))))
(lambda args (lambda (remaining-ticks . args)
(start-implicit-atomic-mode) (start-implicit-atomic-mode)
(accum-cpu-time! t #t) (accum-cpu-time! t #t)
(set-thread-future! t #f) (set-thread-future! t #f)
@ -113,19 +113,20 @@
(when (eq? root-thread t) (when (eq? root-thread t)
(force-exit 0)) (force-exit 0))
(thread-did-work!) (thread-did-work!)
(select-thread!)) (select-thread! (- leftover-ticks (- TICKS remaining-ticks))))
(lambda (e timeout?) (lambda (e remaining-ticks)
(start-implicit-atomic-mode) (start-implicit-atomic-mode)
(cond (cond
[(zero? (current-atomic)) [(zero? (current-atomic))
(accum-cpu-time! t timeout?) (define new-leftover-ticks (- leftover-ticks (- TICKS remaining-ticks)))
(accum-cpu-time! t (new-leftover-ticks . <= . 0))
(set-thread-future! t (current-future)) (set-thread-future! t (current-future))
(current-thread/in-atomic #f) (current-thread/in-atomic #f)
(current-future #f) (current-future #f)
(set-place-current-thread! current-place #f) (set-place-current-thread! current-place #f)
(unless (eq? (thread-engine t) 'done) (unless (eq? (thread-engine t) 'done)
(set-thread-engine! t e)) (set-thread-engine! t e))
(select-thread!)] (select-thread! new-leftover-ticks)]
[else [else
;; Swap out when the atomic region ends and at a point ;; Swap out when the atomic region ends and at a point
;; where host-system interrupts are not disabled (i.e., ;; where host-system interrupts are not disabled (i.e.,
@ -141,7 +142,7 @@
(do-make-thread 'scheduler-make-thread (do-make-thread 'scheduler-make-thread
void void
#:custodian #f) #:custodian #f)
(select-thread! callbacks)] (select-thread! 0 callbacks)]
[(and (not (sandman-any-sleepers?)) [(and (not (sandman-any-sleepers?))
(not (any-idle-waiters?))) (not (any-idle-waiters?)))
;; all threads done or blocked ;; all threads done or blocked
@ -151,12 +152,21 @@
;; blocked, but it's not going to become unblocked; ;; blocked, but it's not going to become unblocked;
;; sleep forever or until a signal changes things ;; sleep forever or until a signal changes things
(process-sleep) (process-sleep)
(select-thread!)] (select-thread! 0)]
[else [else
(void)])] (void)])]
[else [else
;; try again, which should lead to `process-sleep` ;; try again, which should lead to `process-sleep`
(select-thread!)])) (select-thread! 0)]))
;; Limit frequency of polling for external events, even
;; in 'fast mode (because it's not that fast)
(define (maybe-check-external-events leftover-ticks)
(cond
[(leftover-ticks . > . 0) leftover-ticks]
[else
(check-external-events 'fast)
TICKS]))
;; Check for threads that have been suspended until a particular time, ;; Check for threads that have been suspended until a particular time,
;; etc., as registered with the sandman ;; etc., as registered with the sandman
@ -187,7 +197,7 @@
(engine-block)) (engine-block))
(lambda args (lambda args
(internal-error "thread ended while it should run callbacks atomically")) (internal-error "thread ended while it should run callbacks atomically"))
(lambda (e timeout?) (lambda (e remaining)
(start-implicit-atomic-mode) (start-implicit-atomic-mode)
(if done? (if done?
(k e) (k e)