diff --git a/racket/src/cs/demo/thread.ss b/racket/src/cs/demo/thread.ss index 7014f80075..31faad56f6 100644 --- a/racket/src/cs/demo/thread.ss +++ b/racket/src/cs/demo/thread.ss @@ -186,6 +186,40 @@ (sync (system-idle-evt)) (check 2 trying)) ; second thread should have completed + (let* ([place-symbols (make-hasheq)] + [register-place-symbol! + (lambda (sym proc) + (hash-set! place-symbols sym proc))]) + (install-start-place! + (lambda (mod sym in out err) + (lambda (finish) + (finish #f #f #f) + ((hash-ref place-symbols sym))))) + + (register-place-symbol! 'nothing void) + (let ([pl1 (dynamic-place 'dummy 'nothing #f #f #f)]) + (check #t (place? pl1)) + (check 0 (place-wait pl1))) + + (register-place-symbol! 'exit1 (lambda () (exit 1))) + (let ([pl2 (dynamic-place 'dummy 'exit1 #f #f #f)]) + (check #t (place? pl2)) + (check 1 (place-wait pl2))) + + (register-place-symbol! 'loop (lambda () (let loop () (loop)))) + (let ([pl3 (dynamic-place 'dummy 'loop #f #f #f)]) + (check #t (place? pl3)) + (place-break pl3) + (check 1 (place-wait pl3)) + (printf "[That break was from a place, and it's expected]\n")) + + (let ([pl4 (dynamic-place 'dummy 'loop #f #f #f)]) + (check #f (sync/timeout 0.01 (place-dead-evt pl4))) + (place-kill pl4) + (check 1 (place-wait pl4)) + (check #t (evt? (sync (place-dead-evt pl4)))) + (check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4)))))) + ;; Measure thread quantum: #; (let ([t1 (thread (lambda () (let loop () (loop))))] diff --git a/racket/src/cs/place-register.ss b/racket/src/cs/place-register.ss index c49ce06e26..ff5a9696a0 100644 --- a/racket/src/cs/place-register.ss +++ b/racket/src/cs/place-register.ss @@ -5,7 +5,7 @@ ;; first index is reserved for Rumble: (meta chez:define thread-register-start 1) -(meta chez:define thread-register-count 12) +(meta chez:define thread-register-count 14) (meta chez:define io-register-start (+ thread-register-start thread-register-count)) (meta chez:define io-register-count 16) @@ -14,7 +14,7 @@ (meta chez:define regexp-register-count 3) (meta chez:define expander-register-start (+ regexp-register-start regexp-register-count)) -(meta chez:define expander-register-count 32) +(meta chez:define expander-register-count 30) ;; ---------------------------------------- diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index b7e2751d5d..d1b7476470 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -578,6 +578,7 @@ place-local-register-ref ; not exported to Racket place-local-register-set! ; not exported to Racket place-local-register-init! ; not exported to Racket + place-exit ; not exported to Racket _bool _bytes _short_bytes _double _double* _fixint _fixnum _float _fpointer _gcpointer _int16 _int32 _int64 _int8 _longdouble _pointer _scheme _stdbool _void diff --git a/racket/src/cs/rumble/engine.ss b/racket/src/cs/rumble/engine.ss index 559e63c4e5..d3a9b333a6 100644 --- a/racket/src/cs/rumble/engine.ss +++ b/racket/src/cs/rumble/engine.ss @@ -19,7 +19,7 @@ (keyboard-interrupt-handler)) (define (engine-exit v) - (chez:exit v)) + (place-exit v)) (define (set-engine-exit-handler! proc) (set! engine-exit proc)) diff --git a/racket/src/cs/rumble/error.ss b/racket/src/cs/rumble/error.ss index 9f32e2d10a..bd5fce5608 100644 --- a/racket/src/cs/rumble/error.ss +++ b/racket/src/cs/rumble/error.ss @@ -687,6 +687,10 @@ (current-exception-state (create-exception-state)) (base-exception-handler (lambda (v) + #; + (#%printf "~s ~s\n" + (exn->string v) + '(continuation-mark-set-traces (current-continuation-marks))) (cond [(and (warning? v) (not (non-continuable-violation? v))) diff --git a/racket/src/cs/rumble/foreign.ss b/racket/src/cs/rumble/foreign.ss index fcdc07847b..cc5ad8a89b 100644 --- a/racket/src/cs/rumble/foreign.ss +++ b/racket/src/cs/rumble/foreign.ss @@ -1528,6 +1528,9 @@ (define-virtual-register place-thread-category PLACE-KNOWN-THREAD) (define (register-as-place-main!) (place-thread-category PLACE-MAIN-THREAD) + (foreign-place-init!)) + +(define (foreign-place-init!) (current-async-callback-queue (make-async-callback-queue (make-mutex) (make-condition) '()))) diff --git a/racket/src/cs/rumble/interrupt.ss b/racket/src/cs/rumble/interrupt.ss index 3fd0d7b0d2..6ac28740cf 100644 --- a/racket/src/cs/rumble/interrupt.ss +++ b/racket/src/cs/rumble/interrupt.ss @@ -58,4 +58,4 @@ (define (internal-error who s) (CHECK-uninterrupted (chez:fprintf (current-error-port) "~a: ~a\n" who s) - (chez:exit 1))) + (#%exit 1))) diff --git a/racket/src/cs/rumble/place.ss b/racket/src/cs/rumble/place.ss index 4fffecf9b9..c0e4a4a321 100644 --- a/racket/src/cs/rumble/place.ss +++ b/racket/src/cs/rumble/place.ss @@ -54,17 +54,26 @@ ;; ---------------------------------------- +(define-thread-local place-esc-box (box #f)) + (meta-cond [(threaded?) (define (place-enabled?) #f) ;; FIXME - (define (fork-place thunk) + (define (fork-place thunk finish-proc) (fork-thread (lambda () (init-virtual-registers) (place-registers (vector-copy place-register-inits)) - (thunk))))] + (init-place-locals!) + (foreign-place-init!) + (let ([result (call/cc + (lambda (esc) + (set-box! place-esc-box esc) + (thunk) + 0))]) + (finish-proc result)))))] [else (define (place-enabled?) #f) - (define (fork-place thunk) #f)]) + (define (fork-place thunk finish-proc) #f)]) (define do-start-place void) (define (install-start-place! proc) @@ -72,3 +81,9 @@ (define (start-place path sym in out err) (do-start-place path sym in out err)) + +(define (place-exit v) + (let ([esc (unbox place-esc-box)]) + (if esc + (esc v) + (#%exit v)))) diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index 89d610bdec..e641af4376 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -2,10 +2,6 @@ (export) (import (rename (chezpart) [define chez:define]) - (rename (only (chezscheme) - sleep - printf) - [sleep chez:sleep]) (rename (rumble) [rumble:break-enabled-key break-enabled-key] ;; Remapped to place-local register operations: @@ -55,14 +51,35 @@ ;; Chain to place-register handling: [(_ . rest) #'(place:define . rest)])) - (define (exit n) - (chez:exit n)) - + ;; This implementation of `sleep`, `get-wakeup-handle`, and `wakeup` is relevant + ;; only for running the places part of the thread demo. The relevant callbacks get + ;; replaced by the "io" layer to use rktio-based functions. + (define sleep-interrupted (rumble:unsafe-make-place-local #f)) (define (sleep secs) - (define isecs (inexact->exact (floor secs))) - (chez:sleep (make-time 'time-duration - (inexact->exact (floor (* (- secs isecs) 1e9))) - isecs))) + (let ([isecs (inexact->exact (floor secs))] + [zero-secs (make-time 'time-duration 0 0)] + [pause-secs (make-time 'time-duration 100000 0)]) + (let loop ([all-secs (make-time 'time-duration + (inexact->exact (floor (* (- secs isecs) 1e9))) + isecs)]) + (unless (or (time<=? all-secs zero-secs) + (let ([b (rumble:unsafe-place-local-ref sleep-interrupted)]) + (and b (unbox b)))) + (#%sleep pause-secs) + (loop (subtract-duration all-secs pause-secs)))) + (let ([b (rumble:unsafe-place-local-ref sleep-interrupted)]) + (when b + (set-box! b #f))))) + (define (get-wakeup-handle) + (let ([b (rumble:unsafe-place-local-ref sleep-interrupted)]) + (or b + (begin + ;; There's a race condition here.. Avoid triggering it + ;; in the thread demo. + (rumble:unsafe-place-local-set! sleep-interrupted (box #f)) + (get-wakeup-handle))))) + (define (wakeup b) + (set-box! b #t)) (define (primitive-table key) (case key @@ -105,9 +122,13 @@ 'poll-async-callbacks poll-async-callbacks 'disable-interrupts disable-interrupts 'enable-interrupts enable-interrupts + 'sleep sleep + 'get-wakeup-handle get-wakeup-handle + 'wakeup wakeup 'fork-place rumble:fork-place 'start-place rumble:start-place 'fork-pthread rumble:fork-thread + 'exit place-exit 'pthread? rumble:thread? 'get-thread-id rumble:get-thread-id 'make-condition rumble:make-condition diff --git a/racket/src/io/sandman/main.rkt b/racket/src/io/sandman/main.rkt index b64042ced5..6bd5dfb685 100644 --- a/racket/src/io/sandman/main.rkt +++ b/racket/src/io/sandman/main.rkt @@ -104,6 +104,14 @@ (check-signals))) ((sandman-do-poll timeout-sandman) mode wakeup)) + ;; get-wakeup + (lambda () + (rktio_get_signal_handle rktio)) + + ;; wakeup + (lambda (h) + (rktio_signal_received_at h)) + ;; any-sleepers? (lambda () ((sandman-do-any-sleepers? timeout-sandman))) diff --git a/racket/src/thread/bootstrap-main.rkt b/racket/src/thread/bootstrap-main.rkt index 055a4fcd1e..1763fe09f1 100644 --- a/racket/src/thread/bootstrap-main.rkt +++ b/racket/src/thread/bootstrap-main.rkt @@ -2,5 +2,6 @@ (require "bootstrap.rkt" ; must be before "main.rkt" "main.rkt") -(provide (all-from-out "main.rkt")) - +(provide (all-from-out "main.rkt") + ;; From "bootstrap.rkt": + register-place-symbol!) diff --git a/racket/src/thread/bootstrap.rkt b/racket/src/thread/bootstrap.rkt index 34742d4620..0246934437 100644 --- a/racket/src/thread/bootstrap.rkt +++ b/racket/src/thread/bootstrap.rkt @@ -10,6 +10,8 @@ ;; with `break-enabled-key`, and it does not support using an ;; exception handler in an engine. +(provide register-place-symbol!) + (define (make-engine thunk init-break-enabled-cell empty-config?) (define ready-s (make-semaphore)) (define s (make-semaphore)) @@ -135,32 +137,68 @@ (struct exn:break:terminate/non-engine exn:break/non-engine ()) (define (make-pthread-parameter v) - (define x v) + (define l (unsafe-make-place-local v)) (case-lambda - [() x] - [(v) (set! x v)])) + [() (unsafe-place-local-ref l)] + [(v) (unsafe-place-local-set! l v)])) -(define place-local-table (make-thread-cell (make-hasheq))) +(define initial-place-local-table (make-hasheq)) +(define place-local-table (make-parameter initial-place-local-table)) (define (unsafe-make-place-local v) (define key (vector v 'place-locale)) - (hash-set! (thread-cell-ref place-local-table) key v) + (hash-set! (place-local-table) key v) key) (define (unsafe-place-local-ref key) - (hash-ref (thread-cell-ref place-local-table) key (vector-ref key 0))) + (hash-ref (place-local-table) key (vector-ref key 0))) (define (unsafe-place-local-set! key val) - (hash-set! (thread-cell-ref place-local-table) key val)) + (hash-set! (place-local-table) key val)) -(define (fork-place thunk) - (thread (lambda () - (thread-cell-set! place-local-table (make-hasheq)) - (thunk)))) +(define wakeables (make-weak-hasheq)) + +(define (wakeable-sleep msecs) + (define s (make-semaphore)) + (hash-set! wakeables (place-local-table) s) + (sync/timeout msecs s) + (void)) + +(define (get-wakeup-handle) + (place-local-table)) + +(define (wakeup t) + (define s (hash-ref wakeables t #f)) + (when s (semaphore-post s))) + +(define place-done-prompt (make-continuation-prompt-tag 'place-done)) + +(define (fork-place thunk finish) + (parameterize ([place-local-table (make-hasheq)]) + (thread (lambda () + (define v + (call-with-continuation-prompt + thunk + place-done-prompt)) + (finish v))))) + +(define place-symbols (make-hasheq)) (define (start-place mod sym in out err) (lambda (finish) - (void))) + (finish #f #f #f) + ((hash-ref place-symbols sym)))) + +;; For use in "demo.rkt" +(define (register-place-symbol! sym proc) + (hash-set! place-symbols sym proc)) + +(define (place-exit v) + (if (eq? initial-place-local-table (place-local-table)) + (exit v) + (abort-current-continuation + place-done-prompt + (lambda () v)))) (primitive-table '#%pthread (hash @@ -193,29 +231,33 @@ 'poll-async-callbacks (lambda () null) 'disable-interrupts void 'enable-interrupts void + 'sleep wakeable-sleep + 'get-wakeup-handle get-wakeup-handle + 'wakeup wakeup 'fork-place fork-place 'start-place start-place + 'exit place-exit 'fork-pthread (lambda args (error "fork-pthread: not ready")) 'pthread? (lambda args (error "thread?: not ready")) 'get-thread-id (lambda args (error "get-pthread-id: not ready")) - 'make-condition (lambda () 'condition) - 'condition-wait (lambda args - (error "condition-wait: not ready")) - 'condition-signal (lambda args - (error "condition-signal: not ready")) + 'make-condition (lambda () (make-semaphore)) + 'condition-wait (lambda (c s) + (semaphore-post s) + (semaphore-wait c) + (semaphore-wait s)) + 'condition-signal (lambda (c) + (semaphore-post c)) 'condition-broadcast (lambda args (error "condition-broadcast: not ready")) 'threaded? (lambda () #f) 'current-engine-state (lambda args (error "current-engine state: not ready")) - 'make-mutex (lambda () 'mutex) - 'mutex-acquire (lambda args - (error "mutex-acquire: not ready")) - 'mutex-release (lambda args - (error "mutex-release: not ready")))) + 'make-mutex (lambda () (make-semaphore 1)) + 'mutex-acquire (lambda (s) (semaphore-wait s)) + 'mutex-release (lambda (s) (semaphore-post s)))) ;; add dummy definitions that implement pthreads and conditions etc. ;; dummy definitions that error diff --git a/racket/src/thread/custodian.rkt b/racket/src/thread/custodian.rkt index 51bb1cbf71..641bf80aba 100644 --- a/racket/src/thread/custodian.rkt +++ b/racket/src/thread/custodian.rkt @@ -28,6 +28,11 @@ raise-custodian-is-shut-down set-post-shutdown-action!) +(module+ scheduling + (provide do-custodian-shutdown-all + set-root-custodian! + create-custodian)) + (struct custodian (children ; weakly maps maps object to callback [shut-down? #:mutable] [shutdown-sema #:mutable] @@ -59,7 +64,7 @@ #f ; shutdown semaphore #f)) -(define root-custodian (create-custodian)) +(define-place-local root-custodian (create-custodian)) (define/who current-custodian (make-parameter root-custodian @@ -67,6 +72,11 @@ (check who custodian? v) v))) +;; To initialize a new place: +(define (set-root-custodian! c) + (set! root-custodian c) + (current-custodian c)) + (define/who (make-custodian [parent (current-custodian)]) (check who custodian? parent) (define c (create-custodian)) diff --git a/racket/src/thread/demo.rkt b/racket/src/thread/demo.rkt index e517cfeae4..a13b54df36 100644 --- a/racket/src/thread/demo.rkt +++ b/racket/src/thread/demo.rkt @@ -501,10 +501,30 @@ (check #t (symbol? (will-execute we))) ;; Check places, where the various export symbols passed to - ;; `dynamic-place` are built into "bootstrap.rkt" - (define pl (dynamic-place 'dummy 'no-op #f #f #f)) - (check #t (place? pl)) - (sleep 1) + ;; `dynamic-place` are registered via `register-place-symbol!` + (register-place-symbol! 'nothing void) + (define pl1 (dynamic-place 'dummy 'nothing #f #f #f)) + (check #t (place? pl1)) + (check 0 (place-wait pl1)) + + (register-place-symbol! 'exit1 (lambda () (exit 1))) + (define pl2 (dynamic-place 'dummy 'exit1 #f #f #f)) + (check #t (place? pl2)) + (check 1 (place-wait pl2)) + + (register-place-symbol! 'loop (lambda () (let loop () (loop)))) + (define pl3 (dynamic-place 'dummy 'loop #f #f #f)) + (check #t (place? pl3)) + (place-break pl3) + (check 1 (place-wait pl3)) + (printf "[That break was from a place, and it's expected]\n") + + (define pl4 (dynamic-place 'dummy 'loop #f #f #f)) + (check #f (sync/timeout 0.01 (place-dead-evt pl4))) + (place-kill pl4) + (check 1 (place-wait pl4)) + (check #t (evt? (sync (place-dead-evt pl4)))) + (check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4)))) (set! done? #t))) diff --git a/racket/src/thread/exit.rkt b/racket/src/thread/exit.rkt index 2af7ec30ae..e4ba8ab4db 100644 --- a/racket/src/thread/exit.rkt +++ b/racket/src/thread/exit.rkt @@ -1,7 +1,6 @@ #lang racket/base -(require (only-in racket/base - [exit host:exit]) - "../common/check.rkt" +(require "../common/check.rkt" + "host.rkt" "plumber.rkt") (provide exit @@ -17,6 +16,7 @@ (check who (procedure-arity-includes/c 1) p) p))) +;; In a non-main place, must be called only in the scheduler: (define (force-exit v) (cond [(byte? v) diff --git a/racket/src/thread/future.rkt b/racket/src/thread/future.rkt index 76cc10df17..65a45096a1 100644 --- a/racket/src/thread/future.rkt +++ b/racket/src/thread/future.rkt @@ -253,7 +253,7 @@ (let loop ([id 1]) (cond [(< id (+ 1 THREAD-COUNT)) - (cons (worker id (make-lock) (chez:make-mutex) (chez:make-condition) (make-queue) #t) + (cons (worker id (make-lock) (host:make-mutex) (host:make-condition) (make-queue) #t) (loop (+ id 1)))] [else '()]))) @@ -277,10 +277,10 @@ (let ([w (pick-worker)]) (with-lock ((worker-lock w) (get-caller)) - (chez:mutex-acquire (worker-mutex w)) + (host:mutex-acquire (worker-mutex w)) (queue-add! (worker-queue w) f) - (chez:condition-signal (worker-cond w)) - (chez:mutex-release (worker-mutex w))))) + (host:condition-signal (worker-cond w)) + (host:mutex-release (worker-mutex w))))) (define (pick-worker) (define workers (scheduler-workers global-scheduler)) @@ -304,9 +304,9 @@ (cond [(not (queue-empty? (worker-queue w))) ;; got work in meantime (void)] - [(chez:mutex-acquire m #f) ;; cannot acquire lock while worker is being given work. - (chez:condition-wait (worker-cond w) m) - (chez:mutex-release m)] + [(host:mutex-acquire m #f) ;; cannot acquire lock while worker is being given work. + (host:condition-wait (worker-cond w) m) + (host:mutex-release m)] [else ;; try to get lock again. (try)]))) @@ -343,9 +343,9 @@ ((future*-engine future) TICKS void complete (expire future worker))] [(not (future*-cont future)) ;; don't want to reschedule future with a saved continuation (with-lock ((worker-lock worker) (get-caller)) - (chez:mutex-acquire (worker-mutex worker)) + (host:mutex-acquire (worker-mutex worker)) (queue-add! (worker-queue worker) future) - (chez:mutex-release (worker-mutex worker)))] + (host:mutex-release (worker-mutex worker)))] [else (with-lock ((future*-lock future) (get-caller)) (future:condition-signal (future*-cond future)))]))) diff --git a/racket/src/thread/host.rkt b/racket/src/thread/host.rkt index 1f9bedb999..398daa8481 100644 --- a/racket/src/thread/host.rkt +++ b/racket/src/thread/host.rkt @@ -66,17 +66,24 @@ [disable-interrupts host:disable-interrupts] [enable-interrupts host:enable-interrupts] + ;; Support for the thre scheduler and interrupts + ;; across places + [sleep host:sleep] + [get-wakeup-handle host:get-wakeup-handle] + [wakeup host:wakeup] + [fork-place host:fork-place] [start-place host:start-place] + [exit host:exit] fork-pthread pthread? [get-thread-id get-pthread-id] - [make-condition chez:make-condition] - [condition-wait chez:condition-wait] - [condition-signal chez:condition-signal] - [condition-broadcast chez:condition-broadcast] - [make-mutex chez:make-mutex] - [mutex-acquire chez:mutex-acquire] - [mutex-release chez:mutex-release] + [make-condition host:make-condition] + [condition-wait host:condition-wait] + [condition-signal host:condition-signal] + [condition-broadcast host:condition-broadcast] + [make-mutex host:make-mutex] + [mutex-acquire host:mutex-acquire] + [mutex-release host:mutex-release] threaded?) diff --git a/racket/src/thread/os-thread.rkt b/racket/src/thread/os-thread.rkt index 18289801ce..85f7830c69 100644 --- a/racket/src/thread/os-thread.rkt +++ b/racket/src/thread/os-thread.rkt @@ -24,30 +24,30 @@ (define/who (unsafe-make-os-semaphore) (unless threaded? (raise-unsupported who)) - (os-semaphore 0 (chez:make-mutex) (chez:make-condition))) + (os-semaphore 0 (host:make-mutex) (host:make-condition))) (define/who (unsafe-os-semaphore-post s) (check who os-semaphore? s) - (chez:mutex-acquire (os-semaphore-mutex s)) + (host:mutex-acquire (os-semaphore-mutex s)) (when (zero? (os-semaphore-count s)) - (chez:condition-signal (os-semaphore-condition s))) + (host:condition-signal (os-semaphore-condition s))) (set-os-semaphore-count! s (add1 (os-semaphore-count s))) - (chez:mutex-release (os-semaphore-mutex s))) + (host:mutex-release (os-semaphore-mutex s))) ;; interrupts must be enabled when waiting on a semaphore; otherwise, ;; the wait will block GCs, likely deadlocking this thread and another ;; thread that is working toward posting the semaphore (define/who (unsafe-os-semaphore-wait s) (check who os-semaphore? s) - (chez:mutex-acquire (os-semaphore-mutex s)) + (host:mutex-acquire (os-semaphore-mutex s)) (let loop () (cond [(zero? (os-semaphore-count s)) - (chez:condition-wait (os-semaphore-condition s) (os-semaphore-mutex s)) + (host:condition-wait (os-semaphore-condition s) (os-semaphore-mutex s)) (loop)] [else (set-os-semaphore-count! s (sub1 (os-semaphore-count s)))])) - (chez:mutex-release (os-semaphore-mutex s))) + (host:mutex-release (os-semaphore-mutex s))) (define (raise-unsupported who) (raise diff --git a/racket/src/thread/place.rkt b/racket/src/thread/place.rkt index a1f83aa03a..cd6a0347ea 100644 --- a/racket/src/thread/place.rkt +++ b/racket/src/thread/place.rkt @@ -6,11 +6,14 @@ "schedule.rkt" "atomic.rkt" "thread.rkt" + (submod "thread.rkt" for-place) "custodian.rkt" + (submod "custodian.rkt" scheduling) "plumber.rkt" "exit.rkt" "sync.rkt" - "evt.rkt") + "evt.rkt" + "sandman.rkt") (provide dynamic-place place? @@ -32,67 +35,158 @@ ;; ---------------------------------------- -(struct place ([result #:mutable] +(struct place (lock + [result #:mutable] ; byte or #f, where #f means "not done" + [queued-result #:mutable] ; non-#f triggers a place exit custodian - [post-shutdown #:mutable] - pumper-threads)) + [post-shutdown #:mutable] ; list of callbacks + pumper-threads ; vector of up to three pumper threads + [pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate + done-waiting ; hash table of places to ping when this one ends + [wakeup-handle #:mutable])) (define-place-local current-place #f) -(define place-prompt-tag (make-continuation-prompt-tag 'place)) - (define (dynamic-place path sym in out err) - (define c (make-custodian)) - (define new-place (place #f ; result + (define c (create-custodian)) + (define lock (host:make-mutex)) + (define started (host:make-condition)) + (define done-waiting (make-hasheq)) + (define new-place (place lock + #f ; result + #f ; queued-result c '() ; post-shutdown - (make-vector 3 #f))) ; pumper-threads + (make-vector 3 #f) ; pumper-threads + #f ; pending-break + done-waiting + #f)) (define orig-plumber (make-plumber)) (define (default-exit v) (plumber-flush-all orig-plumber) - (unsafe-abort-current-continuation/no-wind - place-prompt-tag - (if (byte? v) v 0))) + (host:mutex-acquire lock) + (set-place-queued-result! new-place (if (byte? v) v 0)) + (host:mutex-release lock) + ;; Switch to scheduler, so it can exit: + (engine-block)) + ;; Start the new place (host:fork-place (lambda () (define finish (host:start-place path sym in out err)) - (call-in-main-thread + (call-in-another-main-thread + c (lambda () - (define result - (call-with-continuation-prompt - (lambda () - (set! current-place new-place) - (current-custodian c) - (current-plumber orig-plumber) - (exit-handler default-exit) - (finish - (lambda (in-th out-th err-th) - (vector-set! (place-pumper-threads place) 0 in-th) - (vector-set! (place-pumper-threads place) 1 out-th) - (vector-set! (place-pumper-threads place) 2 err-th))) - (default-exit 0)) - place-prompt-tag - (lambda (v) v))) - (set-place-result! new-place result))))) + (set! current-place new-place) + (current-plumber orig-plumber) + (exit-handler default-exit) + ;; The finish function reports some I/O related + ;; information to store in the place, and when that + ;; callback returns, it starts loading the specified + ;; module + (call-with-continuation-prompt + (lambda () + (finish + (lambda (in-th out-th err-th) + (vector-set! (place-pumper-threads new-place) 0 in-th) + (vector-set! (place-pumper-threads new-place) 1 out-th) + (vector-set! (place-pumper-threads new-place) 2 err-th) + (host:mutex-acquire lock) + (set-place-wakeup-handle! new-place (sandman-get-wakeup-handle)) + (host:condition-signal started) ; place is sufficiently started + (host:mutex-release lock)))) + (default-continuation-prompt-tag) + (lambda (thunk) + ;; Thread ended with escape => exit with status 1 + (call-with-continuation-prompt thunk) + (default-exit 1))) + (default-exit 0)))) + (lambda (result) + ;; Place is done, so save the result and alert anyone waiting on + ;; the place + (do-custodian-shutdown-all c) + (host:mutex-acquire lock) + (set-place-result! new-place result) + (host:mutex-release lock) + (for ([k (in-hash-keys done-waiting)]) + (cond + [(place? k) + (host:mutex-acquire (place-lock k)) + (unless (place-result k) + (sandman-wakeup (place-wakeup-handle k))) + (host:mutex-release (place-lock k))] + [else (sandman-wakeup k)])) + (hash-clear! done-waiting))) + ;; Wait for the place to start, then return the place object + (host:mutex-acquire lock) + (host:condition-wait started lock) + (host:mutex-release lock) new-place) (define/who (place-break p [kind #f]) (check who place? p) (unless (or (not kind) (eq? kind 'hangup) (eq? kind 'terminate)) (raise-argument-error who "(or/c #f 'hangup 'terminate)" kind)) - (void)) + (host:mutex-acquire (place-lock p)) + (define pending-break (place-pending-break p)) + (when (or (not pending-break) + (break>? (or kind 'break) pending-break)) + (set-place-pending-break! p (or kind 'break)) + (sandman-wakeup (place-wakeup-handle p))) + (host:mutex-release (place-lock p))) + +(void + (set-check-place-break! + ;; Called in atomic mode by scheduler + (lambda () + (define p current-place) + (when p + (host:mutex-acquire (place-lock p)) + (define queued-result (place-queued-result p)) + (define break (place-pending-break p)) + (when break + (set-place-pending-break! p #f)) + (host:mutex-release (place-lock p)) + (when queued-result + (force-exit queued-result)) + (when break + (do-break-thread root-thread break #f)))))) (define/who (place-kill p) (check who place? p) + (host:mutex-acquire (place-lock p)) + (unless (or (place-result p) + (place-queued-result p)) + (set-place-queued-result! p 1)) + (host:mutex-release (place-lock p)) + (place-wait p) (void)) (define/who (place-wait p) (check who place? p) - (sync never-evt)) + (sync (place-done-evt p #t))) + +(struct place-done-evt (p get-result?) + #:property prop:evt (poller (lambda (self poll-ctx) + (assert-atomic-mode) + (define p (place-done-evt-p self)) + (host:mutex-acquire (place-lock p)) + (define result (place-result p)) + (unless result + (hash-set! (place-done-waiting p) + (or current-place + (sandman-get-wakeup-handle)) + #t)) + (host:mutex-release (place-lock p)) + (if result + (if (place-done-evt-get-result? self) + (values (list result) #f) + (values (list self) #f)) + (values #f self)))) + #:reflection-name 'place-dead-evt) (define/who (place-dead-evt p) (check who place? p) - never-evt) + (place-done-evt p #f)) (define/who (place-sleep msecs) (void)) @@ -124,7 +218,8 @@ #f) (define (unsafe-add-post-custodian-shutdown proc) - (atomically - (set-place-post-shutdown! current-place - (cons proc - (place-post-shutdown current-place))))) + (when current-place + (atomically + (set-place-post-shutdown! current-place + (cons proc + (place-post-shutdown current-place)))))) diff --git a/racket/src/thread/sandman-struct.rkt b/racket/src/thread/sandman-struct.rkt index 4c7e67f8fc..7bb8b7cad0 100644 --- a/racket/src/thread/sandman-struct.rkt +++ b/racket/src/thread/sandman-struct.rkt @@ -15,6 +15,9 @@ do-poll ; (thread -> any) -> (void), calls function on any thread to wake up ; where is 'fast or 'slow + do-get-wakeup ; -> , identifies current place + do-wakeup ; -> (void), wakes up `do-sleep` call + do-any-sleepers? ; -> boolean do-sleepers-external-events ; -> for sleepers diff --git a/racket/src/thread/sandman.rkt b/racket/src/thread/sandman.rkt index 790137c717..05ae343cbb 100644 --- a/racket/src/thread/sandman.rkt +++ b/racket/src/thread/sandman.rkt @@ -3,7 +3,8 @@ "check.rkt" "tree.rkt" "internal-error.rkt" - "sandman-struct.rkt") + "sandman-struct.rkt" + "host.rkt") ;; A "sandman" manages the set of all sleeping threads that may need ;; to be awoken in response to an external event, and it implements @@ -35,6 +36,8 @@ sandman-remove-sleeping-thread! sandman-poll sandman-sleep + sandman-get-wakeup-handle + sandman-wakeup sandman-any-sleepers? sandman-sleepers-external-events sandman-condition-wait @@ -67,6 +70,14 @@ (define (sandman-sleep exts) ((sandman-do-sleep the-sandman) exts)) +;; potentially in atomic mode +(define (sandman-get-wakeup-handle) + ((sandman-do-get-wakeup the-sandman))) + +;; potentially in atomic mode +(define (sandman-wakeup h) + ((sandman-do-wakeup the-sandman) h)) + ;; in atomic mode (define (sandman-any-sleepers?) ((sandman-do-any-sleepers? the-sandman))) @@ -129,7 +140,7 @@ (sandman ;; sleep (lambda (timeout-at) - (sleep (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0))) + (host:sleep (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0))) ;; poll (lambda (mode wakeup) @@ -141,6 +152,14 @@ (for ([t (in-hash-keys threads)]) (wakeup t)))))) + ;; get-wakeup-handle + (lambda () + (host:get-wakeup-handle)) + + ;; wakeup + (lambda (h) + (host:wakeup h)) + ;; any-sleepers? (lambda () (not (tree-empty? sleeping-threads))) diff --git a/racket/src/thread/schedule.rkt b/racket/src/thread/schedule.rkt index 2657248219..1d930334e5 100644 --- a/racket/src/thread/schedule.rkt +++ b/racket/src/thread/schedule.rkt @@ -10,14 +10,18 @@ (submod "thread.rkt" scheduling) "system-idle-evt.rkt" "exit.rkt" - "future.rkt") + "future.rkt" + "custodian.rkt" + (submod "custodian.rkt" scheduling)) ;; Many scheduler details are implemented in "thread.rkt", but this ;; module handles the thread selection, thread swapping, and ;; process sleeping. (provide call-in-main-thread - set-atomic-timeout-callback!) + call-in-another-main-thread + set-atomic-timeout-callback! + set-check-place-break!) (define TICKS 100000) @@ -28,6 +32,12 @@ (make-initial-thread thunk) (select-thread!)) +;; Initializes the thread system in a new place: +(define (call-in-another-main-thread c thunk) + (make-another-initial-thread-group) + (set-root-custodian! c) + (call-in-main-thread thunk)) + ;; ---------------------------------------- (define (select-thread! [pending-callbacks null]) @@ -37,6 +47,7 @@ pending-callbacks)) (host:poll-will-executors) (check-external-events 'fast) + (check-place-break) (when (and (null? callbacks) (all-threads-poll-done?) (waiting-on-external-or-idle?)) @@ -219,3 +230,9 @@ (begin0 atomic-timeout-callback (set! atomic-timeout-callback cb))) + +;; ---------------------------------------- + +(define check-place-break void) +(define (set-check-place-break! proc) + (set! check-place-break proc)) diff --git a/racket/src/thread/thread-group.rkt b/racket/src/thread/thread-group.rkt index d3f192dfb2..0d2b4603f4 100644 --- a/racket/src/thread/thread-group.rkt +++ b/racket/src/thread/thread-group.rkt @@ -9,6 +9,7 @@ thread-group? make-thread-group current-thread-group + make-another-initial-thread-group ;; Used by scheduler root-thread-group @@ -32,7 +33,10 @@ [chain #:mutable] ; children remaining to be scheduled round-robin [chain-end #:mutable])) -(define root-thread-group (thread-group 'none 'none #f #f #f #f)) +(define (make-root-thread-group) + (thread-group 'none 'none #f #f #f #f)) + +(define-place-local root-thread-group (make-root-thread-group)) (define-place-local num-threads-in-groups 0) @@ -42,6 +46,10 @@ (check who thread-group? v) v))) +(define (make-another-initial-thread-group) + (set! root-thread-group (make-root-thread-group)) + (current-thread-group root-thread-group)) + (define/who (make-thread-group [parent (current-thread-group)]) (check who thread-group? parent) (define tg (thread-group 'none 'none parent #f #f #f)) diff --git a/racket/src/thread/thread.rkt b/racket/src/thread/thread.rkt index ac785f5a91..0568afceee 100644 --- a/racket/src/thread/thread.rkt +++ b/racket/src/thread/thread.rkt @@ -84,6 +84,12 @@ break-max)) +;; Exports needed by "place.rkt": +(module* for-place #f + (provide root-thread + do-break-thread + break>?)) + ;; ---------------------------------------- (struct thread node (name