From 5e20081b697951370ae69df9ad6a25ce64a240cc Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Fri, 9 Mar 2012 12:15:45 -0700 Subject: [PATCH] [Distributed Places] added ability to launch remote threads --- collects/racket/place.rkt | 120 ++------------ .../racket/place/define-remote-server.rkt | 13 +- collects/racket/place/distributed.rkt | 63 +++++--- .../distributed/examples/logging/master.rkt | 4 +- .../distributed/examples/multiple/master.rkt | 2 +- .../distributed/examples/named/master.rkt | 4 +- .../distributed/examples/thread/master.rkt | 58 +++++++ collects/racket/place/private/th-place.rkt | 147 ++++++++++++++++++ 8 files changed, 274 insertions(+), 137 deletions(-) create mode 100644 collects/racket/place/distributed/examples/thread/master.rkt create mode 100644 collects/racket/place/private/th-place.rkt diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index a9f6e1121f..4291da3af8 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -7,6 +7,7 @@ racket/fixnum racket/flonum racket/vector + racket/place/private/th-place mzlib/private/streams (for-syntax racket/base @@ -15,7 +16,7 @@ (provide dynamic-place dynamic-place* place-sleep - place-wait + place-wait place-kill place-break place-channel @@ -29,103 +30,14 @@ place place* (rename-out [pl-place-enabled? place-enabled?]) - place-dead-evt) + place-dead-evt + ) -(define-struct TH-place (th ch cust) - #:property prop:evt (lambda (x) (TH-place-channel-in (TH-place-ch x)))) (define (place-channel-put/get ch msg) (place-channel-put ch msg) (place-channel-get ch)) -(define (make-th-async-channel) - (define ch (make-channel)) - (values - (thread - (lambda () - (let loop () - (let ([v (thread-receive)]) - (channel-put ch v) - (loop))))) - ch)) - -(define (th-dynamic-place mod funcname) - (unless (or (path-string? mod) (resolved-module-path? mod)) - (raise-type-error 'place "resolved-module-path? or path-string?" 0 mod funcname)) - (unless (symbol? funcname) - (raise-type-error 'place "symbol?" 1 mod funcname)) - (define-values (pch cch) (th-place-channel)) - (define cust (make-custodian-from-main)) - (define th (thread - (lambda () - (with-continuation-mark - parameterization-key - orig-paramz - (parameterize ([current-namespace (make-base-namespace)] - [current-custodian cust]) - ((dynamic-require mod funcname) cch)))))) - (TH-place th pch cust)) - -(define (th-place-sleep n) (sleep n)) -(define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0) -(define (th-place-kill pl) (custodian-shutdown-all (TH-place-cust pl))) -(define (th-place-break pl) (break-thread (TH-place-th pl))) -(define (th-place-dead-evt pl) (thread-dead-evt (TH-place-th pl))) -(define (th-place-channel) - (define-values (as ar) (make-th-async-channel)) - (define-values (bs br) (make-th-async-channel)) - (define pch (TH-place-channel ar bs)) - (define cch (TH-place-channel br as)) - (values pch cch)) - -(define (deep-copy x) - (define (dcw o) - (cond - [(ormap (lambda (x) (x o)) (list number? char? boolean? null? void? string? symbol? TH-place-channel?)) o] - [(cond - [(path? o) (path->bytes o)] - [(bytes? o) (if (pl-place-shared? o) o (bytes-copy o))] - [(fxvector? o) (if (pl-place-shared? o) o (fxvector-copy o))] - [(flvector? o) (if (pl-place-shared? o) o (flvector-copy o))] - [else #f]) - => values] - [(TH-place? o) (dcw (TH-place-ch o))] - [(pair? o) (cons (dcw (car o)) (dcw (cdr o)))] - [(vector? o) (vector-map! dcw (vector-copy o))] - [(struct? o) - (define key (prefab-struct-key o)) - (when (not key) - (error "Must be a prefab struct")) - (apply make-prefab-struct - key - (map dcw (cdr (vector->list (struct->vector o)))))] - [else (raise-mismatch-error 'place-channel-put "cannot transmit a message containing value: " o)])) - - (dcw x)) - - -(define (th-place-channel-put pl msg) - (define th - (cond - [(TH-place? pl) (TH-place-channel-out (TH-place-ch pl))] - [(TH-place-channel? pl) (TH-place-channel-out pl)] - [else (raise-type-error 'place-channel-put "expect a place? or place-channel?" pl)])) - (void (thread-send th (deep-copy msg) #f))) - -(define (th-place-channel-get pl) - (channel-get - (cond - [(TH-place? pl) (TH-place-channel-in (TH-place-ch pl))] - [(TH-place-channel? pl) (TH-place-channel-in pl)] - [else (raise-type-error 'place-channel-get "expect a place? or place-channel?" pl)]))) - -(define (th-place-channel? pl) - (or (TH-place? pl) - (TH-place-channel? pl))) - -(define (th-place-message-allowed? pl) - #t) - (define-syntax-rule (define-pl x p t) (define x (if (pl-place-enabled?) p t))) (define-pl place-sleep pl-place-sleep th-place-sleep) @@ -136,7 +48,7 @@ (define-pl place-channel-put pl-place-channel-put th-place-channel-put) (define-pl place-channel-get pl-place-channel-get th-place-channel-get) (define-pl place-channel? pl-place-channel? th-place-channel?) -(define-pl place? pl-place? TH-place?) +(define-pl place? pl-place? th-place?) (define-pl place-message-allowed? pl-place-message-allowed? th-place-message-allowed?) (define-pl place-dead-evt pl-place-dead-evt th-place-dead-evt) @@ -148,7 +60,7 @@ [else (void)])) (define (dynamic-place module-path function) - (start-place 'dynamic-place module-path function + (start-place 'dynamic-place module-path function #f (current-output-port) (current-error-port))) (define (dynamic-place* module-path @@ -160,8 +72,8 @@ (define (start-place who module-path function in out err) (define-values (p i o e) (start-place* who - module-path - function + module-path + function in out err)) @@ -169,7 +81,7 @@ p) (define (start-place* who module-path function in out err) - ;; Duplicate checks in that are in the primitive `pl-dynamic-place', + ;; Duplicate checks in that are in the primitive `pl-dynamic-place', ;; unfortunately, but we want these checks before we start making ;; stream-pumping threads, etc. (unless (or (module-path? module-path) (path? module-path)) @@ -201,7 +113,7 @@ (if-stream-out who err))) (pump-place p pin pout perr in out err) - (values p + (values p (and (not in) pin) (and (not out) pout) (and (not err) perr))] @@ -221,12 +133,12 @@ (define-for-syntax (modpath->string modpath) - (cond + (cond [(equal? modpath #f) (number->string (current-inexact-milliseconds))] [else (define name (resolved-module-path-name modpath)) - (cond + (cond [(symbol? name) (symbol->string name)] [(path? name) (path->string name)])])) @@ -242,10 +154,10 @@ (raise-syntax-error #f "can only be used in a module" stx)) (unless (identifier? #'ch) (raise-syntax-error #f "expected an identifier" stx #'ch)) - (define func-name-stx - (datum->syntax stx - (string->symbol - (string-append "place/anon" + (define func-name-stx + (datum->syntax stx + (string->symbol + (string-append "place/anon" (modpath->string (current-module-declare-name)))))) (with-syntax ([internal-def-name (syntax-local-lift-expression #'(lambda (ch) body1 body ...))] diff --git a/collects/racket/place/define-remote-server.rkt b/collects/racket/place/define-remote-server.rkt index 9e2ca51ee5..26fc573a06 100644 --- a/collects/racket/place/define-remote-server.rkt +++ b/collects/racket/place/define-remote-server.rkt @@ -2,6 +2,7 @@ (require (for-syntax racket/base) (for-syntax syntax/stx) racket/place + racket/place/private/th-place racket/match racket/class racket/stxparam @@ -20,11 +21,13 @@ (define (dplace/place-channel-get dest) (cond [(place-channel? dest) (place-channel-get dest)] + [(th-place-channel? dest) (th-place-channel-get dest)] [else (send dest get-msg)])) (define (dplace/place-channel-put dest msg) (cond [(place-channel? dest) (place-channel-put dest msg)] + [(th-place-channel? dest) (th-place-channel-put dest msg)] [else (send dest put-msg msg)])) @@ -105,7 +108,7 @@ (with-syntax ([fname-symbol #'(quote fname)] [(send-line (... ...)) (cond - [(is-id? 'define-rpc #'define-type) #'((place-channel-put send-dest result))] + [(is-id? 'define-rpc #'define-type) #'((dplace/place-channel-put send-dest result))] [(is-id? 'define-cast #'define-type) #'()] [else (raise "Bad define in define-remote-server")])]) #'[receive-line @@ -114,13 +117,13 @@ body (... ...))) send-line (... ...) (loop)]))]))]) - #`(place ch + #`(lambda (ch) (let () states2 (... ...) (let loop () - (define msg (place-channel-get ch)) + (define msg (dplace/place-channel-get ch)) (define (log-to-parent-real msg #:severity [severity 'info]) - (place-channel-put ch (log-message severity msg))) + (dplace/place-channel-put ch (log-message severity msg))) (syntax-parameterize ([log-to-parent (make-rename-transformer #'log-to-parent-real)]) (match msg cases (... ...) @@ -133,7 +136,7 @@ (require racket/place racket/match) #,@trans-rpcs - (define/provide (mkname) #,trans-place) + (define/provide mkname #,trans-place) (void))) ;(pretty-print (syntax->datum x)) x))])) diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 06c9074c75..7ce22655b8 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -3,6 +3,7 @@ racket/match racket/tcp racket/place + racket/place/private/th-place racket/class racket/trait racket/udp @@ -21,7 +22,12 @@ spawn-vm-supervise-place-thunk-at spawn-vm-supervise-dynamic-place-at/2 spawn-vm-supervise-place-thunk-at/2 + supervise-named-dynamic-place-at supervise-named-place-thunk-at + supervise-place-thunk-at + supervise-dynamic-place-at + supervise-thread-at + supervise-process-at every-seconds after-seconds @@ -32,8 +38,6 @@ spawn-remote-racket-vm node-send-exit node-get-first-place - supervise-place-thunk-at - supervise-dynamic-place-at dplace-put dplace-get @@ -43,8 +47,6 @@ ll-channel-put write-flush log-message - - ;; start-spawned-node-router ;;Coercion Routines @@ -155,7 +157,6 @@ (define (write-flush msg [p (current-output-port)]) - ;(printf "WRITING ~v\n" msg) (write msg p) (flush-output p)) @@ -359,8 +360,6 @@ (wrap-evt (if (dchannel? pch) (dchannel-ch pch) pch) (lambda (e) - ; (printf "MSG ~v\n" e) - ; (flush-output) (match e [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) (send node log-from-child #:severity severity msg)] @@ -438,7 +437,6 @@ [else (sconn-write-flush src-channel (dcgm DCGM-TYPE-INTER-DCHANNEL ch-id ch-id (format "ERROR: name not found ~a" name)))])] - [else (define np (new place% [place-exec place-exec] @@ -459,10 +457,11 @@ (define pch (sconn-lookup-subchannel src-channel ch-id)) (cond [(place-channel? pch) - ;(printf "SOCKET to PLACE CHANNEL ~a\n" msg) (place-channel-put pch msg)] [(is-a? pch connection%) - (send pch forward msg)])] + (send pch forward msg)] + [(th-place-channel? pch) + (th-place-channel-put pch msg)])] [(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1) (define vm (new remote-node% @@ -528,7 +527,6 @@ (sconn-get-forward-event x forward-mesg)] [(or (place-channel? x) (place? x)) (wrap-evt x (lambda (e) - ;(printf "VECTOR PLACE MESSAGE ~a\n" e) (forward-mesg e x)))]) n)) nes)] @@ -626,7 +624,6 @@ (tcp-connect rname (->number rport))))) (define (ensure-connected) - ;(printf "Waiting on connecting to ~a ~a\n" host port) (when connecting (match (channel-get ch) [(list _in _out) @@ -648,7 +645,6 @@ (define/public (get-forward-event forwarder) (when (equal? out #f) (ensure-connected)) (wrap-evt in (lambda (e) - ;(printf "VECTOR SOCKET MESSAGE ~a\n" e) (forwarder (read in) this)))) (define/public (read-message) @@ -661,7 +657,6 @@ (when (and host port background-connect) (set! connecting #t) (set! ch (make-channel)) - ;(printf "Delay connecting to ~a ~a\n" host port) (thread (lambda () (channel-put @@ -728,7 +723,6 @@ (define pch (sconn-lookup-subchannel sc ch-id)) (cond [(place-channel? pch) - ;(printf "SOCKET to PLACE CHANNEL ~a\n" msg) (place-channel-put pch msg)] [(is-a? pch connection%) (send pch forward msg)])] @@ -736,7 +730,6 @@ (define parent (send this get-router)) (cond [parent - ;(printf "Sent to Parent ~a ~a \n" severity msg) (send parent log-from-child #:severity severity msg)] [else (print-log-message severity msg)])] @@ -998,20 +991,33 @@ [(list 'dynamic-place place-path place-func) (dynamic-place (->path place-path) place-func)] [(list 'place place-path place-func) - ((dynamic-require (->path place-path) place-func))])) + ((dynamic-require (->path place-path) place-func))] + [(list 'thread place-path place-func) + (define-values (ch1 ch2) (th-place-channel)) + (define th + (thread + (lambda () + ((dynamic-require (->path place-path) place-func) ch1)))) + (th-place th ch2 null)])) (sconn-add-subchannel sc ch-id pd) (set! psb (new place-socket-bridge% [pch pd] [sch sc] [id ch-id] [node node])) (define/public (get-channel) pd) (define/public (stop) (cond - [pd + [(place? pd) (place-kill pd) (set! pd #f)] + [(th-place? pd) + (th-place-kill pd)] [else (void)])) ;send place not running message (define/public (register es) - (let* ([es (if pd (cons (wrap-evt (place-dead-evt pd) on-place-dead) es) es)] + (let* ([es (if pd (cons (wrap-evt + (cond + [(place? pd) (place-dead-evt pd)] + [(th-place? pd) (th-place-dead-evt pd)]) on-place-dead) + es) es)] [es (if psb (send psb register es) es)]) es)) (super-new) @@ -1155,7 +1161,6 @@ (define (remote-spawn) (define sp (new socket-connection% [host rname] [port rport])) (define msg (list my-id node-name node-cnt curr-conf-idx next-node-id rname rcnt conf)) - ;(printf "Sending ~v\n" msg) (sconn-write-flush sp msg) (for ([i (in-range rcnt)]) (vector-set! cv (+ next-node-id i) sp)) @@ -1212,7 +1217,6 @@ ) (define (supervise-named-place-thunk-at vm name place-path place-func - #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:initial-message [initial-message #f] #:restart-on-exit [restart-on-exit #f]) (send vm launch-place @@ -1220,6 +1224,16 @@ ;#:initial-message initial-message #:restart-on-exit restart-on-exit )) + +(define (supervise-named-dynamic-place-at vm name place-path place-func + #:initial-message [initial-message #f] + #:restart-on-exit [restart-on-exit #f]) + (send vm launch-place + (list 'dynamic-place (->string place-path) place-func (->string name)) + ;#:initial-message initial-message + #:restart-on-exit restart-on-exit + )) + (define (spawn-vm-supervise-dynamic-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] @@ -1296,9 +1310,9 @@ (values vm dp)) -(define (master-event-loop #:listen-port [listen-port DEFAULT-ROUTER-PORT] . event-containers) +(define (master-event-loop #:node [_nc #f] #:listen-port [listen-port DEFAULT-ROUTER-PORT] . event-containers) (define listener (tcp-listen listen-port 4 #t)) - (define nc (new node% [listen-port listener])) + (define nc (or _nc (new node% [listen-port listener]))) (for ([ec event-containers]) (send nc add-sub-ec ec) (send ec backlink nc)) @@ -1320,6 +1334,9 @@ (define (supervise-place-thunk-at remote-vm place-path place-func) (send remote-vm launch-place (list 'place (->string place-path) place-func))) +(define (supervise-thread-at remote-vm place-path place-func) + (send remote-vm launch-place (list 'thread (->string place-path) place-func))) + (define-syntax-rule (every-seconds _seconds _body ...) (new respawn-and-fire% [seconds _seconds] [thunk (lambda () _body ...)])) diff --git a/collects/racket/place/distributed/examples/logging/master.rkt b/collects/racket/place/distributed/examples/logging/master.rkt index 388afe07d9..75093fbe55 100644 --- a/collects/racket/place/distributed/examples/logging/master.rkt +++ b/collects/racket/place/distributed/examples/logging/master.rkt @@ -12,8 +12,8 @@ (define (main) (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) - (define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) - (define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank)) + (define tuple-place (supervise-named-dynamic-place-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) + (define bank-place (supervise-dynamic-place-at remote-vm bank-path 'make-bank)) (master-event-loop remote-vm diff --git a/collects/racket/place/distributed/examples/multiple/master.rkt b/collects/racket/place/distributed/examples/multiple/master.rkt index 2deb3bf674..92bad8ac44 100644 --- a/collects/racket/place/distributed/examples/multiple/master.rkt +++ b/collects/racket/place/distributed/examples/multiple/master.rkt @@ -23,7 +23,7 @@ (define (main) - (define bank-vm (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6344 bank-path 'make-bank)) + (define bank-vm (spawn-vm-supervise-dynamic-place-at "localhost" #:listen-port 6344 bank-path 'make-bank)) (define bank-place (send bank-vm get-first-place)) (master-event-loop (spawn-place-worker-at 6341 "ONE") diff --git a/collects/racket/place/distributed/examples/named/master.rkt b/collects/racket/place/distributed/examples/named/master.rkt index bf36adf060..73d8afe07e 100644 --- a/collects/racket/place/distributed/examples/named/master.rkt +++ b/collects/racket/place/distributed/examples/named/master.rkt @@ -12,8 +12,8 @@ (define (main) (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) - (define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) - (define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank)) + (define tuple-place (supervise-named-dynamic-place-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) + (define bank-place (supervise-dynamic-place-at remote-vm bank-path 'make-bank)) (master-event-loop remote-vm diff --git a/collects/racket/place/distributed/examples/thread/master.rkt b/collects/racket/place/distributed/examples/thread/master.rkt new file mode 100644 index 0000000000..44d3e640c6 --- /dev/null +++ b/collects/racket/place/distributed/examples/thread/master.rkt @@ -0,0 +1,58 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/match + racket/place + racket/place/define-remote-server + racket/runtime-path) + +(define-remote-server + bank + + (define-state accounts (make-hash)) + (define-rpc (new-account who) + (match (hash-has-key? accounts who) + [#t '(already-exists)] + [else + (hash-set! accounts who 0) + (list 'created who)])) + (define-rpc (removeM who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (cond [(<= amount balance) + (define new-balance (- balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance)] + [else + (list 'insufficient-funds balance)]))] + [else + (list 'invalid-account who)])) + (define-rpc (add who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (define new-balance (+ balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance))] + [else + (list 'invalid-account who)]))) + + +(provide main) + +(define (main) + (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) + (define bank-place (supervise-thread-at remote-vm (get-current-module-path) 'make-bank)) + + (master-event-loop + remote-vm + (after-seconds 2 + (displayln (bank-new-account bank-place 'user0)) + (displayln (bank-add bank-place 'user0 10)) + (displayln (bank-removeM bank-place 'user0 5))) + + (after-seconds 6 + (node-send-exit remote-vm)) + (after-seconds 8 + (exit 0)))) diff --git a/collects/racket/place/private/th-place.rkt b/collects/racket/place/private/th-place.rkt new file mode 100644 index 0000000000..ffb1714e24 --- /dev/null +++ b/collects/racket/place/private/th-place.rkt @@ -0,0 +1,147 @@ +#lang racket/base +(require (prefix-in pl- '#%place) + '#%boot + (only-in '#%paramz parameterization-key make-custodian-from-main) + '#%place-struct + racket/fixnum + racket/flonum + racket/vector) + +(provide th-dynamic-place + ;th-dynamic-place* + th-place-sleep + th-place-wait + th-place-kill + th-place-break + th-place-channel + th-place-channel-put + th-place-channel-get + th-place-channel? + th-place + th-place? + th-place-message-allowed? + th-place-dead-evt + ) + + +(define-struct TH-place (th ch cust) + #:property prop:evt (lambda (x) (TH-place-channel-in (TH-place-ch x)))) +(define th-place? TH-place?) +(define th-place TH-place) + +(define (make-th-async-channel) + (define ch (make-channel)) + (values + (thread + (lambda () + (let loop () + (let ([v (thread-receive)]) + (channel-put ch v) + (loop))))) + ch)) + +(define (th-dynamic-place mod funcname) + (unless (or (path-string? mod) (resolved-module-path? mod)) + (raise-type-error 'place "resolved-module-path? or path-string?" 0 mod funcname)) + (unless (symbol? funcname) + (raise-type-error 'place "symbol?" 1 mod funcname)) + (define-values (pch cch) (th-place-channel)) + (define cust (make-custodian-from-main)) + (define th (thread + (lambda () + (with-continuation-mark + parameterization-key + orig-paramz + (parameterize ([current-namespace (make-base-namespace)] + [current-custodian cust]) + ((dynamic-require mod funcname) cch)))))) + (TH-place th pch cust)) + +(define (th-place-sleep n) (sleep n)) +(define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0) +(define (th-place-kill pl) (custodian-shutdown-all (TH-place-cust pl))) +(define (th-place-break pl) (break-thread (TH-place-th pl))) +(define (th-place-dead-evt pl) (thread-dead-evt (TH-place-th pl))) +(define (th-place-channel) + (define-values (as ar) (make-th-async-channel)) + (define-values (bs br) (make-th-async-channel)) + (define pch (TH-place-channel ar bs)) + (define cch (TH-place-channel br as)) + (values pch cch)) + +(define (deep-copy x) + (define (dcw o) + (cond + [(ormap (lambda (x) (x o)) (list number? char? boolean? null? void? string? symbol? TH-place-channel?)) o] + [(cond + [(path? o) (path->bytes o)] + [(bytes? o) (if (pl-place-shared? o) o (bytes-copy o))] + [(fxvector? o) (if (pl-place-shared? o) o (fxvector-copy o))] + [(flvector? o) (if (pl-place-shared? o) o (flvector-copy o))] + [else #f]) + => values] + [(TH-place? o) (dcw (TH-place-ch o))] + [(pair? o) (cons (dcw (car o)) (dcw (cdr o)))] + [(vector? o) (vector-map! dcw (vector-copy o))] + [(hash-equal? o) + (for/fold ([nh (hash)]) ([p (in-hash-pairs o)]) + (hash-set nh (dcw (car p)) (dcw (cdr p))))] + [(hash-eq? o) + (for/fold ([nh (hasheq)]) ([p (in-hash-pairs o)]) + (hash-set nh (dcw (car p)) (dcw (cdr p))))] + [(hash-eqv? o) + (for/fold ([nh (hasheqv)]) ([p (in-hash-pairs o)]) + (hash-set nh (dcw (car p)) (dcw (cdr p))))] + [(struct? o) + (define key (prefab-struct-key o)) + (when (not key) + (error "Must be a prefab struct")) + (apply make-prefab-struct + key + (map dcw (cdr (vector->list (struct->vector o)))))] + [else (raise-mismatch-error 'place-channel-put "cannot transmit a message containing value: " o)])) + + (dcw x)) + + +(define (th-place-channel-put pl msg) + (define th + (cond + [(TH-place? pl) (TH-place-channel-out (TH-place-ch pl))] + [(TH-place-channel? pl) (TH-place-channel-out pl)] + [else (raise-type-error 'place-channel-put "expect a place? or place-channel?" pl)])) + (void (thread-send th (deep-copy msg) #f))) + +(define (th-place-channel-get pl) + (channel-get + (cond + [(TH-place? pl) (TH-place-channel-in (TH-place-ch pl))] + [(TH-place-channel? pl) (TH-place-channel-in pl)] + [else (raise-type-error 'place-channel-get "expect a place? or place-channel?" pl)]))) + +(define (th-place-channel? pl) + (or (TH-place? pl) + (TH-place-channel? pl))) + +(define (th-place-message-allowed? x) + (define (dcw o) + (cond + [(ormap (lambda (x) (x o)) (list number? char? boolean? null? void? string? symbol? TH-place-channel? + path? bytes? fxvector? flvector? TH-place?)) #t] + [(pair? o) (and (dcw (car o)) (dcw (cdr o)))] + [(vector? o) + (for/fold ([nh #t]) ([i (in-vector o)]) + (and nh (dcw i)))] + [(hash? o) + (for/fold ([nh #t]) ([p (in-hash-pairs o)]) + (and nh (dcw (car p)) (dcw (cdr p))))] + [(struct? o) + (define key (prefab-struct-key o)) + (when (not key) + (error "Must be a prefab struct")) + (for/fold ([nh #t]) ([p (cdr (vector->list (struct->vector o)))]) + (and nh (dcw p)))] + [else (raise-mismatch-error 'place-channel-put "cannot transmit a message containing value: " o)])) + + (dcw x) + #t)