[Distributed Places] added ability to launch remote threads

This commit is contained in:
Kevin Tew 2012-03-09 12:15:45 -07:00
parent e73be4a9ae
commit 5e20081b69
8 changed files with 274 additions and 137 deletions

View File

@ -7,6 +7,7 @@
racket/fixnum
racket/flonum
racket/vector
racket/place/private/th-place
mzlib/private/streams
(for-syntax racket/base
@ -15,7 +16,7 @@
(provide dynamic-place
dynamic-place*
place-sleep
place-wait
place-wait
place-kill
place-break
place-channel
@ -29,103 +30,14 @@
place
place*
(rename-out [pl-place-enabled? place-enabled?])
place-dead-evt)
place-dead-evt
)
(define-struct TH-place (th ch cust)
#:property prop:evt (lambda (x) (TH-place-channel-in (TH-place-ch x))))
(define (place-channel-put/get ch msg)
(place-channel-put ch msg)
(place-channel-get ch))
(define (make-th-async-channel)
(define ch (make-channel))
(values
(thread
(lambda ()
(let loop ()
(let ([v (thread-receive)])
(channel-put ch v)
(loop)))))
ch))
(define (th-dynamic-place mod funcname)
(unless (or (path-string? mod) (resolved-module-path? mod))
(raise-type-error 'place "resolved-module-path? or path-string?" 0 mod funcname))
(unless (symbol? funcname)
(raise-type-error 'place "symbol?" 1 mod funcname))
(define-values (pch cch) (th-place-channel))
(define cust (make-custodian-from-main))
(define th (thread
(lambda ()
(with-continuation-mark
parameterization-key
orig-paramz
(parameterize ([current-namespace (make-base-namespace)]
[current-custodian cust])
((dynamic-require mod funcname) cch))))))
(TH-place th pch cust))
(define (th-place-sleep n) (sleep n))
(define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0)
(define (th-place-kill pl) (custodian-shutdown-all (TH-place-cust pl)))
(define (th-place-break pl) (break-thread (TH-place-th pl)))
(define (th-place-dead-evt pl) (thread-dead-evt (TH-place-th pl)))
(define (th-place-channel)
(define-values (as ar) (make-th-async-channel))
(define-values (bs br) (make-th-async-channel))
(define pch (TH-place-channel ar bs))
(define cch (TH-place-channel br as))
(values pch cch))
(define (deep-copy x)
(define (dcw o)
(cond
[(ormap (lambda (x) (x o)) (list number? char? boolean? null? void? string? symbol? TH-place-channel?)) o]
[(cond
[(path? o) (path->bytes o)]
[(bytes? o) (if (pl-place-shared? o) o (bytes-copy o))]
[(fxvector? o) (if (pl-place-shared? o) o (fxvector-copy o))]
[(flvector? o) (if (pl-place-shared? o) o (flvector-copy o))]
[else #f])
=> values]
[(TH-place? o) (dcw (TH-place-ch o))]
[(pair? o) (cons (dcw (car o)) (dcw (cdr o)))]
[(vector? o) (vector-map! dcw (vector-copy o))]
[(struct? o)
(define key (prefab-struct-key o))
(when (not key)
(error "Must be a prefab struct"))
(apply make-prefab-struct
key
(map dcw (cdr (vector->list (struct->vector o)))))]
[else (raise-mismatch-error 'place-channel-put "cannot transmit a message containing value: " o)]))
(dcw x))
(define (th-place-channel-put pl msg)
(define th
(cond
[(TH-place? pl) (TH-place-channel-out (TH-place-ch pl))]
[(TH-place-channel? pl) (TH-place-channel-out pl)]
[else (raise-type-error 'place-channel-put "expect a place? or place-channel?" pl)]))
(void (thread-send th (deep-copy msg) #f)))
(define (th-place-channel-get pl)
(channel-get
(cond
[(TH-place? pl) (TH-place-channel-in (TH-place-ch pl))]
[(TH-place-channel? pl) (TH-place-channel-in pl)]
[else (raise-type-error 'place-channel-get "expect a place? or place-channel?" pl)])))
(define (th-place-channel? pl)
(or (TH-place? pl)
(TH-place-channel? pl)))
(define (th-place-message-allowed? pl)
#t)
(define-syntax-rule (define-pl x p t) (define x (if (pl-place-enabled?) p t)))
(define-pl place-sleep pl-place-sleep th-place-sleep)
@ -136,7 +48,7 @@
(define-pl place-channel-put pl-place-channel-put th-place-channel-put)
(define-pl place-channel-get pl-place-channel-get th-place-channel-get)
(define-pl place-channel? pl-place-channel? th-place-channel?)
(define-pl place? pl-place? TH-place?)
(define-pl place? pl-place? th-place?)
(define-pl place-message-allowed? pl-place-message-allowed? th-place-message-allowed?)
(define-pl place-dead-evt pl-place-dead-evt th-place-dead-evt)
@ -148,7 +60,7 @@
[else (void)]))
(define (dynamic-place module-path function)
(start-place 'dynamic-place module-path function
(start-place 'dynamic-place module-path function
#f (current-output-port) (current-error-port)))
(define (dynamic-place* module-path
@ -160,8 +72,8 @@
(define (start-place who module-path function in out err)
(define-values (p i o e) (start-place* who
module-path
function
module-path
function
in
out
err))
@ -169,7 +81,7 @@
p)
(define (start-place* who module-path function in out err)
;; Duplicate checks in that are in the primitive `pl-dynamic-place',
;; Duplicate checks in that are in the primitive `pl-dynamic-place',
;; unfortunately, but we want these checks before we start making
;; stream-pumping threads, etc.
(unless (or (module-path? module-path) (path? module-path))
@ -201,7 +113,7 @@
(if-stream-out who err)))
(pump-place p pin pout perr in out err)
(values p
(values p
(and (not in) pin)
(and (not out) pout)
(and (not err) perr))]
@ -221,12 +133,12 @@
(define-for-syntax (modpath->string modpath)
(cond
(cond
[(equal? modpath #f)
(number->string (current-inexact-milliseconds))]
[else
(define name (resolved-module-path-name modpath))
(cond
(cond
[(symbol? name) (symbol->string name)]
[(path? name) (path->string name)])]))
@ -242,10 +154,10 @@
(raise-syntax-error #f "can only be used in a module" stx))
(unless (identifier? #'ch)
(raise-syntax-error #f "expected an identifier" stx #'ch))
(define func-name-stx
(datum->syntax stx
(string->symbol
(string-append "place/anon"
(define func-name-stx
(datum->syntax stx
(string->symbol
(string-append "place/anon"
(modpath->string (current-module-declare-name))))))
(with-syntax ([internal-def-name
(syntax-local-lift-expression #'(lambda (ch) body1 body ...))]

View File

@ -2,6 +2,7 @@
(require (for-syntax racket/base)
(for-syntax syntax/stx)
racket/place
racket/place/private/th-place
racket/match
racket/class
racket/stxparam
@ -20,11 +21,13 @@
(define (dplace/place-channel-get dest)
(cond
[(place-channel? dest) (place-channel-get dest)]
[(th-place-channel? dest) (th-place-channel-get dest)]
[else (send dest get-msg)]))
(define (dplace/place-channel-put dest msg)
(cond
[(place-channel? dest) (place-channel-put dest msg)]
[(th-place-channel? dest) (th-place-channel-put dest msg)]
[else (send dest put-msg msg)]))
@ -105,7 +108,7 @@
(with-syntax ([fname-symbol #'(quote fname)]
[(send-line (... ...))
(cond
[(is-id? 'define-rpc #'define-type) #'((place-channel-put send-dest result))]
[(is-id? 'define-rpc #'define-type) #'((dplace/place-channel-put send-dest result))]
[(is-id? 'define-cast #'define-type) #'()]
[else (raise "Bad define in define-remote-server")])])
#'[receive-line
@ -114,13 +117,13 @@
body (... ...)))
send-line (... ...)
(loop)]))]))])
#`(place ch
#`(lambda (ch)
(let ()
states2 (... ...)
(let loop ()
(define msg (place-channel-get ch))
(define msg (dplace/place-channel-get ch))
(define (log-to-parent-real msg #:severity [severity 'info])
(place-channel-put ch (log-message severity msg)))
(dplace/place-channel-put ch (log-message severity msg)))
(syntax-parameterize ([log-to-parent (make-rename-transformer #'log-to-parent-real)])
(match msg
cases (... ...)
@ -133,7 +136,7 @@
(require racket/place
racket/match)
#,@trans-rpcs
(define/provide (mkname) #,trans-place)
(define/provide mkname #,trans-place)
(void)))
;(pretty-print (syntax->datum x))
x))]))

View File

@ -3,6 +3,7 @@
racket/match
racket/tcp
racket/place
racket/place/private/th-place
racket/class
racket/trait
racket/udp
@ -21,7 +22,12 @@
spawn-vm-supervise-place-thunk-at
spawn-vm-supervise-dynamic-place-at/2
spawn-vm-supervise-place-thunk-at/2
supervise-named-dynamic-place-at
supervise-named-place-thunk-at
supervise-place-thunk-at
supervise-dynamic-place-at
supervise-thread-at
supervise-process-at
every-seconds
after-seconds
@ -32,8 +38,6 @@
spawn-remote-racket-vm
node-send-exit
node-get-first-place
supervise-place-thunk-at
supervise-dynamic-place-at
dplace-put
dplace-get
@ -43,8 +47,6 @@
ll-channel-put
write-flush
log-message
;;
start-spawned-node-router
;;Coercion Routines
@ -155,7 +157,6 @@
(define (write-flush msg [p (current-output-port)])
;(printf "WRITING ~v\n" msg)
(write msg p)
(flush-output p))
@ -359,8 +360,6 @@
(wrap-evt
(if (dchannel? pch) (dchannel-ch pch) pch)
(lambda (e)
; (printf "MSG ~v\n" e)
; (flush-output)
(match e
[(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg))
(send node log-from-child #:severity severity msg)]
@ -438,7 +437,6 @@
[else
(sconn-write-flush src-channel (dcgm DCGM-TYPE-INTER-DCHANNEL ch-id ch-id
(format "ERROR: name not found ~a" name)))])]
[else
(define np (new place%
[place-exec place-exec]
@ -459,10 +457,11 @@
(define pch (sconn-lookup-subchannel src-channel ch-id))
(cond
[(place-channel? pch)
;(printf "SOCKET to PLACE CHANNEL ~a\n" msg)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)])]
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)])]
[(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1)
(define vm
(new remote-node%
@ -528,7 +527,6 @@
(sconn-get-forward-event x forward-mesg)]
[(or (place-channel? x) (place? x))
(wrap-evt x (lambda (e)
;(printf "VECTOR PLACE MESSAGE ~a\n" e)
(forward-mesg e x)))])
n))
nes)]
@ -626,7 +624,6 @@
(tcp-connect rname (->number rport)))))
(define (ensure-connected)
;(printf "Waiting on connecting to ~a ~a\n" host port)
(when connecting
(match (channel-get ch)
[(list _in _out)
@ -648,7 +645,6 @@
(define/public (get-forward-event forwarder)
(when (equal? out #f) (ensure-connected))
(wrap-evt in (lambda (e)
;(printf "VECTOR SOCKET MESSAGE ~a\n" e)
(forwarder (read in) this))))
(define/public (read-message)
@ -661,7 +657,6 @@
(when (and host port background-connect)
(set! connecting #t)
(set! ch (make-channel))
;(printf "Delay connecting to ~a ~a\n" host port)
(thread
(lambda ()
(channel-put
@ -728,7 +723,6 @@
(define pch (sconn-lookup-subchannel sc ch-id))
(cond
[(place-channel? pch)
;(printf "SOCKET to PLACE CHANNEL ~a\n" msg)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)])]
@ -736,7 +730,6 @@
(define parent (send this get-router))
(cond
[parent
;(printf "Sent to Parent ~a ~a \n" severity msg)
(send parent log-from-child #:severity severity msg)]
[else (print-log-message severity msg)])]
@ -998,20 +991,33 @@
[(list 'dynamic-place place-path place-func)
(dynamic-place (->path place-path) place-func)]
[(list 'place place-path place-func)
((dynamic-require (->path place-path) place-func))]))
((dynamic-require (->path place-path) place-func))]
[(list 'thread place-path place-func)
(define-values (ch1 ch2) (th-place-channel))
(define th
(thread
(lambda ()
((dynamic-require (->path place-path) place-func) ch1))))
(th-place th ch2 null)]))
(sconn-add-subchannel sc ch-id pd)
(set! psb (new place-socket-bridge% [pch pd] [sch sc] [id ch-id] [node node]))
(define/public (get-channel) pd)
(define/public (stop)
(cond
[pd
[(place? pd)
(place-kill pd)
(set! pd #f)]
[(th-place? pd)
(th-place-kill pd)]
[else (void)])) ;send place not running message
(define/public (register es)
(let* ([es (if pd (cons (wrap-evt (place-dead-evt pd) on-place-dead) es) es)]
(let* ([es (if pd (cons (wrap-evt
(cond
[(place? pd) (place-dead-evt pd)]
[(th-place? pd) (th-place-dead-evt pd)]) on-place-dead)
es) es)]
[es (if psb (send psb register es) es)])
es))
(super-new)
@ -1155,7 +1161,6 @@
(define (remote-spawn)
(define sp (new socket-connection% [host rname] [port rport]))
(define msg (list my-id node-name node-cnt curr-conf-idx next-node-id rname rcnt conf))
;(printf "Sending ~v\n" msg)
(sconn-write-flush sp msg)
(for ([i (in-range rcnt)])
(vector-set! cv (+ next-node-id i) sp))
@ -1212,7 +1217,6 @@
)
(define (supervise-named-place-thunk-at vm name place-path place-func
#:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:initial-message [initial-message #f]
#:restart-on-exit [restart-on-exit #f])
(send vm launch-place
@ -1220,6 +1224,16 @@
;#:initial-message initial-message
#:restart-on-exit restart-on-exit
))
(define (supervise-named-dynamic-place-at vm name place-path place-func
#:initial-message [initial-message #f]
#:restart-on-exit [restart-on-exit #f])
(send vm launch-place
(list 'dynamic-place (->string place-path) place-func (->string name))
;#:initial-message initial-message
#:restart-on-exit restart-on-exit
))
(define (spawn-vm-supervise-dynamic-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
@ -1296,9 +1310,9 @@
(values vm dp))
(define (master-event-loop #:listen-port [listen-port DEFAULT-ROUTER-PORT] . event-containers)
(define (master-event-loop #:node [_nc #f] #:listen-port [listen-port DEFAULT-ROUTER-PORT] . event-containers)
(define listener (tcp-listen listen-port 4 #t))
(define nc (new node% [listen-port listener]))
(define nc (or _nc (new node% [listen-port listener])))
(for ([ec event-containers])
(send nc add-sub-ec ec)
(send ec backlink nc))
@ -1320,6 +1334,9 @@
(define (supervise-place-thunk-at remote-vm place-path place-func)
(send remote-vm launch-place (list 'place (->string place-path) place-func)))
(define (supervise-thread-at remote-vm place-path place-func)
(send remote-vm launch-place (list 'thread (->string place-path) place-func)))
(define-syntax-rule (every-seconds _seconds _body ...)
(new respawn-and-fire% [seconds _seconds] [thunk (lambda () _body ...)]))

View File

@ -12,8 +12,8 @@
(define (main)
(define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344))
(define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server))
(define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank))
(define tuple-place (supervise-named-dynamic-place-at remote-vm 'tuple-server tuple-path 'make-tuple-server))
(define bank-place (supervise-dynamic-place-at remote-vm bank-path 'make-bank))
(master-event-loop
remote-vm

View File

@ -23,7 +23,7 @@
(define (main)
(define bank-vm (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6344 bank-path 'make-bank))
(define bank-vm (spawn-vm-supervise-dynamic-place-at "localhost" #:listen-port 6344 bank-path 'make-bank))
(define bank-place (send bank-vm get-first-place))
(master-event-loop
(spawn-place-worker-at 6341 "ONE")

View File

@ -12,8 +12,8 @@
(define (main)
(define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344))
(define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server))
(define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank))
(define tuple-place (supervise-named-dynamic-place-at remote-vm 'tuple-server tuple-path 'make-tuple-server))
(define bank-place (supervise-dynamic-place-at remote-vm bank-path 'make-bank))
(master-event-loop
remote-vm

View File

@ -0,0 +1,58 @@
#lang racket/base
(require racket/place/distributed
racket/class
racket/match
racket/place
racket/place/define-remote-server
racket/runtime-path)
(define-remote-server
bank
(define-state accounts (make-hash))
(define-rpc (new-account who)
(match (hash-has-key? accounts who)
[#t '(already-exists)]
[else
(hash-set! accounts who 0)
(list 'created who)]))
(define-rpc (removeM who amount)
(cond
[(hash-ref accounts who (lambda () #f)) =>
(lambda (balance)
(cond [(<= amount balance)
(define new-balance (- balance amount))
(hash-set! accounts who new-balance)
(list 'ok new-balance)]
[else
(list 'insufficient-funds balance)]))]
[else
(list 'invalid-account who)]))
(define-rpc (add who amount)
(cond
[(hash-ref accounts who (lambda () #f)) =>
(lambda (balance)
(define new-balance (+ balance amount))
(hash-set! accounts who new-balance)
(list 'ok new-balance))]
[else
(list 'invalid-account who)])))
(provide main)
(define (main)
(define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344))
(define bank-place (supervise-thread-at remote-vm (get-current-module-path) 'make-bank))
(master-event-loop
remote-vm
(after-seconds 2
(displayln (bank-new-account bank-place 'user0))
(displayln (bank-add bank-place 'user0 10))
(displayln (bank-removeM bank-place 'user0 5)))
(after-seconds 6
(node-send-exit remote-vm))
(after-seconds 8
(exit 0))))

View File

@ -0,0 +1,147 @@
#lang racket/base
(require (prefix-in pl- '#%place)
'#%boot
(only-in '#%paramz parameterization-key make-custodian-from-main)
'#%place-struct
racket/fixnum
racket/flonum
racket/vector)
(provide th-dynamic-place
;th-dynamic-place*
th-place-sleep
th-place-wait
th-place-kill
th-place-break
th-place-channel
th-place-channel-put
th-place-channel-get
th-place-channel?
th-place
th-place?
th-place-message-allowed?
th-place-dead-evt
)
(define-struct TH-place (th ch cust)
#:property prop:evt (lambda (x) (TH-place-channel-in (TH-place-ch x))))
(define th-place? TH-place?)
(define th-place TH-place)
(define (make-th-async-channel)
(define ch (make-channel))
(values
(thread
(lambda ()
(let loop ()
(let ([v (thread-receive)])
(channel-put ch v)
(loop)))))
ch))
(define (th-dynamic-place mod funcname)
(unless (or (path-string? mod) (resolved-module-path? mod))
(raise-type-error 'place "resolved-module-path? or path-string?" 0 mod funcname))
(unless (symbol? funcname)
(raise-type-error 'place "symbol?" 1 mod funcname))
(define-values (pch cch) (th-place-channel))
(define cust (make-custodian-from-main))
(define th (thread
(lambda ()
(with-continuation-mark
parameterization-key
orig-paramz
(parameterize ([current-namespace (make-base-namespace)]
[current-custodian cust])
((dynamic-require mod funcname) cch))))))
(TH-place th pch cust))
(define (th-place-sleep n) (sleep n))
(define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0)
(define (th-place-kill pl) (custodian-shutdown-all (TH-place-cust pl)))
(define (th-place-break pl) (break-thread (TH-place-th pl)))
(define (th-place-dead-evt pl) (thread-dead-evt (TH-place-th pl)))
(define (th-place-channel)
(define-values (as ar) (make-th-async-channel))
(define-values (bs br) (make-th-async-channel))
(define pch (TH-place-channel ar bs))
(define cch (TH-place-channel br as))
(values pch cch))
(define (deep-copy x)
(define (dcw o)
(cond
[(ormap (lambda (x) (x o)) (list number? char? boolean? null? void? string? symbol? TH-place-channel?)) o]
[(cond
[(path? o) (path->bytes o)]
[(bytes? o) (if (pl-place-shared? o) o (bytes-copy o))]
[(fxvector? o) (if (pl-place-shared? o) o (fxvector-copy o))]
[(flvector? o) (if (pl-place-shared? o) o (flvector-copy o))]
[else #f])
=> values]
[(TH-place? o) (dcw (TH-place-ch o))]
[(pair? o) (cons (dcw (car o)) (dcw (cdr o)))]
[(vector? o) (vector-map! dcw (vector-copy o))]
[(hash-equal? o)
(for/fold ([nh (hash)]) ([p (in-hash-pairs o)])
(hash-set nh (dcw (car p)) (dcw (cdr p))))]
[(hash-eq? o)
(for/fold ([nh (hasheq)]) ([p (in-hash-pairs o)])
(hash-set nh (dcw (car p)) (dcw (cdr p))))]
[(hash-eqv? o)
(for/fold ([nh (hasheqv)]) ([p (in-hash-pairs o)])
(hash-set nh (dcw (car p)) (dcw (cdr p))))]
[(struct? o)
(define key (prefab-struct-key o))
(when (not key)
(error "Must be a prefab struct"))
(apply make-prefab-struct
key
(map dcw (cdr (vector->list (struct->vector o)))))]
[else (raise-mismatch-error 'place-channel-put "cannot transmit a message containing value: " o)]))
(dcw x))
(define (th-place-channel-put pl msg)
(define th
(cond
[(TH-place? pl) (TH-place-channel-out (TH-place-ch pl))]
[(TH-place-channel? pl) (TH-place-channel-out pl)]
[else (raise-type-error 'place-channel-put "expect a place? or place-channel?" pl)]))
(void (thread-send th (deep-copy msg) #f)))
(define (th-place-channel-get pl)
(channel-get
(cond
[(TH-place? pl) (TH-place-channel-in (TH-place-ch pl))]
[(TH-place-channel? pl) (TH-place-channel-in pl)]
[else (raise-type-error 'place-channel-get "expect a place? or place-channel?" pl)])))
(define (th-place-channel? pl)
(or (TH-place? pl)
(TH-place-channel? pl)))
(define (th-place-message-allowed? x)
(define (dcw o)
(cond
[(ormap (lambda (x) (x o)) (list number? char? boolean? null? void? string? symbol? TH-place-channel?
path? bytes? fxvector? flvector? TH-place?)) #t]
[(pair? o) (and (dcw (car o)) (dcw (cdr o)))]
[(vector? o)
(for/fold ([nh #t]) ([i (in-vector o)])
(and nh (dcw i)))]
[(hash? o)
(for/fold ([nh #t]) ([p (in-hash-pairs o)])
(and nh (dcw (car p)) (dcw (cdr p))))]
[(struct? o)
(define key (prefab-struct-key o))
(when (not key)
(error "Must be a prefab struct"))
(for/fold ([nh #t]) ([p (cdr (vector->list (struct->vector o)))])
(and nh (dcw p)))]
[else (raise-mismatch-error 'place-channel-put "cannot transmit a message containing value: " o)]))
(dcw x)
#t)