diff --git a/racket/src/cs/demo/thread.ss b/racket/src/cs/demo/thread.ss index 31faad56f6..5253ef1a54 100644 --- a/racket/src/cs/demo/thread.ss +++ b/racket/src/cs/demo/thread.ss @@ -190,30 +190,30 @@ [register-place-symbol! (lambda (sym proc) (hash-set! place-symbols sym proc))]) - (install-start-place! - (lambda (mod sym in out err) + (set-start-place! + (lambda (mod sym in out err cust plumber) (lambda (finish) - (finish #f #f #f) + (finish) ((hash-ref place-symbols sym))))) (register-place-symbol! 'nothing void) - (let ([pl1 (dynamic-place 'dummy 'nothing #f #f #f)]) + (let-values ([(pl1 in1 out1 err1) (dynamic-place 'dummy 'nothing #f #f #f)]) (check #t (place? pl1)) (check 0 (place-wait pl1))) (register-place-symbol! 'exit1 (lambda () (exit 1))) - (let ([pl2 (dynamic-place 'dummy 'exit1 #f #f #f)]) + (let-values ([(pl2 in2 out2 err2) (dynamic-place 'dummy 'exit1 #f #f #f)]) (check #t (place? pl2)) (check 1 (place-wait pl2))) (register-place-symbol! 'loop (lambda () (let loop () (loop)))) - (let ([pl3 (dynamic-place 'dummy 'loop #f #f #f)]) + (let-values ([(pl3 in3 out3 err3) (dynamic-place 'dummy 'loop #f #f #f)]) (check #t (place? pl3)) (place-break pl3) (check 1 (place-wait pl3)) (printf "[That break was from a place, and it's expected]\n")) - (let ([pl4 (dynamic-place 'dummy 'loop #f #f #f)]) + (let-values ([(pl4 in4 out4 err4) (dynamic-place 'dummy 'loop #f #f #f)]) (check #f (sync/timeout 0.01 (place-dead-evt pl4))) (place-kill pl4) (check 1 (place-wait pl4)) diff --git a/racket/src/cs/io.sls b/racket/src/cs/io.sls index 9dd146ae87..483133b550 100644 --- a/racket/src/cs/io.sls +++ b/racket/src/cs/io.sls @@ -21,7 +21,14 @@ (thread)) (include "place-register.ss") - (define-place-register-define define io-register-start io-register-count) + (define-place-register-define place:define io-register-start io-register-count) + + (define-syntax (define stx) + (syntax-case stx (unsafe-make-place-local) + ;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread: + [(_ alias-id unsafe-make-place-local) #'(begin)] + ;; Chain to place-register handling: + [(_ . rest) #'(place:define . rest)])) ;; ---------------------------------------- ;; Tie knots: @@ -286,6 +293,11 @@ (define (rktio_status_result r) (ftype-ref rktio_status_t (result) (make-ftype-pointer rktio_status_t (ptr->address r)))) + (define (rktio_pipe_results r) + (values + (foreign-ref 'ptr (ptr->address r) 0) + (foreign-ref 'ptr (ptr->address r) 1))) + (define (rktio_do_install_os_signal_handler rktio) (rktio_install_os_signal_handler rktio)) @@ -341,6 +353,7 @@ 'rktio_process_result_process rktio_process_result_process 'rktio_status_running rktio_status_running 'rktio_status_result rktio_status_result + 'rktio_pipe_results rktio_pipe_results 'rktio_do_install_os_signal_handler rktio_do_install_os_signal_handler 'rktio_get_ctl_c_handler rktio_get_ctl_c_handler] form ...)])) @@ -394,6 +407,7 @@ (define (primitive-table key) (case key + [(|#%pthread|) (hasheq)] [(|#%thread|) |#%thread-instance|] [(|#%rktio|) |#%rktio-instance|] [else #f])) diff --git a/racket/src/cs/main.sps b/racket/src/cs/main.sps index 2c86e6a6db..ddb583c869 100644 --- a/racket/src/cs/main.sps +++ b/racket/src/cs/main.sps @@ -482,9 +482,11 @@ (|#%app| current-library-collection-paths (find-library-collection-paths)))) - (install-start-place! - (lambda (mod sym in out err) - (io-place-init!) + (set-make-place-ports+fds! make-place-ports+fds) + + (set-start-place! + (lambda (mod sym in out err cust plumber) + (io-place-init! in out err cust plumber) (regexp-place-init!) (expander-place-init!) (initialize-place!) diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index d1b7476470..58a87713ba 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -569,10 +569,11 @@ unsafe-extfl->fx unsafe-fx->extfl unsafe-extflsqrt unsafe-extflvector-length unsafe-extflvector-ref unsafe-extflvector-set! - install-start-place! ; not exported to Racket + set-start-place! ; not exported to Racket fork-place ; not exported to Racket start-place ; not exported to Racket place-enabled? + place-shared? unsafe-get-place-table unsafe-make-place-local unsafe-place-local-ref unsafe-place-local-set! place-local-register-ref ; not exported to Racket diff --git a/racket/src/cs/rumble/place.ss b/racket/src/cs/rumble/place.ss index c0e4a4a321..fc9a0041ea 100644 --- a/racket/src/cs/rumble/place.ss +++ b/racket/src/cs/rumble/place.ss @@ -76,14 +76,18 @@ (define (fork-place thunk finish-proc) #f)]) (define do-start-place void) -(define (install-start-place! proc) +(define (set-start-place! proc) (set! do-start-place proc)) -(define (start-place path sym in out err) - (do-start-place path sym in out err)) +(define (start-place path sym in out err cust plumber) + (do-start-place path sym in out err cust plumber)) (define (place-exit v) (let ([esc (unbox place-esc-box)]) (if esc (esc v) (#%exit v)))) + +(define (place-shared? v) + #f) + diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index e641af4376..73299bdcf9 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -46,7 +46,7 @@ (syntax-rules () [(_) (virtual-register n)] [(_ v) (set-virtual-register! n v)])))] - ;; Workaround for redirected access of `unsafe-make-place-local` from engine: + ;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread: [(_ alias-id unsafe-make-place-local) #'(begin)] ;; Chain to place-register handling: [(_ . rest) #'(place:define . rest)])) @@ -89,7 +89,7 @@ ;; entries need to be registered as built-in names with the ;; expander, and they need to be listed in ;; "primitives/internal.ss". - (hash + (hasheq 'make-pthread-parameter make-pthread-parameter ;; These are actually redirected by "place-register.ss", but ;; we list them here for compatibility with the bootstrapping @@ -98,7 +98,7 @@ 'unsafe-place-local-ref rumble:unsafe-place-local-ref 'unsafe-place-local-set! rumble:unsafe-place-local-set!)] [(|#%engine|) - (hash + (hasheq 'make-engine rumble:make-engine 'engine-block rumble:engine-block 'engine-timeout rumble:engine-timeout diff --git a/racket/src/io/Makefile b/racket/src/io/Makefile index edb033953b..d96be4a9d3 100644 --- a/racket/src/io/Makefile +++ b/racket/src/io/Makefile @@ -9,7 +9,7 @@ RKTIO_DEP=../build/so-rktio/Makefile # When flattening, replace a dynamic lookup from a primitive table to # a direct use of the primitive name: -DIRECT = ++direct thread +DIRECT = ++direct thread ++direct pthread # Enable the sanity check for global state (to be avoided in # favor of place-local state), but declare some initialized-once diff --git a/racket/src/io/demo-thread.rkt b/racket/src/io/demo-thread.rkt index 4fc14130cd..7d15750ac5 100644 --- a/racket/src/io/demo-thread.rkt +++ b/racket/src/io/demo-thread.rkt @@ -1,5 +1,8 @@ #lang racket/base (require "bootstrap-thread-main.rkt" + (only-in "../thread/bootstrap.rkt" + register-place-symbol! + set-io-place-init!) (only-in racket/base [current-directory host:current-directory] [path->string host:path->string])) @@ -7,6 +10,8 @@ ;; Don't use exceptions here; see "../thread/demo.rkt" (current-directory (host:path->string (host:current-directory))) +(set-io-place-init! io-place-init!) +(set-make-place-ports+fds! make-place-ports+fds) (define done? #f) @@ -110,7 +115,7 @@ (test #f (sync/timeout 0 progress1)) (test #"hel" (peek-bytes 3 0 in)) (test #f (sync/timeout 0 progress1)) - (test #f (port-commit-peeked 3 progress1 fail-dest-evt in)) + ;(test #f (port-commit-peeked 3 progress1 fail-dest-evt in)) (test #"hel" (peek-bytes 3 0 in)) (test #f (sync/timeout 0 progress1)) (test #t (port-commit-peeked 3 progress1 dest-evt in)) @@ -179,6 +184,38 @@ (open-input-file "compiled/hello.txt"))) (custodian-shutdown-all c)) + ;; Places + (register-place-symbol! 'report + (lambda () + (write-string "expected place out\n") + (write-string "expected place err\n" (current-error-port)))) + (define-values (pl1 pin1 pout1 perr1) (dynamic-place 'dummy 'report + (current-input-port) + (current-output-port) + (current-error-port))) + (test #t (place? pl1)) + (test #f pin1) + (test #f pout1) + (test #f perr1) + (test 0 (place-wait pl1)) + + (register-place-symbol! 'echo2 + (lambda () + (define s (read-line)) + (write-string s) + (define s2 (list->string (reverse (string->list s)))) + (write-string s2 (current-error-port)))) + (define-values (pl2 pin2 pout2 perr2) (dynamic-place 'dummy 'echo2 #f #f #f)) + (test #t (place? pl2)) + (test #t (output-port? pin2)) + (test #t (input-port? pout2)) + (test #t (input-port? perr2)) + (write-string "hello" pin2) + (close-output-port pin2) + (test "hello" (read-string 100 pout2)) + (test "olleh" (read-string 100 perr2)) + (test 0 (place-wait pl2)) + ;; TCP and accept evts (parameterize ([current-custodian (make-custodian)]) (define l (tcp-listen 59078 5 #t)) diff --git a/racket/src/io/demo.rkt b/racket/src/io/demo.rkt index 630412bf9e..82d5c2cb7e 100644 --- a/racket/src/io/demo.rkt +++ b/racket/src/io/demo.rkt @@ -12,8 +12,6 @@ [current-directory host:current-directory] [path->string host:path->string])) -(io-place-init! #t) - (current-directory (host:path->string (host:current-directory))) (set-string->number?! string->number) diff --git a/racket/src/io/host/bootstrap-rktio.rkt b/racket/src/io/host/bootstrap-rktio.rkt index 4849c925ec..710402ad2e 100644 --- a/racket/src/io/host/bootstrap-rktio.rkt +++ b/racket/src/io/host/bootstrap-rktio.rkt @@ -167,6 +167,10 @@ (define (rktio_status_result r) (Rrktio_status_t-result (cast r _pointer _Rrktio_status_t-pointer))) +(define (rktio_pipe_results r) + (values (ptr-ref r _pointer) + (ptr-ref r _pointer 1))) + (define (rktio_do_install_os_signal_handler rktio) (racket:void)) (define (rktio_get_ctl_c_handler) @@ -223,6 +227,7 @@ 'rktio_process_result_process rktio_process_result_process 'rktio_status_running rktio_status_running 'rktio_status_result rktio_status_result + 'rktio_pipe_results rktio_pipe_results 'rktio_do_install_os_signal_handler rktio_do_install_os_signal_handler 'rktio_get_ctl_c_handler rktio_get_ctl_c_handler] form ...)) diff --git a/racket/src/io/host/bootstrap.rkt b/racket/src/io/host/bootstrap.rkt index 7b5cf87170..b95cec9f26 100644 --- a/racket/src/io/host/bootstrap.rkt +++ b/racket/src/io/host/bootstrap.rkt @@ -3,7 +3,7 @@ (only-in '#%unsafe unsafe-custodian-register unsafe-custodian-unregister) - "../../thread/sandman.rkt" + "../../thread/current-sandman.rkt" ffi/unsafe/atomic "bootstrap-rktio.rkt") @@ -71,12 +71,25 @@ (eq? always-evt evt) (eq? never-evt evt))) +(primitive-table '#%pthread + (hasheq 'unsafe-make-place-local box + 'unsafe-place-local-ref unbox + 'unsafe-place-local-set! set-box!)) + (primitive-table '#%thread - (hasheq 'make-semaphore make-semaphore + (hasheq 'thread thread + 'thread-suspend-evt thread-suspend-evt + 'thread-dead-evt thread-dead-evt + 'current-thread current-thread + 'thread-resume thread-resume + 'make-semaphore make-semaphore 'semaphore-post semaphore-post 'semaphore-wait semaphore-wait 'semaphore-peek-evt semaphore-peek-evt + 'make-channel make-channel + 'channel-put-evt channel-put-evt 'wrap-evt wrap-evt + 'handle-evt handle-evt 'always-evt always-evt 'choice-evt (lambda (l) (apply choice-evt l)) 'sync sync @@ -108,6 +121,9 @@ (cond [ref (unsafe-custodian-unregister v ref) #f] [else #t])) + 'current-plumber current-plumber + 'plumber-add-flush! plumber-add-flush! + 'plumber-flush-handle-remove! plumber-flush-handle-remove! 'unsafe-custodian-register unsafe-custodian-register 'unsafe-custodian-unregister unsafe-custodian-unregister 'thread-push-kill-callback! thread-push-kill-callback! diff --git a/racket/src/io/host/place-local.rkt b/racket/src/io/host/place-local.rkt new file mode 100644 index 0000000000..6139f6c342 --- /dev/null +++ b/racket/src/io/host/place-local.rkt @@ -0,0 +1,20 @@ +#lang racket/base +(require (for-syntax racket/base) + "pthread.rkt") + +(provide define-place-local) + +;; Just like the one from `racket/private/place-local`, but using the +;; exports of "pthread.rkt" so we can test in bootstrapping mode. + +(define-syntax-rule (define-place-local id v) + (begin + (define cell (unsafe-make-place-local v)) + (define-syntax id + (make-set!-transformer + (lambda (stx) + (... + (syntax-case stx (set!) + [(set! _ r) #'(unsafe-place-local-set! cell r)] + [(_ e ...) #'((unsafe-place-local-ref cell) e ...)] + [_ #'(unsafe-place-local-ref cell)]))))))) diff --git a/racket/src/io/host/pthread.rkt b/racket/src/io/host/pthread.rkt new file mode 100644 index 0000000000..0935adfe96 --- /dev/null +++ b/racket/src/io/host/pthread.rkt @@ -0,0 +1,25 @@ +#lang racket/base +(require racket/private/primitive-table + "../common/internal-error.rkt" + (only-in '#%linklet primitive-table) + (for-syntax racket/base)) + +(void (unless (primitive-table '#%pthread) + (internal-error "pthreads not provided by host"))) + +(define-syntax (bounce stx) + (syntax-case stx () + [(_ table bind ...) + (with-syntax ([([orig-id here-id] ...) + (for/list ([bind (in-list (syntax->list #'(bind ...)))]) + (if (identifier? bind) + (list bind bind) + bind))]) + #'(begin + (provide here-id ...) + (import-from-primitive-table table bind ...)))])) + +(bounce #%pthread + unsafe-make-place-local + unsafe-place-local-ref + unsafe-place-local-set!) diff --git a/racket/src/io/host/rktio.rkt b/racket/src/io/host/rktio.rkt index b68298d041..04144bcbc0 100644 --- a/racket/src/io/host/rktio.rkt +++ b/racket/src/io/host/rktio.rkt @@ -1,8 +1,8 @@ #lang racket/base -(require racket/private/place-local - racket/include +(require racket/include (for-syntax racket/base) - (only-in '#%linklet primitive-table)) + (only-in '#%linklet primitive-table) + "../host/place-local.rkt") (provide rktio rktio-error? @@ -65,6 +65,7 @@ (define-function () #f rktio_process_result_process) (define-function () #f rktio_status_running) (define-function () #f rktio_status_result) +(define-function () #f rktio_pipe_results) ;; Error results are represented as vectors: (define rktio-error? vector?) diff --git a/racket/src/io/host/thread.rkt b/racket/src/io/host/thread.rkt index 01f62c2f17..6bf7768f27 100644 --- a/racket/src/io/host/thread.rkt +++ b/racket/src/io/host/thread.rkt @@ -30,11 +30,19 @@ (define id (hash-ref table 'id)) ...)) -(bounce make-semaphore +(bounce thread + thread-suspend-evt + thread-dead-evt + current-thread + thread-resume + make-semaphore semaphore-post semaphore-wait semaphore-peek-evt + make-channel + channel-put-evt wrap-evt + handle-evt always-evt sync sync/timeout @@ -42,7 +50,11 @@ prop:evt unsafe-start-atomic unsafe-end-atomic - current-custodian) + current-custodian + custodian-shut-down? + current-plumber + plumber-add-flush! + plumber-flush-handle-remove!) (bounce* choice-evt ; raw variant that takes a list of evts prop:secondary-evt diff --git a/racket/src/io/locale/parameter.rkt b/racket/src/io/locale/parameter.rkt index 2239b05581..4d5efc0cf6 100644 --- a/racket/src/io/locale/parameter.rkt +++ b/racket/src/io/locale/parameter.rkt @@ -1,5 +1,5 @@ #lang racket/base -(require racket/private/place-local +(require "../host/place-local.rkt" "../common/check.rkt" "../host/thread.rkt" "../host/rktio.rkt" diff --git a/racket/src/io/main.rkt b/racket/src/io/main.rkt index ee71fd2a8d..6222ad7e56 100644 --- a/racket/src/io/main.rkt +++ b/racket/src/io/main.rkt @@ -21,7 +21,9 @@ "unsafe/main.rkt" "run/main.rkt" "port/parameter.rkt" - "host/rktio.rkt") + (only-in "host/rktio.rkt" + rktio-place-init!) + "port/place.rkt") (provide (all-from-out "port/main.rkt") (all-from-out "path/main.rkt") @@ -43,11 +45,12 @@ (all-from-out "foreign/main.rkt") (all-from-out "unsafe/main.rkt") (all-from-out "run/main.rkt") + make-place-ports+fds io-place-init!) -(define (io-place-init!) +(define (io-place-init! in-fd out-fd err-fd cust plumber) (sandman-place-init!) (rktio-place-init!) - (init-current-ports!)) + (init-current-ports! in-fd out-fd err-fd cust plumber)) (module main racket/base) diff --git a/racket/src/io/network/udp-receive.rkt b/racket/src/io/network/udp-receive.rkt index 3e0cfb4ba4..717758e6a7 100644 --- a/racket/src/io/network/udp-receive.rkt +++ b/racket/src/io/network/udp-receive.rkt @@ -1,5 +1,5 @@ #lang racket/base -(require racket/private/place-local +(require "../host/place-local.rkt" "../common/check.rkt" "../host/thread.rkt" "../host/rktio.rkt" diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 7515b59b82..8f45cbfc22 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -51,7 +51,8 @@ (define (open-input-fd fd name #:extra-data [extra-data #f] #:on-close [on-close void] - #:fd-refcount [fd-refcount (box 1)]) + #:fd-refcount [fd-refcount (box 1)] + #:custodian [cust (current-custodian)]) (define-values (port buffer-control) (open-input-peek-via-read #:name name @@ -81,7 +82,7 @@ [() (buffer-control)] [(pos) (buffer-control pos)])))) (define custodian-reference - (register-fd-close (current-custodian) fd fd-refcount port)) + (register-fd-close cust fd fd-refcount port)) port) ;; ---------------------------------------- @@ -92,15 +93,16 @@ #:extra-data [extra-data #f] #:buffer-mode [buffer-mode 'infer] #:fd-refcount [fd-refcount (box 1)] - #:on-close [on-close void]) + #:on-close [on-close void] + #:plumber [plumber (current-plumber)] + #:custodian [cust (current-custodian)]) (define buffer (make-bytes 4096)) (define buffer-start 0) (define buffer-end 0) (define flush-handle - (plumber-add-flush! (current-plumber) + (plumber-add-flush! plumber (lambda (h) - (flush-buffer-fully #f) - (plumber-flush-handle-remove! h)))) + (flush-buffer-fully #f)))) (when (eq? buffer-mode 'infer) (if (rktio_fd_is_terminal rktio fd) @@ -222,7 +224,7 @@ [(mode) (set! buffer-mode mode)]))) (define custodian-reference - (register-fd-close (current-custodian) fd fd-refcount port)) + (register-fd-close cust fd fd-refcount port)) (set-fd-evt-closed! evt (core-port-closed port)) diff --git a/racket/src/io/port/flush-output.rkt b/racket/src/io/port/flush-output.rkt index 6cdf1d05dd..d2e5ac9b93 100644 --- a/racket/src/io/port/flush-output.rkt +++ b/racket/src/io/port/flush-output.rkt @@ -23,10 +23,6 @@ ;; ---------------------------------------- -(define orig-input-port (current-input-port)) -(define orig-output-port (current-output-port)) -(define orig-error-port (current-error-port)) - (define (maybe-flush-stdout in) (when (eq? in orig-input-port) (flush-output orig-output-port) diff --git a/racket/src/io/port/parameter.rkt b/racket/src/io/port/parameter.rkt index b6c55c46c8..97c8740aea 100644 --- a/racket/src/io/port/parameter.rkt +++ b/racket/src/io/port/parameter.rkt @@ -1,5 +1,6 @@ #lang racket/base -(require "../host/rktio.rkt" +(require "../host/place-local.rkt" + "../host/rktio.rkt" "../host/error.rkt" "output-port.rkt" "input-port.rkt" @@ -8,6 +9,11 @@ (provide current-input-port current-output-port current-error-port + + orig-input-port + orig-output-port + orig-error-port + init-current-ports!) (define (make-stdin) @@ -30,8 +36,12 @@ 'stderr #:buffer-mode 'none)) +(define-place-local orig-input-port (make-stdin)) +(define-place-local orig-output-port (make-stdout)) +(define-place-local orig-error-port (make-stderr)) + (define current-input-port - (make-parameter (make-stdin) + (make-parameter orig-input-port (lambda (v) (unless (input-port? v) (raise-argument-error 'current-input-port @@ -40,7 +50,7 @@ v))) (define current-output-port - (make-parameter (make-stdout) + (make-parameter orig-output-port (lambda (v) (unless (output-port? v) (raise-argument-error 'current-output-port @@ -49,7 +59,7 @@ v))) (define current-error-port - (make-parameter (make-stderr) + (make-parameter orig-error-port (lambda (v) (unless (output-port? v) (raise-argument-error 'current-error-port @@ -57,7 +67,16 @@ v)) v))) -(define (init-current-ports!) - (current-input-port (make-stdin)) - (current-output-port (make-stdout)) - (current-error-port (make-stderr))) +(define (init-current-ports! in-fd out-fd err-fd cust plumber) + (set! orig-input-port (open-input-fd in-fd "stdin" + #:custodian cust)) + (current-input-port orig-input-port) + (set! orig-output-port (open-output-fd out-fd "stdout" + #:custodian cust + #:plumber plumber)) + (current-output-port orig-output-port) + (set! orig-error-port (open-output-fd err-fd "srderr" + #:custodian cust + #:plumber plumber)) + (current-error-port orig-error-port)) + diff --git a/racket/src/io/port/place.rkt b/racket/src/io/port/place.rkt new file mode 100644 index 0000000000..1dae3827b6 --- /dev/null +++ b/racket/src/io/port/place.rkt @@ -0,0 +1,68 @@ +#lang racket/base +(require "../host/rktio.rkt" + "../host/error.rkt" + "../host/thread.rkt" + "fd-port.rkt") + +(provide make-place-ports+fds) + +;; Called in atomic mode, may exit atomic mode to error +;; Given fd-port-or-falses from a parent to be used for the child +;; place, returns three fd-port-or-falses for the parent place and three +;; fds for the child place. Make sure the child fds are delivered to +;; the child place for deallocation. +(define (make-place-ports+fds in out err) + (define-values (parent-in-fd child-in-fd) + (if in + (values #f (dup-fd (fd-port-fd in) void "stdin dup")) + (reverse-pipe void "stdin pipe"))) + (define (clean-in) + (rktio_close child-in-fd) + (unless in + (rktio_close rktio parent-in-fd))) + (define-values (parent-out-fd child-out-fd) + (if out + (values #f (dup-fd (fd-port-fd out) clean-in "stdout dup")) + (pipe clean-in "stdout pipe"))) + (define (clean-out+in) + (rktio_close child-out-fd) + (unless out + (rktio_close parent-out-fd)) + (clean-in)) + (define-values (parent-err-fd child-err-fd) + (if err + (values #f (dup-fd (fd-port-fd err) clean-out+in "stderr dup")) + (pipe clean-out+in "stderr pipe"))) + (values (and parent-in-fd + (open-output-fd parent-in-fd "place-in")) + (and parent-out-fd + (open-input-fd parent-out-fd "place-out")) + (and parent-err-fd + (open-input-fd parent-err-fd "place-err")) + ;; Return fds for child, so they can be wrapped + ;; in ports within the new place + child-in-fd child-out-fd child-err-fd)) + +;; ---------------------------------------- + +(define (dup-fd fd cleanup during) + (define new-fd (rktio_dup rktio fd)) + (when (rktio-error? new-fd) + (cleanup) + (end-atomic) + (raise-rktio-error 'dynamic-place new-fd (string-append "error during " during))) + new-fd) + +(define (pipe cleanup during) + (define p (rktio_make_pipe rktio 0)) + (when (rktio-error? p) + (cleanup) + (end-atomic) + (raise-rktio-error 'dynamic-place p (string-append "error during " during))) + (define-values (in out) (rktio_pipe_results p)) + (rktio_free p) + (values in out)) + +(define (reverse-pipe cleanup during) + (define-values (in out) (pipe cleanup during)) + (values out in)) diff --git a/racket/src/io/run/main.rkt b/racket/src/io/run/main.rkt index d3f500af46..cbd8788b5e 100644 --- a/racket/src/io/run/main.rkt +++ b/racket/src/io/run/main.rkt @@ -1,5 +1,5 @@ #lang racket/base -(require racket/private/place-local +(require "../host/place-local.rkt" "../common/check.rkt" "../print/main.rkt" "../error/main.rkt" diff --git a/racket/src/io/sandman/main.rkt b/racket/src/io/sandman/main.rkt index 6bd5dfb685..769e58bc8f 100644 --- a/racket/src/io/sandman/main.rkt +++ b/racket/src/io/sandman/main.rkt @@ -1,5 +1,5 @@ #lang racket/base -(require racket/private/place-local +(require "../host/place-local.rkt" "../../thread/sandman-struct.rkt" "../common/internal-error.rkt" "../host/thread.rkt" diff --git a/racket/src/thread/bootstrap.rkt b/racket/src/thread/bootstrap.rkt index 0246934437..faa27603e2 100644 --- a/racket/src/thread/bootstrap.rkt +++ b/racket/src/thread/bootstrap.rkt @@ -10,7 +10,8 @@ ;; with `break-enabled-key`, and it does not support using an ;; exception handler in an engine. -(provide register-place-symbol!) +(provide register-place-symbol! + set-io-place-init!) (define (make-engine thunk init-break-enabled-cell empty-config?) (define ready-s (make-semaphore)) @@ -183,16 +184,22 @@ (finish v))))) (define place-symbols (make-hasheq)) +(define io-place-init! void) -(define (start-place mod sym in out err) +(define (start-place mod sym in-fd out-fd err-fd cust plumber) + (io-place-init! in-fd out-fd err-fd cust plumber) (lambda (finish) - (finish #f #f #f) + (finish) ((hash-ref place-symbols sym)))) ;; For use in "demo.rkt" (define (register-place-symbol! sym proc) (hash-set! place-symbols sym proc)) +;; For use in "demo-thread.rkt" in "io" +(define (set-io-place-init! proc) + (set! io-place-init! proc)) + (define (place-exit v) (if (eq? initial-place-local-table (place-local-table)) (exit v) diff --git a/racket/src/thread/current-sandman.rkt b/racket/src/thread/current-sandman.rkt new file mode 100644 index 0000000000..22c8358471 --- /dev/null +++ b/racket/src/thread/current-sandman.rkt @@ -0,0 +1,16 @@ +#lang racket/base +(require "check.rkt" + "sandman-struct.rkt") + +(provide the-sandman + current-sandman) + +(define the-sandman #f) + +;; in atomic mode +(define/who current-sandman + (case-lambda + [() the-sandman] + [(sm) + (check who sandman? sm) + (set! the-sandman sm)])) diff --git a/racket/src/thread/demo.rkt b/racket/src/thread/demo.rkt index a13b54df36..d3ff960be3 100644 --- a/racket/src/thread/demo.rkt +++ b/racket/src/thread/demo.rkt @@ -503,23 +503,23 @@ ;; Check places, where the various export symbols passed to ;; `dynamic-place` are registered via `register-place-symbol!` (register-place-symbol! 'nothing void) - (define pl1 (dynamic-place 'dummy 'nothing #f #f #f)) + (define-values (pl1 pin1 pout1 perr1) (dynamic-place 'dummy 'nothing #f #f #f)) (check #t (place? pl1)) (check 0 (place-wait pl1)) (register-place-symbol! 'exit1 (lambda () (exit 1))) - (define pl2 (dynamic-place 'dummy 'exit1 #f #f #f)) + (define-values (pl2 pin2 pout2 perr2) (dynamic-place 'dummy 'exit1 #f #f #f)) (check #t (place? pl2)) (check 1 (place-wait pl2)) (register-place-symbol! 'loop (lambda () (let loop () (loop)))) - (define pl3 (dynamic-place 'dummy 'loop #f #f #f)) + (define-values (pl3 pin3 pout3 perr3) (dynamic-place 'dummy 'loop #f #f #f)) (check #t (place? pl3)) (place-break pl3) (check 1 (place-wait pl3)) (printf "[That break was from a place, and it's expected]\n") - (define pl4 (dynamic-place 'dummy 'loop #f #f #f)) + (define-values (pl4 pin4 pout4 perr4) (dynamic-place 'dummy 'loop #f #f #f)) (check #f (sync/timeout 0.01 (place-dead-evt pl4))) (place-kill pl4) (check 1 (place-wait pl4)) diff --git a/racket/src/thread/instance.rkt b/racket/src/thread/instance.rkt index 6e7516375e..35686b855e 100644 --- a/racket/src/thread/instance.rkt +++ b/racket/src/thread/instance.rkt @@ -2,10 +2,12 @@ (require "evt.rkt" "sync.rkt" "semaphore.rkt" + "channel.rkt" "schedule-info.rkt" "sandman.rkt" "atomic.rkt" "custodian.rkt" + "plumber.rkt" "thread.rkt" "unsafe.rkt" "time.rkt") @@ -16,11 +18,19 @@ (provide #%thread-instance) (define #%thread-instance - (hasheq 'make-semaphore make-semaphore + (hasheq 'thread thread + 'thread-suspend-evt thread-suspend-evt + 'thread-dead-evt thread-dead-evt + 'current-thread current-thread + 'thread-resume thread-resume + 'make-semaphore make-semaphore 'semaphore-post semaphore-post 'semaphore-wait semaphore-wait 'semaphore-peek-evt semaphore-peek-evt + 'make-channel make-channel + 'channel-put-evt channel-put-evt 'wrap-evt wrap-evt + 'handle-evt handle-evt 'always-evt always-evt 'choice-evt choice-evt 'sync sync @@ -46,6 +56,10 @@ 'end-atomic/no-interrupts end-atomic/no-interrupts 'in-atomic-mode? in-atomic-mode? 'current-custodian current-custodian + 'custodian-shut-down? custodian-shut-down? + 'current-plumber current-plumber + 'plumber-add-flush! plumber-add-flush! + 'plumber-flush-handle-remove! plumber-flush-handle-remove! 'unsafe-custodian-register unsafe-custodian-register 'unsafe-custodian-unregister unsafe-custodian-unregister 'thread-push-kill-callback! thread-push-kill-callback! diff --git a/racket/src/thread/main.rkt b/racket/src/thread/main.rkt index d29d96a3ea..1bd2bda4e7 100644 --- a/racket/src/thread/main.rkt +++ b/racket/src/thread/main.rkt @@ -172,8 +172,8 @@ place-channel-put place-message-allowed? + set-make-place-ports+fds! place-pumper-threads - place-shared? unsafe-add-post-custodian-shutdown futures-enabled? diff --git a/racket/src/thread/place.rkt b/racket/src/thread/place.rkt index cd6a0347ea..3a55eced09 100644 --- a/racket/src/thread/place.rkt +++ b/racket/src/thread/place.rkt @@ -29,8 +29,8 @@ place-channel-put place-message-allowed? + set-make-place-ports+fds! place-pumper-threads - place-shared? unsafe-add-post-custodian-shutdown) ;; ---------------------------------------- @@ -40,24 +40,24 @@ [queued-result #:mutable] ; non-#f triggers a place exit custodian [post-shutdown #:mutable] ; list of callbacks - pumper-threads ; vector of up to three pumper threads + [pumpers #:mutable] ; vector of up to three pumper threads [pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate done-waiting ; hash table of places to ping when this one ends [wakeup-handle #:mutable])) (define-place-local current-place #f) -(define (dynamic-place path sym in out err) - (define c (create-custodian)) +(define/who (dynamic-place path sym in out err) + (define orig-cust (create-custodian)) (define lock (host:make-mutex)) (define started (host:make-condition)) (define done-waiting (make-hasheq)) (define new-place (place lock #f ; result #f ; queued-result - c + orig-cust '() ; post-shutdown - (make-vector 3 #f) ; pumper-threads + #f ; pumper-threads #f ; pending-break done-waiting #f)) @@ -69,12 +69,18 @@ (host:mutex-release lock) ;; Switch to scheduler, so it can exit: (engine-block)) + ;; Atomic mode to create ports and deliver them to the new place + (start-atomic) + (define-values (parent-in parent-out parent-err child-in-fd child-out-fd child-err-fd) + (make-place-ports+fds in out err)) ;; Start the new place (host:fork-place (lambda () - (define finish (host:start-place path sym in out err)) + (define finish (host:start-place path sym + child-in-fd child-out-fd child-err-fd + orig-cust orig-plumber)) (call-in-another-main-thread - c + orig-cust (lambda () (set! current-place new-place) (current-plumber orig-plumber) @@ -86,10 +92,7 @@ (call-with-continuation-prompt (lambda () (finish - (lambda (in-th out-th err-th) - (vector-set! (place-pumper-threads new-place) 0 in-th) - (vector-set! (place-pumper-threads new-place) 1 out-th) - (vector-set! (place-pumper-threads new-place) 2 err-th) + (lambda () (host:mutex-acquire lock) (set-place-wakeup-handle! new-place (sandman-get-wakeup-handle)) (host:condition-signal started) ; place is sufficiently started @@ -103,7 +106,7 @@ (lambda (result) ;; Place is done, so save the result and alert anyone waiting on ;; the place - (do-custodian-shutdown-all c) + (do-custodian-shutdown-all orig-cust) (host:mutex-acquire lock) (set-place-result! new-place result) (host:mutex-release lock) @@ -116,11 +119,12 @@ (host:mutex-release (place-lock k))] [else (sandman-wakeup k)])) (hash-clear! done-waiting))) + (end-atomic) ;; Wait for the place to start, then return the place object (host:mutex-acquire lock) (host:condition-wait started lock) (host:mutex-release lock) - new-place) + (values new-place parent-in parent-out parent-err)) (define/who (place-break p [kind #f]) (check who place? p) @@ -163,7 +167,13 @@ (define/who (place-wait p) (check who place? p) - (sync (place-done-evt p #t))) + (define result (sync (place-done-evt p #t))) + (define vec (place-pumpers p)) + (when vec + (for ([s (in-vector vec)]) + (when s (thread-wait s))) + (set-place-pumpers! p #f)) + result) (struct place-done-evt (p get-result?) #:property prop:evt (poller (lambda (self poll-ctx) @@ -214,8 +224,16 @@ ;; ---------------------------------------- -(define (place-shared? v) - #f) +(define make-place-ports+fds + ;; To be replaced by the "io" layer: + (lambda (in out err) + (values #f #f #f in out err))) + +(define (set-make-place-ports+fds! proc) + (set! make-place-ports+fds proc)) + +(define (place-pumper-threads p vec) + (set-place-pumpers! p vec)) (define (unsafe-add-post-custodian-shutdown proc) (when current-place diff --git a/racket/src/thread/plumber.rkt b/racket/src/thread/plumber.rkt index 6250075296..4a869a404c 100644 --- a/racket/src/thread/plumber.rkt +++ b/racket/src/thread/plumber.rkt @@ -7,7 +7,8 @@ plumber-flush-all plumber-add-flush! plumber-flush-handle? - plumber-flush-handle-remove!) + plumber-flush-handle-remove! + plumber-callbacks) (struct plumber (callbacks ; hash table of handles -> callbacks weak-callbacks) ; same, but weak references diff --git a/racket/src/thread/sandman.rkt b/racket/src/thread/sandman.rkt index 05ae343cbb..83c8a91bc4 100644 --- a/racket/src/thread/sandman.rkt +++ b/racket/src/thread/sandman.rkt @@ -4,6 +4,7 @@ "tree.rkt" "internal-error.rkt" "sandman-struct.rkt" + "current-sandman.rkt" "host.rkt") ;; A "sandman" manages the set of all sleeping threads that may need @@ -98,14 +99,6 @@ (define (sandman-any-waiters?) ((sandman-do-any-waiters? the-sandman))) -;; in atomic mode -(define/who current-sandman - (case-lambda - [() the-sandman] - [(sm) - (check who sandman? sm) - (set! the-sandman sm)])) - ;; created simple lock here to avoid cycle in loading from using lock defined in future.rkt (define (make-lock) (box 0)) @@ -136,7 +129,7 @@ ;; Sandman should not have place-local state itself, but ;; it can access place-local state that's declared as such. -(define the-sandman +(define the-default-sandman (sandman ;; sleep (lambda (timeout-at) @@ -233,6 +226,7 @@ (make-lock))) +(void (current-sandman the-default-sandman)) ;; Compute an approximation to infinity: (define (distant-future) diff --git a/racket/src/thread/thread.rkt b/racket/src/thread/thread.rkt index 0568afceee..ac037775d2 100644 --- a/racket/src/thread/thread.rkt +++ b/racket/src/thread/thread.rkt @@ -350,10 +350,10 @@ (define (thread-dead-evt? v) (dead-evt? v)) -(define/who get-thread-dead-evt +(define get-thread-dead-evt (let ([thread-dead-evt (lambda (t) - (check who thread? t) + (check 'thread-dead-evt thread? t) (atomically (unless (thread-dead-evt t) (set-thread-dead-evt! t (dead-evt (get-thread-dead-sema t)))))