cs, thread & io: finish implementing places

Implement place channels and messages, and change `place-enabled?` to
claim that places are enabled, but `processor-count` still reports 1.

The implementation of place channels has an interesting use of
ephemerons --- that is, a use that isn't just solving a key-in-value
problem. Using epehemerons solves the problem of forgetting a place
channel and any thread blocked on the read end when there are no
producers on the write end. Along similar lines, when only the write
end is retained (i.e., no readers), the channel data is forgotten and
writes become a no-op. The read end holds a "read key" and references
the channel data through an ephemeron keyed by a "write key"; the
write end similarly holds a "write key" and uses an ephemeron keyed by
the "read key". This use of an epehemeron implements a reachability
"and": retain the place-channel data only if the read end *and* the
write end are both reachable. (Minor point: a read end also holds onto
the "write key" anytime the channel already has data.)
This commit is contained in:
Matthew Flatt 2018-09-05 10:57:59 -06:00
parent aad98e563d
commit c50d2753c0
27 changed files with 773 additions and 134 deletions

View File

@ -0,0 +1,36 @@
#lang racket/base
(provide immutable-prefab-struct-key
prefab-key-all-fields-immutable?)
(define (immutable-prefab-struct-key v)
(define k (prefab-struct-key v))
(and k
(all-fields-immutable? k)
k))
(define (prefab-key-all-fields-immutable? k)
(unless (prefab-key? k)
(raise-argument-error 'prefab-key-all-fields-immutable? "prefab-key?" k))
(all-fields-immutable? k))
(define (all-fields-immutable? k)
(or (symbol? k)
(null? k)
(let* ([rk (cdr k)] ; skip name
[rk (if (and (pair? rk)
(exact-integer? (car rk)))
(cdr rk) ; skip init count
rk)]
[rk (if (and (pair? rk)
(pair? (car rk)))
(if (zero? (caar rk))
(cdr rk) ; skip zero auto count
(cons '#(1) (cdr rk))) ; reflect mutable auto field
rk)])
(if (and (pair? rk)
(vector? (car rk)))
(if (zero? (vector-length (car rk)))
(all-fields-immutable? (cdr rk))
#f)
(all-fields-immutable? rk)))))

View File

@ -66,7 +66,7 @@ setup:
$(MAKE) run ARGS="-l- setup $(ARGS)"
setup-v:
$(MAKE) run ARGS="-W 'info@compiler/cm info@linklet debug@GC:major error' -l- setup $(ARGS)"
$(MAKE) run ARGS="-W 'info@compiler/cm info@linklet debug@GC:major error' -l- setup -j 1 $(ARGS)"
run-wpo: $(BUILDDIR)racket.so ../../bin/racket
$(SCHEME) --script $(BUILDDIR)racket.so $(RACKET_SETUP_ARGS) $(ARGS)

View File

@ -130,4 +130,63 @@
(test '#(info "cats: hello" 7 cats) msg1)
(log-message demo1-logger 'info 'cats "goodbye" 9)
(test '#(info "cats: goodbye" 9 cats) (sync lr1)))))))
(test '#(info "cats: goodbye" 9 cats) (sync lr1)))))
;; ----------------------------------------
(let* ([place-symbols (make-hasheq)]
[register-place-symbol!
(lambda (sym proc)
(hash-set! place-symbols sym proc))])
(set-make-place-ports+fds! make-place-ports+fds)
(set-start-place!
(lambda (pch mod sym in out err cust plumber)
(io-place-init! in out err cust plumber)
(lambda (finish)
(finish)
((hash-ref place-symbols sym) pch))))
;; Check file port passed across places
(let ([f (open-input-file "compiled/io.scm")])
(file-stream-buffer-mode f 'none)
(let ([content (read-bytes 5 f)])
(file-position f 0)
(register-place-symbol! 'read-byte
(lambda (pch)
(let ([f (place-channel-get pch)])
(file-stream-buffer-mode f 'none)
(let ([b (read-byte f)])
(close-input-port f)
(place-channel-put pch b)))))
(let-values ([(pl in out err) (dynamic-place 'dummy 'read-byte #f #f #f)])
(test (bytes-ref content 0) (read-byte f))
(place-channel-put pl f)
(test (bytes-ref content 1) (place-channel-get pl))
(test (bytes-ref content 2) (read-byte f))
(close-input-port f)))))
;; Thread can be GCed if it's block on a place channel with no writer
(let ()
(define-values (left1 right1) (place-channel))
(define saved #f)
(define not-saved (gensym))
(define weak-saved (make-weak-box not-saved))
(define weak-right1 (make-weak-box right1))
(place-channel-put right1 not-saved)
;; DON'T USE `right1` from here on...
(let ()
(define weak-thread
(make-weak-box
(thread (lambda ()
(define local-saved (place-channel-get left1))
(place-channel-get left1) ; no writer for this channel
(set! saved local-saved)))))
(sync (system-idle-evt))
(collect-garbage)
(test #f (weak-box-value weak-right1))
(test #f (weak-box-value weak-thread))
(test #f (weak-box-value weak-saved))))
(void)))

View File

@ -191,22 +191,22 @@
(lambda (sym proc)
(hash-set! place-symbols sym proc))])
(set-start-place!
(lambda (mod sym in out err cust plumber)
(lambda (pch mod sym in out err cust plumber)
(lambda (finish)
(finish)
((hash-ref place-symbols sym)))))
((hash-ref place-symbols sym) pch))))
(register-place-symbol! 'nothing void)
(let-values ([(pl1 in1 out1 err1) (dynamic-place 'dummy 'nothing #f #f #f)])
(check #t (place? pl1))
(check 0 (place-wait pl1)))
(register-place-symbol! 'exit1 (lambda () (exit 1)))
(register-place-symbol! 'exit1 (lambda (pch) (exit 1)))
(let-values ([(pl2 in2 out2 err2) (dynamic-place 'dummy 'exit1 #f #f #f)])
(check #t (place? pl2))
(check 1 (place-wait pl2)))
(register-place-symbol! 'loop (lambda () (let loop () (loop))))
(register-place-symbol! 'loop (lambda (pch) (let loop () (loop))))
(let-values ([(pl3 in3 out3 err3) (dynamic-place 'dummy 'loop #f #f #f)])
(check #t (place? pl3))
(place-break pl3)

View File

@ -295,8 +295,8 @@
(define (rktio_pipe_results r)
(values
(foreign-ref 'ptr (ptr->address r) 0)
(foreign-ref 'ptr (ptr->address r) 1)))
(address->ptr (foreign-ref 'uptr (ptr->address r) 0))
(address->ptr (foreign-ref 'uptr (ptr->address r) (foreign-sizeof 'uptr)))))
(define (rktio_do_install_os_signal_handler rktio)
(rktio_install_os_signal_handler rktio))

View File

@ -485,13 +485,15 @@
(set-make-place-ports+fds! make-place-ports+fds)
(set-start-place!
(lambda (mod sym in out err cust plumber)
(lambda (pch mod sym in out err cust plumber)
(io-place-init! in out err cust plumber)
(regexp-place-init!)
(expander-place-init!)
(initialize-place!)
(lambda ()
(dynamic-require mod sym))))
(lambda (finish)
(finish)
(let ([f (dynamic-require mod sym)])
(f pch)))))
(when (getenv "PLT_STATS_ON_BREAK")
(keyboard-interrupt-handler

View File

@ -1226,7 +1226,7 @@
(let* ([bstr (make-bytevector size)]
[p (make-cpointer bstr #f)])
(lock-object bstr)
(the-foreign-guardian p (lambda () (unlock-object bstr)))
(with-global-lock (the-foreign-guardian p (lambda () (unlock-object bstr))))
p)]
[else
(raise-unsupported-error 'malloc

View File

@ -58,7 +58,7 @@
(meta-cond
[(threaded?)
(define (place-enabled?) #f) ;; FIXME
(define (place-enabled?) #t)
(define (fork-place thunk finish-proc)
(fork-thread (lambda ()
(init-virtual-registers)
@ -79,8 +79,8 @@
(define (set-start-place! proc)
(set! do-start-place proc))
(define (start-place path sym in out err cust plumber)
(do-start-place path sym in out err cust plumber))
(define (start-place pch path sym in out err cust plumber)
(do-start-place pch path sym in out err cust plumber))
(define (place-exit v)
(let ([esc (unbox place-esc-box)])

View File

@ -1,31 +1,8 @@
#lang racket/base
(require racket/prefab)
(provide immutable-prefab-struct-key
all-fields-immutable?)
(define (immutable-prefab-struct-key v)
(define k (prefab-struct-key v))
(and k
(all-fields-immutable? k)
k))
(define (all-fields-immutable? k)
(or (symbol? k)
(null? k)
(let* ([rk (cdr k)] ; skip name
[rk (if (and (pair? rk)
(exact-integer? (car rk)))
(cdr rk) ; skip init count
rk)]
[rk (if (and (pair? rk)
(pair? (car rk)))
(if (zero? (caar rk))
(cdr rk) ; skip zero auto count
(cons '#(1) (cdr rk))) ; reflect mutable auto field
rk)])
(if (and (pair? rk)
(vector? (car rk)))
(if (zero? (vector-length (car rk)))
(all-fields-immutable? (cdr rk))
#f)
(all-fields-immutable? rk)))))
(prefab-key-all-fields-immutable? k))

View File

@ -23,7 +23,7 @@
(call-in-main-thread
(lambda ()
;; Make `N` threads trying to write `P` copies
;; of each possible byte into a limited pipe, and
;; make `N` other threads try to read those bytes.
@ -185,8 +185,11 @@
(custodian-shutdown-all c))
;; Places
;; BEWARE: we can run some basic places tests in bootstrap mode,
;; but since "places" are just Racket threads, avoid any rktio-based
;; blocking operations
(register-place-symbol! 'report
(lambda ()
(lambda (pch)
(write-string "expected place out\n")
(write-string "expected place err\n" (current-error-port))))
(define-values (pl1 pin1 pout1 perr1) (dynamic-place 'dummy 'report
@ -198,9 +201,10 @@
(test #f pout1)
(test #f perr1)
(test 0 (place-wait pl1))
;; See warnign about about places in bootstrap-demo mode
(register-place-symbol! 'echo2
(lambda ()
(lambda (pch)
(define s (read-line))
(write-string s)
(define s2 (list->string (reverse (string->list s))))
@ -216,6 +220,31 @@
(test "olleh" (read-string 100 perr2))
(test 0 (place-wait pl2))
;; Can pass a file-stream port through a place channel, but it
;; makes a fresh port on the other end
(define-values (left1 right1) (place-channel))
(let ([f (open-input-file "compiled/hello.txt")])
(file-stream-buffer-mode f 'none)
(test #\h (read-char f))
(test 1 (file-position f))
(place-channel-put left1 f)
(define f2 (place-channel-get right1))
(file-stream-buffer-mode f2 'none)
(test #\e (read-char f2))
(test 2 (file-position f2))
(close-input-port f2)
(test 2 (file-position f))
(test #\l (read-char f))
(test 3 (file-position f))
(close-input-port f))
;; Paths are ok as place messages:
(let ([p (bytes->path #"ok" 'windows)])
(place-channel-put left1 p)
(test p (place-channel-get right1)))
(let ([p (bytes->path #"ok" 'unix)])
(place-channel-put left1 p)
(test p (place-channel-get right1)))
;; TCP and accept evts
(parameterize ([current-custodian (make-custodian)])
(define l (tcp-listen 59078 5 #t))

View File

@ -71,6 +71,9 @@
(eq? always-evt evt)
(eq? never-evt evt)))
(define-values (prop:place-message place-message? place-message-ref)
(make-struct-type-property 'place-message))
(primitive-table '#%pthread
(hasheq 'unsafe-make-place-local box
'unsafe-place-local-ref unbox
@ -128,4 +131,5 @@
'unsafe-custodian-unregister unsafe-custodian-unregister
'thread-push-kill-callback! thread-push-kill-callback!
'thread-pop-kill-callback! thread-pop-kill-callback!
'set-get-subprocesses-time! void))
'set-get-subprocesses-time! void
'prop:place-message prop:place-message))

View File

@ -54,7 +54,8 @@
custodian-shut-down?
current-plumber
plumber-add-flush!
plumber-flush-handle-remove!)
plumber-flush-handle-remove!
prop:place-message)
(bounce* choice-evt ; raw variant that takes a list of evts
prop:secondary-evt

View File

@ -1,7 +1,8 @@
#lang racket/base
(require "../print/custom-write.rkt"
"../port/string-output.rkt"
"../locale/string.rkt")
"../locale/string.rkt"
(only-in "../host/thread.rkt" prop:place-message))
(provide (struct-out path)
is-path?
@ -32,7 +33,10 @@
(lambda (p hc)
(hc (path-bytes p)))
(lambda (p hc)
(hc (path-bytes p)))))
(hc (path-bytes p))))
#:property prop:place-message (lambda (self)
(lambda ()
(lambda () self))))
(define is-path?
(let ([path? (lambda (p)

View File

@ -13,7 +13,8 @@
"buffer-mode.rkt"
"close.rkt"
"count.rkt"
"check.rkt")
"check.rkt"
"place-message.rkt")
(provide open-input-fd
open-output-fd
@ -21,7 +22,7 @@
fd-port-fd
maybe-fd-data-extra)
(struct fd-data (fd extra)
(struct fd-data (fd extra input?)
#:property prop:file-stream (lambda (fdd) (fd-data-fd fdd))
#:property prop:file-truncate (case-lambda
[(fdd pos)
@ -29,7 +30,10 @@
(rktio_set_file_size rktio
(fd-data-fd fdd)
pos)
"error setting file size")]))
"error setting file size")])
#:property prop:data-place-message (lambda (port)
(lambda ()
(fd-port->place-message port))))
(define (maybe-fd-data-extra data)
(and (fd-data? data)
@ -56,7 +60,7 @@
(define-values (port buffer-control)
(open-input-peek-via-read
#:name name
#:data (fd-data fd extra-data)
#:data (fd-data fd extra-data #t)
#:read-in
;; in atomic mode
(lambda (dest-bstr start end copy?)
@ -159,7 +163,7 @@
(define port
(make-core-output-port
#:name name
#:data (fd-data fd extra-data)
#:data (fd-data fd extra-data #f)
#:evt evt
@ -335,3 +339,36 @@
(set-closed-state! closed))
#f
#f))
;; ----------------------------------------
(define (fd-port->place-message port)
(start-atomic)
(cond
[(port-closed? port) #f]
[else
(define input? (input-port? port))
(define fd-dup (dup-port-fd port))
(define name (core-port-name port))
(end-atomic)
(lambda ()
(atomically
(define fd (claim-dup fd-dup))
(if input?
(open-input-fd fd name)
(open-output-fd fd name))))]))
;; in atomic mode
(define (dup-port-fd port)
(define fd (fd-data-fd (core-port-data port)))
(define new-fd (rktio_dup rktio fd))
(when (rktio-error? new-fd)
(end-atomic)
(raise-rktio-error 'place-channel-put new-fd "error during duping file descriptor"))
(define fd-dup (box new-fd))
;; FIXME: possible leak if place message is never received
fd-dup)
;; in atomic mode
(define (claim-dup fd-dup)
(unbox fd-dup))

View File

@ -26,7 +26,7 @@
((core-input-port-prepare-change peek-pipe-i)))
;; in atomic mode
(define (pull-some-bytes [amt (bytes-length buf)] #:keep-eof? [keep-eof? #t])
(define (pull-some-bytes [amt (if (eq? 'block buffer-mode) (bytes-length buf) 1)] #:keep-eof? [keep-eof? #t])
(define v (read-in buf 0 amt #f))
(cond
[(eof-object? v)

View File

@ -0,0 +1,12 @@
#lang racket/base
(provide prop:data-place-message
data->place-message)
(define-values (prop:data-place-message data-place-message? data-place-message-ref)
(make-struct-type-property 'data-place-message))
(define (data->place-message data port)
(if (data-place-message? data)
((data-place-message-ref data) port)
#f))

View File

@ -1,6 +1,7 @@
#lang racket/base
(require "../host/thread.rkt"
"evt.rkt")
"evt.rkt"
"place-message.rkt")
(provide (struct-out core-port)
(struct-out closed-state))
@ -27,7 +28,10 @@
[position #:mutable]) ; count UTF-8 characters
#:authentic
#:property prop:object-name (struct-field-index name)
#:property prop:secondary-evt port->evt)
#:property prop:secondary-evt port->evt
#:property prop:place-message (lambda (p)
(define data (core-port-data p))
(data->place-message data p)))
(struct closed-state ([closed? #:mutable]
[closed-sema #:mutable]) ; #f or a semaphore posed on close

View File

@ -63,6 +63,10 @@ Thread and signal conventions:
before a second call, different `rktio_t` values can be used freely
from different threads.
- Except as otherwise specificed, anything created with a particular
`rktio_t` must be used with that same `rktio_t` thereafter (and in
only one thread at a time).
- If a function doesn't take a `rktio_t` argument, then it can be
called concurrently with anything else. Notably,
`rktio_signal_received_at` does not take a `rktio_t`.
@ -189,7 +193,9 @@ typedef struct rktio_fd_t rktio_fd_t;
RKTIO_EXTERN rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes);
/* A socket (as opposed to other file descriptors) registered this way
should include include `RKTIO_OPEN_SOCKET` and be non-blocking or
use `RKTIO_OPEN_INIT`. */
use `RKTIO_OPEN_INIT`. The resulting `rktio_fd_t` is not attached
to `rktio`; it can be used with other `rktio_t`s, as long as it is
used from only one thread at a time. */
RKTIO_EXTERN_NOERR intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd);
/* Extracts a native file descriptor or socket. */
@ -230,7 +236,8 @@ RKTIO_EXTERN void rktio_close_noerr(rktio_t *rktio, rktio_fd_t *fd);
RKTIO_EXTERN rktio_fd_t *rktio_dup(rktio_t *rktio, rktio_fd_t *rfd);
/* Copies a file descriptor, where each must be closed or forgotten
independenty. */
independenty. Like the result of `rktio_system_fd`, the resulting
`rktio_fd_t` is not attached to `rktio`. */
RKTIO_EXTERN void rktio_forget(rktio_t *rktio, rktio_fd_t *fd);
/* Deallocates a `rktio_fd_t` without closing the file descriptor,

View File

@ -174,6 +174,10 @@
(define place-done-prompt (make-continuation-prompt-tag 'place-done))
;; Beware that this implementation of `fork-place` doesn't support
;; rktio-based blocking in different places. So, be careful of the
;; preliminary tests that you might try with the "io" layer and
;; places.
(define (fork-place thunk finish)
(parameterize ([place-local-table (make-hasheq)])
(thread (lambda ()
@ -186,11 +190,11 @@
(define place-symbols (make-hasheq))
(define io-place-init! void)
(define (start-place mod sym in-fd out-fd err-fd cust plumber)
(define (start-place pch mod sym in-fd out-fd err-fd cust plumber)
(io-place-init! in-fd out-fd err-fd cust plumber)
(lambda (finish)
(finish)
((hash-ref place-symbols sym))))
((hash-ref place-symbols sym) pch)))
;; For use in "demo.rkt"
(define (register-place-symbol! sym proc)

View File

@ -507,12 +507,12 @@
(check #t (place? pl1))
(check 0 (place-wait pl1))
(register-place-symbol! 'exit1 (lambda () (exit 1)))
(register-place-symbol! 'exit1 (lambda (pch) (exit 1)))
(define-values (pl2 pin2 pout2 perr2) (dynamic-place 'dummy 'exit1 #f #f #f))
(check #t (place? pl2))
(check 1 (place-wait pl2))
(register-place-symbol! 'loop (lambda () (let loop () (loop))))
(register-place-symbol! 'loop (lambda (pch) (let loop () (loop))))
(define-values (pl3 pin3 pout3 perr3) (dynamic-place 'dummy 'loop #f #f #f))
(check #t (place? pl3))
(place-break pl3)
@ -526,6 +526,42 @@
(check #t (evt? (sync (place-dead-evt pl4))))
(check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4))))
;; Place channel
(define-values (left1 right1) (place-channel))
(check #t (place-channel? left1))
(check #t (place-channel? right1))
(place-channel-put left1 'a)
(check 'a (place-channel-get right1))
(place-channel-put right1 'b)
(check 'b (place-channel-get left1))
(check #t (evt? left1))
(check #t (evt? right1))
(place-channel-put left1 'c)
(place-channel-put left1 'd)
(check 'c (sync right1))
(check 'd (sync/timeout 0 right1))
;; Inter-place place channel
(register-place-symbol! 'vector-echo (lambda (pch)
(for ([i 3])
(place-channel-put pch (vector (place-channel-get pch))))))
(define-values (pl5 pin5 pout5 perr5) (dynamic-place 'dummy 'vector-echo #f #f #f))
(check #t (place? pl5))
(check #t (place-channel? pl5))
(check #t (evt? pl5))
(check (void) (place-channel-put pl5 'howdy))
(check (vector 'howdy) (place-channel-get pl5))
(check (void) (place-channel-put pl5 'hiya))
(check (vector 'hiya) (sync pl5))
(define got5 #f)
(define t5 (thread (lambda () (set! got5 (place-channel-get pl5)))))
(sync (system-idle-evt))
(check #f (sync/timeout 0 t5))
(check (void) (place-channel-put pl5 'again))
(check t5 (sync t5))
(check (vector 'again) got5)
(check 0 (place-wait pl5))
(set! done? #t)))
(unless done?

View File

@ -10,7 +10,8 @@
"plumber.rkt"
"thread.rkt"
"unsafe.rkt"
"time.rkt")
"time.rkt"
"place-message.rkt")
;; Unsafe scheduler-cooperation functions are made available to
;; clients through a `#%thread` primitive linklet instance:
@ -64,4 +65,5 @@
'unsafe-custodian-unregister unsafe-custodian-unregister
'thread-push-kill-callback! thread-push-kill-callback!
'thread-pop-kill-callback! thread-pop-kill-callback!
'set-get-subprocesses-time! set-get-subprocesses-time!))
'set-get-subprocesses-time! set-get-subprocesses-time!
'prop:place-message prop:place-message))

View File

@ -26,6 +26,7 @@
"stats.rkt"
"stack-size.rkt"
"place.rkt"
"place-message.rkt"
"future.rkt"
"fsemaphore.rkt"
"os-thread.rkt")
@ -171,6 +172,8 @@
place-channel-put
place-message-allowed?
prop:place-message
set-make-place-ports+fds!
place-pumper-threads
unsafe-add-post-custodian-shutdown

View File

@ -0,0 +1,251 @@
#lang racket/base
(require '#%flfxnum
(only-in '#%foreign cpointer? ptr-add)
(only-in '#%place place-shared?)
racket/prefab)
(provide place-message-allowed?
place-message-allowed-direct?
message-ize
un-message-ize
prop:place-message)
;; A prop:place-message value is a procedure that takes self
;; and returns either #f [=> not a place mesage after all] or
;; (-> (-> v))
(define-values (prop:place-message place-message? place-message-ref)
(make-struct-type-property 'place-message))
(struct message-ized (unmessage)
#:authentic)
(define (allowed? v #:direct? direct?)
(let loop ([v v] [graph #hasheq()])
(or (number? v)
(char? v)
(boolean? v)
(keyword? v)
(void? v)
(symbol? v)
(and (or (string? v)
(bytes? v))
(or (not direct?) (immutable? v)))
(null? v)
(and (pair? v)
(or (hash-ref graph v #f)
(let ([graph (hash-set graph v #t)])
(and (loop (car v) graph)
(loop (cdr v) graph)))))
(and (vector? v)
(or (not direct?)
(and (immutable? v)
(not (impersonator? v))))
(let ([graph (hash-ref graph v #t)])
(for/and ([e (in-vector v)])
(loop e graph))))
(and (immutable-prefab-struct-key v)
(let ([graph (hash-set graph v #t)])
(for/and ([e (in-vector (struct->vector v))])
(loop e graph))))
(and (hash? v)
(or (not direct?)
(and (immutable? v)
(not (impersonator? v))))
(let ([graph (hash-set graph v #t)])
(for/and ([(k v) (in-hash v)])
(and (loop k graph)
(loop v graph)))))
(and (not direct?)
(or (cpointer? v)
(and (or (fxvector? v)
(flvector? v))
(place-shared? v))
(and (place-message? v)
((place-message-ref v) v)
#t))))))
(define (place-message-allowed-direct? v)
(allowed? v #:direct? #t))
(define (place-message-allowed? v)
(allowed? v #:direct? #f))
;; Convert a message to a form suitable to keep in a channel, but
;; simultaneously check whether the message is ok, since a message
;; might start out with mutable elements that are changed while
;; the conversion is in progress (but we convert enough to avoid
;; problems afterward)
(define (message-ize v fail)
(define graph #f)
(define used #f)
(define (maybe-ph ph v)
(if (and used (hash-ref used ph #f))
(begin
(placeholder-set! ph v)
ph)
v))
(define new-v
(let loop ([v v])
(cond
[(or (number? v)
(char? v)
(boolean? v)
(keyword? v)
(void? v)
(symbol? v)
(null? v))
v]
[(string? v)
(string->immutable-string v)]
[(bytes? v)
(bytes->immutable-bytes v)]
[else
(unless graph (set! graph (make-hasheq)))
(cond
[(hash-ref graph v #f)
=> (lambda (v)
(unless used (set! used (make-hasheq)))
(hash-set! used v #t)
v)]
[(pair? v)
(define ph (make-placeholder #f))
(hash-set! graph v ph)
(maybe-ph ph (cons (loop (car v))
(loop (cdr v))))]
[(vector? v)
(define ph (make-placeholder #f))
(hash-set! graph v ph)
(maybe-ph ph (for/vector #:length (vector-length v) ([e (in-vector v)])
(loop e)))]
[(immutable-prefab-struct-key v)
=> (lambda (k)
(define ph (make-placeholder #f))
(hash-set! graph v ph)
(maybe-ph
(apply make-prefab-struct
k
(for/list ([e (in-vector (struct->vector v) 1)])
(loop v)))))]
[(hash? v)
(define ph (make-placeholder #f))
(hash-set! graph v ph)
(cond
[(hash-eq? v)
(for/hasheq ([(k v) (in-hash v)])
(values (loop k) (loop v)))]
[(hash-eqv? v)
(for/hasheqv ([(k v) (in-hash v)])
(values (loop k) (loop v)))]
[else
(for/hash ([(k v) (in-hash v)])
(values (loop k) (loop v)))])]
[(cpointer? v)
(ptr-add v 0)]
[(and (or (fxvector? v)
(flvector? v))
(place-shared? v))
v]
[(place-message? v)
(define make-unmessager ((place-message-ref v) v))
(if make-unmessager
(message-ized (make-unmessager))
(fail))]
[else (fail)])])))
(message-ized new-v))
(define (un-message-ize v)
(if (message-ized? v)
(make-reader-graph (do-un-message-ize (message-ized-unmessage v)))
v))
(define (do-un-message-ize v)
(define graph #f)
(let loop ([v v])
(cond
[(placeholder? v)
(define ph (make-placeholder #f))
(unless graph (set! graph (make-hasheq)))
(cond
[(hash-ref graph v #f) => (lambda (ph) ph)]
[else
(hash-set! graph v ph)
(placeholder-set! ph (loop (placeholder-get v)))
ph])]
[(pair? v)
(cons (loop (car v)) (loop (cdr v)))]
[(vector? v)
(for/vector #:length (vector-length v) ([e (in-vector v)])
(loop e))]
[(immutable-prefab-struct-key v)
=> (lambda (k)
(apply make-prefab-struct
k
(for/list ([e (in-vector (struct->vector v) 1)])
(loop v))))]
[(hash? v)
(cond
[(hash-eq? v)
(for/hasheq ([(k v) (in-hash v)])
(values (loop k) (loop v)))]
[(hash-eqv? v)
(for/hasheqv ([(k v) (in-hash v)])
(values (loop k) (loop v)))]
[else
(for/hash ([(k v) (in-hash v)])
(values (loop k) (loop v)))])]
[(cpointer? v)
(ptr-add v 0)]
[(message-ized? v)
((message-ized-unmessage v))]
[else v])))
(module+ test
(define-syntax-rule (test expect actual)
(let ([v actual])
(unless (equal? expect v)
(error 'test "failed: ~s = ~s" 'actual v))))
(struct external (a)
#:property prop:place-message (lambda (self)
(lambda ()
(define a (external-a self))
(lambda () (external a)))))
(test #t (place-message-allowed? "apple"))
(test #t (place-message-allowed-direct? "apple"))
(test #f (place-message-allowed-direct? (string-copy "apple")))
(test #f (place-message-allowed-direct? (cons 1 (string-copy "apple"))))
(test #t (place-message-allowed-direct? '(a . b)))
(test #t (place-message-allowed-direct? '#(a b)))
(test #t (place-message-allowed-direct? '#hasheq((a . b))))
(test #t (place-message-allowed-direct? '#s(pre 1 2 3)))
(define direct-cyclic (read (open-input-string "#0=(1 #0# 2)")))
(test #t (place-message-allowed-direct? direct-cyclic))
(define stateful-cyclic (make-reader-graph
(let ([ph (make-placeholder #f)]
[ph2 (make-placeholder #f)]
[ph3 (make-placeholder #f)])
(define (as ph v) (placeholder-set! ph v) v)
(as ph2 (vector (as ph (cons ph (string-copy "apple")))
ph2
(as ph3 (hasheq 'a 1 'b ph3))
'#s(pre 4 5)))
ph)))
(test #f (place-message-allowed-direct? stateful-cyclic))
(test #t (place-message-allowed? stateful-cyclic))
(test stateful-cyclic (un-message-ize (message-ize stateful-cyclic)))
(define ext (external 'x))
(test #t (place-message-allowed? ext))
(test #f (place-message-allowed-direct? ext))
(define ext2 (un-message-ize (message-ize ext)))
(test #t (external? ext2))
(test #f (eq? ext ext2))
(test 'x (external-a ext2))
(void))

View File

@ -12,8 +12,10 @@
"plumber.rkt"
"exit.rkt"
"sync.rkt"
"semaphore.rkt"
"evt.rkt"
"sandman.rkt")
"sandman.rkt"
"place-message.rkt")
(provide dynamic-place
place?
@ -26,7 +28,6 @@
place-channel?
place-channel-get
place-channel-put
place-message-allowed?
set-make-place-ports+fds!
place-pumper-threads
@ -34,7 +35,10 @@
;; ----------------------------------------
(struct place (lock
(struct place (parent
lock
activity-canary ; box for quick check before taking lock
pch ; channel to new place
[result #:mutable] ; byte or #f, where #f means "not done"
[queued-result #:mutable] ; non-#f triggers a place exit
custodian
@ -42,30 +46,48 @@
[pumpers #:mutable] ; vector of up to three pumper threads
[pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate
done-waiting ; hash table of places to ping when this one ends
[wakeup-handle #:mutable]))
[wakeup-handle #:mutable]
[dequeue-semas #:mutable]) ; semaphores reflecting place-channel waits to recheck
#:property prop:evt (struct-field-index pch)
#:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self)))))
(define-place-local current-place #f)
(define (make-place lock cust
#:parent [parent #f]
#:place-channel [pch #f])
(place parent
lock
(box #f) ; activity canary
pch
#f ; result
#f ; queued-result
cust
'() ; post-shutdown
#f ; pumper-threads
#f ; pending-break
(make-hasheq) ; done-waiting
#f ; wakeup-handle
'())) ; dequeue-semas
(define-place-local current-place (make-place (host:make-mutex)
(current-custodian)))
(define/who (dynamic-place path sym in out err)
(define orig-cust (create-custodian))
(define lock (host:make-mutex))
(define started (host:make-condition))
(define done-waiting (make-hasheq))
(define new-place (place lock
#f ; result
#f ; queued-result
orig-cust
'() ; post-shutdown
#f ; pumper-threads
#f ; pending-break
done-waiting
#f))
(define-values (place-pch child-pch) (place-channel))
(define new-place (make-place lock orig-cust
#:parent current-place
#:place-channel place-pch))
(define done-waiting (place-done-waiting new-place))
(define orig-plumber (make-plumber))
(define (default-exit v)
(plumber-flush-all orig-plumber)
(host:mutex-acquire lock)
(set-place-queued-result! new-place (if (byte? v) v 0))
(host:mutex-release lock)
(atomically
(host:mutex-acquire lock)
(set-place-queued-result! new-place (if (byte? v) v 0))
(place-has-activity! new-place)
(host:mutex-release lock))
;; Switch to scheduler, so it can exit:
(engine-block))
;; Atomic mode to create ports and deliver them to the new place
@ -75,7 +97,7 @@
;; Start the new place
(host:fork-place
(lambda ()
(define finish (host:start-place path sym
(define finish (host:start-place child-pch path sym
child-in-fd child-out-fd child-err-fd
orig-cust orig-plumber))
(call-in-another-main-thread
@ -109,58 +131,68 @@
(host:mutex-acquire lock)
(set-place-result! new-place result)
(host:mutex-release lock)
(for ([k (in-hash-keys done-waiting)])
(cond
[(place? k)
(host:mutex-acquire (place-lock k))
(unless (place-result k)
(sandman-wakeup (place-wakeup-handle k)))
(host:mutex-release (place-lock k))]
[else (sandman-wakeup k)]))
(for ([pl (in-hash-keys done-waiting)])
(wakeup-waiting pl))
(hash-clear! done-waiting)))
(end-atomic)
;; Wait for the place to start, then return the place object
(host:mutex-acquire lock)
(host:condition-wait started lock)
(host:mutex-release lock)
(end-atomic)
(values new-place parent-in parent-out parent-err))
(define/who (place-break p [kind #f])
(check who place? p)
(unless (or (not kind) (eq? kind 'hangup) (eq? kind 'terminate))
(raise-argument-error who "(or/c #f 'hangup 'terminate)" kind))
(host:mutex-acquire (place-lock p))
(define pending-break (place-pending-break p))
(when (or (not pending-break)
(break>? (or kind 'break) pending-break))
(set-place-pending-break! p (or kind 'break))
(sandman-wakeup (place-wakeup-handle p)))
(host:mutex-release (place-lock p)))
(atomically
(host:mutex-acquire (place-lock p))
(define pending-break (place-pending-break p))
(when (or (not pending-break)
(break>? (or kind 'break) pending-break))
(set-place-pending-break! p (or kind 'break))
(place-has-activity! p)
(sandman-wakeup (place-wakeup-handle p)))
(host:mutex-release (place-lock p))))
(define (place-has-activity! p)
(box-cas! (place-activity-canary p) #f #t)
(void))
(void
(set-check-place-break!
(set-check-place-activity!
;; Called in atomic mode by scheduler
(lambda ()
(define p current-place)
(when p
(unless (box-cas! (place-activity-canary p) #f #f)
(box-cas! (place-activity-canary p) #t #f)
(host:mutex-acquire (place-lock p))
(define queued-result (place-queued-result p))
(define break (place-pending-break p))
(define dequeue-semas (place-dequeue-semas p))
(when break
(set-place-pending-break! p #f))
(when (pair? dequeue-semas)
(set-place-dequeue-semas! p null))
(host:mutex-release (place-lock p))
(when queued-result
(force-exit queued-result))
(for ([s (in-list dequeue-semas)])
(thread-did-work!)
(semaphore-post-all/atomic s))
(when break
(thread-did-work!)
(do-break-thread root-thread break #f))))))
(define/who (place-kill p)
(check who place? p)
(host:mutex-acquire (place-lock p))
(unless (or (place-result p)
(place-queued-result p))
(set-place-queued-result! p 1))
(host:mutex-release (place-lock p))
(atomically
(host:mutex-acquire (place-lock p))
(unless (or (place-result p)
(place-queued-result p))
(set-place-queued-result! p 1)
(place-has-activity! p))
(host:mutex-release (place-lock p)))
(place-wait p)
(void))
@ -170,21 +202,19 @@
(define vec (place-pumpers p))
(when vec
(for ([s (in-vector vec)])
(when s (thread-wait s)))
(when (thread? s) (thread-wait s)))
(set-place-pumpers! p #f))
result)
(struct place-done-evt (p get-result?)
#:property prop:evt (poller (lambda (self poll-ctx)
(assert-atomic-mode)
(ensure-wakeup-handle!)
(define p (place-done-evt-p self))
(host:mutex-acquire (place-lock p))
(define result (place-result p))
(unless result
(hash-set! (place-done-waiting p)
(or current-place
(sandman-get-wakeup-handle))
#t))
(hash-set! (place-done-waiting p) current-place #t))
(host:mutex-release (place-lock p))
(if result
(if (place-done-evt-get-result? self)
@ -199,24 +229,159 @@
;; ----------------------------------------
(struct pchannel ()
#:reflection-name 'place-channel)
(struct message-queue (lock
[q #:mutable]
[rev-q #:mutable]
key-box ; holds write key when non-empty
[waiters #:mutable]) ; hash of waiting place -> semaphore
#:authentic)
(define (make-message-queue)
(message-queue (host:make-mutex)
'()
'()
(box #f)
#hash()))
(define (enqueue! mq msg wk)
(define lock (message-queue-lock mq))
(atomically
(host:mutex-acquire lock)
(set-message-queue-rev-q! mq (cons msg (message-queue-rev-q mq)))
(define waiters (message-queue-waiters mq))
(set-message-queue-waiters! mq '#hash())
(set-box! (message-queue-key-box mq) wk)
(host:mutex-release lock)
(for ([(pl s) (in-hash waiters)])
(host:mutex-acquire (place-lock pl))
(set-place-dequeue-semas! pl (cons s (place-dequeue-semas pl)))
(place-has-activity! pl)
(host:mutex-release (place-lock pl))
(wakeup-waiting pl))))
;; in atomic mode
;; Either calls `success-k` or calls `fail-k` with
;; a semaphore to be posted when the queue receives
;; a message. Note that if the message queue becomes
;; inaccessible (so no writers), then the semaphores
;; become inaccessible.
(define (dequeue! mq success-k fail-k)
(ensure-wakeup-handle!)
(define lock (message-queue-lock mq))
(host:mutex-acquire lock)
(when (and (null? (message-queue-q mq))
(not (null? (message-queue-rev-q mq))))
(set-message-queue-q! mq (reverse (message-queue-rev-q mq)))
(set-message-queue-rev-q! mq null))
(define q (message-queue-q mq))
(cond
[(null? q)
(define waiters (message-queue-waiters mq))
(cond
[(hash-ref waiters current-place #f)
=> (lambda (s)
(host:mutex-release lock)
(fail-k s))]
[else
(define s (make-semaphore))
(set-message-queue-waiters! mq (hash-set waiters current-place s))
(host:mutex-release lock)
(fail-k s)])]
[else
(define new-q (cdr q))
(set-message-queue-q! mq new-q)
(when (null? new-q)
(set-box! (message-queue-key-box mq) #f))
(host:mutex-release lock)
(success-k (car q))]))
;; ----------------------------------------
(struct pchannel (in-mq-e ; ephemeron of writer key and message-queue
out-mq-e ; ephemeron of reader key and message-queue
reader-key
writer-key
in-key-box) ; causes in-mq-e to be retained when non-empty
#:reflection-name 'place-channel
#:property prop:evt (poller (lambda (self poll-ctx)
(define in-mq (ephemeron-value (pchannel-in-mq-e self)))
(if in-mq
(dequeue! in-mq
(lambda (v)
(values #f
(wrap-evt
always-evt
(lambda (a)
;; Convert when out of atomic region
(un-message-ize v)))))
(lambda (sema)
(values #f (replace-evt sema (lambda (s) self)))))
(values #f never-evt))))
#:property prop:place-message (lambda (self) (lambda () (lambda () self))))
(define (place-channel? v)
(pchannel? v))
(or (pchannel? v)
(place? v)))
(define (unwrap-place-channel in-pch)
(if (place? in-pch)
(place-pch in-pch)
in-pch))
(define (place-channel)
(values (pchannel)
(pchannel)))
(define mq1 (make-message-queue))
(define mq2 (make-message-queue))
(define rk1 (gensym 'read))
(define wk1 (gensym 'write))
(define rk2 (gensym 'read))
(define wk2 (gensym 'write))
(values (pchannel (make-ephemeron wk1 mq1) (make-ephemeron rk2 mq2) rk1 wk2 (message-queue-key-box mq1))
(pchannel (make-ephemeron wk2 mq2) (make-ephemeron rk1 mq1) rk2 wk1 (message-queue-key-box mq2))))
(define (place-channel-get pch)
(sync never-evt))
(define/who (place-channel-get in-pch)
(check who place-channel? in-pch)
(define pch (unwrap-place-channel in-pch))
(define in-mq (ephemeron-value (pchannel-in-mq-e pch)))
(if in-mq
(begin
(start-atomic)
(dequeue! in-mq
(lambda (v)
(end-atomic)
(un-message-ize v))
(lambda (sema)
(end-atomic)
(semaphore-wait sema)
(place-channel-get pch))))
(sync never-evt)))
(define (place-channel-put pch v)
(void))
(define/who (place-channel-put in-pch in-v)
(check who place-channel? in-pch)
(define v
(if (place-message-allowed-direct? in-v)
in-v
(message-ize
in-v
(lambda ()
(raise-argument-error who "place-message-allowed?" in-v)))))
(define pch (unwrap-place-channel in-pch))
(define out-mq (ephemeron-value (pchannel-out-mq-e pch)))
(when out-mq
(enqueue! out-mq v (pchannel-writer-key pch))))
(define (place-message-allowed? v)
#t)
;; ----------------------------------------
;; in atomic mode
(define (ensure-wakeup-handle!)
(unless (place-wakeup-handle current-place)
(set-place-wakeup-handle! current-place (sandman-get-wakeup-handle))))
;; in atomic mode
(define (wakeup-waiting k)
(host:mutex-acquire (place-lock k))
(unless (place-result k)
(sandman-wakeup (place-wakeup-handle k)))
(host:mutex-release (place-lock k)))
;; ----------------------------------------
@ -232,7 +397,7 @@
(set-place-pumpers! p vec))
(define (unsafe-add-post-custodian-shutdown proc)
(when current-place
(when (place-parent current-place)
(atomically
(set-place-post-shutdown! current-place
(cons proc

View File

@ -21,7 +21,7 @@
(provide call-in-main-thread
call-in-another-main-thread
set-atomic-timeout-callback!
set-check-place-break!)
set-check-place-activity!)
(define TICKS 100000)
@ -47,7 +47,7 @@
pending-callbacks))
(host:poll-will-executors)
(check-external-events 'fast)
(check-place-break)
(check-place-activity)
(when (and (null? callbacks)
(all-threads-poll-done?)
(waiting-on-external-or-idle?))
@ -233,6 +233,6 @@
;; ----------------------------------------
(define check-place-break void)
(define (set-check-place-break! proc)
(set! check-place-break proc))
(define check-place-activity void)
(define (set-check-place-activity! proc)
(set! check-place-activity proc))

View File

@ -20,7 +20,8 @@
semaphore-any-waiters?
semaphore-post/atomic
semaphore-wait/atomic)
semaphore-wait/atomic
semaphore-post-all/atomic)
(struct semaphore ([count #:mutable]
queue)
@ -71,12 +72,16 @@
;; Don't consume a post for a peek waiter
(loop))])))
;; In atomic mode
(define (semaphore-post-all/atomic s)
(set-semaphore-count! s +inf.0)
(queue-remove-all!
(semaphore-queue s)
(lambda (w) (waiter-resume! w s))))
(define (semaphore-post-all s)
(atomically
(set-semaphore-count! s +inf.0)
(queue-remove-all!
(semaphore-queue s)
(lambda (w) (waiter-resume! w s)))))
(semaphore-post-all/atomic s)))
;; In atomic mode:
(define (semaphore-any-waiters? s)

View File

@ -88,7 +88,8 @@
(module* for-place #f
(provide root-thread
do-break-thread
break>?))
break>?
thread-did-work!))
;; ----------------------------------------