From 35af40d8508b37ae00a35d1d8bc370f2a91dd530 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Mon, 3 Sep 2018 09:03:18 -0600 Subject: [PATCH] cs & thread: move places API to the thread layer The Rumble layer provides a primitive `fork-place` operation and `start-place` hook to tie in place initialization from other layers (especially "io"), but the rest can live in the "thread" layer, especially to handle place-channel synchronization. --- racket/src/cs/main.sps | 3 +- racket/src/cs/primitive/internal.ss | 2 + racket/src/cs/rumble.sls | 8 +- racket/src/cs/rumble/place.ss | 41 +----- racket/src/cs/thread.sls | 18 ++- .../src/expander/compile/built-in-symbol.rkt | 30 ++-- racket/src/thread/atomic.rkt | 4 +- racket/src/thread/bootstrap.rkt | 29 +++- racket/src/thread/continuation-mark.rkt | 2 +- racket/src/thread/custodian.rkt | 4 +- racket/src/thread/demo.rkt | 6 + racket/src/thread/future.rkt | 4 +- racket/src/thread/{engine.rkt => host.rkt} | 8 +- racket/src/thread/main.rkt | 19 +++ racket/src/thread/nested-thread.rkt | 2 +- racket/src/thread/os-thread.rkt | 2 +- racket/src/thread/parameter.rkt | 2 +- racket/src/thread/place-local.rkt | 20 +++ racket/src/thread/place.rkt | 130 ++++++++++++++++++ racket/src/thread/sandman.rkt | 2 +- racket/src/thread/schedule.rkt | 4 +- racket/src/thread/thread-group.rkt | 2 +- racket/src/thread/thread.rkt | 6 +- racket/src/thread/time.rkt | 4 +- racket/src/thread/will-executor.rkt | 2 +- 25 files changed, 272 insertions(+), 82 deletions(-) rename racket/src/thread/{engine.rkt => host.rkt} (93%) create mode 100644 racket/src/thread/place-local.rkt create mode 100644 racket/src/thread/place.rkt diff --git a/racket/src/cs/main.sps b/racket/src/cs/main.sps index 2b0815a36d..2c86e6a6db 100644 --- a/racket/src/cs/main.sps +++ b/racket/src/cs/main.sps @@ -488,7 +488,8 @@ (regexp-place-init!) (expander-place-init!) (initialize-place!) - (dynamic-require mod sym))) + (lambda () + (dynamic-require mod sym)))) (when (getenv "PLT_STATS_ON_BREAK") (keyboard-interrupt-handler diff --git a/racket/src/cs/primitive/internal.ss b/racket/src/cs/primitive/internal.ss index c245c01a33..0243e6efbf 100644 --- a/racket/src/cs/primitive/internal.ss +++ b/racket/src/cs/primitive/internal.ss @@ -29,4 +29,6 @@ [record-mutator (known-constant)] [unsafe-struct? (known-constant)] + [fork-place (known-procedure 1)] + [start-place (known-procedure 32)] [make-pthread-parameter (known-procedure 2)]) diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index 0b2116b7b4..b7e2751d5d 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -570,12 +570,10 @@ unsafe-extflvector-length unsafe-extflvector-ref unsafe-extflvector-set! install-start-place! ; not exported to Racket - place-enabled? place? place-channel? place-break - place-channel-get place-channel-put place-sleep - place-channel place-dead-evt place-kill place-message-allowed? - dynamic-place place-wait place-pumper-threads place-shared? + fork-place ; not exported to Racket + start-place ; not exported to Racket + place-enabled? unsafe-get-place-table - unsafe-add-post-custodian-shutdown unsafe-make-place-local unsafe-place-local-ref unsafe-place-local-set! place-local-register-ref ; not exported to Racket place-local-register-set! ; not exported to Racket diff --git a/racket/src/cs/rumble/place.ss b/racket/src/cs/rumble/place.ss index 455b08a9c6..4fffecf9b9 100644 --- a/racket/src/cs/rumble/place.ss +++ b/racket/src/cs/rumble/place.ss @@ -54,8 +54,6 @@ ;; ---------------------------------------- -(define-record place (thread)) - (meta-cond [(threaded?) (define (place-enabled?) #f) ;; FIXME @@ -68,40 +66,9 @@ (define (place-enabled?) #f) (define (fork-place thunk) #f)]) -(define start-place void) +(define do-start-place void) (define (install-start-place! proc) - (set! start-place proc)) + (set! do-start-place proc)) -(define (dynamic-place path sym in out err) - (make-place - (fork-place (lambda () - (start-place path sym in out err))))) - -(define (place-channel? v) - #f) - -(define-syntax define-place-not-yet-available - (syntax-rules () - [(_ id) - (define (id . args) - (error 'id "place API not yet supported"))] - [(_ id ...) - (begin (define-place-not-yet-available id) ...)])) - -;; This operation adds shutdown thunks to a non-main place, so it's a -;; no-op for now: -(define (unsafe-add-post-custodian-shutdown proc) - (void)) - -(define-place-not-yet-available - place-break - place-channel-get - place-channel-put - place-sleep - place-channel - place-dead-evt - place-kill - place-message-allowed? - place-wait - place-pumper-threads - place-shared?) +(define (start-place path sym in out err) + (do-start-place path sym in out err)) diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index 8bfdf340f9..89d610bdec 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -26,6 +26,8 @@ [mutex-acquire rumble:mutex-acquire] [mutex-release rumble:mutex-release] [pthread? rumble:thread?] + [fork-place rumble:fork-place] + [start-place rumble:start-place] [fork-pthread rumble:fork-thread] [threaded? rumble:threaded?] [get-thread-id rumble:get-thread-id] @@ -39,7 +41,8 @@ ;; Special handling of `current-atomic`: use the last virtual register; ;; we rely on the fact that the register's default value is 0. (define-syntax (define stx) - (syntax-case stx (current-atomic make-pthread-parameter) + (syntax-case stx (current-atomic make-pthread-parameter unsafe-make-place-local) + ;; Recognize definition of `current-atomic`: [(_ current-atomic (make-pthread-parameter 0)) (with-syntax ([(_ id _) stx] [n (datum->syntax #'here (sub1 (virtual-register-count)))]) @@ -47,6 +50,9 @@ (syntax-rules () [(_) (virtual-register n)] [(_ v) (set-virtual-register! n v)])))] + ;; Workaround for redirected access of `unsafe-make-place-local` from engine: + [(_ alias-id unsafe-make-place-local) #'(begin)] + ;; Chain to place-register handling: [(_ . rest) #'(place:define . rest)])) (define (exit n) @@ -67,7 +73,13 @@ ;; expander, and they need to be listed in ;; "primitives/internal.ss". (hash - 'make-pthread-parameter make-pthread-parameter)] + 'make-pthread-parameter make-pthread-parameter + ;; These are actually redirected by "place-register.ss", but + ;; we list them here for compatibility with the bootstrapping + ;; variant of `#%pthread` + 'unsafe-make-place-local rumble:unsafe-make-place-local + 'unsafe-place-local-ref rumble:unsafe-place-local-ref + 'unsafe-place-local-set! rumble:unsafe-place-local-set!)] [(|#%engine|) (hash 'make-engine rumble:make-engine @@ -93,6 +105,8 @@ 'poll-async-callbacks poll-async-callbacks 'disable-interrupts disable-interrupts 'enable-interrupts enable-interrupts + 'fork-place rumble:fork-place + 'start-place rumble:start-place 'fork-pthread rumble:fork-thread 'pthread? rumble:thread? 'get-thread-id rumble:get-thread-id diff --git a/racket/src/expander/compile/built-in-symbol.rkt b/racket/src/expander/compile/built-in-symbol.rkt index 6821d2d35e..a26a7a1fa0 100644 --- a/racket/src/expander/compile/built-in-symbol.rkt +++ b/racket/src/expander/compile/built-in-symbol.rkt @@ -48,18 +48,18 @@ ;; Linklet compilation on Chez Scheme (for-each register-built-in-symbol! - '(let - letrec* - define - or - and - pariah - variable-set! - variable-ref - variable-ref/no-check - make-instance-variable-reference - annotation? - annotation-expression - #%app - #%call-with-values - make-pthread-parameter)))) + '(or + and + let + letrec* + define + pariah + variable-set! + variable-ref + variable-ref/no-check + make-instance-variable-reference + annotation? + annotation-expression + #%app + #%call-with-values + make-pthread-parameter)))) diff --git a/racket/src/thread/atomic.rkt b/racket/src/thread/atomic.rkt index 9dc2ee15a5..c24b395ddd 100644 --- a/racket/src/thread/atomic.rkt +++ b/racket/src/thread/atomic.rkt @@ -1,6 +1,6 @@ #lang racket/base -(require racket/private/place-local - "engine.rkt" +(require "host.rkt" + "place-local.rkt" "internal-error.rkt" "debug.rkt") diff --git a/racket/src/thread/bootstrap.rkt b/racket/src/thread/bootstrap.rkt index dfd95ad157..34742d4620 100644 --- a/racket/src/thread/bootstrap.rkt +++ b/racket/src/thread/bootstrap.rkt @@ -140,9 +140,34 @@ [() x] [(v) (set! x v)])) +(define place-local-table (make-thread-cell (make-hasheq))) + +(define (unsafe-make-place-local v) + (define key (vector v 'place-locale)) + (hash-set! (thread-cell-ref place-local-table) key v) + key) + +(define (unsafe-place-local-ref key) + (hash-ref (thread-cell-ref place-local-table) key (vector-ref key 0))) + +(define (unsafe-place-local-set! key val) + (hash-set! (thread-cell-ref place-local-table) key val)) + +(define (fork-place thunk) + (thread (lambda () + (thread-cell-set! place-local-table (make-hasheq)) + (thunk)))) + +(define (start-place mod sym in out err) + (lambda (finish) + (void))) + (primitive-table '#%pthread (hash - 'make-pthread-parameter make-pthread-parameter)) + 'make-pthread-parameter make-pthread-parameter + 'unsafe-make-place-local unsafe-make-place-local + 'unsafe-place-local-ref unsafe-place-local-ref + 'unsafe-place-local-set! unsafe-place-local-set!)) (primitive-table '#%engine (hash 'make-engine make-engine @@ -168,6 +193,8 @@ 'poll-async-callbacks (lambda () null) 'disable-interrupts void 'enable-interrupts void + 'fork-place fork-place + 'start-place start-place 'fork-pthread (lambda args (error "fork-pthread: not ready")) 'pthread? (lambda args diff --git a/racket/src/thread/continuation-mark.rkt b/racket/src/thread/continuation-mark.rkt index efc8b3f1fe..e277a8921a 100644 --- a/racket/src/thread/continuation-mark.rkt +++ b/racket/src/thread/continuation-mark.rkt @@ -1,7 +1,7 @@ #lang racket/base (require "check.rkt" (submod "thread.rkt" scheduling) - "engine.rkt") + "host.rkt") (provide continuation-marks) diff --git a/racket/src/thread/custodian.rkt b/racket/src/thread/custodian.rkt index b8f4252cc2..51bb1cbf71 100644 --- a/racket/src/thread/custodian.rkt +++ b/racket/src/thread/custodian.rkt @@ -1,8 +1,8 @@ #lang racket/base -(require racket/private/place-local +(require "place-local.rkt" "check.rkt" "atomic.rkt" - "engine.rkt" + "host.rkt" "evt.rkt" "semaphore.rkt") diff --git a/racket/src/thread/demo.rkt b/racket/src/thread/demo.rkt index c1f215bd9f..e517cfeae4 100644 --- a/racket/src/thread/demo.rkt +++ b/racket/src/thread/demo.rkt @@ -500,6 +500,12 @@ (thread (lambda () (sync (system-idle-evt)) (collect-garbage))) (check #t (symbol? (will-execute we))) + ;; Check places, where the various export symbols passed to + ;; `dynamic-place` are built into "bootstrap.rkt" + (define pl (dynamic-place 'dummy 'no-op #f #f #f)) + (check #t (place? pl)) + (sleep 1) + (set! done? #t))) (unless done? diff --git a/racket/src/thread/future.rkt b/racket/src/thread/future.rkt index 728da5da89..76cc10df17 100644 --- a/racket/src/thread/future.rkt +++ b/racket/src/thread/future.rkt @@ -1,9 +1,9 @@ #lang racket/base -(require racket/private/place-local +(require "place-local.rkt" "check.rkt" "internal-error.rkt" - "engine.rkt" + "host.rkt" "atomic.rkt" "parameter.rkt" "../common/queue.rkt" diff --git a/racket/src/thread/engine.rkt b/racket/src/thread/host.rkt similarity index 93% rename from racket/src/thread/engine.rkt rename to racket/src/thread/host.rkt index 5601f4ec77..1f9bedb999 100644 --- a/racket/src/thread/engine.rkt +++ b/racket/src/thread/host.rkt @@ -24,7 +24,10 @@ ;; This `#%pthread` table's entries are linked more directly ;; than `#%engine` entries: (bounce #%pthread - make-pthread-parameter) + make-pthread-parameter + unsafe-make-place-local + unsafe-place-local-ref + unsafe-place-local-set!) (bounce #%engine make-engine @@ -63,6 +66,9 @@ [disable-interrupts host:disable-interrupts] [enable-interrupts host:enable-interrupts] + [fork-place host:fork-place] + [start-place host:start-place] + fork-pthread pthread? [get-thread-id get-pthread-id] diff --git a/racket/src/thread/main.rkt b/racket/src/thread/main.rkt index db8cd4f6fc..d29d96a3ea 100644 --- a/racket/src/thread/main.rkt +++ b/racket/src/thread/main.rkt @@ -25,6 +25,7 @@ "time.rkt" "stats.rkt" "stack-size.rkt" + "place.rkt" "future.rkt" "fsemaphore.rkt" "os-thread.rkt") @@ -157,6 +158,24 @@ unsafe-custodian-register unsafe-custodian-unregister + dynamic-place ; not the one from `racket/place` + place? + place-break + place-kill + place-wait + place-dead-evt + place-sleep + + place-channel + place-channel? + place-channel-get + place-channel-put + place-message-allowed? + + place-pumper-threads + place-shared? + unsafe-add-post-custodian-shutdown + futures-enabled? processor-count future diff --git a/racket/src/thread/nested-thread.rkt b/racket/src/thread/nested-thread.rkt index 1f2bc244e6..07340c4c47 100644 --- a/racket/src/thread/nested-thread.rkt +++ b/racket/src/thread/nested-thread.rkt @@ -1,7 +1,7 @@ #lang racket/base (require "check.rkt" "atomic.rkt" - "engine.rkt" + "host.rkt" "thread.rkt" (except-in (submod "thread.rkt" scheduling) thread diff --git a/racket/src/thread/os-thread.rkt b/racket/src/thread/os-thread.rkt index d43f76b337..18289801ce 100644 --- a/racket/src/thread/os-thread.rkt +++ b/racket/src/thread/os-thread.rkt @@ -1,6 +1,6 @@ #lang racket/base (require "check.rkt" - "engine.rkt" + "host.rkt" "atomic.rkt") (provide unsafe-os-thread-enabled? diff --git a/racket/src/thread/parameter.rkt b/racket/src/thread/parameter.rkt index 5a8b235fec..8e64f50484 100644 --- a/racket/src/thread/parameter.rkt +++ b/racket/src/thread/parameter.rkt @@ -1,5 +1,5 @@ #lang racket/base -(require "engine.rkt") +(require "host.rkt") (provide current-thread) diff --git a/racket/src/thread/place-local.rkt b/racket/src/thread/place-local.rkt new file mode 100644 index 0000000000..fc9c91745b --- /dev/null +++ b/racket/src/thread/place-local.rkt @@ -0,0 +1,20 @@ +#lang racket/base +(require (for-syntax racket/base) + "host.rkt") + +(provide define-place-local) + +;; Just like the one from `racket/private/place-local`, but using the +;; exports of "host.rkt" so we can test in bootstrapping mode. + +(define-syntax-rule (define-place-local id v) + (begin + (define cell (unsafe-make-place-local v)) + (define-syntax id + (make-set!-transformer + (lambda (stx) + (... + (syntax-case stx (set!) + [(set! _ r) #'(unsafe-place-local-set! cell r)] + [(_ e ...) #'((unsafe-place-local-ref cell) e ...)] + [_ #'(unsafe-place-local-ref cell)]))))))) diff --git a/racket/src/thread/place.rkt b/racket/src/thread/place.rkt new file mode 100644 index 0000000000..a1f83aa03a --- /dev/null +++ b/racket/src/thread/place.rkt @@ -0,0 +1,130 @@ +#lang racket/base +(require (only-in '#%unsafe unsafe-abort-current-continuation/no-wind) + "place-local.rkt" + "check.rkt" + "host.rkt" + "schedule.rkt" + "atomic.rkt" + "thread.rkt" + "custodian.rkt" + "plumber.rkt" + "exit.rkt" + "sync.rkt" + "evt.rkt") + +(provide dynamic-place + place? + place-break + place-kill + place-wait + place-dead-evt + place-sleep + + place-channel + place-channel? + place-channel-get + place-channel-put + place-message-allowed? + + place-pumper-threads + place-shared? + unsafe-add-post-custodian-shutdown) + +;; ---------------------------------------- + +(struct place ([result #:mutable] + custodian + [post-shutdown #:mutable] + pumper-threads)) + +(define-place-local current-place #f) + +(define place-prompt-tag (make-continuation-prompt-tag 'place)) + +(define (dynamic-place path sym in out err) + (define c (make-custodian)) + (define new-place (place #f ; result + c + '() ; post-shutdown + (make-vector 3 #f))) ; pumper-threads + (define orig-plumber (make-plumber)) + (define (default-exit v) + (plumber-flush-all orig-plumber) + (unsafe-abort-current-continuation/no-wind + place-prompt-tag + (if (byte? v) v 0))) + (host:fork-place + (lambda () + (define finish (host:start-place path sym in out err)) + (call-in-main-thread + (lambda () + (define result + (call-with-continuation-prompt + (lambda () + (set! current-place new-place) + (current-custodian c) + (current-plumber orig-plumber) + (exit-handler default-exit) + (finish + (lambda (in-th out-th err-th) + (vector-set! (place-pumper-threads place) 0 in-th) + (vector-set! (place-pumper-threads place) 1 out-th) + (vector-set! (place-pumper-threads place) 2 err-th))) + (default-exit 0)) + place-prompt-tag + (lambda (v) v))) + (set-place-result! new-place result))))) + new-place) + +(define/who (place-break p [kind #f]) + (check who place? p) + (unless (or (not kind) (eq? kind 'hangup) (eq? kind 'terminate)) + (raise-argument-error who "(or/c #f 'hangup 'terminate)" kind)) + (void)) + +(define/who (place-kill p) + (check who place? p) + (void)) + +(define/who (place-wait p) + (check who place? p) + (sync never-evt)) + +(define/who (place-dead-evt p) + (check who place? p) + never-evt) + +(define/who (place-sleep msecs) + (void)) + +;; ---------------------------------------- + +(struct pchannel () + #:reflection-name 'place-channel) + +(define (place-channel? v) + (pchannel? v)) + +(define (place-channel) + (values (pchannel) + (pchannel))) + +(define (place-channel-get pch) + (sync never-evt)) + +(define (place-channel-put pch v) + (void)) + +(define (place-message-allowed? v) + #t) + +;; ---------------------------------------- + +(define (place-shared? v) + #f) + +(define (unsafe-add-post-custodian-shutdown proc) + (atomically + (set-place-post-shutdown! current-place + (cons proc + (place-post-shutdown current-place))))) diff --git a/racket/src/thread/sandman.rkt b/racket/src/thread/sandman.rkt index c56b77aec2..790137c717 100644 --- a/racket/src/thread/sandman.rkt +++ b/racket/src/thread/sandman.rkt @@ -1,5 +1,5 @@ #lang racket/base -(require racket/private/place-local +(require "place-local.rkt" "check.rkt" "tree.rkt" "internal-error.rkt" diff --git a/racket/src/thread/schedule.rkt b/racket/src/thread/schedule.rkt index 487ce99a23..2657248219 100644 --- a/racket/src/thread/schedule.rkt +++ b/racket/src/thread/schedule.rkt @@ -1,7 +1,7 @@ #lang racket/base -(require racket/private/place-local +(require "place-local.rkt" "atomic.rkt" - "engine.rkt" + "host.rkt" "internal-error.rkt" "sandman.rkt" "parameter.rkt" diff --git a/racket/src/thread/thread-group.rkt b/racket/src/thread/thread-group.rkt index de361e9c98..d3f192dfb2 100644 --- a/racket/src/thread/thread-group.rkt +++ b/racket/src/thread/thread-group.rkt @@ -1,5 +1,5 @@ #lang racket/base -(require racket/private/place-local +(require "place-local.rkt" "check.rkt" "internal-error.rkt" "atomic.rkt") diff --git a/racket/src/thread/thread.rkt b/racket/src/thread/thread.rkt index 47b5016d55..ac785f5a91 100644 --- a/racket/src/thread/thread.rkt +++ b/racket/src/thread/thread.rkt @@ -1,9 +1,9 @@ #lang racket/base -(require racket/private/place-local - "../common/queue.rkt" +(require "../common/queue.rkt" + "place-local.rkt" "check.rkt" "internal-error.rkt" - "engine.rkt" + "host.rkt" "sandman.rkt" "parameter.rkt" "evt.rkt" diff --git a/racket/src/thread/time.rkt b/racket/src/thread/time.rkt index 9cd6aa2ef7..e065be936b 100644 --- a/racket/src/thread/time.rkt +++ b/racket/src/thread/time.rkt @@ -1,14 +1,14 @@ #lang racket/base (require "check.rkt" (submod "thread.rkt" scheduling) - (prefix-in engine: "engine.rkt")) + (prefix-in host: "host.rkt")) (provide current-process-milliseconds set-get-subprocesses-time!) (define/who (current-process-milliseconds [scope #f]) (cond - [(not scope) (engine:current-process-milliseconds)] + [(not scope) (host:current-process-milliseconds)] [(thread? scope) (thread-cpu-time scope)] [(eq? scope 'subprocesses) (get-subprocesses-time)] [else diff --git a/racket/src/thread/will-executor.rkt b/racket/src/thread/will-executor.rkt index 68163ac6a0..94ed2a8629 100644 --- a/racket/src/thread/will-executor.rkt +++ b/racket/src/thread/will-executor.rkt @@ -1,7 +1,7 @@ #lang racket/base (require "check.rkt" "atomic.rkt" - "engine.rkt" + "host.rkt" "evt.rkt" "sync.rkt" "semaphore.rkt")