diff --git a/racket/src/cs/README.txt b/racket/src/cs/README.txt index b838a27d7f..c90907c64b 100644 --- a/racket/src/cs/README.txt +++ b/racket/src/cs/README.txt @@ -403,15 +403,22 @@ As a result of these layers, there are multiple ways to implement atomic regions: * For critical sections with respect to Chez Scheme / OS threads, use - a mutex. + a mutex or a spinlock. For example, the implementation of `eq?` and `eqv?`-based hash - tables uses mutex to guard hash tables, so they can be accessed - concurrently from futures. In contrast, `equal?`-based hash table - operations are not atomic from the Racket perspective, so they - can't be locked by a mutex; they use Racket-thread locks, instead. - The "rumble/lock.ss" layer skips the `eq?`/`eqv?`-table mutex when - threads are not enabled at the Chez Scheme level. + tables uses a spinlock to guard hash tables, so they can be + accessed concurrently from futures. In contrast, `equal?`-based + hash table operations are not atomic from the Racket perspective, + so they can't be locked by a mutex or spinlock; they use + Racket-thread locks, instead. The "rumble/lock.ss" layer skips the + `eq?`/`eqv?`-table spinlock when threads are not enabled at the + Chez Scheme level. + + Chez Scheme deactivates a thread that is blocked on a mutex, so you + don't have to worry about waiting on a mutex blocking GCs. However, + if a lock guards a value that is also used by a GC callback, then + interrupts should be disabled before taking the lock to avoid + deadlock. * For critical sections at the Racket level, there are multiple possibilities: diff --git a/racket/src/cs/chezpart.sls b/racket/src/cs/chezpart.sls index c0fe203913..f96512c220 100644 --- a/racket/src/cs/chezpart.sls +++ b/racket/src/cs/chezpart.sls @@ -8,6 +8,7 @@ (rename (except (chezscheme) remq remove sort + void force delay identifier? output-port-buffer-mode peek-char char-ready? @@ -39,7 +40,6 @@ map for-each andmap ormap char-general-category) [make-parameter chez:make-parameter] - [void chez:void] [date-second chez:date-second] [date-minute chez:date-minute] [date-hour chez:date-hour] diff --git a/racket/src/cs/compile-file.ss b/racket/src/cs/compile-file.ss index 2ccb81c1ee..a7c45c2a3e 100644 --- a/racket/src/cs/compile-file.ss +++ b/racket/src/cs/compile-file.ss @@ -33,6 +33,7 @@ (fxvector-set! x 0 (op 0)))) (eval '(f (fxvector 0)))))) (check-defined 'vfasl-convert-file) +(check-defined 'compute-size-increments) ;; ---------------------------------------- diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index fdbc021627..d77392ad36 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -460,7 +460,9 @@ phantom-bytes? make-phantom-bytes set-phantom-bytes! - set-garbage-collect-notify! ; not exported to Racket + set-garbage-collect-notify! ; not exported to Racket + set-reachable-size-increments-callback! ; not exported to Racket + set-custodian-memory-use-proc! ; not exported to Racket unsafe-add-collect-callbacks unsafe-remove-collect-callbacks @@ -587,6 +589,7 @@ place-local-register-set! ; not exported to Racket place-local-register-init! ; not exported to Racket place-exit ; not exported to Racket + current-place-roots ; not exported to Racket _bool _bytes _short_bytes _double _double* _fixint _fixnum _float _fpointer _gcpointer _int16 _int32 _int64 _int8 _longdouble _pointer _scheme _stdbool _void @@ -663,6 +666,7 @@ fork-pthread pthread? get-thread-id + get-initial-pthread make-condition condition-wait condition-signal diff --git a/racket/src/cs/rumble/constant.ss b/racket/src/cs/rumble/constant.ss index 01673cbe0f..b4fd50856e 100644 --- a/racket/src/cs/rumble/constant.ss +++ b/racket/src/cs/rumble/constant.ss @@ -1,5 +1,5 @@ (define null '()) (define eof #!eof) -(define (void . args) (chez:void)) -(define (void? v) (eq? v (chez:void))) +(define (void . args) (#%void)) +(define (void? v) (eq? v (#%void))) diff --git a/racket/src/cs/rumble/memory.ss b/racket/src/cs/rumble/memory.ss index d6ccf12c85..34c56c97db 100644 --- a/racket/src/cs/rumble/memory.ss +++ b/racket/src/cs/rumble/memory.ss @@ -19,6 +19,15 @@ post-allocated post-allocated+overhead post-time post-cpu-time) (void))) +;; #f or a procedure that accepts `compute-size-increments` to be +;; called in any Chez Scheme thread (with all other threads paused) +;; after each major GC; this procedure must not do anything that might +;; use "control.ss": +(define reachable-size-increments-callback #f) + +(define (set-reachable-size-increments-callback! proc) + (set! reachable-size-increments-callback proc)) + ;; Replicate the counting that `(collect)` would do ;; so that we can report a generation to the notification ;; callback @@ -61,7 +70,10 @@ pre-allocated pre-allocated+overhead pre-time pre-cpu-time post-allocated (current-memory-bytes) (real-time) (cpu-time))) (poll-foreign-guardian) - (run-collect-callbacks cdr)))) + (run-collect-callbacks cdr) + (when (and reachable-size-increments-callback + (fx= gen (collect-maximum-generation))) + (reachable-size-increments-callback compute-size-increments))))) (define collect-garbage (case-lambda @@ -91,9 +103,11 @@ (cond [(not mode) (bytes-allocated)] [(eq? mode 'cumulative) (sstats-bytes (statistics))] - [else - ;; must be a custodian... - (bytes-allocated)])])) + ;; must be a custodian; hook is reposnsible for complaining if not + [else (custodian-memory-use mode (bytes-allocated))])])) + +(define custodian-memory-use (lambda (mode all) all)) +(define (set-custodian-memory-use-proc! proc) (set! custodian-memory-use proc)) (define prev-stats-objects #f) diff --git a/racket/src/cs/rumble/place.ss b/racket/src/cs/rumble/place.ss index 40c5abcc1e..66ffed4c9e 100644 --- a/racket/src/cs/rumble/place.ss +++ b/racket/src/cs/rumble/place.ss @@ -35,6 +35,9 @@ (define (place-local-register-set! i v) (#%vector-set! (place-registers) i v)) +(define (place-local-register-cas! i old-v new-v) + (#%vector-cas! (place-registers) i old-v new-v)) + (define (place-local-register-init! i v) (place-local-register-set! i v) (#%vector-set! place-register-inits i v)) @@ -47,10 +50,13 @@ ;; ---------------------------------------- -(define place-specific-table (make-hasheq)) +(define place-specific-table (unsafe-make-place-local #f)) (define (unsafe-get-place-table) - place-specific-table) + (or (unsafe-place-local-ref place-specific-table) + (begin + (place-local-register-cas! place-specific-table #f (make-hasheq)) + (unsafe-get-place-table)))) ;; ---------------------------------------- @@ -71,10 +77,15 @@ (set-box! place-esc-box esc) (thunk) 0))]) - (finish-proc result)))))] + (finish-proc result))))) + ;; Must be called within an engine, used for memory accounting: + (define (current-place-roots) + (list (place-registers) + (current-engine-thread-cell-values)))] [else (define (place-enabled?) #f) - (define (fork-place thunk finish-proc) #f)]) + (define (fork-place thunk finish-proc) #f) + (define (current-place-roots) '())]) (define do-start-place void) (define (set-start-place! proc) diff --git a/racket/src/cs/rumble/pthread.ss b/racket/src/cs/rumble/pthread.ss index f7b85f71b7..751b837871 100644 --- a/racket/src/cs/rumble/pthread.ss +++ b/racket/src/cs/rumble/pthread.ss @@ -12,6 +12,8 @@ (let ([initial-thread-id (get-thread-id)]) (lambda () (eqv? (get-thread-id) initial-thread-id)))) + (define (get-initial-pthread) + (get-initial-thread)) ;; make-condition ;; condition-wait ;; condition-signal @@ -23,6 +25,7 @@ [else (define make-pthread-parameter #%make-parameter) (define (fork-pthread) (void)) + (define (get-initial-pthread) #f) (define (pthread?) #f) (define (in-original-host-thread?) #t) (define (make-condition) (void)) diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index 73299bdcf9..b381bd782d 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -27,9 +27,13 @@ [fork-pthread rumble:fork-thread] [threaded? rumble:threaded?] [get-thread-id rumble:get-thread-id] + [get-initial-pthread rumble:get-initial-pthread] + [current-place-roots rumble:current-place-roots] [set-ctl-c-handler! rumble:set-ctl-c-handler!] [unsafe-root-continuation-prompt-tag rumble:unsafe-root-continuation-prompt-tag] - [set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!])) + [set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!] + [set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback!] + [set-custodian-memory-use-proc! rumble:set-custodian-memory-use-proc!])) (include "place-register.ss") (define-place-register-define place:define thread-register-start thread-register-count) @@ -115,6 +119,8 @@ 'break-enabled-key break-enabled-key 'set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook! 'continuation-marks rumble:continuation-marks + 'set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback! + 'set-custodian-memory-use-proc! rumble:set-custodian-memory-use-proc! 'exn:break/non-engine exn:break 'exn:break:hang-up/non-engine exn:break:hang-up 'exn:break:terminate/non-engine exn:break:terminate @@ -128,6 +134,9 @@ 'fork-place rumble:fork-place 'start-place rumble:start-place 'fork-pthread rumble:fork-thread + 'get-initial-place rumble:get-initial-pthread + 'current-place-roots rumble:current-place-roots + 'call-with-current-pthread-continuation call/cc 'exit place-exit 'pthread? rumble:thread? 'get-thread-id rumble:get-thread-id diff --git a/racket/src/thread/atomic.rkt b/racket/src/thread/atomic.rkt index c24b395ddd..9043fb8ddb 100644 --- a/racket/src/thread/atomic.rkt +++ b/racket/src/thread/atomic.rkt @@ -10,6 +10,7 @@ start-atomic end-atomic + atomically/no-interrupts start-atomic/no-interrupts end-atomic/no-interrupts @@ -32,6 +33,13 @@ (let () expr ...) (end-atomic)))) +(define-syntax-rule (atomically/no-interrupts expr ...) + (begin + (start-atomic/no-interrupts) + (begin0 + (let () expr ...) + (end-atomic/no-interrupts)))) + (define (start-atomic) (current-atomic (add1 (current-atomic)))) diff --git a/racket/src/thread/bootstrap.rkt b/racket/src/thread/bootstrap.rkt index 10306d0f6f..d4b23c28e9 100644 --- a/racket/src/thread/bootstrap.rkt +++ b/racket/src/thread/bootstrap.rkt @@ -236,6 +236,8 @@ 'will-executor? will-executor/notify? 'will-register will-register/notify 'will-try-execute will-try-execute/notify + 'set-reachable-size-increments-callback! (lambda (proc) (void)) + 'set-custodian-memory-use-proc! (lambda (proc) (void)) 'exn:break/non-engine exn:break/non-engine 'exn:break:hang-up/non-engine exn:break:hang-up/non-engine 'exn:break:terminate/non-engine exn:break:terminate/non-engine @@ -252,8 +254,10 @@ (error "fork-pthread: not ready")) 'pthread? (lambda args (error "thread?: not ready")) - 'get-thread-id (lambda args - (error "get-pthread-id: not ready")) + 'get-thread-id (lambda () 0) + 'current-place-roots (lambda () '()) + 'get-initial-place (lambda () #f) + 'call-with-current-place-continuation call/cc 'make-condition (lambda () (make-semaphore)) 'condition-wait (lambda (c s) (semaphore-post s) diff --git a/racket/src/thread/custodian-object.rkt b/racket/src/thread/custodian-object.rkt new file mode 100644 index 0000000000..ea7c84c860 --- /dev/null +++ b/racket/src/thread/custodian-object.rkt @@ -0,0 +1,33 @@ +#lang racket/base +(require "place-local.rkt") + +(provide (struct-out custodian) + create-custodian + initial-place-root-custodian + root-custodian) + +(struct custodian (children ; weakly maps maps object to callback + [shut-down? #:mutable] + [shutdown-sema #:mutable] + [need-shutdown #:mutable] ; queued asynchronous shutdown: #f, 'needed, or 'needed/sent-wakeup + [parent-reference #:mutable] + [place #:mutable] ; place containing the custodian + [memory-use #:mutable] ; set after a major GC + [gc-roots #:mutable] ; weak references to charge to custodian; access without interrupts + [memory-limits #:mutable]) ; list of (cons limit cust) + #:authentic) + +(define (create-custodian) + (custodian (make-weak-hasheq) + #f ; shut-down? + #f ; shutdown semaphore + #f ; need shutdown? + #f ; parent reference + #f ; place + 0 ; memory use + #f ; GC roots + null)) ; memory limits + +(define initial-place-root-custodian (create-custodian)) + +(define-place-local root-custodian initial-place-root-custodian) diff --git a/racket/src/thread/custodian.rkt b/racket/src/thread/custodian.rkt index 4cf9b8666d..5af6373e2f 100644 --- a/racket/src/thread/custodian.rkt +++ b/racket/src/thread/custodian.rkt @@ -1,10 +1,12 @@ #lang racket/base -(require "place-local.rkt" +(require "custodian-object.rkt" + "place-object.rkt" "check.rkt" "atomic.rkt" "host.rkt" "evt.rkt" - "semaphore.rkt") + "semaphore.rkt" + "parameter.rkt") (provide current-custodian make-custodian @@ -25,19 +27,19 @@ unsafe-make-custodian-at-root unsafe-custodian-register unsafe-custodian-unregister + custodian-register-thread + custodian-register-place raise-custodian-is-shut-down - set-post-shutdown-action!) + set-post-shutdown-action! + check-queued-custodian-shutdown + set-place-custodian-procs!) (module+ scheduling (provide do-custodian-shutdown-all set-root-custodian! create-custodian)) -(struct custodian (children ; weakly maps maps object to callback - [shut-down? #:mutable] - [shutdown-sema #:mutable] - [parent-reference #:mutable]) - #:authentic) +;; For `(struct custodian ...)`, see "custodian-object.rkt" (struct custodian-box ([v #:mutable] sema) #:authentic @@ -58,14 +60,6 @@ (struct custodian-reference (c) #:authentic) -(define (create-custodian) - (custodian (make-weak-hasheq) - #f ; shut-down? - #f ; shutdown semaphore - #f)) - -(define-place-local root-custodian (create-custodian)) - (define/who current-custodian (make-parameter root-custodian (lambda (v) @@ -80,7 +74,8 @@ (define/who (make-custodian [parent (current-custodian)]) (check who custodian? parent) (define c (create-custodian)) - (define cref (unsafe-custodian-register parent c do-custodian-shutdown-all #f #t)) + (set-custodian-place! c (custodian-place parent)) + (define cref (do-custodian-register parent c do-custodian-shutdown-all #f #t #t)) (set-custodian-parent-reference! c cref) (unless cref (raise-custodian-is-shut-down who parent)) c) @@ -93,7 +88,7 @@ ;; finalizer, so don't supply an `obj` that is exposed to safe code ;; that might see `obj` after finalization through a weak reference ;; (and detect that `obj` is thereafter retained strongly). -(define (unsafe-custodian-register cust obj callback at-exit? weak?) +(define (do-custodian-register cust obj callback at-exit? weak? gc-root?) (atomically (cond [(custodian-shut-down? cust) #f] @@ -111,14 +106,34 @@ ;; effect of turning a weak reference into a strong one when ;; there are no other references: (host:will-register we obj void)) + (when gc-root? + (host:disable-interrupts) + (unless (custodian-gc-roots cust) + (set-custodian-gc-roots! cust (make-weak-hasheq))) + (hash-set! (custodian-gc-roots cust) obj #t) + (host:enable-interrupts)) (custodian-reference cust)]))) +(define (unsafe-custodian-register cust obj callback at-exit? weak?) + (do-custodian-register cust obj callback at-exit? weak? #f)) + +(define (custodian-register-thread cust obj callback) + (do-custodian-register cust obj callback #f #t #t)) + +(define (custodian-register-place cust obj callback) + (do-custodian-register cust obj callback #f #t #t)) + (define (unsafe-custodian-unregister obj cref) (when cref (atomically (define c (custodian-reference-c cref)) (unless (custodian-shut-down? c) - (hash-remove! (custodian-children c) obj))))) + (hash-remove! (custodian-children c) obj)) + (host:disable-interrupts) + (define gc-roots (custodian-gc-roots c)) + (when gc-roots + (hash-remove! gc-roots obj)) + (host:enable-interrupts)))) ;; Hook for thread scheduling: (define post-shutdown-action void) @@ -133,6 +148,57 @@ ;; should be swapped out (post-shutdown-action)) +;; Custodians across all places that have a queued shutdown. Hold the +;; memory-limit lock and also disable interrupts (or OK as a GC +;; callback) while modifying this list: +(define queued-shutdowns null) + +;; In atomic mode, in an arbitrary host thread but with other threads +;; suspended: +(define (queue-custodian-shutdown! c) + (unless (custodian-need-shutdown c) + (set-custodian-need-shutdown! c 'needed) + (set! queued-shutdowns (cons c queued-shutdowns)) + ;; We can't send a signal to wake up an arbitrary place, because + ;; the lock on the place is not always taken with interrupts + ;; disabled. But we don't need a lock to send a wakeup to the + ;; initial place, because it's wakeup handle never goeas away. + ;; When the initial place scans for queued shutdowns, it sends + ;; wakes up to other places as needed. + (place-wakeup-initial))) + +;; Called in atomic mode by the scheduler +(define (check-queued-custodian-shutdown) + (unless (null? queued-shutdowns) + (host:disable-interrupts) + (host:mutex-acquire memory-limit-lock) + (define queued queued-shutdowns) + (set! queued-shutdowns + ;; Keep only custodians owned by other places + (for/list ([c (in-list queued)] + #:unless (custodian-this-place? c)) + (when (eq? (custodian-need-shutdown c) 'needed) + ;; Make sure custodian's place is polling for shutdowns: + (set-custodian-need-shutdown! c 'neeed/sent-wakeup) + (place-wakeup (custodian-place c))) + c)) + (host:mutex-release memory-limit-lock) + (host:enable-interrupts) + (for ([c (in-list queued)] + #:when (custodian-this-place? c)) + (do-custodian-shutdown-all c)))) + +(define place-ensure-wakeup! (lambda () #f)) ; call before enabling shutdowns +(define place-wakeup-initial void) +(define place-wakeup void) +(define (set-place-custodian-procs! ensure-wakeup! wakeup-initial wakeup) + (set! place-ensure-wakeup! ensure-wakeup!) + (set! place-wakeup-initial wakeup-initial) + (set! place-wakeup wakeup)) + +(define (custodian-this-place? c) + (eq? (custodian-place c) current-place)) + ;; In atomic mode (define (do-custodian-shutdown-all c) (unless (custodian-shut-down? c) @@ -181,7 +247,7 @@ (hash-keys (custodian-children c))) (define (custodian-memory-accounting-available?) - #f) + #t) (define/who (custodian-require-memory limit-cust need-amt stop-cust) (check who custodian? limit-cust) @@ -195,9 +261,14 @@ (check who custodian? limit-cust) (check who exact-nonnegative-integer? need-amt) (check who custodian? stop-cust) - (raise (exn:fail:unsupported - "custodian-limit-memory: unsupported" - (current-continuation-marks)))) + (place-ensure-wakeup!) + (atomically/no-interrupts + (set-custodian-memory-limits! limit-cust + (cons (cons need-amt stop-cust) + (custodian-memory-limits limit-cust))) + (host:mutex-acquire memory-limit-lock) + (set! compute-memory-sizes (max compute-memory-sizes 1)) + (host:mutex-release memory-limit-lock))) ;; ---------------------------------------- @@ -217,3 +288,126 @@ (define (raise-custodian-is-shut-down who c) (raise-arguments-error who "the custodian has been shut down" "custodian" c)) + +;; ---------------------------------------- + +;; Disable interrupts before taking this lock, since it +;; guards values that are manipulated by a GC callback +(define memory-limit-lock (host:make-mutex)) + +;; If non-zero, the custodian memory sizes are gathered after a GC. +;; The value decays +(define compute-memory-sizes 0) + +(void (set-reachable-size-increments-callback! + ;; Called in an arbitary host thread, with interrupts off and all other threads suspended: + (lambda (compute-size-increments) + (unless (zero? compute-memory-sizes) + (host:call-with-current-place-continuation + (lambda (starting-k) + ;; Get roots, which are threads and custodians, for all distinct accounting domains + (define-values (roots custs) ; parallel lists: root and custodian to charge for the root + (let c-loop ([c initial-place-root-custodian] [pl initial-place] [accum-roots null] [accum-custs null]) + (set-custodian-memory-use! c 0) + (define gc-roots (custodian-gc-roots c)) + (define roots (if gc-roots + (hash-keys gc-roots) + null)) + (define host-regs (let ([pl (custodian-place c)]) + (if (eq? (place-custodian pl) c) + ;; Charge anything directly reachable from place registers + ;; to the root custodian + (list (place-host-roots pl)) + ;; Not the root + null))) + (let loop ([roots roots] + [local-accum-roots (cons c host-regs)] + [accum-roots accum-roots] + [accum-custs accum-custs]) + (cond + [(null? roots) + (define local-custs (for/list ([root (in-list local-accum-roots)]) c)) + ;; values owned directly by this custodian need to go earlier in the list, + ;; since we're traversing from parent custodian to children + (values (append local-accum-roots accum-roots) + (append local-custs accum-custs))] + [(custodian? (car roots)) + (define-values (new-roots new-custs) (c-loop (car roots) pl accum-roots accum-custs)) + (loop (cdr roots) local-accum-roots new-roots new-custs)] + [(place? (car roots)) + (define pl (car roots)) + (define c (place-custodian pl)) + (define-values (new-roots new-custs) (c-loop c pl accum-roots accum-custs)) + (loop (cdr roots) local-accum-roots new-roots new-custs)] + [else + (define root (car roots)) + (define new-local-roots (cons root local-accum-roots)) + (define more-local-roots + (cond + [(eq? root (place-current-thread pl)) + (define k-root + (if (eq? pl current-place) ; assuming host thread is place main thread + starting-k + (place-host-thread pl))) + (cons k-root new-local-roots)] + [else new-local-roots])) + (loop (cdr roots) more-local-roots accum-roots accum-custs)])))) + (define sizes (compute-size-increments roots)) + (for ([size (in-list sizes)] + [c (in-list custs)]) + (set-custodian-memory-use! c (+ size (custodian-memory-use c)))) + ;; Merge child counts to parents: + (define any-limits? + (let c-loop ([c root-custodian]) + (define gc-roots (custodian-gc-roots c)) + (define roots (if gc-roots + (hash-keys gc-roots) + null)) + (define any-limits? + (for/fold ([any-limits? #f]) ([root (in-list roots)] + #:when (custodian? root)) + (define root-any-limits? (c-loop root)) + (set-custodian-memory-use! c (+ (custodian-memory-use root) + (custodian-memory-use c))) + (or root-any-limits? any-limits?))) + (define use (custodian-memory-use c)) + (define new-limits + (for/list ([limit (in-list (custodian-memory-limits c))] + #:when (cond + [((car limit) . <= . use) + (queue-custodian-shutdown! (cdr limit)) + #f] + [else #t])) + limit)) + (set-custodian-memory-limits! c new-limits) + (or any-limits? (pair? new-limits)))) + ;; If no limits are installed, decay demand for memory counts: + (unless any-limits? + (set! compute-memory-sizes (sub1 compute-memory-sizes))))))))) + +(void (set-custodian-memory-use-proc! + ;; Get memory use for a custodian; the second argument is + ;; total memory use, which is a suitable result for the + ;; root custodian in the original place. + (lambda (c all) + (unless (custodian? c) + (raise-argument-error 'current-memory-use "(or/c #f 'cumulative custodian?)" c)) + (cond + [(eq? c root-custodian) all] + [else + (when (atomically/no-interrupts + (host:mutex-acquire memory-limit-lock) + (cond + [(zero? compute-memory-sizes) + ;; Based on the idea that memory accounting + ;; should be about 1/2 the cost of a full GC, so a + ;; value of 2 hedges future demands versus + ;; no future demands: + (set! compute-memory-sizes 2) + (host:mutex-release memory-limit-lock) + #t] + [else + (host:mutex-release memory-limit-lock) + #f])) + (collect-garbage)) + (custodian-memory-use c)])))) diff --git a/racket/src/thread/host.rkt b/racket/src/thread/host.rkt index 398daa8481..f89c19fa7b 100644 --- a/racket/src/thread/host.rkt +++ b/racket/src/thread/host.rkt @@ -49,6 +49,9 @@ [will-register host:will-register] [will-try-execute host:will-try-execute] + set-reachable-size-increments-callback! + set-custodian-memory-use-proc! + ;; Just `exn:break`, etc., but the host may need ;; to distinguish breaks raised by the thread ;; implementation: @@ -75,6 +78,9 @@ [fork-place host:fork-place] [start-place host:start-place] [exit host:exit] + [current-place-roots host:current-place-roots] + [get-initial-place host:get-initial-place] + [call-with-current-pthread-continuation host:call-with-current-place-continuation] fork-pthread pthread? diff --git a/racket/src/thread/place-object.rkt b/racket/src/thread/place-object.rkt new file mode 100644 index 0000000000..fdc4cd9822 --- /dev/null +++ b/racket/src/thread/place-object.rkt @@ -0,0 +1,60 @@ +#lang racket/base +(require "host.rkt" + "place-local.rkt" + "custodian-object.rkt" + "evt.rkt" + "place-message.rkt") + +(provide (struct-out place) + make-place + initial-place + current-place) + +(struct place (parent + lock + activity-canary ; box for quick check before taking lock + pch ; channel to new place + [result #:mutable] ; byte or #f, where #f means "not done" + [queued-result #:mutable] ; non-#f triggers a place exit + custodian ; root custodian + [custodian-ref #:mutable] ; owning custodian + [host-thread #:mutable] ; host thread, needed for memory accounting + [host-roots #:mutable] ; continuation-independent state, needed for memory accounting + [current-thread #:mutable] ; running Racket thread, needed for accounting + [post-shutdown #:mutable] ; list of callbacks + [pumpers #:mutable] ; vector of up to three pumper threads + [pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate + done-waiting ; hash table of places to ping when this one ends + [wakeup-handle #:mutable] + [dequeue-semas #:mutable]) ; semaphores reflecting place-channel waits to recheck + #:property prop:evt (struct-field-index pch) + #:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self))))) + +(define (make-place lock cust + #:parent [parent #f] + #:place-channel [pch #f]) + (place parent + lock + (box #f) ; activity canary + pch + #f ; result + #f ; queued-result + cust + #f + #f ; host thread + #f ; host roots + #f ; running thread + '() ; post-shutdown + #f ; pumper-threads + #f ; pending-break + (make-hasheq) ; done-waiting + #f ; wakeup-handle + '())) ; dequeue-semas + +(define initial-place (make-place (host:make-mutex) + root-custodian)) + +(define-place-local current-place initial-place) + +(void (set-custodian-place! initial-place-root-custodian initial-place)) +(void (set-place-host-thread! initial-place (host:get-initial-place))) diff --git a/racket/src/thread/place.rkt b/racket/src/thread/place.rkt index 8ee8dac89c..5e70f064b2 100644 --- a/racket/src/thread/place.rkt +++ b/racket/src/thread/place.rkt @@ -1,6 +1,7 @@ #lang racket/base (require (only-in '#%unsafe unsafe-abort-current-continuation/no-wind) - "place-local.rkt" + "place-object.rkt" + "custodian-object.rkt" "check.rkt" "host.rkt" "schedule.rkt" @@ -34,108 +35,87 @@ place-pumper-threads unsafe-add-post-custodian-shutdown) +;; For `(struct place ...)`, see "place-object.rkt" + ;; ---------------------------------------- -(struct place (parent - lock - activity-canary ; box for quick check before taking lock - pch ; channel to new place - [result #:mutable] ; byte or #f, where #f means "not done" - [queued-result #:mutable] ; non-#f triggers a place exit - custodian - [post-shutdown #:mutable] ; list of callbacks - [pumpers #:mutable] ; vector of up to three pumper threads - [pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate - done-waiting ; hash table of places to ping when this one ends - [wakeup-handle #:mutable] - [dequeue-semas #:mutable]) ; semaphores reflecting place-channel waits to recheck - #:property prop:evt (struct-field-index pch) - #:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self))))) - -(define (make-place lock cust - #:parent [parent #f] - #:place-channel [pch #f]) - (place parent - lock - (box #f) ; activity canary - pch - #f ; result - #f ; queued-result - cust - '() ; post-shutdown - #f ; pumper-threads - #f ; pending-break - (make-hasheq) ; done-waiting - #f ; wakeup-handle - '())) ; dequeue-semas - -(define-place-local current-place (make-place (host:make-mutex) - (current-custodian))) - (define/who (dynamic-place path sym in out err) + (when (eq? initial-place current-place) + ;; needed by custodian GC callback for memory limits: + (atomically (ensure-wakeup-handle!))) (define orig-cust (create-custodian)) (define lock (host:make-mutex)) (define started (host:make-condition)) (define-values (place-pch child-pch) (place-channel)) + (define orig-plumber (make-plumber)) (define new-place (make-place lock orig-cust #:parent current-place #:place-channel place-pch)) + (set-custodian-place! orig-cust new-place) (define done-waiting (place-done-waiting new-place)) - (define orig-plumber (make-plumber)) (define (default-exit v) (plumber-flush-all orig-plumber) (atomically (host:mutex-acquire lock) (set-place-queued-result! new-place (if (byte? v) v 0)) (place-has-activity! new-place) + (unsafe-custodian-unregister new-place (place-custodian-ref new-place)) (host:mutex-release lock)) ;; Switch to scheduler, so it can exit: (engine-block)) ;; Atomic mode to create ports and deliver them to the new place (start-atomic) + (define cref (custodian-register-place (current-custodian) new-place shutdown-place)) + (unless cref + (end-atomic) + (raise-custodian-is-shut-down who (current-custodian))) + (set-place-custodian-ref! new-place cref) (define-values (parent-in parent-out parent-err child-in-fd child-out-fd child-err-fd) (make-place-ports+fds in out err)) (host:mutex-acquire lock) ;; Start the new place - (host:fork-place - (lambda () - (call-in-another-main-thread - orig-cust - (lambda () - (set! current-place new-place) - (current-thread-group root-thread-group) - (current-custodian orig-cust) - (current-plumber orig-plumber) - (exit-handler default-exit) - (current-pseudo-random-generator (make-pseudo-random-generator)) - (current-evt-pseudo-random-generator (make-pseudo-random-generator)) - (define finish - (host:start-place child-pch path sym - child-in-fd child-out-fd child-err-fd - orig-cust orig-plumber)) - (call-with-continuation-prompt - (lambda () - (host:mutex-acquire lock) - (set-place-wakeup-handle! new-place (sandman-get-wakeup-handle)) - (host:condition-signal started) ; place is sufficiently started - (host:mutex-release lock) - (finish)) - (default-continuation-prompt-tag) - (lambda (thunk) - ;; Thread ended with escape => exit with status 1 - (call-with-continuation-prompt thunk) - (default-exit 1))) - (default-exit 0)))) - (lambda (result) - ;; Place is done, so save the result and alert anyone waiting on - ;; the place - (do-custodian-shutdown-all orig-cust) - (host:mutex-acquire lock) - (set-place-result! new-place result) - (host:mutex-release lock) - (for ([pl (in-hash-keys done-waiting)]) - (wakeup-waiting pl)) - (hash-clear! done-waiting))) + (define host-thread + (host:fork-place + (lambda () + (call-in-another-main-thread + orig-cust + (lambda () + (set! current-place new-place) + (set-place-host-roots! new-place (host:current-place-roots)) + (current-thread-group root-thread-group) + (current-custodian orig-cust) + (current-plumber orig-plumber) + (exit-handler default-exit) + (current-pseudo-random-generator (make-pseudo-random-generator)) + (current-evt-pseudo-random-generator (make-pseudo-random-generator)) + (define finish + (host:start-place child-pch path sym + child-in-fd child-out-fd child-err-fd + orig-cust orig-plumber)) + (call-with-continuation-prompt + (lambda () + (host:mutex-acquire lock) + (set-place-wakeup-handle! new-place (sandman-get-wakeup-handle)) + (host:condition-signal started) ; place is sufficiently started + (host:mutex-release lock) + (finish)) + (default-continuation-prompt-tag) + (lambda (thunk) + ;; Thread ended with escape => exit with status 1 + (call-with-continuation-prompt thunk) + (default-exit 1))) + (default-exit 0)))) + (lambda (result) + ;; Place is done, so save the result and alert anyone waiting on + ;; the place + (do-custodian-shutdown-all orig-cust) + (host:mutex-acquire lock) + (set-place-result! new-place result) + (host:mutex-release lock) + (for ([pl (in-hash-keys done-waiting)]) + (wakeup-waiting pl)) + (hash-clear! done-waiting)))) + (set-place-host-thread! new-place host-thread) ;; Wait for the place to start, then return the place object (host:condition-wait started lock) (host:mutex-release lock) @@ -152,13 +132,12 @@ (when (or (not pending-break) (break>? (or kind 'break) pending-break)) (set-place-pending-break! p (or kind 'break)) - (place-has-activity! p) - (sandman-wakeup (place-wakeup-handle p))) + (place-has-activity! p)) (host:mutex-release (place-lock p)))) (define (place-has-activity! p) (box-cas! (place-activity-canary p) #f #t) - (void)) + (sandman-wakeup (place-wakeup-handle p))) (void (set-check-place-activity! @@ -185,15 +164,19 @@ (thread-did-work!) (do-break-thread root-thread break #f)))))) +;; in atomic mode +(define (do-place-kill p) + (host:mutex-acquire (place-lock p)) + (unless (or (place-result p) + (place-queued-result p)) + (set-place-queued-result! p 1) + (place-has-activity! p)) + (host:mutex-release (place-lock p))) + (define/who (place-kill p) (check who place? p) (atomically - (host:mutex-acquire (place-lock p)) - (unless (or (place-result p) - (place-queued-result p)) - (set-place-queued-result! p 1) - (place-has-activity! p)) - (host:mutex-release (place-lock p))) + (do-place-kill p)) (place-wait p) (void)) @@ -207,6 +190,21 @@ (set-place-pumpers! p #f)) result) +;; In atomic mode, callback from custodian: +(define (shutdown-place p c) + (do-place-kill p) + ;; Wait for the place to finish; that should happen quickly, + ;; so loop here in the atomic region: + (let loop () + (host:mutex-acquire (place-lock p)) + (define result (place-result p)) + (unless result + (hash-set! (place-done-waiting p) current-place #t)) + (host:mutex-release (place-lock p)) + (unless result + (sandman-sleep #f) + (loop)))) + (struct place-done-evt (p get-result?) #:property prop:evt (poller (lambda (self poll-ctx) (assert-atomic-mode) @@ -389,11 +387,16 @@ (set-place-wakeup-handle! current-place (sandman-get-wakeup-handle)))) ;; in atomic mode -(define (wakeup-waiting k) - (host:mutex-acquire (place-lock k)) - (unless (place-result k) - (sandman-wakeup (place-wakeup-handle k))) - (host:mutex-release (place-lock k))) +(define (wakeup-waiting pl) + (host:mutex-acquire (place-lock pl)) + (unless (place-result pl) + (sandman-wakeup (place-wakeup-handle pl))) + (host:mutex-release (place-lock pl))) + +(define (wakeup-initial-place) + ;; This is ok without a lock, because if the initial place + ;; terminates, the process exist: + (sandman-wakeup (place-wakeup-handle initial-place))) ;; ---------------------------------------- @@ -414,3 +417,14 @@ (set-place-post-shutdown! current-place (cons proc (place-post-shutdown current-place)))))) + +(void (set-place-custodian-procs! + (lambda () + (atomically (ensure-wakeup-handle!)) + current-place) + ;; in atomic mode + (lambda () + (wakeup-initial-place)) + ;; in atomic mode + (lambda (pl) + (wakeup-waiting pl)))) diff --git a/racket/src/thread/schedule.rkt b/racket/src/thread/schedule.rkt index e2d8b84f43..3c751f9f37 100644 --- a/racket/src/thread/schedule.rkt +++ b/racket/src/thread/schedule.rkt @@ -1,5 +1,6 @@ #lang racket/base (require "place-local.rkt" + "place-object.rkt" "atomic.rkt" "host.rkt" "internal-error.rkt" @@ -30,7 +31,9 @@ ;; Initializes the thread system: (define (call-in-main-thread thunk) - (make-initial-thread thunk) + (make-initial-thread (lambda () + (set-place-host-roots! initial-place (host:current-place-roots)) + (thunk))) (select-thread!)) ;; Initializes the thread system in a new place: @@ -52,6 +55,7 @@ (check-external-events 'fast) (call-pre-poll-external-callbacks) (check-place-activity) + (check-queued-custodian-shutdown) (when (and (null? callbacks) (all-threads-poll-done?) (waiting-on-external-or-idle?)) @@ -71,6 +75,7 @@ (set-thread-engine! t 'running) (set-thread-sched-info! t #f) (current-thread t) + (set-place-current-thread! current-place t) (run-callbacks-in-engine e callbacks (lambda (e) @@ -87,6 +92,7 @@ (start-implicit-atomic-mode) (accum-cpu-time! t) (current-thread #f) + (set-place-current-thread! current-place #f) (unless (zero? (current-atomic)) (internal-error "terminated in atomic mode!")) (thread-dead! t) @@ -100,6 +106,7 @@ [(zero? (current-atomic)) (accum-cpu-time! t) (current-thread #f) + (set-place-current-thread! current-place #f) (unless (eq? (thread-engine t) 'done) (set-thread-engine! t e)) (select-thread!)] diff --git a/racket/src/thread/thread.rkt b/racket/src/thread/thread.rkt index ad2e98f667..64fbfcb384 100644 --- a/racket/src/thread/thread.rkt +++ b/racket/src/thread/thread.rkt @@ -193,7 +193,7 @@ void ; condition-wakeup )) ((atomically - (define cref (and c (unsafe-custodian-register c t remove-thread-custodian #f #t))) + (define cref (and c (custodian-register-thread c t remove-thread-custodian))) (cond [(or (not c) cref) (set-thread-custodian-references! t (list cref))