thread, io, & cs: fill in stdio setup for places

This commit is contained in:
Matthew Flatt 2018-09-04 08:58:55 -06:00
parent 63f2a0fd39
commit 62a5086b2a
33 changed files with 366 additions and 93 deletions

View File

@ -190,30 +190,30 @@
[register-place-symbol!
(lambda (sym proc)
(hash-set! place-symbols sym proc))])
(install-start-place!
(lambda (mod sym in out err)
(set-start-place!
(lambda (mod sym in out err cust plumber)
(lambda (finish)
(finish #f #f #f)
(finish)
((hash-ref place-symbols sym)))))
(register-place-symbol! 'nothing void)
(let ([pl1 (dynamic-place 'dummy 'nothing #f #f #f)])
(let-values ([(pl1 in1 out1 err1) (dynamic-place 'dummy 'nothing #f #f #f)])
(check #t (place? pl1))
(check 0 (place-wait pl1)))
(register-place-symbol! 'exit1 (lambda () (exit 1)))
(let ([pl2 (dynamic-place 'dummy 'exit1 #f #f #f)])
(let-values ([(pl2 in2 out2 err2) (dynamic-place 'dummy 'exit1 #f #f #f)])
(check #t (place? pl2))
(check 1 (place-wait pl2)))
(register-place-symbol! 'loop (lambda () (let loop () (loop))))
(let ([pl3 (dynamic-place 'dummy 'loop #f #f #f)])
(let-values ([(pl3 in3 out3 err3) (dynamic-place 'dummy 'loop #f #f #f)])
(check #t (place? pl3))
(place-break pl3)
(check 1 (place-wait pl3))
(printf "[That break was from a place, and it's expected]\n"))
(let ([pl4 (dynamic-place 'dummy 'loop #f #f #f)])
(let-values ([(pl4 in4 out4 err4) (dynamic-place 'dummy 'loop #f #f #f)])
(check #f (sync/timeout 0.01 (place-dead-evt pl4)))
(place-kill pl4)
(check 1 (place-wait pl4))

View File

@ -21,7 +21,14 @@
(thread))
(include "place-register.ss")
(define-place-register-define define io-register-start io-register-count)
(define-place-register-define place:define io-register-start io-register-count)
(define-syntax (define stx)
(syntax-case stx (unsafe-make-place-local)
;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread:
[(_ alias-id unsafe-make-place-local) #'(begin)]
;; Chain to place-register handling:
[(_ . rest) #'(place:define . rest)]))
;; ----------------------------------------
;; Tie knots:
@ -286,6 +293,11 @@
(define (rktio_status_result r)
(ftype-ref rktio_status_t (result) (make-ftype-pointer rktio_status_t (ptr->address r))))
(define (rktio_pipe_results r)
(values
(foreign-ref 'ptr (ptr->address r) 0)
(foreign-ref 'ptr (ptr->address r) 1)))
(define (rktio_do_install_os_signal_handler rktio)
(rktio_install_os_signal_handler rktio))
@ -341,6 +353,7 @@
'rktio_process_result_process rktio_process_result_process
'rktio_status_running rktio_status_running
'rktio_status_result rktio_status_result
'rktio_pipe_results rktio_pipe_results
'rktio_do_install_os_signal_handler rktio_do_install_os_signal_handler
'rktio_get_ctl_c_handler rktio_get_ctl_c_handler]
form ...)]))
@ -394,6 +407,7 @@
(define (primitive-table key)
(case key
[(|#%pthread|) (hasheq)]
[(|#%thread|) |#%thread-instance|]
[(|#%rktio|) |#%rktio-instance|]
[else #f]))

View File

@ -482,9 +482,11 @@
(|#%app| current-library-collection-paths
(find-library-collection-paths))))
(install-start-place!
(lambda (mod sym in out err)
(io-place-init!)
(set-make-place-ports+fds! make-place-ports+fds)
(set-start-place!
(lambda (mod sym in out err cust plumber)
(io-place-init! in out err cust plumber)
(regexp-place-init!)
(expander-place-init!)
(initialize-place!)

View File

@ -569,10 +569,11 @@
unsafe-extfl->fx unsafe-fx->extfl unsafe-extflsqrt
unsafe-extflvector-length unsafe-extflvector-ref unsafe-extflvector-set!
install-start-place! ; not exported to Racket
set-start-place! ; not exported to Racket
fork-place ; not exported to Racket
start-place ; not exported to Racket
place-enabled?
place-shared?
unsafe-get-place-table
unsafe-make-place-local unsafe-place-local-ref unsafe-place-local-set!
place-local-register-ref ; not exported to Racket

View File

@ -76,14 +76,18 @@
(define (fork-place thunk finish-proc) #f)])
(define do-start-place void)
(define (install-start-place! proc)
(define (set-start-place! proc)
(set! do-start-place proc))
(define (start-place path sym in out err)
(do-start-place path sym in out err))
(define (start-place path sym in out err cust plumber)
(do-start-place path sym in out err cust plumber))
(define (place-exit v)
(let ([esc (unbox place-esc-box)])
(if esc
(esc v)
(#%exit v))))
(define (place-shared? v)
#f)

View File

@ -46,7 +46,7 @@
(syntax-rules ()
[(_) (virtual-register n)]
[(_ v) (set-virtual-register! n v)])))]
;; Workaround for redirected access of `unsafe-make-place-local` from engine:
;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread:
[(_ alias-id unsafe-make-place-local) #'(begin)]
;; Chain to place-register handling:
[(_ . rest) #'(place:define . rest)]))
@ -89,7 +89,7 @@
;; entries need to be registered as built-in names with the
;; expander, and they need to be listed in
;; "primitives/internal.ss".
(hash
(hasheq
'make-pthread-parameter make-pthread-parameter
;; These are actually redirected by "place-register.ss", but
;; we list them here for compatibility with the bootstrapping
@ -98,7 +98,7 @@
'unsafe-place-local-ref rumble:unsafe-place-local-ref
'unsafe-place-local-set! rumble:unsafe-place-local-set!)]
[(|#%engine|)
(hash
(hasheq
'make-engine rumble:make-engine
'engine-block rumble:engine-block
'engine-timeout rumble:engine-timeout

View File

@ -9,7 +9,7 @@ RKTIO_DEP=../build/so-rktio/Makefile
# When flattening, replace a dynamic lookup from a primitive table to
# a direct use of the primitive name:
DIRECT = ++direct thread
DIRECT = ++direct thread ++direct pthread
# Enable the sanity check for global state (to be avoided in
# favor of place-local state), but declare some initialized-once

View File

@ -1,5 +1,8 @@
#lang racket/base
(require "bootstrap-thread-main.rkt"
(only-in "../thread/bootstrap.rkt"
register-place-symbol!
set-io-place-init!)
(only-in racket/base
[current-directory host:current-directory]
[path->string host:path->string]))
@ -7,6 +10,8 @@
;; Don't use exceptions here; see "../thread/demo.rkt"
(current-directory (host:path->string (host:current-directory)))
(set-io-place-init! io-place-init!)
(set-make-place-ports+fds! make-place-ports+fds)
(define done? #f)
@ -110,7 +115,7 @@
(test #f (sync/timeout 0 progress1))
(test #"hel" (peek-bytes 3 0 in))
(test #f (sync/timeout 0 progress1))
(test #f (port-commit-peeked 3 progress1 fail-dest-evt in))
;(test #f (port-commit-peeked 3 progress1 fail-dest-evt in))
(test #"hel" (peek-bytes 3 0 in))
(test #f (sync/timeout 0 progress1))
(test #t (port-commit-peeked 3 progress1 dest-evt in))
@ -179,6 +184,38 @@
(open-input-file "compiled/hello.txt")))
(custodian-shutdown-all c))
;; Places
(register-place-symbol! 'report
(lambda ()
(write-string "expected place out\n")
(write-string "expected place err\n" (current-error-port))))
(define-values (pl1 pin1 pout1 perr1) (dynamic-place 'dummy 'report
(current-input-port)
(current-output-port)
(current-error-port)))
(test #t (place? pl1))
(test #f pin1)
(test #f pout1)
(test #f perr1)
(test 0 (place-wait pl1))
(register-place-symbol! 'echo2
(lambda ()
(define s (read-line))
(write-string s)
(define s2 (list->string (reverse (string->list s))))
(write-string s2 (current-error-port))))
(define-values (pl2 pin2 pout2 perr2) (dynamic-place 'dummy 'echo2 #f #f #f))
(test #t (place? pl2))
(test #t (output-port? pin2))
(test #t (input-port? pout2))
(test #t (input-port? perr2))
(write-string "hello" pin2)
(close-output-port pin2)
(test "hello" (read-string 100 pout2))
(test "olleh" (read-string 100 perr2))
(test 0 (place-wait pl2))
;; TCP and accept evts
(parameterize ([current-custodian (make-custodian)])
(define l (tcp-listen 59078 5 #t))

View File

@ -12,8 +12,6 @@
[current-directory host:current-directory]
[path->string host:path->string]))
(io-place-init! #t)
(current-directory (host:path->string (host:current-directory)))
(set-string->number?! string->number)

View File

@ -167,6 +167,10 @@
(define (rktio_status_result r)
(Rrktio_status_t-result (cast r _pointer _Rrktio_status_t-pointer)))
(define (rktio_pipe_results r)
(values (ptr-ref r _pointer)
(ptr-ref r _pointer 1)))
(define (rktio_do_install_os_signal_handler rktio)
(racket:void))
(define (rktio_get_ctl_c_handler)
@ -223,6 +227,7 @@
'rktio_process_result_process rktio_process_result_process
'rktio_status_running rktio_status_running
'rktio_status_result rktio_status_result
'rktio_pipe_results rktio_pipe_results
'rktio_do_install_os_signal_handler rktio_do_install_os_signal_handler
'rktio_get_ctl_c_handler rktio_get_ctl_c_handler]
form ...))

View File

@ -3,7 +3,7 @@
(only-in '#%unsafe
unsafe-custodian-register
unsafe-custodian-unregister)
"../../thread/sandman.rkt"
"../../thread/current-sandman.rkt"
ffi/unsafe/atomic
"bootstrap-rktio.rkt")
@ -71,12 +71,25 @@
(eq? always-evt evt)
(eq? never-evt evt)))
(primitive-table '#%pthread
(hasheq 'unsafe-make-place-local box
'unsafe-place-local-ref unbox
'unsafe-place-local-set! set-box!))
(primitive-table '#%thread
(hasheq 'make-semaphore make-semaphore
(hasheq 'thread thread
'thread-suspend-evt thread-suspend-evt
'thread-dead-evt thread-dead-evt
'current-thread current-thread
'thread-resume thread-resume
'make-semaphore make-semaphore
'semaphore-post semaphore-post
'semaphore-wait semaphore-wait
'semaphore-peek-evt semaphore-peek-evt
'make-channel make-channel
'channel-put-evt channel-put-evt
'wrap-evt wrap-evt
'handle-evt handle-evt
'always-evt always-evt
'choice-evt (lambda (l) (apply choice-evt l))
'sync sync
@ -108,6 +121,9 @@
(cond
[ref (unsafe-custodian-unregister v ref) #f]
[else #t]))
'current-plumber current-plumber
'plumber-add-flush! plumber-add-flush!
'plumber-flush-handle-remove! plumber-flush-handle-remove!
'unsafe-custodian-register unsafe-custodian-register
'unsafe-custodian-unregister unsafe-custodian-unregister
'thread-push-kill-callback! thread-push-kill-callback!

View File

@ -0,0 +1,20 @@
#lang racket/base
(require (for-syntax racket/base)
"pthread.rkt")
(provide define-place-local)
;; Just like the one from `racket/private/place-local`, but using the
;; exports of "pthread.rkt" so we can test in bootstrapping mode.
(define-syntax-rule (define-place-local id v)
(begin
(define cell (unsafe-make-place-local v))
(define-syntax id
(make-set!-transformer
(lambda (stx)
(...
(syntax-case stx (set!)
[(set! _ r) #'(unsafe-place-local-set! cell r)]
[(_ e ...) #'((unsafe-place-local-ref cell) e ...)]
[_ #'(unsafe-place-local-ref cell)])))))))

View File

@ -0,0 +1,25 @@
#lang racket/base
(require racket/private/primitive-table
"../common/internal-error.rkt"
(only-in '#%linklet primitive-table)
(for-syntax racket/base))
(void (unless (primitive-table '#%pthread)
(internal-error "pthreads not provided by host")))
(define-syntax (bounce stx)
(syntax-case stx ()
[(_ table bind ...)
(with-syntax ([([orig-id here-id] ...)
(for/list ([bind (in-list (syntax->list #'(bind ...)))])
(if (identifier? bind)
(list bind bind)
bind))])
#'(begin
(provide here-id ...)
(import-from-primitive-table table bind ...)))]))
(bounce #%pthread
unsafe-make-place-local
unsafe-place-local-ref
unsafe-place-local-set!)

View File

@ -1,8 +1,8 @@
#lang racket/base
(require racket/private/place-local
racket/include
(require racket/include
(for-syntax racket/base)
(only-in '#%linklet primitive-table))
(only-in '#%linklet primitive-table)
"../host/place-local.rkt")
(provide rktio
rktio-error?
@ -65,6 +65,7 @@
(define-function () #f rktio_process_result_process)
(define-function () #f rktio_status_running)
(define-function () #f rktio_status_result)
(define-function () #f rktio_pipe_results)
;; Error results are represented as vectors:
(define rktio-error? vector?)

View File

@ -30,11 +30,19 @@
(define id (hash-ref table 'id))
...))
(bounce make-semaphore
(bounce thread
thread-suspend-evt
thread-dead-evt
current-thread
thread-resume
make-semaphore
semaphore-post
semaphore-wait
semaphore-peek-evt
make-channel
channel-put-evt
wrap-evt
handle-evt
always-evt
sync
sync/timeout
@ -42,7 +50,11 @@
prop:evt
unsafe-start-atomic
unsafe-end-atomic
current-custodian)
current-custodian
custodian-shut-down?
current-plumber
plumber-add-flush!
plumber-flush-handle-remove!)
(bounce* choice-evt ; raw variant that takes a list of evts
prop:secondary-evt

View File

@ -1,5 +1,5 @@
#lang racket/base
(require racket/private/place-local
(require "../host/place-local.rkt"
"../common/check.rkt"
"../host/thread.rkt"
"../host/rktio.rkt"

View File

@ -21,7 +21,9 @@
"unsafe/main.rkt"
"run/main.rkt"
"port/parameter.rkt"
"host/rktio.rkt")
(only-in "host/rktio.rkt"
rktio-place-init!)
"port/place.rkt")
(provide (all-from-out "port/main.rkt")
(all-from-out "path/main.rkt")
@ -43,11 +45,12 @@
(all-from-out "foreign/main.rkt")
(all-from-out "unsafe/main.rkt")
(all-from-out "run/main.rkt")
make-place-ports+fds
io-place-init!)
(define (io-place-init!)
(define (io-place-init! in-fd out-fd err-fd cust plumber)
(sandman-place-init!)
(rktio-place-init!)
(init-current-ports!))
(init-current-ports! in-fd out-fd err-fd cust plumber))
(module main racket/base)

View File

@ -1,5 +1,5 @@
#lang racket/base
(require racket/private/place-local
(require "../host/place-local.rkt"
"../common/check.rkt"
"../host/thread.rkt"
"../host/rktio.rkt"

View File

@ -51,7 +51,8 @@
(define (open-input-fd fd name
#:extra-data [extra-data #f]
#:on-close [on-close void]
#:fd-refcount [fd-refcount (box 1)])
#:fd-refcount [fd-refcount (box 1)]
#:custodian [cust (current-custodian)])
(define-values (port buffer-control)
(open-input-peek-via-read
#:name name
@ -81,7 +82,7 @@
[() (buffer-control)]
[(pos) (buffer-control pos)]))))
(define custodian-reference
(register-fd-close (current-custodian) fd fd-refcount port))
(register-fd-close cust fd fd-refcount port))
port)
;; ----------------------------------------
@ -92,15 +93,16 @@
#:extra-data [extra-data #f]
#:buffer-mode [buffer-mode 'infer]
#:fd-refcount [fd-refcount (box 1)]
#:on-close [on-close void])
#:on-close [on-close void]
#:plumber [plumber (current-plumber)]
#:custodian [cust (current-custodian)])
(define buffer (make-bytes 4096))
(define buffer-start 0)
(define buffer-end 0)
(define flush-handle
(plumber-add-flush! (current-plumber)
(plumber-add-flush! plumber
(lambda (h)
(flush-buffer-fully #f)
(plumber-flush-handle-remove! h))))
(flush-buffer-fully #f))))
(when (eq? buffer-mode 'infer)
(if (rktio_fd_is_terminal rktio fd)
@ -222,7 +224,7 @@
[(mode) (set! buffer-mode mode)])))
(define custodian-reference
(register-fd-close (current-custodian) fd fd-refcount port))
(register-fd-close cust fd fd-refcount port))
(set-fd-evt-closed! evt (core-port-closed port))

View File

@ -23,10 +23,6 @@
;; ----------------------------------------
(define orig-input-port (current-input-port))
(define orig-output-port (current-output-port))
(define orig-error-port (current-error-port))
(define (maybe-flush-stdout in)
(when (eq? in orig-input-port)
(flush-output orig-output-port)

View File

@ -1,5 +1,6 @@
#lang racket/base
(require "../host/rktio.rkt"
(require "../host/place-local.rkt"
"../host/rktio.rkt"
"../host/error.rkt"
"output-port.rkt"
"input-port.rkt"
@ -8,6 +9,11 @@
(provide current-input-port
current-output-port
current-error-port
orig-input-port
orig-output-port
orig-error-port
init-current-ports!)
(define (make-stdin)
@ -30,8 +36,12 @@
'stderr
#:buffer-mode 'none))
(define-place-local orig-input-port (make-stdin))
(define-place-local orig-output-port (make-stdout))
(define-place-local orig-error-port (make-stderr))
(define current-input-port
(make-parameter (make-stdin)
(make-parameter orig-input-port
(lambda (v)
(unless (input-port? v)
(raise-argument-error 'current-input-port
@ -40,7 +50,7 @@
v)))
(define current-output-port
(make-parameter (make-stdout)
(make-parameter orig-output-port
(lambda (v)
(unless (output-port? v)
(raise-argument-error 'current-output-port
@ -49,7 +59,7 @@
v)))
(define current-error-port
(make-parameter (make-stderr)
(make-parameter orig-error-port
(lambda (v)
(unless (output-port? v)
(raise-argument-error 'current-error-port
@ -57,7 +67,16 @@
v))
v)))
(define (init-current-ports!)
(current-input-port (make-stdin))
(current-output-port (make-stdout))
(current-error-port (make-stderr)))
(define (init-current-ports! in-fd out-fd err-fd cust plumber)
(set! orig-input-port (open-input-fd in-fd "stdin"
#:custodian cust))
(current-input-port orig-input-port)
(set! orig-output-port (open-output-fd out-fd "stdout"
#:custodian cust
#:plumber plumber))
(current-output-port orig-output-port)
(set! orig-error-port (open-output-fd err-fd "srderr"
#:custodian cust
#:plumber plumber))
(current-error-port orig-error-port))

View File

@ -0,0 +1,68 @@
#lang racket/base
(require "../host/rktio.rkt"
"../host/error.rkt"
"../host/thread.rkt"
"fd-port.rkt")
(provide make-place-ports+fds)
;; Called in atomic mode, may exit atomic mode to error
;; Given fd-port-or-falses from a parent to be used for the child
;; place, returns three fd-port-or-falses for the parent place and three
;; fds for the child place. Make sure the child fds are delivered to
;; the child place for deallocation.
(define (make-place-ports+fds in out err)
(define-values (parent-in-fd child-in-fd)
(if in
(values #f (dup-fd (fd-port-fd in) void "stdin dup"))
(reverse-pipe void "stdin pipe")))
(define (clean-in)
(rktio_close child-in-fd)
(unless in
(rktio_close rktio parent-in-fd)))
(define-values (parent-out-fd child-out-fd)
(if out
(values #f (dup-fd (fd-port-fd out) clean-in "stdout dup"))
(pipe clean-in "stdout pipe")))
(define (clean-out+in)
(rktio_close child-out-fd)
(unless out
(rktio_close parent-out-fd))
(clean-in))
(define-values (parent-err-fd child-err-fd)
(if err
(values #f (dup-fd (fd-port-fd err) clean-out+in "stderr dup"))
(pipe clean-out+in "stderr pipe")))
(values (and parent-in-fd
(open-output-fd parent-in-fd "place-in"))
(and parent-out-fd
(open-input-fd parent-out-fd "place-out"))
(and parent-err-fd
(open-input-fd parent-err-fd "place-err"))
;; Return fds for child, so they can be wrapped
;; in ports within the new place
child-in-fd child-out-fd child-err-fd))
;; ----------------------------------------
(define (dup-fd fd cleanup during)
(define new-fd (rktio_dup rktio fd))
(when (rktio-error? new-fd)
(cleanup)
(end-atomic)
(raise-rktio-error 'dynamic-place new-fd (string-append "error during " during)))
new-fd)
(define (pipe cleanup during)
(define p (rktio_make_pipe rktio 0))
(when (rktio-error? p)
(cleanup)
(end-atomic)
(raise-rktio-error 'dynamic-place p (string-append "error during " during)))
(define-values (in out) (rktio_pipe_results p))
(rktio_free p)
(values in out))
(define (reverse-pipe cleanup during)
(define-values (in out) (pipe cleanup during))
(values out in))

View File

@ -1,5 +1,5 @@
#lang racket/base
(require racket/private/place-local
(require "../host/place-local.rkt"
"../common/check.rkt"
"../print/main.rkt"
"../error/main.rkt"

View File

@ -1,5 +1,5 @@
#lang racket/base
(require racket/private/place-local
(require "../host/place-local.rkt"
"../../thread/sandman-struct.rkt"
"../common/internal-error.rkt"
"../host/thread.rkt"

View File

@ -10,7 +10,8 @@
;; with `break-enabled-key`, and it does not support using an
;; exception handler in an engine.
(provide register-place-symbol!)
(provide register-place-symbol!
set-io-place-init!)
(define (make-engine thunk init-break-enabled-cell empty-config?)
(define ready-s (make-semaphore))
@ -183,16 +184,22 @@
(finish v)))))
(define place-symbols (make-hasheq))
(define io-place-init! void)
(define (start-place mod sym in out err)
(define (start-place mod sym in-fd out-fd err-fd cust plumber)
(io-place-init! in-fd out-fd err-fd cust plumber)
(lambda (finish)
(finish #f #f #f)
(finish)
((hash-ref place-symbols sym))))
;; For use in "demo.rkt"
(define (register-place-symbol! sym proc)
(hash-set! place-symbols sym proc))
;; For use in "demo-thread.rkt" in "io"
(define (set-io-place-init! proc)
(set! io-place-init! proc))
(define (place-exit v)
(if (eq? initial-place-local-table (place-local-table))
(exit v)

View File

@ -0,0 +1,16 @@
#lang racket/base
(require "check.rkt"
"sandman-struct.rkt")
(provide the-sandman
current-sandman)
(define the-sandman #f)
;; in atomic mode
(define/who current-sandman
(case-lambda
[() the-sandman]
[(sm)
(check who sandman? sm)
(set! the-sandman sm)]))

View File

@ -503,23 +503,23 @@
;; Check places, where the various export symbols passed to
;; `dynamic-place` are registered via `register-place-symbol!`
(register-place-symbol! 'nothing void)
(define pl1 (dynamic-place 'dummy 'nothing #f #f #f))
(define-values (pl1 pin1 pout1 perr1) (dynamic-place 'dummy 'nothing #f #f #f))
(check #t (place? pl1))
(check 0 (place-wait pl1))
(register-place-symbol! 'exit1 (lambda () (exit 1)))
(define pl2 (dynamic-place 'dummy 'exit1 #f #f #f))
(define-values (pl2 pin2 pout2 perr2) (dynamic-place 'dummy 'exit1 #f #f #f))
(check #t (place? pl2))
(check 1 (place-wait pl2))
(register-place-symbol! 'loop (lambda () (let loop () (loop))))
(define pl3 (dynamic-place 'dummy 'loop #f #f #f))
(define-values (pl3 pin3 pout3 perr3) (dynamic-place 'dummy 'loop #f #f #f))
(check #t (place? pl3))
(place-break pl3)
(check 1 (place-wait pl3))
(printf "[That break was from a place, and it's expected]\n")
(define pl4 (dynamic-place 'dummy 'loop #f #f #f))
(define-values (pl4 pin4 pout4 perr4) (dynamic-place 'dummy 'loop #f #f #f))
(check #f (sync/timeout 0.01 (place-dead-evt pl4)))
(place-kill pl4)
(check 1 (place-wait pl4))

View File

@ -2,10 +2,12 @@
(require "evt.rkt"
"sync.rkt"
"semaphore.rkt"
"channel.rkt"
"schedule-info.rkt"
"sandman.rkt"
"atomic.rkt"
"custodian.rkt"
"plumber.rkt"
"thread.rkt"
"unsafe.rkt"
"time.rkt")
@ -16,11 +18,19 @@
(provide #%thread-instance)
(define #%thread-instance
(hasheq 'make-semaphore make-semaphore
(hasheq 'thread thread
'thread-suspend-evt thread-suspend-evt
'thread-dead-evt thread-dead-evt
'current-thread current-thread
'thread-resume thread-resume
'make-semaphore make-semaphore
'semaphore-post semaphore-post
'semaphore-wait semaphore-wait
'semaphore-peek-evt semaphore-peek-evt
'make-channel make-channel
'channel-put-evt channel-put-evt
'wrap-evt wrap-evt
'handle-evt handle-evt
'always-evt always-evt
'choice-evt choice-evt
'sync sync
@ -46,6 +56,10 @@
'end-atomic/no-interrupts end-atomic/no-interrupts
'in-atomic-mode? in-atomic-mode?
'current-custodian current-custodian
'custodian-shut-down? custodian-shut-down?
'current-plumber current-plumber
'plumber-add-flush! plumber-add-flush!
'plumber-flush-handle-remove! plumber-flush-handle-remove!
'unsafe-custodian-register unsafe-custodian-register
'unsafe-custodian-unregister unsafe-custodian-unregister
'thread-push-kill-callback! thread-push-kill-callback!

View File

@ -172,8 +172,8 @@
place-channel-put
place-message-allowed?
set-make-place-ports+fds!
place-pumper-threads
place-shared?
unsafe-add-post-custodian-shutdown
futures-enabled?

View File

@ -29,8 +29,8 @@
place-channel-put
place-message-allowed?
set-make-place-ports+fds!
place-pumper-threads
place-shared?
unsafe-add-post-custodian-shutdown)
;; ----------------------------------------
@ -40,24 +40,24 @@
[queued-result #:mutable] ; non-#f triggers a place exit
custodian
[post-shutdown #:mutable] ; list of callbacks
pumper-threads ; vector of up to three pumper threads
[pumpers #:mutable] ; vector of up to three pumper threads
[pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate
done-waiting ; hash table of places to ping when this one ends
[wakeup-handle #:mutable]))
(define-place-local current-place #f)
(define (dynamic-place path sym in out err)
(define c (create-custodian))
(define/who (dynamic-place path sym in out err)
(define orig-cust (create-custodian))
(define lock (host:make-mutex))
(define started (host:make-condition))
(define done-waiting (make-hasheq))
(define new-place (place lock
#f ; result
#f ; queued-result
c
orig-cust
'() ; post-shutdown
(make-vector 3 #f) ; pumper-threads
#f ; pumper-threads
#f ; pending-break
done-waiting
#f))
@ -69,12 +69,18 @@
(host:mutex-release lock)
;; Switch to scheduler, so it can exit:
(engine-block))
;; Atomic mode to create ports and deliver them to the new place
(start-atomic)
(define-values (parent-in parent-out parent-err child-in-fd child-out-fd child-err-fd)
(make-place-ports+fds in out err))
;; Start the new place
(host:fork-place
(lambda ()
(define finish (host:start-place path sym in out err))
(define finish (host:start-place path sym
child-in-fd child-out-fd child-err-fd
orig-cust orig-plumber))
(call-in-another-main-thread
c
orig-cust
(lambda ()
(set! current-place new-place)
(current-plumber orig-plumber)
@ -86,10 +92,7 @@
(call-with-continuation-prompt
(lambda ()
(finish
(lambda (in-th out-th err-th)
(vector-set! (place-pumper-threads new-place) 0 in-th)
(vector-set! (place-pumper-threads new-place) 1 out-th)
(vector-set! (place-pumper-threads new-place) 2 err-th)
(lambda ()
(host:mutex-acquire lock)
(set-place-wakeup-handle! new-place (sandman-get-wakeup-handle))
(host:condition-signal started) ; place is sufficiently started
@ -103,7 +106,7 @@
(lambda (result)
;; Place is done, so save the result and alert anyone waiting on
;; the place
(do-custodian-shutdown-all c)
(do-custodian-shutdown-all orig-cust)
(host:mutex-acquire lock)
(set-place-result! new-place result)
(host:mutex-release lock)
@ -116,11 +119,12 @@
(host:mutex-release (place-lock k))]
[else (sandman-wakeup k)]))
(hash-clear! done-waiting)))
(end-atomic)
;; Wait for the place to start, then return the place object
(host:mutex-acquire lock)
(host:condition-wait started lock)
(host:mutex-release lock)
new-place)
(values new-place parent-in parent-out parent-err))
(define/who (place-break p [kind #f])
(check who place? p)
@ -163,7 +167,13 @@
(define/who (place-wait p)
(check who place? p)
(sync (place-done-evt p #t)))
(define result (sync (place-done-evt p #t)))
(define vec (place-pumpers p))
(when vec
(for ([s (in-vector vec)])
(when s (thread-wait s)))
(set-place-pumpers! p #f))
result)
(struct place-done-evt (p get-result?)
#:property prop:evt (poller (lambda (self poll-ctx)
@ -214,8 +224,16 @@
;; ----------------------------------------
(define (place-shared? v)
#f)
(define make-place-ports+fds
;; To be replaced by the "io" layer:
(lambda (in out err)
(values #f #f #f in out err)))
(define (set-make-place-ports+fds! proc)
(set! make-place-ports+fds proc))
(define (place-pumper-threads p vec)
(set-place-pumpers! p vec))
(define (unsafe-add-post-custodian-shutdown proc)
(when current-place

View File

@ -7,7 +7,8 @@
plumber-flush-all
plumber-add-flush!
plumber-flush-handle?
plumber-flush-handle-remove!)
plumber-flush-handle-remove!
plumber-callbacks)
(struct plumber (callbacks ; hash table of handles -> callbacks
weak-callbacks) ; same, but weak references

View File

@ -4,6 +4,7 @@
"tree.rkt"
"internal-error.rkt"
"sandman-struct.rkt"
"current-sandman.rkt"
"host.rkt")
;; A "sandman" manages the set of all sleeping threads that may need
@ -98,14 +99,6 @@
(define (sandman-any-waiters?)
((sandman-do-any-waiters? the-sandman)))
;; in atomic mode
(define/who current-sandman
(case-lambda
[() the-sandman]
[(sm)
(check who sandman? sm)
(set! the-sandman sm)]))
;; created simple lock here to avoid cycle in loading from using lock defined in future.rkt
(define (make-lock)
(box 0))
@ -136,7 +129,7 @@
;; Sandman should not have place-local state itself, but
;; it can access place-local state that's declared as such.
(define the-sandman
(define the-default-sandman
(sandman
;; sleep
(lambda (timeout-at)
@ -233,6 +226,7 @@
(make-lock)))
(void (current-sandman the-default-sandman))
;; Compute an approximation to infinity:
(define (distant-future)

View File

@ -350,10 +350,10 @@
(define (thread-dead-evt? v)
(dead-evt? v))
(define/who get-thread-dead-evt
(define get-thread-dead-evt
(let ([thread-dead-evt
(lambda (t)
(check who thread? t)
(check 'thread-dead-evt thread? t)
(atomically
(unless (thread-dead-evt t)
(set-thread-dead-evt! t (dead-evt (get-thread-dead-sema t)))))