cs: implement custodian memory limits

This commit is contained in:
Matthew Flatt 2019-01-03 20:24:27 -07:00
parent bcc9b2264e
commit e99de2bf79
18 changed files with 513 additions and 138 deletions

View File

@ -403,15 +403,22 @@ As a result of these layers, there are multiple ways to implement
atomic regions:
* For critical sections with respect to Chez Scheme / OS threads, use
a mutex.
a mutex or a spinlock.
For example, the implementation of `eq?` and `eqv?`-based hash
tables uses mutex to guard hash tables, so they can be accessed
concurrently from futures. In contrast, `equal?`-based hash table
operations are not atomic from the Racket perspective, so they
can't be locked by a mutex; they use Racket-thread locks, instead.
The "rumble/lock.ss" layer skips the `eq?`/`eqv?`-table mutex when
threads are not enabled at the Chez Scheme level.
tables uses a spinlock to guard hash tables, so they can be
accessed concurrently from futures. In contrast, `equal?`-based
hash table operations are not atomic from the Racket perspective,
so they can't be locked by a mutex or spinlock; they use
Racket-thread locks, instead. The "rumble/lock.ss" layer skips the
`eq?`/`eqv?`-table spinlock when threads are not enabled at the
Chez Scheme level.
Chez Scheme deactivates a thread that is blocked on a mutex, so you
don't have to worry about waiting on a mutex blocking GCs. However,
if a lock guards a value that is also used by a GC callback, then
interrupts should be disabled before taking the lock to avoid
deadlock.
* For critical sections at the Racket level, there are multiple
possibilities:

View File

@ -8,6 +8,7 @@
(rename (except (chezscheme)
remq remove
sort
void
force delay identifier?
output-port-buffer-mode
peek-char char-ready?
@ -39,7 +40,6 @@
map for-each andmap ormap
char-general-category)
[make-parameter chez:make-parameter]
[void chez:void]
[date-second chez:date-second]
[date-minute chez:date-minute]
[date-hour chez:date-hour]

View File

@ -33,6 +33,7 @@
(fxvector-set! x 0 (op 0))))
(eval '(f (fxvector 0))))))
(check-defined 'vfasl-convert-file)
(check-defined 'compute-size-increments)
;; ----------------------------------------

View File

@ -460,7 +460,9 @@
phantom-bytes?
make-phantom-bytes
set-phantom-bytes!
set-garbage-collect-notify! ; not exported to Racket
set-garbage-collect-notify! ; not exported to Racket
set-reachable-size-increments-callback! ; not exported to Racket
set-custodian-memory-use-proc! ; not exported to Racket
unsafe-add-collect-callbacks
unsafe-remove-collect-callbacks
@ -587,6 +589,7 @@
place-local-register-set! ; not exported to Racket
place-local-register-init! ; not exported to Racket
place-exit ; not exported to Racket
current-place-roots ; not exported to Racket
_bool _bytes _short_bytes _double _double* _fixint _fixnum _float _fpointer _gcpointer
_int16 _int32 _int64 _int8 _longdouble _pointer _scheme _stdbool _void
@ -663,6 +666,7 @@
fork-pthread
pthread?
get-thread-id
get-initial-pthread
make-condition
condition-wait
condition-signal

View File

@ -1,5 +1,5 @@
(define null '())
(define eof #!eof)
(define (void . args) (chez:void))
(define (void? v) (eq? v (chez:void)))
(define (void . args) (#%void))
(define (void? v) (eq? v (#%void)))

View File

@ -19,6 +19,15 @@
post-allocated post-allocated+overhead post-time post-cpu-time)
(void)))
;; #f or a procedure that accepts `compute-size-increments` to be
;; called in any Chez Scheme thread (with all other threads paused)
;; after each major GC; this procedure must not do anything that might
;; use "control.ss":
(define reachable-size-increments-callback #f)
(define (set-reachable-size-increments-callback! proc)
(set! reachable-size-increments-callback proc))
;; Replicate the counting that `(collect)` would do
;; so that we can report a generation to the notification
;; callback
@ -61,7 +70,10 @@
pre-allocated pre-allocated+overhead pre-time pre-cpu-time
post-allocated (current-memory-bytes) (real-time) (cpu-time)))
(poll-foreign-guardian)
(run-collect-callbacks cdr))))
(run-collect-callbacks cdr)
(when (and reachable-size-increments-callback
(fx= gen (collect-maximum-generation)))
(reachable-size-increments-callback compute-size-increments)))))
(define collect-garbage
(case-lambda
@ -91,9 +103,11 @@
(cond
[(not mode) (bytes-allocated)]
[(eq? mode 'cumulative) (sstats-bytes (statistics))]
[else
;; must be a custodian...
(bytes-allocated)])]))
;; must be a custodian; hook is reposnsible for complaining if not
[else (custodian-memory-use mode (bytes-allocated))])]))
(define custodian-memory-use (lambda (mode all) all))
(define (set-custodian-memory-use-proc! proc) (set! custodian-memory-use proc))
(define prev-stats-objects #f)

View File

@ -35,6 +35,9 @@
(define (place-local-register-set! i v)
(#%vector-set! (place-registers) i v))
(define (place-local-register-cas! i old-v new-v)
(#%vector-cas! (place-registers) i old-v new-v))
(define (place-local-register-init! i v)
(place-local-register-set! i v)
(#%vector-set! place-register-inits i v))
@ -47,10 +50,13 @@
;; ----------------------------------------
(define place-specific-table (make-hasheq))
(define place-specific-table (unsafe-make-place-local #f))
(define (unsafe-get-place-table)
place-specific-table)
(or (unsafe-place-local-ref place-specific-table)
(begin
(place-local-register-cas! place-specific-table #f (make-hasheq))
(unsafe-get-place-table))))
;; ----------------------------------------
@ -71,10 +77,15 @@
(set-box! place-esc-box esc)
(thunk)
0))])
(finish-proc result)))))]
(finish-proc result)))))
;; Must be called within an engine, used for memory accounting:
(define (current-place-roots)
(list (place-registers)
(current-engine-thread-cell-values)))]
[else
(define (place-enabled?) #f)
(define (fork-place thunk finish-proc) #f)])
(define (fork-place thunk finish-proc) #f)
(define (current-place-roots) '())])
(define do-start-place void)
(define (set-start-place! proc)

View File

@ -12,6 +12,8 @@
(let ([initial-thread-id (get-thread-id)])
(lambda ()
(eqv? (get-thread-id) initial-thread-id))))
(define (get-initial-pthread)
(get-initial-thread))
;; make-condition
;; condition-wait
;; condition-signal
@ -23,6 +25,7 @@
[else
(define make-pthread-parameter #%make-parameter)
(define (fork-pthread) (void))
(define (get-initial-pthread) #f)
(define (pthread?) #f)
(define (in-original-host-thread?) #t)
(define (make-condition) (void))

View File

@ -27,9 +27,13 @@
[fork-pthread rumble:fork-thread]
[threaded? rumble:threaded?]
[get-thread-id rumble:get-thread-id]
[get-initial-pthread rumble:get-initial-pthread]
[current-place-roots rumble:current-place-roots]
[set-ctl-c-handler! rumble:set-ctl-c-handler!]
[unsafe-root-continuation-prompt-tag rumble:unsafe-root-continuation-prompt-tag]
[set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!]))
[set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!]
[set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback!]
[set-custodian-memory-use-proc! rumble:set-custodian-memory-use-proc!]))
(include "place-register.ss")
(define-place-register-define place:define thread-register-start thread-register-count)
@ -115,6 +119,8 @@
'break-enabled-key break-enabled-key
'set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!
'continuation-marks rumble:continuation-marks
'set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback!
'set-custodian-memory-use-proc! rumble:set-custodian-memory-use-proc!
'exn:break/non-engine exn:break
'exn:break:hang-up/non-engine exn:break:hang-up
'exn:break:terminate/non-engine exn:break:terminate
@ -128,6 +134,9 @@
'fork-place rumble:fork-place
'start-place rumble:start-place
'fork-pthread rumble:fork-thread
'get-initial-place rumble:get-initial-pthread
'current-place-roots rumble:current-place-roots
'call-with-current-pthread-continuation call/cc
'exit place-exit
'pthread? rumble:thread?
'get-thread-id rumble:get-thread-id

View File

@ -10,6 +10,7 @@
start-atomic
end-atomic
atomically/no-interrupts
start-atomic/no-interrupts
end-atomic/no-interrupts
@ -32,6 +33,13 @@
(let () expr ...)
(end-atomic))))
(define-syntax-rule (atomically/no-interrupts expr ...)
(begin
(start-atomic/no-interrupts)
(begin0
(let () expr ...)
(end-atomic/no-interrupts))))
(define (start-atomic)
(current-atomic (add1 (current-atomic))))

View File

@ -236,6 +236,8 @@
'will-executor? will-executor/notify?
'will-register will-register/notify
'will-try-execute will-try-execute/notify
'set-reachable-size-increments-callback! (lambda (proc) (void))
'set-custodian-memory-use-proc! (lambda (proc) (void))
'exn:break/non-engine exn:break/non-engine
'exn:break:hang-up/non-engine exn:break:hang-up/non-engine
'exn:break:terminate/non-engine exn:break:terminate/non-engine
@ -252,8 +254,10 @@
(error "fork-pthread: not ready"))
'pthread? (lambda args
(error "thread?: not ready"))
'get-thread-id (lambda args
(error "get-pthread-id: not ready"))
'get-thread-id (lambda () 0)
'current-place-roots (lambda () '())
'get-initial-place (lambda () #f)
'call-with-current-place-continuation call/cc
'make-condition (lambda () (make-semaphore))
'condition-wait (lambda (c s)
(semaphore-post s)

View File

@ -0,0 +1,33 @@
#lang racket/base
(require "place-local.rkt")
(provide (struct-out custodian)
create-custodian
initial-place-root-custodian
root-custodian)
(struct custodian (children ; weakly maps maps object to callback
[shut-down? #:mutable]
[shutdown-sema #:mutable]
[need-shutdown #:mutable] ; queued asynchronous shutdown: #f, 'needed, or 'needed/sent-wakeup
[parent-reference #:mutable]
[place #:mutable] ; place containing the custodian
[memory-use #:mutable] ; set after a major GC
[gc-roots #:mutable] ; weak references to charge to custodian; access without interrupts
[memory-limits #:mutable]) ; list of (cons limit cust)
#:authentic)
(define (create-custodian)
(custodian (make-weak-hasheq)
#f ; shut-down?
#f ; shutdown semaphore
#f ; need shutdown?
#f ; parent reference
#f ; place
0 ; memory use
#f ; GC roots
null)) ; memory limits
(define initial-place-root-custodian (create-custodian))
(define-place-local root-custodian initial-place-root-custodian)

View File

@ -1,10 +1,12 @@
#lang racket/base
(require "place-local.rkt"
(require "custodian-object.rkt"
"place-object.rkt"
"check.rkt"
"atomic.rkt"
"host.rkt"
"evt.rkt"
"semaphore.rkt")
"semaphore.rkt"
"parameter.rkt")
(provide current-custodian
make-custodian
@ -25,19 +27,19 @@
unsafe-make-custodian-at-root
unsafe-custodian-register
unsafe-custodian-unregister
custodian-register-thread
custodian-register-place
raise-custodian-is-shut-down
set-post-shutdown-action!)
set-post-shutdown-action!
check-queued-custodian-shutdown
set-place-custodian-procs!)
(module+ scheduling
(provide do-custodian-shutdown-all
set-root-custodian!
create-custodian))
(struct custodian (children ; weakly maps maps object to callback
[shut-down? #:mutable]
[shutdown-sema #:mutable]
[parent-reference #:mutable])
#:authentic)
;; For `(struct custodian ...)`, see "custodian-object.rkt"
(struct custodian-box ([v #:mutable] sema)
#:authentic
@ -58,14 +60,6 @@
(struct custodian-reference (c)
#:authentic)
(define (create-custodian)
(custodian (make-weak-hasheq)
#f ; shut-down?
#f ; shutdown semaphore
#f))
(define-place-local root-custodian (create-custodian))
(define/who current-custodian
(make-parameter root-custodian
(lambda (v)
@ -80,7 +74,8 @@
(define/who (make-custodian [parent (current-custodian)])
(check who custodian? parent)
(define c (create-custodian))
(define cref (unsafe-custodian-register parent c do-custodian-shutdown-all #f #t))
(set-custodian-place! c (custodian-place parent))
(define cref (do-custodian-register parent c do-custodian-shutdown-all #f #t #t))
(set-custodian-parent-reference! c cref)
(unless cref (raise-custodian-is-shut-down who parent))
c)
@ -93,7 +88,7 @@
;; finalizer, so don't supply an `obj` that is exposed to safe code
;; that might see `obj` after finalization through a weak reference
;; (and detect that `obj` is thereafter retained strongly).
(define (unsafe-custodian-register cust obj callback at-exit? weak?)
(define (do-custodian-register cust obj callback at-exit? weak? gc-root?)
(atomically
(cond
[(custodian-shut-down? cust) #f]
@ -111,14 +106,34 @@
;; effect of turning a weak reference into a strong one when
;; there are no other references:
(host:will-register we obj void))
(when gc-root?
(host:disable-interrupts)
(unless (custodian-gc-roots cust)
(set-custodian-gc-roots! cust (make-weak-hasheq)))
(hash-set! (custodian-gc-roots cust) obj #t)
(host:enable-interrupts))
(custodian-reference cust)])))
(define (unsafe-custodian-register cust obj callback at-exit? weak?)
(do-custodian-register cust obj callback at-exit? weak? #f))
(define (custodian-register-thread cust obj callback)
(do-custodian-register cust obj callback #f #t #t))
(define (custodian-register-place cust obj callback)
(do-custodian-register cust obj callback #f #t #t))
(define (unsafe-custodian-unregister obj cref)
(when cref
(atomically
(define c (custodian-reference-c cref))
(unless (custodian-shut-down? c)
(hash-remove! (custodian-children c) obj)))))
(hash-remove! (custodian-children c) obj))
(host:disable-interrupts)
(define gc-roots (custodian-gc-roots c))
(when gc-roots
(hash-remove! gc-roots obj))
(host:enable-interrupts))))
;; Hook for thread scheduling:
(define post-shutdown-action void)
@ -133,6 +148,57 @@
;; should be swapped out
(post-shutdown-action))
;; Custodians across all places that have a queued shutdown. Hold the
;; memory-limit lock and also disable interrupts (or OK as a GC
;; callback) while modifying this list:
(define queued-shutdowns null)
;; In atomic mode, in an arbitrary host thread but with other threads
;; suspended:
(define (queue-custodian-shutdown! c)
(unless (custodian-need-shutdown c)
(set-custodian-need-shutdown! c 'needed)
(set! queued-shutdowns (cons c queued-shutdowns))
;; We can't send a signal to wake up an arbitrary place, because
;; the lock on the place is not always taken with interrupts
;; disabled. But we don't need a lock to send a wakeup to the
;; initial place, because it's wakeup handle never goeas away.
;; When the initial place scans for queued shutdowns, it sends
;; wakes up to other places as needed.
(place-wakeup-initial)))
;; Called in atomic mode by the scheduler
(define (check-queued-custodian-shutdown)
(unless (null? queued-shutdowns)
(host:disable-interrupts)
(host:mutex-acquire memory-limit-lock)
(define queued queued-shutdowns)
(set! queued-shutdowns
;; Keep only custodians owned by other places
(for/list ([c (in-list queued)]
#:unless (custodian-this-place? c))
(when (eq? (custodian-need-shutdown c) 'needed)
;; Make sure custodian's place is polling for shutdowns:
(set-custodian-need-shutdown! c 'neeed/sent-wakeup)
(place-wakeup (custodian-place c)))
c))
(host:mutex-release memory-limit-lock)
(host:enable-interrupts)
(for ([c (in-list queued)]
#:when (custodian-this-place? c))
(do-custodian-shutdown-all c))))
(define place-ensure-wakeup! (lambda () #f)) ; call before enabling shutdowns
(define place-wakeup-initial void)
(define place-wakeup void)
(define (set-place-custodian-procs! ensure-wakeup! wakeup-initial wakeup)
(set! place-ensure-wakeup! ensure-wakeup!)
(set! place-wakeup-initial wakeup-initial)
(set! place-wakeup wakeup))
(define (custodian-this-place? c)
(eq? (custodian-place c) current-place))
;; In atomic mode
(define (do-custodian-shutdown-all c)
(unless (custodian-shut-down? c)
@ -181,7 +247,7 @@
(hash-keys (custodian-children c)))
(define (custodian-memory-accounting-available?)
#f)
#t)
(define/who (custodian-require-memory limit-cust need-amt stop-cust)
(check who custodian? limit-cust)
@ -195,9 +261,14 @@
(check who custodian? limit-cust)
(check who exact-nonnegative-integer? need-amt)
(check who custodian? stop-cust)
(raise (exn:fail:unsupported
"custodian-limit-memory: unsupported"
(current-continuation-marks))))
(place-ensure-wakeup!)
(atomically/no-interrupts
(set-custodian-memory-limits! limit-cust
(cons (cons need-amt stop-cust)
(custodian-memory-limits limit-cust)))
(host:mutex-acquire memory-limit-lock)
(set! compute-memory-sizes (max compute-memory-sizes 1))
(host:mutex-release memory-limit-lock)))
;; ----------------------------------------
@ -217,3 +288,126 @@
(define (raise-custodian-is-shut-down who c)
(raise-arguments-error who "the custodian has been shut down"
"custodian" c))
;; ----------------------------------------
;; Disable interrupts before taking this lock, since it
;; guards values that are manipulated by a GC callback
(define memory-limit-lock (host:make-mutex))
;; If non-zero, the custodian memory sizes are gathered after a GC.
;; The value decays
(define compute-memory-sizes 0)
(void (set-reachable-size-increments-callback!
;; Called in an arbitary host thread, with interrupts off and all other threads suspended:
(lambda (compute-size-increments)
(unless (zero? compute-memory-sizes)
(host:call-with-current-place-continuation
(lambda (starting-k)
;; Get roots, which are threads and custodians, for all distinct accounting domains
(define-values (roots custs) ; parallel lists: root and custodian to charge for the root
(let c-loop ([c initial-place-root-custodian] [pl initial-place] [accum-roots null] [accum-custs null])
(set-custodian-memory-use! c 0)
(define gc-roots (custodian-gc-roots c))
(define roots (if gc-roots
(hash-keys gc-roots)
null))
(define host-regs (let ([pl (custodian-place c)])
(if (eq? (place-custodian pl) c)
;; Charge anything directly reachable from place registers
;; to the root custodian
(list (place-host-roots pl))
;; Not the root
null)))
(let loop ([roots roots]
[local-accum-roots (cons c host-regs)]
[accum-roots accum-roots]
[accum-custs accum-custs])
(cond
[(null? roots)
(define local-custs (for/list ([root (in-list local-accum-roots)]) c))
;; values owned directly by this custodian need to go earlier in the list,
;; since we're traversing from parent custodian to children
(values (append local-accum-roots accum-roots)
(append local-custs accum-custs))]
[(custodian? (car roots))
(define-values (new-roots new-custs) (c-loop (car roots) pl accum-roots accum-custs))
(loop (cdr roots) local-accum-roots new-roots new-custs)]
[(place? (car roots))
(define pl (car roots))
(define c (place-custodian pl))
(define-values (new-roots new-custs) (c-loop c pl accum-roots accum-custs))
(loop (cdr roots) local-accum-roots new-roots new-custs)]
[else
(define root (car roots))
(define new-local-roots (cons root local-accum-roots))
(define more-local-roots
(cond
[(eq? root (place-current-thread pl))
(define k-root
(if (eq? pl current-place) ; assuming host thread is place main thread
starting-k
(place-host-thread pl)))
(cons k-root new-local-roots)]
[else new-local-roots]))
(loop (cdr roots) more-local-roots accum-roots accum-custs)]))))
(define sizes (compute-size-increments roots))
(for ([size (in-list sizes)]
[c (in-list custs)])
(set-custodian-memory-use! c (+ size (custodian-memory-use c))))
;; Merge child counts to parents:
(define any-limits?
(let c-loop ([c root-custodian])
(define gc-roots (custodian-gc-roots c))
(define roots (if gc-roots
(hash-keys gc-roots)
null))
(define any-limits?
(for/fold ([any-limits? #f]) ([root (in-list roots)]
#:when (custodian? root))
(define root-any-limits? (c-loop root))
(set-custodian-memory-use! c (+ (custodian-memory-use root)
(custodian-memory-use c)))
(or root-any-limits? any-limits?)))
(define use (custodian-memory-use c))
(define new-limits
(for/list ([limit (in-list (custodian-memory-limits c))]
#:when (cond
[((car limit) . <= . use)
(queue-custodian-shutdown! (cdr limit))
#f]
[else #t]))
limit))
(set-custodian-memory-limits! c new-limits)
(or any-limits? (pair? new-limits))))
;; If no limits are installed, decay demand for memory counts:
(unless any-limits?
(set! compute-memory-sizes (sub1 compute-memory-sizes)))))))))
(void (set-custodian-memory-use-proc!
;; Get memory use for a custodian; the second argument is
;; total memory use, which is a suitable result for the
;; root custodian in the original place.
(lambda (c all)
(unless (custodian? c)
(raise-argument-error 'current-memory-use "(or/c #f 'cumulative custodian?)" c))
(cond
[(eq? c root-custodian) all]
[else
(when (atomically/no-interrupts
(host:mutex-acquire memory-limit-lock)
(cond
[(zero? compute-memory-sizes)
;; Based on the idea that memory accounting
;; should be about 1/2 the cost of a full GC, so a
;; value of 2 hedges future demands versus
;; no future demands:
(set! compute-memory-sizes 2)
(host:mutex-release memory-limit-lock)
#t]
[else
(host:mutex-release memory-limit-lock)
#f]))
(collect-garbage))
(custodian-memory-use c)]))))

View File

@ -49,6 +49,9 @@
[will-register host:will-register]
[will-try-execute host:will-try-execute]
set-reachable-size-increments-callback!
set-custodian-memory-use-proc!
;; Just `exn:break`, etc., but the host may need
;; to distinguish breaks raised by the thread
;; implementation:
@ -75,6 +78,9 @@
[fork-place host:fork-place]
[start-place host:start-place]
[exit host:exit]
[current-place-roots host:current-place-roots]
[get-initial-place host:get-initial-place]
[call-with-current-pthread-continuation host:call-with-current-place-continuation]
fork-pthread
pthread?

View File

@ -0,0 +1,60 @@
#lang racket/base
(require "host.rkt"
"place-local.rkt"
"custodian-object.rkt"
"evt.rkt"
"place-message.rkt")
(provide (struct-out place)
make-place
initial-place
current-place)
(struct place (parent
lock
activity-canary ; box for quick check before taking lock
pch ; channel to new place
[result #:mutable] ; byte or #f, where #f means "not done"
[queued-result #:mutable] ; non-#f triggers a place exit
custodian ; root custodian
[custodian-ref #:mutable] ; owning custodian
[host-thread #:mutable] ; host thread, needed for memory accounting
[host-roots #:mutable] ; continuation-independent state, needed for memory accounting
[current-thread #:mutable] ; running Racket thread, needed for accounting
[post-shutdown #:mutable] ; list of callbacks
[pumpers #:mutable] ; vector of up to three pumper threads
[pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate
done-waiting ; hash table of places to ping when this one ends
[wakeup-handle #:mutable]
[dequeue-semas #:mutable]) ; semaphores reflecting place-channel waits to recheck
#:property prop:evt (struct-field-index pch)
#:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self)))))
(define (make-place lock cust
#:parent [parent #f]
#:place-channel [pch #f])
(place parent
lock
(box #f) ; activity canary
pch
#f ; result
#f ; queued-result
cust
#f
#f ; host thread
#f ; host roots
#f ; running thread
'() ; post-shutdown
#f ; pumper-threads
#f ; pending-break
(make-hasheq) ; done-waiting
#f ; wakeup-handle
'())) ; dequeue-semas
(define initial-place (make-place (host:make-mutex)
root-custodian))
(define-place-local current-place initial-place)
(void (set-custodian-place! initial-place-root-custodian initial-place))
(void (set-place-host-thread! initial-place (host:get-initial-place)))

View File

@ -1,6 +1,7 @@
#lang racket/base
(require (only-in '#%unsafe unsafe-abort-current-continuation/no-wind)
"place-local.rkt"
"place-object.rkt"
"custodian-object.rkt"
"check.rkt"
"host.rkt"
"schedule.rkt"
@ -34,108 +35,87 @@
place-pumper-threads
unsafe-add-post-custodian-shutdown)
;; For `(struct place ...)`, see "place-object.rkt"
;; ----------------------------------------
(struct place (parent
lock
activity-canary ; box for quick check before taking lock
pch ; channel to new place
[result #:mutable] ; byte or #f, where #f means "not done"
[queued-result #:mutable] ; non-#f triggers a place exit
custodian
[post-shutdown #:mutable] ; list of callbacks
[pumpers #:mutable] ; vector of up to three pumper threads
[pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate
done-waiting ; hash table of places to ping when this one ends
[wakeup-handle #:mutable]
[dequeue-semas #:mutable]) ; semaphores reflecting place-channel waits to recheck
#:property prop:evt (struct-field-index pch)
#:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self)))))
(define (make-place lock cust
#:parent [parent #f]
#:place-channel [pch #f])
(place parent
lock
(box #f) ; activity canary
pch
#f ; result
#f ; queued-result
cust
'() ; post-shutdown
#f ; pumper-threads
#f ; pending-break
(make-hasheq) ; done-waiting
#f ; wakeup-handle
'())) ; dequeue-semas
(define-place-local current-place (make-place (host:make-mutex)
(current-custodian)))
(define/who (dynamic-place path sym in out err)
(when (eq? initial-place current-place)
;; needed by custodian GC callback for memory limits:
(atomically (ensure-wakeup-handle!)))
(define orig-cust (create-custodian))
(define lock (host:make-mutex))
(define started (host:make-condition))
(define-values (place-pch child-pch) (place-channel))
(define orig-plumber (make-plumber))
(define new-place (make-place lock orig-cust
#:parent current-place
#:place-channel place-pch))
(set-custodian-place! orig-cust new-place)
(define done-waiting (place-done-waiting new-place))
(define orig-plumber (make-plumber))
(define (default-exit v)
(plumber-flush-all orig-plumber)
(atomically
(host:mutex-acquire lock)
(set-place-queued-result! new-place (if (byte? v) v 0))
(place-has-activity! new-place)
(unsafe-custodian-unregister new-place (place-custodian-ref new-place))
(host:mutex-release lock))
;; Switch to scheduler, so it can exit:
(engine-block))
;; Atomic mode to create ports and deliver them to the new place
(start-atomic)
(define cref (custodian-register-place (current-custodian) new-place shutdown-place))
(unless cref
(end-atomic)
(raise-custodian-is-shut-down who (current-custodian)))
(set-place-custodian-ref! new-place cref)
(define-values (parent-in parent-out parent-err child-in-fd child-out-fd child-err-fd)
(make-place-ports+fds in out err))
(host:mutex-acquire lock)
;; Start the new place
(host:fork-place
(lambda ()
(call-in-another-main-thread
orig-cust
(lambda ()
(set! current-place new-place)
(current-thread-group root-thread-group)
(current-custodian orig-cust)
(current-plumber orig-plumber)
(exit-handler default-exit)
(current-pseudo-random-generator (make-pseudo-random-generator))
(current-evt-pseudo-random-generator (make-pseudo-random-generator))
(define finish
(host:start-place child-pch path sym
child-in-fd child-out-fd child-err-fd
orig-cust orig-plumber))
(call-with-continuation-prompt
(lambda ()
(host:mutex-acquire lock)
(set-place-wakeup-handle! new-place (sandman-get-wakeup-handle))
(host:condition-signal started) ; place is sufficiently started
(host:mutex-release lock)
(finish))
(default-continuation-prompt-tag)
(lambda (thunk)
;; Thread ended with escape => exit with status 1
(call-with-continuation-prompt thunk)
(default-exit 1)))
(default-exit 0))))
(lambda (result)
;; Place is done, so save the result and alert anyone waiting on
;; the place
(do-custodian-shutdown-all orig-cust)
(host:mutex-acquire lock)
(set-place-result! new-place result)
(host:mutex-release lock)
(for ([pl (in-hash-keys done-waiting)])
(wakeup-waiting pl))
(hash-clear! done-waiting)))
(define host-thread
(host:fork-place
(lambda ()
(call-in-another-main-thread
orig-cust
(lambda ()
(set! current-place new-place)
(set-place-host-roots! new-place (host:current-place-roots))
(current-thread-group root-thread-group)
(current-custodian orig-cust)
(current-plumber orig-plumber)
(exit-handler default-exit)
(current-pseudo-random-generator (make-pseudo-random-generator))
(current-evt-pseudo-random-generator (make-pseudo-random-generator))
(define finish
(host:start-place child-pch path sym
child-in-fd child-out-fd child-err-fd
orig-cust orig-plumber))
(call-with-continuation-prompt
(lambda ()
(host:mutex-acquire lock)
(set-place-wakeup-handle! new-place (sandman-get-wakeup-handle))
(host:condition-signal started) ; place is sufficiently started
(host:mutex-release lock)
(finish))
(default-continuation-prompt-tag)
(lambda (thunk)
;; Thread ended with escape => exit with status 1
(call-with-continuation-prompt thunk)
(default-exit 1)))
(default-exit 0))))
(lambda (result)
;; Place is done, so save the result and alert anyone waiting on
;; the place
(do-custodian-shutdown-all orig-cust)
(host:mutex-acquire lock)
(set-place-result! new-place result)
(host:mutex-release lock)
(for ([pl (in-hash-keys done-waiting)])
(wakeup-waiting pl))
(hash-clear! done-waiting))))
(set-place-host-thread! new-place host-thread)
;; Wait for the place to start, then return the place object
(host:condition-wait started lock)
(host:mutex-release lock)
@ -152,13 +132,12 @@
(when (or (not pending-break)
(break>? (or kind 'break) pending-break))
(set-place-pending-break! p (or kind 'break))
(place-has-activity! p)
(sandman-wakeup (place-wakeup-handle p)))
(place-has-activity! p))
(host:mutex-release (place-lock p))))
(define (place-has-activity! p)
(box-cas! (place-activity-canary p) #f #t)
(void))
(sandman-wakeup (place-wakeup-handle p)))
(void
(set-check-place-activity!
@ -185,15 +164,19 @@
(thread-did-work!)
(do-break-thread root-thread break #f))))))
;; in atomic mode
(define (do-place-kill p)
(host:mutex-acquire (place-lock p))
(unless (or (place-result p)
(place-queued-result p))
(set-place-queued-result! p 1)
(place-has-activity! p))
(host:mutex-release (place-lock p)))
(define/who (place-kill p)
(check who place? p)
(atomically
(host:mutex-acquire (place-lock p))
(unless (or (place-result p)
(place-queued-result p))
(set-place-queued-result! p 1)
(place-has-activity! p))
(host:mutex-release (place-lock p)))
(do-place-kill p))
(place-wait p)
(void))
@ -207,6 +190,21 @@
(set-place-pumpers! p #f))
result)
;; In atomic mode, callback from custodian:
(define (shutdown-place p c)
(do-place-kill p)
;; Wait for the place to finish; that should happen quickly,
;; so loop here in the atomic region:
(let loop ()
(host:mutex-acquire (place-lock p))
(define result (place-result p))
(unless result
(hash-set! (place-done-waiting p) current-place #t))
(host:mutex-release (place-lock p))
(unless result
(sandman-sleep #f)
(loop))))
(struct place-done-evt (p get-result?)
#:property prop:evt (poller (lambda (self poll-ctx)
(assert-atomic-mode)
@ -389,11 +387,16 @@
(set-place-wakeup-handle! current-place (sandman-get-wakeup-handle))))
;; in atomic mode
(define (wakeup-waiting k)
(host:mutex-acquire (place-lock k))
(unless (place-result k)
(sandman-wakeup (place-wakeup-handle k)))
(host:mutex-release (place-lock k)))
(define (wakeup-waiting pl)
(host:mutex-acquire (place-lock pl))
(unless (place-result pl)
(sandman-wakeup (place-wakeup-handle pl)))
(host:mutex-release (place-lock pl)))
(define (wakeup-initial-place)
;; This is ok without a lock, because if the initial place
;; terminates, the process exist:
(sandman-wakeup (place-wakeup-handle initial-place)))
;; ----------------------------------------
@ -414,3 +417,14 @@
(set-place-post-shutdown! current-place
(cons proc
(place-post-shutdown current-place))))))
(void (set-place-custodian-procs!
(lambda ()
(atomically (ensure-wakeup-handle!))
current-place)
;; in atomic mode
(lambda ()
(wakeup-initial-place))
;; in atomic mode
(lambda (pl)
(wakeup-waiting pl))))

View File

@ -1,5 +1,6 @@
#lang racket/base
(require "place-local.rkt"
"place-object.rkt"
"atomic.rkt"
"host.rkt"
"internal-error.rkt"
@ -30,7 +31,9 @@
;; Initializes the thread system:
(define (call-in-main-thread thunk)
(make-initial-thread thunk)
(make-initial-thread (lambda ()
(set-place-host-roots! initial-place (host:current-place-roots))
(thunk)))
(select-thread!))
;; Initializes the thread system in a new place:
@ -52,6 +55,7 @@
(check-external-events 'fast)
(call-pre-poll-external-callbacks)
(check-place-activity)
(check-queued-custodian-shutdown)
(when (and (null? callbacks)
(all-threads-poll-done?)
(waiting-on-external-or-idle?))
@ -71,6 +75,7 @@
(set-thread-engine! t 'running)
(set-thread-sched-info! t #f)
(current-thread t)
(set-place-current-thread! current-place t)
(run-callbacks-in-engine
e callbacks
(lambda (e)
@ -87,6 +92,7 @@
(start-implicit-atomic-mode)
(accum-cpu-time! t)
(current-thread #f)
(set-place-current-thread! current-place #f)
(unless (zero? (current-atomic))
(internal-error "terminated in atomic mode!"))
(thread-dead! t)
@ -100,6 +106,7 @@
[(zero? (current-atomic))
(accum-cpu-time! t)
(current-thread #f)
(set-place-current-thread! current-place #f)
(unless (eq? (thread-engine t) 'done)
(set-thread-engine! t e))
(select-thread!)]

View File

@ -193,7 +193,7 @@
void ; condition-wakeup
))
((atomically
(define cref (and c (unsafe-custodian-register c t remove-thread-custodian #f #t)))
(define cref (and c (custodian-register-thread c t remove-thread-custodian)))
(cond
[(or (not c) cref)
(set-thread-custodian-references! t (list cref))