cs: fix memory accounting and futures

Also avoid problems with interrupting a rapidly allocating
computation and shutting down the main custodian.
This commit is contained in:
Matthew Flatt 2019-06-24 13:59:03 -06:00
parent a5a84ea8c6
commit 564dcf339a
8 changed files with 136 additions and 79 deletions

View File

@ -74,7 +74,12 @@
(run-collect-callbacks cdr)
(when (and reachable-size-increments-callback
(fx= gen (collect-maximum-generation)))
(reachable-size-increments-callback compute-size-increments)))))
(reachable-size-increments-callback compute-size-increments))
(when (and (= gen (collect-maximum-generation))
(current-engine-state))
;; This `set-timer` doesn't necessarily penalize the right thread,
;; but it's likely to penalize a thread that is allocating quickly:
(set-timer 1)))))
(define collect-garbage
(case-lambda

View File

@ -37,9 +37,11 @@ GLOBALS = --no-global \
++global-ok wakeup-this-place \
++global-ok ensure-place-wakeup-handle \
++global-ok futures-sync-for-custodian-shutdown \
++global-ok logging-future-events? \
++global-ok 'future-scheduler-add-thread-custodian-mapping!' \
++global-ok 'logging-future-events?' \
++global-ok log-future-event
GENERATE_ARGS = -t main.rkt --submod main \
--check-depends $(BUILDDIR)compiled/thread-dep.rktd \
++depend-module ../expander/bootstrap-run.rkt \

View File

@ -87,7 +87,7 @@
[(thread-dead? t)
(apply complete 0 results)]
[else
(expire go timeout?)]))
(expire go (if timeout? 0 10))]))
go)
(define (engine-block)

View File

@ -8,7 +8,8 @@
"evt.rkt"
"semaphore.rkt"
"parameter.rkt"
"sink.rkt")
"sink.rkt"
"exit.rkt")
(provide current-custodian
make-custodian
@ -44,7 +45,7 @@
poll-custodian-will-executor))
(module+ for-future
(provide set-custodian-futures-sync!))
(provide set-custodian-future-callbacks!))
;; For `(struct custodian ...)`, see "custodian-object.rkt"
@ -228,24 +229,27 @@
;; Called in atomic mode by the scheduler
(define (check-queued-custodian-shutdown)
(unless (null? queued-shutdowns)
(host:disable-interrupts)
(host:mutex-acquire memory-limit-lock)
(define queued queued-shutdowns)
(set! queued-shutdowns
;; Keep only custodians owned by other places
(for/list ([c (in-list queued)]
#:unless (custodian-this-place? c))
(when (eq? (custodian-need-shutdown c) 'needed)
;; Make sure custodian's place is polling for shutdowns:
(set-custodian-need-shutdown! c 'neeed/sent-wakeup)
(place-wakeup (custodian-place c)))
c))
(host:mutex-release memory-limit-lock)
(host:enable-interrupts)
(for ([c (in-list queued)]
#:when (custodian-this-place? c))
(do-custodian-shutdown-all c))))
(cond
[(null? queued-shutdowns) #f]
[else
(host:disable-interrupts)
(host:mutex-acquire memory-limit-lock)
(define queued queued-shutdowns)
(set! queued-shutdowns
;; Keep only custodians owned by other places
(for/list ([c (in-list queued)]
#:unless (custodian-this-place? c))
(when (eq? (custodian-need-shutdown c) 'needed)
;; Make sure custodian's place is polling for shutdowns:
(set-custodian-need-shutdown! c 'neeed/sent-wakeup)
(place-wakeup (custodian-place c)))
c))
(host:mutex-release memory-limit-lock)
(host:enable-interrupts)
(for ([c (in-list queued)]
#:when (custodian-this-place? c))
(do-custodian-shutdown-all c))
#t]))
(define place-ensure-wakeup! (lambda () #f)) ; call before enabling shutdowns
(define place-wakeup-initial void)
@ -370,9 +374,11 @@
;; ----------------------------------------
(define futures-sync-for-custodian-shutdown (lambda () (void)))
(define future-scheduler-add-thread-custodian-mapping! (lambda (s ht) (void)))
(define (set-custodian-futures-sync! proc)
(set! futures-sync-for-custodian-shutdown proc))
(define (set-custodian-future-callbacks! sync-shutdown add-custodian-mapping)
(set! futures-sync-for-custodian-shutdown sync-shutdown)
(set! future-scheduler-add-thread-custodian-mapping! add-custodian-mapping))
;; ----------------------------------------
@ -390,6 +396,12 @@
(unless (zero? compute-memory-sizes)
(host:call-with-current-place-continuation
(lambda (starting-k)
;; A place may have future pthreads, and each ptherad may
;; be running a future that becomes to a particular custodian;
;; build up a custodian-to-pthtread mapping in this table:
(define custodian-future-threads (make-hasheq))
(future-scheduler-add-thread-custodian-mapping! (place-future-scheduler initial-place)
custodian-future-threads)
;; Get roots, which are threads and custodians, for all distinct accounting domains
(define-values (roots custs) ; parallel lists: root and custodian to charge for the root
(let c-loop ([c initial-place-root-custodian] [pl initial-place] [accum-roots null] [accum-custs null])
@ -422,6 +434,8 @@
[(place? (car roots))
(define pl (car roots))
(define c (place-custodian pl))
(future-scheduler-add-thread-custodian-mapping! (place-future-scheduler pl)
custodian-future-threads)
(define-values (new-roots new-custs) (c-loop c pl accum-roots accum-custs))
(loop (cdr roots) local-accum-roots new-roots new-custs)]
[else
@ -445,9 +459,11 @@
(define any-limits?
(let c-loop ([c root-custodian])
(define gc-roots (custodian-gc-roots c))
(define roots (if gc-roots
(hash-keys gc-roots)
null))
(define roots (append
(hash-ref custodian-future-threads c null)
(if gc-roots
(hash-keys gc-roots)
null)))
(define any-limits?
(for/fold ([any-limits? #f]) ([root (in-list roots)]
#:when (custodian? root))

View File

@ -1,6 +1,7 @@
#lang racket/base
(require "config.rkt"
"place-local.rkt"
"place-object.rkt"
"check.rkt"
"internal-error.rkt"
"host.rkt"
@ -352,8 +353,6 @@
(define (set-processor-count! n)
(set! pthread-count n))
(define-place-local the-scheduler #f)
(struct scheduler ([workers #:mutable]
[futures-head #:mutable]
[futures-tail #:mutable]
@ -362,47 +361,58 @@
#:authentic)
(struct worker (id
[pthread #:mutable]
current-future-box ; reports current future (for access external to pthread)
[die? #:mutable]
sync-state) ; box used to sync shutdowns: 'idle, 'running, or 'pending
#:authentic)
(define current-scheduler
(case-lambda
[() (place-future-scheduler current-place)]
[(s) (set-place-future-scheduler! current-place s)]))
(define (make-worker id)
(worker id
#f ; die?
#f ; pthread
(box #f) ; current-future-box
#f ; die?
(box 'idle)))
;; called in a Racket thread
(define (maybe-start-scheduler)
(atomically
(unless the-scheduler
(unless (current-scheduler)
(ensure-place-wakeup-handle)
(set! the-scheduler (scheduler '()
#f ; futures-head
#f ; futures-tail
(host:make-mutex)
(host:make-condition)))
(define s (scheduler '()
#f ; futures-head
#f ; futures-tail
(host:make-mutex)
(host:make-condition)))
(current-scheduler s)
(define workers
(for/list ([id (in-range 1 (add1 pthread-count))])
(define w (make-worker id))
(start-worker w)
w))
(set-scheduler-workers! the-scheduler workers))))
(set-scheduler-workers! s workers))))
;; called in atomic mode
(define (kill-future-scheduler)
(when the-scheduler
(define s the-scheduler)
(define s (current-scheduler))
(when s
(host:mutex-acquire (scheduler-mutex s))
(for ([w (in-list (scheduler-workers s))])
(set-worker-die?! w #t))
(host:condition-signal (scheduler-cond s))
(host:mutex-release (scheduler-mutex s))
(futures-sync-for-shutdown)))
(futures-sync-for-shutdown)
(current-scheduler #f)))
;; called in any pthread
;; called maybe holding an fsemaphore lock, but nothing else
(define (schedule-future! f #:front? [front? #f])
(define s the-scheduler)
(define s (current-scheduler))
(host:mutex-acquire (scheduler-mutex s))
(define old (if front?
(scheduler-futures-head s)
@ -424,7 +434,7 @@
;; called with queue lock held
(define (deschedule-future f)
(define s the-scheduler)
(define s (current-scheduler))
(cond
[(or (future*-prev f)
(future*-next f))
@ -445,7 +455,7 @@
;; called with no locks held; if successful,
;; returns with lock held on f
(define (try-deschedule-future? f)
(define s the-scheduler)
(define s (current-scheduler))
(host:mutex-acquire (scheduler-mutex s))
(define ok?
(cond
@ -484,34 +494,36 @@
;; ----------------------------------------
(define (start-worker w)
(define s the-scheduler)
(fork-pthread
(lambda ()
(current-future 'worker)
(host:mutex-acquire (scheduler-mutex s))
(let loop ()
(or (box-cas! (worker-sync-state w) 'idle 'running)
(box-cas! (worker-sync-state w) 'pending 'running))
(cond
[(worker-die? w) ; worker was killed
(host:mutex-release (scheduler-mutex s))
(box-cas! (worker-sync-state w) 'running 'idle)]
[(scheduler-futures-head s)
=> (lambda (f)
(deschedule-future f)
(host:mutex-release (scheduler-mutex s))
(lock-acquire (future*-lock f))
;; lock is held on f; run the future
(maybe-run-future-in-worker f w)
;; look for more work
(host:mutex-acquire (scheduler-mutex s))
(loop))]
[else
;; wait for work
(or (box-cas! (worker-sync-state w) 'pending 'idle)
(box-cas! (worker-sync-state w) 'running 'idle))
(host:condition-wait (scheduler-cond s) (scheduler-mutex s))
(loop)])))))
(define s (current-scheduler))
(define th
(fork-pthread
(lambda ()
(current-future 'worker)
(host:mutex-acquire (scheduler-mutex s))
(let loop ()
(or (box-cas! (worker-sync-state w) 'idle 'running)
(box-cas! (worker-sync-state w) 'pending 'running))
(cond
[(worker-die? w) ; worker was killed
(host:mutex-release (scheduler-mutex s))
(box-cas! (worker-sync-state w) 'running 'idle)]
[(scheduler-futures-head s)
=> (lambda (f)
(deschedule-future f)
(host:mutex-release (scheduler-mutex s))
(lock-acquire (future*-lock f))
;; lock is held on f; run the future
(maybe-run-future-in-worker f w)
;; look for more work
(host:mutex-acquire (scheduler-mutex s))
(loop))]
[else
;; wait for work
(or (box-cas! (worker-sync-state w) 'pending 'idle)
(box-cas! (worker-sync-state w) 'running 'idle))
(host:condition-wait (scheduler-cond s) (scheduler-mutex s))
(loop)])))))
(set-worker-pthread! w th))
;; called with lock on f
(define (maybe-run-future-in-worker f w)
@ -528,6 +540,7 @@
(define (run-future-in-worker f w)
(current-future f)
(set-box! (worker-current-future-box w) f)
;; If we didn't need to check custodians, could be just
;; (call-with-continuation-prompt
;; (lambda () (run-future f))
@ -558,7 +571,8 @@
(lambda (e timeout?)
(loop e))))
(log-future 'end-work (future*-id f))
(current-future 'worker))
(current-future 'worker)
(set-box! (worker-current-future-box w) #f))
;; in atomic mode
(define (futures-sync-for-shutdown)
@ -567,12 +581,12 @@
;; future-scheduler shutdown.
;;
;; Move each 'running worker into the 'pending state:
(for ([w (in-list (scheduler-workers the-scheduler))])
(for ([w (in-list (scheduler-workers (current-scheduler)))])
(box-cas! (worker-sync-state w) 'running 'pending))
;; A worker that transitions from 'pending to 'running or 'idle
;; is guaranteed to not run a future chose custodian is
;; shutdown or run any future if the worker is terminated
(for ([w (in-list (scheduler-workers the-scheduler))])
(for ([w (in-list (scheduler-workers (current-scheduler)))])
(define bx (worker-sync-state w))
(let loop ()
(when (box-cas! bx 'pending 'pending)
@ -581,6 +595,19 @@
;; ----------------------------------------
;; called in a GCing pthread with all other pthreads stopped
(define (scheduler-add-thread-custodian-mapping! s ht)
(when s
(for ([w (in-list (scheduler-workers s))])
(define f (unbox (worker-current-future-box w)))
(when f
(define c (future*-custodian f))
(when c
(hash-set! ht c (cons (worker-pthread w)
(hash-ref ht c null))))))))
;; ----------------------------------------
(define (reset-future-logs-for-tracing!)
(void))
@ -607,4 +634,6 @@
;; tell "atomic.rkt" layer how to block:
(void (set-future-block! future-block))
(void (set-custodian-futures-sync! futures-sync-for-shutdown))
;; tell "custodian.rkt" how to sync and map pthreads to custodians:
(void (set-custodian-future-callbacks! futures-sync-for-shutdown
scheduler-add-thread-custodian-mapping!))

View File

@ -26,7 +26,8 @@
[pending-break #:mutable] ; #f, 'break, 'hang-up, or 'terminate
done-waiting ; hash table of places to ping when this one ends
[wakeup-handle #:mutable]
[dequeue-semas #:mutable]); semaphores reflecting place-channel waits to recheck
[dequeue-semas #:mutable] ; semaphores reflecting place-channel waits to recheck
[future-scheduler #:mutable]) ; #f or a scheduler of futures
#:property prop:evt (struct-field-index pch)
#:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self)))))
@ -49,7 +50,8 @@
#f ; pending-break
(make-hasheq) ; done-waiting
#f ; wakeup-handle
'())) ; dequeue-semas
'() ; dequeue-semas
#f)) ; future scheduler
(define initial-place (make-place (host:make-mutex)
root-custodian))

View File

@ -67,7 +67,9 @@
(check-external-events))
(call-pre-poll-external-callbacks)
(check-place-activity)
(check-queued-custodian-shutdown)
(when (check-queued-custodian-shutdown)
(when (thread-dead? root-thread)
(force-exit 0)))
(flush-future-log)
(cond
[(and (null? callbacks)

View File

@ -71,6 +71,7 @@
do-make-thread
root-thread
thread-running?
thread-dead?
thread-dead!
thread-did-work!