cs: fix memory accounting and futures
Also avoid problems with interrupting a rapidly allocating computation and shutting down the main custodian.
This commit is contained in:
parent
a5a84ea8c6
commit
564dcf339a
|
@ -74,7 +74,12 @@
|
|||
(run-collect-callbacks cdr)
|
||||
(when (and reachable-size-increments-callback
|
||||
(fx= gen (collect-maximum-generation)))
|
||||
(reachable-size-increments-callback compute-size-increments)))))
|
||||
(reachable-size-increments-callback compute-size-increments))
|
||||
(when (and (= gen (collect-maximum-generation))
|
||||
(current-engine-state))
|
||||
;; This `set-timer` doesn't necessarily penalize the right thread,
|
||||
;; but it's likely to penalize a thread that is allocating quickly:
|
||||
(set-timer 1)))))
|
||||
|
||||
(define collect-garbage
|
||||
(case-lambda
|
||||
|
|
|
@ -37,9 +37,11 @@ GLOBALS = --no-global \
|
|||
++global-ok wakeup-this-place \
|
||||
++global-ok ensure-place-wakeup-handle \
|
||||
++global-ok futures-sync-for-custodian-shutdown \
|
||||
++global-ok logging-future-events? \
|
||||
++global-ok 'future-scheduler-add-thread-custodian-mapping!' \
|
||||
++global-ok 'logging-future-events?' \
|
||||
++global-ok log-future-event
|
||||
|
||||
|
||||
GENERATE_ARGS = -t main.rkt --submod main \
|
||||
--check-depends $(BUILDDIR)compiled/thread-dep.rktd \
|
||||
++depend-module ../expander/bootstrap-run.rkt \
|
||||
|
|
|
@ -87,7 +87,7 @@
|
|||
[(thread-dead? t)
|
||||
(apply complete 0 results)]
|
||||
[else
|
||||
(expire go timeout?)]))
|
||||
(expire go (if timeout? 0 10))]))
|
||||
go)
|
||||
|
||||
(define (engine-block)
|
||||
|
|
|
@ -8,7 +8,8 @@
|
|||
"evt.rkt"
|
||||
"semaphore.rkt"
|
||||
"parameter.rkt"
|
||||
"sink.rkt")
|
||||
"sink.rkt"
|
||||
"exit.rkt")
|
||||
|
||||
(provide current-custodian
|
||||
make-custodian
|
||||
|
@ -44,7 +45,7 @@
|
|||
poll-custodian-will-executor))
|
||||
|
||||
(module+ for-future
|
||||
(provide set-custodian-futures-sync!))
|
||||
(provide set-custodian-future-callbacks!))
|
||||
|
||||
;; For `(struct custodian ...)`, see "custodian-object.rkt"
|
||||
|
||||
|
@ -228,24 +229,27 @@
|
|||
|
||||
;; Called in atomic mode by the scheduler
|
||||
(define (check-queued-custodian-shutdown)
|
||||
(unless (null? queued-shutdowns)
|
||||
(host:disable-interrupts)
|
||||
(host:mutex-acquire memory-limit-lock)
|
||||
(define queued queued-shutdowns)
|
||||
(set! queued-shutdowns
|
||||
;; Keep only custodians owned by other places
|
||||
(for/list ([c (in-list queued)]
|
||||
#:unless (custodian-this-place? c))
|
||||
(when (eq? (custodian-need-shutdown c) 'needed)
|
||||
;; Make sure custodian's place is polling for shutdowns:
|
||||
(set-custodian-need-shutdown! c 'neeed/sent-wakeup)
|
||||
(place-wakeup (custodian-place c)))
|
||||
c))
|
||||
(host:mutex-release memory-limit-lock)
|
||||
(host:enable-interrupts)
|
||||
(for ([c (in-list queued)]
|
||||
#:when (custodian-this-place? c))
|
||||
(do-custodian-shutdown-all c))))
|
||||
(cond
|
||||
[(null? queued-shutdowns) #f]
|
||||
[else
|
||||
(host:disable-interrupts)
|
||||
(host:mutex-acquire memory-limit-lock)
|
||||
(define queued queued-shutdowns)
|
||||
(set! queued-shutdowns
|
||||
;; Keep only custodians owned by other places
|
||||
(for/list ([c (in-list queued)]
|
||||
#:unless (custodian-this-place? c))
|
||||
(when (eq? (custodian-need-shutdown c) 'needed)
|
||||
;; Make sure custodian's place is polling for shutdowns:
|
||||
(set-custodian-need-shutdown! c 'neeed/sent-wakeup)
|
||||
(place-wakeup (custodian-place c)))
|
||||
c))
|
||||
(host:mutex-release memory-limit-lock)
|
||||
(host:enable-interrupts)
|
||||
(for ([c (in-list queued)]
|
||||
#:when (custodian-this-place? c))
|
||||
(do-custodian-shutdown-all c))
|
||||
#t]))
|
||||
|
||||
(define place-ensure-wakeup! (lambda () #f)) ; call before enabling shutdowns
|
||||
(define place-wakeup-initial void)
|
||||
|
@ -370,9 +374,11 @@
|
|||
;; ----------------------------------------
|
||||
|
||||
(define futures-sync-for-custodian-shutdown (lambda () (void)))
|
||||
(define future-scheduler-add-thread-custodian-mapping! (lambda (s ht) (void)))
|
||||
|
||||
(define (set-custodian-futures-sync! proc)
|
||||
(set! futures-sync-for-custodian-shutdown proc))
|
||||
(define (set-custodian-future-callbacks! sync-shutdown add-custodian-mapping)
|
||||
(set! futures-sync-for-custodian-shutdown sync-shutdown)
|
||||
(set! future-scheduler-add-thread-custodian-mapping! add-custodian-mapping))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
|
@ -390,6 +396,12 @@
|
|||
(unless (zero? compute-memory-sizes)
|
||||
(host:call-with-current-place-continuation
|
||||
(lambda (starting-k)
|
||||
;; A place may have future pthreads, and each ptherad may
|
||||
;; be running a future that becomes to a particular custodian;
|
||||
;; build up a custodian-to-pthtread mapping in this table:
|
||||
(define custodian-future-threads (make-hasheq))
|
||||
(future-scheduler-add-thread-custodian-mapping! (place-future-scheduler initial-place)
|
||||
custodian-future-threads)
|
||||
;; Get roots, which are threads and custodians, for all distinct accounting domains
|
||||
(define-values (roots custs) ; parallel lists: root and custodian to charge for the root
|
||||
(let c-loop ([c initial-place-root-custodian] [pl initial-place] [accum-roots null] [accum-custs null])
|
||||
|
@ -422,6 +434,8 @@
|
|||
[(place? (car roots))
|
||||
(define pl (car roots))
|
||||
(define c (place-custodian pl))
|
||||
(future-scheduler-add-thread-custodian-mapping! (place-future-scheduler pl)
|
||||
custodian-future-threads)
|
||||
(define-values (new-roots new-custs) (c-loop c pl accum-roots accum-custs))
|
||||
(loop (cdr roots) local-accum-roots new-roots new-custs)]
|
||||
[else
|
||||
|
@ -445,9 +459,11 @@
|
|||
(define any-limits?
|
||||
(let c-loop ([c root-custodian])
|
||||
(define gc-roots (custodian-gc-roots c))
|
||||
(define roots (if gc-roots
|
||||
(hash-keys gc-roots)
|
||||
null))
|
||||
(define roots (append
|
||||
(hash-ref custodian-future-threads c null)
|
||||
(if gc-roots
|
||||
(hash-keys gc-roots)
|
||||
null)))
|
||||
(define any-limits?
|
||||
(for/fold ([any-limits? #f]) ([root (in-list roots)]
|
||||
#:when (custodian? root))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#lang racket/base
|
||||
(require "config.rkt"
|
||||
"place-local.rkt"
|
||||
"place-object.rkt"
|
||||
"check.rkt"
|
||||
"internal-error.rkt"
|
||||
"host.rkt"
|
||||
|
@ -352,8 +353,6 @@
|
|||
(define (set-processor-count! n)
|
||||
(set! pthread-count n))
|
||||
|
||||
(define-place-local the-scheduler #f)
|
||||
|
||||
(struct scheduler ([workers #:mutable]
|
||||
[futures-head #:mutable]
|
||||
[futures-tail #:mutable]
|
||||
|
@ -362,47 +361,58 @@
|
|||
#:authentic)
|
||||
|
||||
(struct worker (id
|
||||
[pthread #:mutable]
|
||||
current-future-box ; reports current future (for access external to pthread)
|
||||
[die? #:mutable]
|
||||
sync-state) ; box used to sync shutdowns: 'idle, 'running, or 'pending
|
||||
#:authentic)
|
||||
|
||||
(define current-scheduler
|
||||
(case-lambda
|
||||
[() (place-future-scheduler current-place)]
|
||||
[(s) (set-place-future-scheduler! current-place s)]))
|
||||
|
||||
(define (make-worker id)
|
||||
(worker id
|
||||
#f ; die?
|
||||
#f ; pthread
|
||||
(box #f) ; current-future-box
|
||||
#f ; die?
|
||||
(box 'idle)))
|
||||
|
||||
;; called in a Racket thread
|
||||
(define (maybe-start-scheduler)
|
||||
(atomically
|
||||
(unless the-scheduler
|
||||
(unless (current-scheduler)
|
||||
(ensure-place-wakeup-handle)
|
||||
(set! the-scheduler (scheduler '()
|
||||
#f ; futures-head
|
||||
#f ; futures-tail
|
||||
(host:make-mutex)
|
||||
(host:make-condition)))
|
||||
(define s (scheduler '()
|
||||
#f ; futures-head
|
||||
#f ; futures-tail
|
||||
(host:make-mutex)
|
||||
(host:make-condition)))
|
||||
(current-scheduler s)
|
||||
(define workers
|
||||
(for/list ([id (in-range 1 (add1 pthread-count))])
|
||||
(define w (make-worker id))
|
||||
(start-worker w)
|
||||
w))
|
||||
(set-scheduler-workers! the-scheduler workers))))
|
||||
(set-scheduler-workers! s workers))))
|
||||
|
||||
;; called in atomic mode
|
||||
(define (kill-future-scheduler)
|
||||
(when the-scheduler
|
||||
(define s the-scheduler)
|
||||
(define s (current-scheduler))
|
||||
(when s
|
||||
(host:mutex-acquire (scheduler-mutex s))
|
||||
(for ([w (in-list (scheduler-workers s))])
|
||||
(set-worker-die?! w #t))
|
||||
(host:condition-signal (scheduler-cond s))
|
||||
(host:mutex-release (scheduler-mutex s))
|
||||
(futures-sync-for-shutdown)))
|
||||
(futures-sync-for-shutdown)
|
||||
(current-scheduler #f)))
|
||||
|
||||
;; called in any pthread
|
||||
;; called maybe holding an fsemaphore lock, but nothing else
|
||||
(define (schedule-future! f #:front? [front? #f])
|
||||
(define s the-scheduler)
|
||||
(define s (current-scheduler))
|
||||
(host:mutex-acquire (scheduler-mutex s))
|
||||
(define old (if front?
|
||||
(scheduler-futures-head s)
|
||||
|
@ -424,7 +434,7 @@
|
|||
|
||||
;; called with queue lock held
|
||||
(define (deschedule-future f)
|
||||
(define s the-scheduler)
|
||||
(define s (current-scheduler))
|
||||
(cond
|
||||
[(or (future*-prev f)
|
||||
(future*-next f))
|
||||
|
@ -445,7 +455,7 @@
|
|||
;; called with no locks held; if successful,
|
||||
;; returns with lock held on f
|
||||
(define (try-deschedule-future? f)
|
||||
(define s the-scheduler)
|
||||
(define s (current-scheduler))
|
||||
(host:mutex-acquire (scheduler-mutex s))
|
||||
(define ok?
|
||||
(cond
|
||||
|
@ -484,34 +494,36 @@
|
|||
;; ----------------------------------------
|
||||
|
||||
(define (start-worker w)
|
||||
(define s the-scheduler)
|
||||
(fork-pthread
|
||||
(lambda ()
|
||||
(current-future 'worker)
|
||||
(host:mutex-acquire (scheduler-mutex s))
|
||||
(let loop ()
|
||||
(or (box-cas! (worker-sync-state w) 'idle 'running)
|
||||
(box-cas! (worker-sync-state w) 'pending 'running))
|
||||
(cond
|
||||
[(worker-die? w) ; worker was killed
|
||||
(host:mutex-release (scheduler-mutex s))
|
||||
(box-cas! (worker-sync-state w) 'running 'idle)]
|
||||
[(scheduler-futures-head s)
|
||||
=> (lambda (f)
|
||||
(deschedule-future f)
|
||||
(host:mutex-release (scheduler-mutex s))
|
||||
(lock-acquire (future*-lock f))
|
||||
;; lock is held on f; run the future
|
||||
(maybe-run-future-in-worker f w)
|
||||
;; look for more work
|
||||
(host:mutex-acquire (scheduler-mutex s))
|
||||
(loop))]
|
||||
[else
|
||||
;; wait for work
|
||||
(or (box-cas! (worker-sync-state w) 'pending 'idle)
|
||||
(box-cas! (worker-sync-state w) 'running 'idle))
|
||||
(host:condition-wait (scheduler-cond s) (scheduler-mutex s))
|
||||
(loop)])))))
|
||||
(define s (current-scheduler))
|
||||
(define th
|
||||
(fork-pthread
|
||||
(lambda ()
|
||||
(current-future 'worker)
|
||||
(host:mutex-acquire (scheduler-mutex s))
|
||||
(let loop ()
|
||||
(or (box-cas! (worker-sync-state w) 'idle 'running)
|
||||
(box-cas! (worker-sync-state w) 'pending 'running))
|
||||
(cond
|
||||
[(worker-die? w) ; worker was killed
|
||||
(host:mutex-release (scheduler-mutex s))
|
||||
(box-cas! (worker-sync-state w) 'running 'idle)]
|
||||
[(scheduler-futures-head s)
|
||||
=> (lambda (f)
|
||||
(deschedule-future f)
|
||||
(host:mutex-release (scheduler-mutex s))
|
||||
(lock-acquire (future*-lock f))
|
||||
;; lock is held on f; run the future
|
||||
(maybe-run-future-in-worker f w)
|
||||
;; look for more work
|
||||
(host:mutex-acquire (scheduler-mutex s))
|
||||
(loop))]
|
||||
[else
|
||||
;; wait for work
|
||||
(or (box-cas! (worker-sync-state w) 'pending 'idle)
|
||||
(box-cas! (worker-sync-state w) 'running 'idle))
|
||||
(host:condition-wait (scheduler-cond s) (scheduler-mutex s))
|
||||
(loop)])))))
|
||||
(set-worker-pthread! w th))
|
||||
|
||||
;; called with lock on f
|
||||
(define (maybe-run-future-in-worker f w)
|
||||
|
@ -528,6 +540,7 @@
|
|||
|
||||
(define (run-future-in-worker f w)
|
||||
(current-future f)
|
||||
(set-box! (worker-current-future-box w) f)
|
||||
;; If we didn't need to check custodians, could be just
|
||||
;; (call-with-continuation-prompt
|
||||
;; (lambda () (run-future f))
|
||||
|
@ -558,7 +571,8 @@
|
|||
(lambda (e timeout?)
|
||||
(loop e))))
|
||||
(log-future 'end-work (future*-id f))
|
||||
(current-future 'worker))
|
||||
(current-future 'worker)
|
||||
(set-box! (worker-current-future-box w) #f))
|
||||
|
||||
;; in atomic mode
|
||||
(define (futures-sync-for-shutdown)
|
||||
|
@ -567,12 +581,12 @@
|
|||
;; future-scheduler shutdown.
|
||||
;;
|
||||
;; Move each 'running worker into the 'pending state:
|
||||
(for ([w (in-list (scheduler-workers the-scheduler))])
|
||||
(for ([w (in-list (scheduler-workers (current-scheduler)))])
|
||||
(box-cas! (worker-sync-state w) 'running 'pending))
|
||||
;; A worker that transitions from 'pending to 'running or 'idle
|
||||
;; is guaranteed to not run a future chose custodian is
|
||||
;; shutdown or run any future if the worker is terminated
|
||||
(for ([w (in-list (scheduler-workers the-scheduler))])
|
||||
(for ([w (in-list (scheduler-workers (current-scheduler)))])
|
||||
(define bx (worker-sync-state w))
|
||||
(let loop ()
|
||||
(when (box-cas! bx 'pending 'pending)
|
||||
|
@ -581,6 +595,19 @@
|
|||
|
||||
;; ----------------------------------------
|
||||
|
||||
;; called in a GCing pthread with all other pthreads stopped
|
||||
(define (scheduler-add-thread-custodian-mapping! s ht)
|
||||
(when s
|
||||
(for ([w (in-list (scheduler-workers s))])
|
||||
(define f (unbox (worker-current-future-box w)))
|
||||
(when f
|
||||
(define c (future*-custodian f))
|
||||
(when c
|
||||
(hash-set! ht c (cons (worker-pthread w)
|
||||
(hash-ref ht c null))))))))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
(define (reset-future-logs-for-tracing!)
|
||||
(void))
|
||||
|
||||
|
@ -607,4 +634,6 @@
|
|||
;; tell "atomic.rkt" layer how to block:
|
||||
(void (set-future-block! future-block))
|
||||
|
||||
(void (set-custodian-futures-sync! futures-sync-for-shutdown))
|
||||
;; tell "custodian.rkt" how to sync and map pthreads to custodians:
|
||||
(void (set-custodian-future-callbacks! futures-sync-for-shutdown
|
||||
scheduler-add-thread-custodian-mapping!))
|
||||
|
|
|
@ -26,7 +26,8 @@
|
|||
[pending-break #:mutable] ; #f, 'break, 'hang-up, or 'terminate
|
||||
done-waiting ; hash table of places to ping when this one ends
|
||||
[wakeup-handle #:mutable]
|
||||
[dequeue-semas #:mutable]); semaphores reflecting place-channel waits to recheck
|
||||
[dequeue-semas #:mutable] ; semaphores reflecting place-channel waits to recheck
|
||||
[future-scheduler #:mutable]) ; #f or a scheduler of futures
|
||||
#:property prop:evt (struct-field-index pch)
|
||||
#:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self)))))
|
||||
|
||||
|
@ -49,7 +50,8 @@
|
|||
#f ; pending-break
|
||||
(make-hasheq) ; done-waiting
|
||||
#f ; wakeup-handle
|
||||
'())) ; dequeue-semas
|
||||
'() ; dequeue-semas
|
||||
#f)) ; future scheduler
|
||||
|
||||
(define initial-place (make-place (host:make-mutex)
|
||||
root-custodian))
|
||||
|
|
|
@ -67,7 +67,9 @@
|
|||
(check-external-events))
|
||||
(call-pre-poll-external-callbacks)
|
||||
(check-place-activity)
|
||||
(check-queued-custodian-shutdown)
|
||||
(when (check-queued-custodian-shutdown)
|
||||
(when (thread-dead? root-thread)
|
||||
(force-exit 0)))
|
||||
(flush-future-log)
|
||||
(cond
|
||||
[(and (null? callbacks)
|
||||
|
|
|
@ -71,6 +71,7 @@
|
|||
do-make-thread
|
||||
root-thread
|
||||
thread-running?
|
||||
thread-dead?
|
||||
thread-dead!
|
||||
thread-did-work!
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user