thread: refactor scheduler poll-and-select loop
Further reduce redundant checks, and also avoid an excessive delay after making the process sleep.
This commit is contained in:
parent
6d3eff55a4
commit
3f35504355
|
@ -88,7 +88,7 @@
|
||||||
(rktio_poll_set_forget rktio ps))
|
(rktio_poll_set_forget rktio ps))
|
||||||
|
|
||||||
;; poll
|
;; poll
|
||||||
(lambda (mode wakeup)
|
(lambda (wakeup)
|
||||||
(let check-signals ()
|
(let check-signals ()
|
||||||
(define v (rktio_poll_os_signal rktio))
|
(define v (rktio_poll_os_signal rktio))
|
||||||
(unless (eqv? v RKTIO_OS_SIGNAL_NONE)
|
(unless (eqv? v RKTIO_OS_SIGNAL_NONE)
|
||||||
|
@ -100,7 +100,7 @@
|
||||||
(check-signals)))
|
(check-signals)))
|
||||||
(when (fd-semaphore-poll-ready?)
|
(when (fd-semaphore-poll-ready?)
|
||||||
(wakeup #f))
|
(wakeup #f))
|
||||||
((sandman-do-poll timeout-sandman) mode wakeup))
|
((sandman-do-poll timeout-sandman) wakeup))
|
||||||
|
|
||||||
;; get-wakeup
|
;; get-wakeup
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
|
@ -64,8 +64,8 @@
|
||||||
;; The `thread-wakeup` callback can be called with #f
|
;; The `thread-wakeup` callback can be called with #f
|
||||||
;; to indicate that a thread was potentially woken up
|
;; to indicate that a thread was potentially woken up
|
||||||
;; some other way, such as by a semaphore post
|
;; some other way, such as by a semaphore post
|
||||||
(define (sandman-poll mode thread-wakeup)
|
(define (sandman-poll thread-wakeup)
|
||||||
((sandman-do-poll the-sandman) mode thread-wakeup))
|
((sandman-do-poll the-sandman) thread-wakeup))
|
||||||
|
|
||||||
;; in atomic mode
|
;; in atomic mode
|
||||||
(define (sandman-sleep exts)
|
(define (sandman-sleep exts)
|
||||||
|
@ -111,8 +111,7 @@
|
||||||
(host:sleep (max 0.0 (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0))))
|
(host:sleep (max 0.0 (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0))))
|
||||||
|
|
||||||
;; poll
|
;; poll
|
||||||
(lambda (mode wakeup)
|
(lambda (wakeup)
|
||||||
;; This check is fast, so do it in all modes
|
|
||||||
(unless (tree-empty? sleeping-threads)
|
(unless (tree-empty? sleeping-threads)
|
||||||
(define-values (timeout-at threads) (tree-min sleeping-threads))
|
(define-values (timeout-at threads) (tree-min sleeping-threads))
|
||||||
(when (timeout-at . <= . (current-inexact-milliseconds))
|
(when (timeout-at . <= . (current-inexact-milliseconds))
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
(make-initial-thread (lambda ()
|
(make-initial-thread (lambda ()
|
||||||
(set-place-host-roots! initial-place (host:current-place-roots))
|
(set-place-host-roots! initial-place (host:current-place-roots))
|
||||||
(thunk)))
|
(thunk)))
|
||||||
(select-thread! 0))
|
(poll-and-select-thread! 0))
|
||||||
|
|
||||||
;; Initializes the thread system in a new place:
|
;; Initializes the thread system in a new place:
|
||||||
(define (call-in-another-main-thread c thunk)
|
(define (call-in-another-main-thread c thunk)
|
||||||
|
@ -54,31 +54,50 @@
|
||||||
(set! skipped-time-accums 0)
|
(set! skipped-time-accums 0)
|
||||||
(set! thread-swap-count 0))
|
(set! thread-swap-count 0))
|
||||||
|
|
||||||
(define (select-thread! in-leftover-ticks [pending-callbacks null])
|
(define (poll-and-select-thread! leftover-ticks [pending-callbacks null])
|
||||||
(define callbacks (if (null? pending-callbacks)
|
(define callbacks (if (null? pending-callbacks)
|
||||||
(host:poll-async-callbacks)
|
(host:poll-async-callbacks)
|
||||||
pending-callbacks))
|
pending-callbacks))
|
||||||
|
;; Perform any expensive polls (such as ones that consult the OS)
|
||||||
|
;; only after ticks have been used up:
|
||||||
|
(define poll-now? (leftover-ticks . <= . 0))
|
||||||
(host:poll-will-executors)
|
(host:poll-will-executors)
|
||||||
(poll-custodian-will-executor)
|
(poll-custodian-will-executor)
|
||||||
(define leftover-ticks (maybe-check-external-events in-leftover-ticks))
|
(when poll-now?
|
||||||
|
(check-external-events))
|
||||||
(call-pre-poll-external-callbacks)
|
(call-pre-poll-external-callbacks)
|
||||||
(check-place-activity)
|
(check-place-activity)
|
||||||
(check-queued-custodian-shutdown)
|
(check-queued-custodian-shutdown)
|
||||||
(flush-future-log)
|
(flush-future-log)
|
||||||
(when (and (null? callbacks)
|
(cond
|
||||||
(all-threads-poll-done?)
|
[(and (null? callbacks)
|
||||||
(waiting-on-external-or-idle?))
|
(all-threads-poll-done?))
|
||||||
(or (check-external-events 'slow)
|
;; May need to sleep
|
||||||
(try-post-idle)
|
(cond
|
||||||
(process-sleep)))
|
[(and (not poll-now?)
|
||||||
(let loop ([g root-thread-group] [pending-callbacks pending-callbacks] [none-k maybe-done])
|
(check-external-events))
|
||||||
|
;; Retry and reset counter for checking external events
|
||||||
|
(poll-and-select-thread! TICKS callbacks)]
|
||||||
|
[(try-post-idle)
|
||||||
|
;; Enabled a thread that was waiting for idle
|
||||||
|
(select-thread! leftover-ticks callbacks)]
|
||||||
|
[else
|
||||||
|
(process-sleep)
|
||||||
|
;; Retry, checking right away for external events
|
||||||
|
(poll-and-select-thread! 0 callbacks)])]
|
||||||
|
[else
|
||||||
|
;; Looks like some thread can work now
|
||||||
|
(select-thread! (if poll-now? TICKS leftover-ticks) callbacks)]))
|
||||||
|
|
||||||
|
(define (select-thread! leftover-ticks callbacks)
|
||||||
|
(let loop ([g root-thread-group] [callbacks callbacks] [none-k maybe-done])
|
||||||
(define child (thread-group-next! g))
|
(define child (thread-group-next! g))
|
||||||
(cond
|
(cond
|
||||||
[(not child) (none-k callbacks)]
|
[(not child) (none-k callbacks)]
|
||||||
[(thread? child)
|
[(thread? child)
|
||||||
(swap-in-thread child leftover-ticks callbacks)]
|
(swap-in-thread child leftover-ticks callbacks)]
|
||||||
[else
|
[else
|
||||||
(loop child callbacks (lambda (pending-callbacks) (loop g none-k pending-callbacks)))])))
|
(loop child callbacks (lambda (callbacks) (loop g none-k callbacks)))])))
|
||||||
|
|
||||||
(define (swap-in-thread t leftover-ticks callbacks)
|
(define (swap-in-thread t leftover-ticks callbacks)
|
||||||
(define e (thread-engine t))
|
(define e (thread-engine t))
|
||||||
|
@ -113,7 +132,7 @@
|
||||||
(when (eq? root-thread t)
|
(when (eq? root-thread t)
|
||||||
(force-exit 0))
|
(force-exit 0))
|
||||||
(thread-did-work!)
|
(thread-did-work!)
|
||||||
(select-thread! (- leftover-ticks (- TICKS remaining-ticks))))
|
(poll-and-select-thread! (- leftover-ticks (- TICKS remaining-ticks))))
|
||||||
(lambda (e remaining-ticks)
|
(lambda (e remaining-ticks)
|
||||||
(start-implicit-atomic-mode)
|
(start-implicit-atomic-mode)
|
||||||
(cond
|
(cond
|
||||||
|
@ -126,7 +145,7 @@
|
||||||
(set-place-current-thread! current-place #f)
|
(set-place-current-thread! current-place #f)
|
||||||
(unless (eq? (thread-engine t) 'done)
|
(unless (eq? (thread-engine t) 'done)
|
||||||
(set-thread-engine! t e))
|
(set-thread-engine! t e))
|
||||||
(select-thread! new-leftover-ticks)]
|
(poll-and-select-thread! new-leftover-ticks)]
|
||||||
[else
|
[else
|
||||||
;; Swap out when the atomic region ends and at a point
|
;; Swap out when the atomic region ends and at a point
|
||||||
;; where host-system interrupts are not disabled (i.e.,
|
;; where host-system interrupts are not disabled (i.e.,
|
||||||
|
@ -142,7 +161,7 @@
|
||||||
(do-make-thread 'scheduler-make-thread
|
(do-make-thread 'scheduler-make-thread
|
||||||
void
|
void
|
||||||
#:custodian #f)
|
#:custodian #f)
|
||||||
(select-thread! 0 callbacks)]
|
(poll-and-select-thread! 0 callbacks)]
|
||||||
[(and (not (sandman-any-sleepers?))
|
[(and (not (sandman-any-sleepers?))
|
||||||
(not (any-idle-waiters?)))
|
(not (any-idle-waiters?)))
|
||||||
;; all threads done or blocked
|
;; all threads done or blocked
|
||||||
|
@ -152,28 +171,18 @@
|
||||||
;; blocked, but it's not going to become unblocked;
|
;; blocked, but it's not going to become unblocked;
|
||||||
;; sleep forever or until a signal changes things
|
;; sleep forever or until a signal changes things
|
||||||
(process-sleep)
|
(process-sleep)
|
||||||
(select-thread! 0)]
|
(poll-and-select-thread! 0)]
|
||||||
[else
|
[else
|
||||||
(void)])]
|
(void)])]
|
||||||
[else
|
[else
|
||||||
;; try again, which should lead to `process-sleep`
|
;; try again, which should lead to `process-sleep`
|
||||||
(select-thread! 0)]))
|
(poll-and-select-thread! 0)]))
|
||||||
|
|
||||||
;; Limit frequency of polling for external events, even
|
|
||||||
;; in 'fast mode (because it's not that fast)
|
|
||||||
(define (maybe-check-external-events leftover-ticks)
|
|
||||||
(cond
|
|
||||||
[(leftover-ticks . > . 0) leftover-ticks]
|
|
||||||
[else
|
|
||||||
(check-external-events 'fast)
|
|
||||||
TICKS]))
|
|
||||||
|
|
||||||
;; Check for threads that have been suspended until a particular time,
|
;; Check for threads that have been suspended until a particular time,
|
||||||
;; etc., as registered with the sandman
|
;; etc., as registered with the sandman
|
||||||
(define (check-external-events mode)
|
(define (check-external-events)
|
||||||
(define did? #f)
|
(define did? #f)
|
||||||
(sandman-poll mode
|
(sandman-poll (lambda (t)
|
||||||
(lambda (t)
|
|
||||||
(when t
|
(when t
|
||||||
(thread-reschedule! t))
|
(thread-reschedule! t))
|
||||||
(set! did? #t)))
|
(set! did? #t)))
|
||||||
|
@ -219,11 +228,6 @@
|
||||||
(= (hash-count poll-done-threads)
|
(= (hash-count poll-done-threads)
|
||||||
num-threads-in-groups))
|
num-threads-in-groups))
|
||||||
|
|
||||||
(define (waiting-on-external-or-idle?)
|
|
||||||
(or (positive? num-threads-in-groups)
|
|
||||||
(sandman-any-sleepers?)
|
|
||||||
(any-idle-waiters?)))
|
|
||||||
|
|
||||||
;; Stop using the CPU for a while
|
;; Stop using the CPU for a while
|
||||||
(define (process-sleep)
|
(define (process-sleep)
|
||||||
(define ts (thread-group-all-threads root-thread-group null))
|
(define ts (thread-group-all-threads root-thread-group null))
|
||||||
|
|
Loading…
Reference in New Issue
Block a user