diff --git a/collects/racket/place/define-remote-server.rkt b/collects/racket/place/define-remote-server.rkt index 26fc573a06..3dcbdb1c14 100644 --- a/collects/racket/place/define-remote-server.rkt +++ b/collects/racket/place/define-remote-server.rkt @@ -3,6 +3,7 @@ (for-syntax syntax/stx) racket/place racket/place/private/th-place + racket/place/private/coercion racket/match racket/class racket/stxparam diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 9168093239..251bea8804 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -4,23 +4,24 @@ racket/tcp racket/place racket/place/private/th-place + racket/place/private/coercion racket/class racket/trait racket/udp racket/runtime-path - racket/date) + racket/date + syntax/location) (provide ssh-bin-path racket-path distributed-launch-path - get-current-module-path ;; New Design Pattern 2 API - master-event-loop + message-router spawn-vm-supervise-dynamic-place-at spawn-vm-supervise-place-thunk-at - spawn-vm-supervise-dynamic-place-at/2 - spawn-vm-supervise-place-thunk-at/2 + spawn-vm-with-dynamic-place-at + spawn-vm-with-place-thunk-at supervise-named-dynamic-place-at supervise-named-place-thunk-at supervise-place-thunk-at @@ -87,39 +88,26 @@ respawn-and-fire% after-seconds% restarter% + + ;re-provides + quote-module-path ) (define-runtime-path distributed-launch-path "distributed/launch.rkt") (define DEFAULT-ROUTER-PORT 6340) -(define (->path x) - (cond [(path? x) x] - [(string? x) (string->path x)])) +(define-syntax quote-module-path-bytes + (syntax-rules () + [(_) + (->module-path (quote-module-name))] + [(_ path ... ) + (->module-path + (let ([qmn (quote-module-name)]) + (cond + [(list? qmn) (append (list 'submod) qmn (list path ...))] + [else (list 'submod qmn path ...)])))])) -(define (->number x) - (cond [(number? x) x] - [(string? x) (string->number x)])) - -(define (->string x) - (cond [(string? x) x] - [(number? x) (number->string x)] - [(symbol? x) (symbol->string x)] - [(path? x) (path->string x)] - [(bytes? x) (bytes->string/locale x)] - )) - -(define (->length x) - (cond [(string? x) (string-length x)] - [(bytes? x) (bytes-length x)] - [(list? x) (length x)])) - -(define-syntax-rule (get-current-module-path) - (let () - (define rmpn (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference)))) - (cond - [(symbol? rmpn) rmpn] - [(path? rmpn) (path->string rmpn)]))) ; returns the path to the racket executable on the current machine. (define (racket-path) @@ -161,7 +149,7 @@ [wait-time start-seconds]) (with-handlers ([exn? (lambda (e) (cond [(t . < . times) - (printf "backing off ~a sec to ~a:~a\n" (expt 2 t) rname rport) + (log-debug (format "backing off ~a sec to ~a:~a" (expt 2 t) rname rport)) (sleep wait-time) (loop (add1 t) (* 2 wait-time))] [else (raise e)]))]) @@ -171,14 +159,14 @@ (let loop ([t 0]) (with-handlers ([exn? (lambda (e) (cond [(t . < . times) - (printf "waiting ~a sec to retry connection to ~a:~a\n" delay rname rport) + (log-debug (format "waiting ~a sec to retry connection to ~a:~a" delay rname rport)) (sleep delay) (loop (add1 t))] [else (raise e)]))]) (tcp-connect rname (->number rport))))) -(define (print-log-message severity msg) - (printf "~a ~a ~a\n" (date->string (current-date) #t) severity msg) +(define (format-log-message severity msg) + (log-info (format "~a ~a ~a\n" (date->string (current-date) #t) severity msg)) (flush-output)) @@ -309,13 +297,13 @@ (set! o (box _o)) (set! i (box _i)) (set! e (box _e))) - (printf "SPAWNED-PROCESS:~a ~a\n" pid cmdline-list) + (log-debug (format"SPAWNED-PROCESS:~a ~a" pid cmdline-list)) (define (mk-handler _port desc) (define port (unbox _port)) (if port (wrap-evt port (lambda (e) - (define (print-out x) (printf "SPAWNED-PROCESS ~a:~a:~a ~a\n" pid desc (->length x) x) + (define (print-out x) (log-debug (format "SPAWNED-PROCESS ~a:~a:~a ~a" pid desc (->length x) x)) (flush-output)) (cond [(not port) (print-out "IS #F")] @@ -336,7 +324,7 @@ (for/filter/fold/cons nes ([x (list s (list o "OUT") (list e "ERR"))]) (cond [(subprocess? x) (wrap-evt s (lambda (e) - (printf "SPAWNED-PROCESS ~a DIED\n" pid) + (log-debug (format "SPAWNED-PROCESS ~a DIED" pid)) (and parent (send parent process-died this))))] [(list? x) (apply mk-handler x)] [else #f]))) @@ -468,15 +456,15 @@ (send vm launch-place (list 'dynamic-place mod-path funcname) ;#:initial-message initial-message - #:one-sided-place ch1 + #:one-sided-place? ch1 ;#:restart-on-exit restart-on-exit )] [(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id) - (printf "PLACE ~a died\n" ch-id)] + (log-debug (format"PLACE ~a died" ch-id))] [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) (log-from-child #:severity severity msg)] [(dcgm 10 #;(== DCGM-TYPE-SET-OWNER) -1 -1 msg) - (printf "RECV DCGM-TYPE-SET-OWNER ~a\n" src-channel) + (log-debug (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel)) (set! owner src-channel)] [(dcgm mtype srcs dest msg) (define d (vector-ref chan-vec dest)) @@ -486,7 +474,7 @@ [(or (place-channel? d) (place? d)) (place-channel-put d m)])] [(? eof-object?) - (printf "connection died\n") + (log-debug (format "connection died")) (flush-output) (exit 1) ])) @@ -510,7 +498,7 @@ [owner ;(printf "Sent to owner\n") (sconn-write-flush owner (log-message severity msg))] - [else (print-log-message severity msg)])) + [else (format-log-message severity msg)])) (define/public (register nes) (let* @@ -532,7 +520,7 @@ (wrap-evt listen-port (lambda (e) (define-values (in out) (tcp-accept listen-port)) (define-values (lh lp rh rp) (tcp-addresses in #t)) - (printf "INCOMING CONNECTION ~a:~a <- ~a:~a\n" lh lp rh rp) + (log-debug (format "INCOMING CONNECTION ~a:~a <- ~a:~a" lh lp rh rp)) (define sp (new socket-connection% [in in] [out out])) (add-socket-port sp))) nes) @@ -613,7 +601,7 @@ (let loop ([t 0]) (with-handlers ([exn? (lambda (e) (cond [(t . < . times) - (printf "try ~a waiting ~a sec to retry connection to ~a:~a\n" t delay rname rport) + (log-debug (format "try ~a waiting ~a sec to retry connection to ~a:~a" t delay rname rport)) (sleep delay) (loop (add1 t))] [else (raise e)]))]) @@ -647,7 +635,7 @@ (when (equal? out #f) (ensure-connected)) (read in)) (define/public (register nes) - (error) + (raise "Not-implemented/needed") (cons (wrap-evt in void) nes)) (when (and host port background-connect) @@ -659,8 +647,7 @@ ch (call-with-values (lambda () (with-handlers ([exn:fail? (lambda (e) - (printf "OPPS ~a\n" e) - (values 'bozo #f))]) + (raise (format "socket error connecting to ~a:~a" host port)))]) (tcp-connect/retry host port #:times retry-times #:delay delay))) list))))) (when (and host port (not background-connect)) @@ -707,14 +694,19 @@ rp r))) + (define (get-sp-pid) + (cond + [sp (send sp get-pid)] + [else 'failed-to-launch ])) + (define (on-socket-event it in-port) (match it [(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id) - (printf "SPAWNED-PROCESS:~a PLACE DIED ~a:~a:~a\n" (send sp get-pid) host-name listen-port ch-id) + (log-debug (format "SPAWNED-PROCESS:~a PLACE DIED ~a:~a:~a" (get-sp-pid) host-name listen-port ch-id)) (cond [(find-place-by-sc-id ch-id) => (lambda (rp) (send rp place-died))] - [else (printf "remote-place for sc-id ~a not found\n" ch-id)])] + [else (raise (format "remote-place for sc-id ~a not found\n" ch-id))])] [(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg) (define pch (sconn-lookup-subchannel sc ch-id)) (cond @@ -727,18 +719,18 @@ (cond [parent (send parent log-from-child #:severity severity msg)] - [else (print-log-message severity msg)])] + [else (format-log-message severity msg)])] [(? eof-object?) (define-values (lh lp rh rp) (send sc addresses)) - (printf "EOF on vm socket connection pid to ~a ~a:~a CONNECTION ~a:~a -> ~a:~a\n" (send sp get-pid) host-name listen-port lh lp rh rp) + (log-debug (format "EOF on vm socket connection pid to ~a ~a:~a CONNECTION ~a:~a -> ~a:~a" (get-sp-pid) host-name listen-port lh lp rh rp)) (set! sc #f)] - [else (printf "received message ~a\n" it)])) + [else (log-debug (format"received message ~a" it))])) (define/public (get-log-prefix) (format "PLACE ~a:~a" host-name listen-port)) (define/public (process-died child) - (printf "Remote VM pid ~a ~a:~a died \n" (send sp get-pid) host-name listen-port) + (log-debug (format "Remote VM pid ~a ~a:~a died" (get-sp-pid) host-name listen-port)) (set! sp #f) (cond [restart-on-exit @@ -748,11 +740,9 @@ (restart-node) (send restart-on-exit restart restart-node))] [else - (printf "No restart cmdline arguments for ~a\n" - (get-log-prefix))])] + (log-debug (format "No restart cmdline arguments for ~a" (get-log-prefix)))])] [else - (printf "No restart condition for ~a\n" - (get-log-prefix))])) + (log-debug (format "No restart condition for ~a" (get-log-prefix)))])) (define/public (get-first-place) (car remote-places)) @@ -762,9 +752,9 @@ (define/public (drop-sc-id scid) (sconn-remove-subchannel sc scid)) - (define/public (launch-place place-exec #:restart-on-exit [restart-on-exit #f] #:one-sided-place [one-sided-place #f]) + (define/public (launch-place place-exec #:restart-on-exit [restart-on-exit #f] #:one-sided-place? [one-sided-place? #f]) (define rp (new remote-place% [vm this] [place-exec place-exec] [restart-on-exit restart-on-exit] - [one-sided-place one-sided-place])) + [one-sided-place? one-sided-place?])) (add-remote-place rp) rp) @@ -815,8 +805,8 @@ (init-field vm) (init-field [place-exec #f]) (init-field [restart-on-exit #f]) - (init-field [one-sided-place #f]) - (init-field [on-channel/2 #f]) + (init-field [one-sided-place? #f]) + (init-field [on-channel #f]) (field [psb #f]) (field [pc #f]) (field [rpc #f]) @@ -825,8 +815,8 @@ (field [handle-channel #t]) (cond - [one-sided-place - (set! rpc one-sided-place)] + [one-sided-place? + (set! rpc one-sided-place?)] [else (define-values (pch1 pch2) (place-channel)) (set! rpc pch1) @@ -840,7 +830,7 @@ (define/public (stop) (void)) (define/public (get-channel) pc) - (define/public (set-on-channel/2! proc) (set! on-channel/2 proc)) + (define/public (set-on-channel! proc) (set! on-channel proc)) (define/public (get-sc-id) (send psb get-sc-id)) (define/public (set-handle-channel! x) (set! handle-channel x)) (define/public (place-died) @@ -850,11 +840,11 @@ (restart-place) (send restart-on-exit restart restart-place))] [else - (printf "No restart condition for ~a:~a\n" + (log-debug (format "No restart condition for ~a:~a" (send vm get-log-prefix) - (send psb get-sc-id))])) + (send psb get-sc-id)))])) (define (on-channel-event e) - (printf "~a ~a\n" (send vm get-log-prefix) e)) + (log-debug (format "~a ~a" (send vm get-log-prefix) e))) (define/public (register es) (let* ([es (if (and handle-channel pc) (cons (wrap-evt pc @@ -865,9 +855,9 @@ (begin0 (k e) (set! k #f)))))] - [on-channel/2 + [on-channel (lambda (e) - (on-channel/2 pc e))] + (on-channel pc e))] [else on-channel-event])) es) es)] @@ -908,7 +898,7 @@ (init-field vm) (init-field name) (init-field [restart-on-exit #f]) - (init-field [on-channel/2 #f]) + (init-field [on-channel #f]) (field [psb #f]) (field [pc #f]) (field [running #f]) @@ -920,14 +910,14 @@ (define/public (stop) (void)) (define/public (get-channel) pc) - (define/public (set-on-channel/2! proc) (set! on-channel/2 proc)) + (define/public (set-on-channel! proc) (set! on-channel proc)) (define/public (get-sc-id) (send psb get-sc-id)) (define/public (place-died) - (printf "No restart condition for ~a:~a\n" + (log-debug (format "No restart condition for ~a:~a" (send vm get-log-prefix) - (send psb get-sc-id))) + (send psb get-sc-id)))) (define (on-channel-event e) - (printf "~a ~a\n" (send vm get-log-prefix) e)) + (log-debug (format "~a ~a" (send vm get-log-prefix) e))) (define/public (register es) (let* ([es (if pc (cons (wrap-evt pc (cond @@ -937,9 +927,9 @@ (begin0 (k e) (set! k #f)))))] - [on-channel/2 + [on-channel (lambda (e) - (on-channel/2 pc e))] + (on-channel pc e))] [else on-channel-event])) es) es)] [es (send psb register es)]) @@ -980,20 +970,20 @@ (match place-exec ;place% is a named place [(list 'dynamic-place place-path place-func name) - (dynamic-place (->path place-path) place-func)] + (dynamic-place (->module-path place-path) place-func)] [(list 'place place-path place-func name) - ((dynamic-require (->path place-path) place-func))] + ((dynamic-require (->module-path place-path) place-func))] ;place% is a single connected place [(list 'dynamic-place place-path place-func) - (dynamic-place (->path place-path) place-func)] + (dynamic-place (->module-path place-path) place-func)] [(list 'place place-path place-func) - ((dynamic-require (->path place-path) place-func))] + ((dynamic-require (->module-path place-path) place-func))] [(list 'thread place-path place-func) (define-values (ch1 ch2) (th-place-channel)) (define th (thread (lambda () - ((dynamic-require (->path place-path) place-func) ch1)))) + ((dynamic-require (->module-path place-path) place-func) ch1)))) (th-place th ch2 null)])) (sconn-add-subchannel sc ch-id pd) @@ -1107,7 +1097,7 @@ (define/public (restart restart-func) (cond [(and retry (>= retries retry)) - (printf "Already retried to restart ~a times\n" retry) + (log-debug (format "Already retried to restart ~a times" retry)) (and on-final-fail (on-final-fail))] [(> (- (current-inexact-milliseconds) last-attempt) (* seconds 1000)) (when (> (- (current-inexact-milliseconds) last-attempt) (* retry-reset 1000)) @@ -1216,7 +1206,7 @@ #:initial-message [initial-message #f] #:restart-on-exit [restart-on-exit #f]) (send vm launch-place - (list 'place (->string place-path) place-func (->string name)) + (list 'place (->module-path-bytes place-path) place-func (->string name)) ;#:initial-message initial-message #:restart-on-exit restart-on-exit )) @@ -1225,61 +1215,61 @@ #:initial-message [initial-message #f] #:restart-on-exit [restart-on-exit #f]) (send vm launch-place - (list 'dynamic-place (->string place-path) place-func (->string name)) + (list 'dynamic-place (->module-path-bytes place-path) place-func (->string name)) ;#:initial-message initial-message #:restart-on-exit restart-on-exit )) +(define (spawn-vm-with-dynamic-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:restart-on-exit [restart-on-exit #f]) + (define-values (vm pl) + (spawn-vm-supervise-place-at/exec host (list 'dynamic-place (->module-path-bytes place-path) place-func) #:listen-port listen-port + #:initial-message initial-message + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath + #:restart-on-exit restart-on-exit)) + vm) + +(define (spawn-vm-with-place-thunk-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:restart-on-exit [restart-on-exit #f]) + (define-values (vm pl) + (spawn-vm-supervise-place-at/exec host (list 'place (->module-path-bytes place-path) place-func) #:listen-port listen-port + #:initial-message initial-message + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath + #:restart-on-exit restart-on-exit)) + vm) + (define (spawn-vm-supervise-dynamic-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) - (define-values (vm pl) - (spawn-vm-supervise-place-at/exec host (list 'dynamic-place (->string place-path) place-func) #:listen-port listen-port + (spawn-vm-supervise-place-at/exec host (list 'dynamic-place (->module-path-bytes place-path) place-func) #:listen-port listen-port #:initial-message initial-message #:racket-path racketpath #:ssh-bin-path sshpath #:distributed-launch-path distributedlaunchpath #:restart-on-exit restart-on-exit)) - vm) (define (spawn-vm-supervise-place-thunk-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) - (define-values (vm pl) - (spawn-vm-supervise-place-at/exec host (list 'place (->string place-path) place-func) #:listen-port listen-port - #:initial-message initial-message - #:racket-path racketpath - #:ssh-bin-path sshpath - #:distributed-launch-path distributedlaunchpath - #:restart-on-exit restart-on-exit)) - vm) - -(define (spawn-vm-supervise-dynamic-place-at/2 host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] - #:initial-message [initial-message #f] - #:racket-path [racketpath (racket-path)] - #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] - #:restart-on-exit [restart-on-exit #f]) - (spawn-vm-supervise-place-at/exec host (list 'dynamic-place (->string place-path) place-func) #:listen-port listen-port - #:initial-message initial-message - #:racket-path racketpath - #:ssh-bin-path sshpath - #:distributed-launch-path distributedlaunchpath - #:restart-on-exit restart-on-exit)) - -(define (spawn-vm-supervise-place-thunk-at/2 host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] - #:initial-message [initial-message #f] - #:racket-path [racketpath (racket-path)] - #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] - #:restart-on-exit [restart-on-exit #f]) - (spawn-vm-supervise-place-at/exec host (list 'place (->string place-path) place-func) #:listen-port listen-port + (spawn-vm-supervise-place-at/exec host (list 'place (->module-path-bytes place-path) place-func) #:listen-port listen-port #:initial-message initial-message #:racket-path racketpath #:ssh-bin-path sshpath @@ -1290,7 +1280,7 @@ #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) (define vm (spawn-remote-racket-vm host #:listen-port listen-port @@ -1306,7 +1296,7 @@ (values vm dp)) -(define (master-event-loop #:node [_nc #f] #:listen-port [listen-port DEFAULT-ROUTER-PORT] . event-containers) +(define (message-router #:node [_nc #f] #:listen-port [listen-port DEFAULT-ROUTER-PORT] . event-containers) (define listener (tcp-listen listen-port 4 #t)) (define nc (or _nc (new node% [listen-port listener]))) (for ([ec event-containers]) @@ -1318,20 +1308,20 @@ (define (spawn-remote-racket-vm host #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)]) + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]) (new remote-node% [host-name host] [listen-port listen-port] [cmdline-list (list sshpath host racketpath "-tm" distributedlaunchpath "spawn" (->string listen-port))])) (define (supervise-dynamic-place-at remote-vm place-path place-func) - (send remote-vm launch-place (list 'dynamic-place (->string place-path) place-func))) + (send remote-vm launch-place (list 'dynamic-place (->module-path-bytes place-path) place-func))) (define (supervise-place-thunk-at remote-vm place-path place-func) - (send remote-vm launch-place (list 'place (->string place-path) place-func))) + (send remote-vm launch-place (list 'place (->module-path-bytes place-path) place-func))) (define (supervise-thread-at remote-vm place-path place-func) - (send remote-vm launch-place (list 'thread (->string place-path) place-func))) + (send remote-vm launch-place (list 'thread (->module-path-bytes place-path) place-func))) (define-syntax-rule (every-seconds _seconds _body ...) (new respawn-and-fire% [seconds _seconds] [thunk (lambda () _body ...)])) @@ -1386,7 +1376,7 @@ (lambda (x) (define bbl (read-bytes-avail!* bb x)) (define (print-out x) - (printf "~a:~a:~a ~a\n" (node-config-node-name config) (node-config-node-port config) bbl x) + (log-debug (format "~a:~a:~a ~a" (node-config-node-name config) (node-config-node-port config) bbl x)) (flush-output)) (cond [(eof-object? bbl) (print-out "EOF") @@ -1411,9 +1401,9 @@ (lambda () (unless normal-finish (for ([n nodes]) - (printf "Killing ~a\n" n) + (log-debug (format "Killing ~a" n)) (define out (third (first n))) - (with-handlers ([exn:fail? (lambda (e) (printf "Error sending Ctrl-C: ~a\n" e))]) + (with-handlers ([exn:fail? (lambda (e) (log-debug (format "Error sending Ctrl-C: ~a" e)))]) (write-byte 3 out) (flush-output out) (sleep)) diff --git a/collects/racket/place/distributed/examples/hello-world.rkt b/collects/racket/place/distributed/examples/hello-world.rkt index 8847c0e6bc..aaeca85e58 100644 --- a/collects/racket/place/distributed/examples/hello-world.rkt +++ b/collects/racket/place/distributed/examples/hello-world.rkt @@ -2,8 +2,7 @@ (require racket/place/distributed racket/place) -(provide main - hello-world) +(provide hello-world) (define (hello-world) (place ch @@ -13,18 +12,17 @@ (printf "hello-world sent: ~a\n" HW))) -(define (main) +(module+ main (define-values (vm pl) - (spawn-vm-supervise-place-thunk-at/2 "localhost" - #:listen-port 6344 - (get-current-module-path) - 'hello-world)) - (master-event-loop + (spawn-vm-supervise-place-thunk-at "localhost" + #:listen-port 6344 + (quote-module-path "..") + 'hello-world)) + (message-router vm (after-seconds 2 (dplace-put pl "Hello") - (printf "master-event-loop received: ~a\n" (dplace-get pl))) + (printf "message-router received: ~a\n" (dplace-get pl))) - (after-seconds 6 - (exit 0)) - )) + (after-seconds 6 + (exit 0)))) diff --git a/collects/racket/place/distributed/examples/logging/master.rkt b/collects/racket/place/distributed/examples/logging/master.rkt index 75093fbe55..de701ae869 100644 --- a/collects/racket/place/distributed/examples/logging/master.rkt +++ b/collects/racket/place/distributed/examples/logging/master.rkt @@ -15,7 +15,7 @@ (define tuple-place (supervise-named-dynamic-place-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) (define bank-place (supervise-dynamic-place-at remote-vm bank-path 'make-bank)) - (master-event-loop + (message-router remote-vm (after-seconds 4 (displayln (bank-new-account bank-place 'user1)) diff --git a/collects/racket/place/distributed/examples/multiple/master.rkt b/collects/racket/place/distributed/examples/multiple/master.rkt index 92bad8ac44..3df3dd0488 100644 --- a/collects/racket/place/distributed/examples/multiple/master.rkt +++ b/collects/racket/place/distributed/examples/multiple/master.rkt @@ -3,7 +3,9 @@ racket/class racket/place racket/runtime-path - "bank.rkt") + "bank.rkt" + syntax/location) + (define-runtime-path bank-path "bank.rkt") (define-runtime-path place-worker-path "place-worker.rkt") @@ -13,7 +15,7 @@ wait-place-thunk) (define (spawn-place-worker-at port message) - (spawn-vm-supervise-dynamic-place-at "localhost" #:listen-port port place-worker-path 'place-worker #:initial-message message #:restart-on-exit #f)) + (spawn-vm-with-dynamic-place-at "localhost" #:listen-port port place-worker-path 'place-worker #:initial-message message #:restart-on-exit #f)) (define (wait-place-thunk) (place ch @@ -23,14 +25,14 @@ (define (main) - (define bank-vm (spawn-vm-supervise-dynamic-place-at "localhost" #:listen-port 6344 bank-path 'make-bank)) + (define bank-vm (spawn-vm-with-dynamic-place-at "localhost" #:listen-port 6344 bank-path 'make-bank)) (define bank-place (send bank-vm get-first-place)) - (master-event-loop + (message-router (spawn-place-worker-at 6341 "ONE") (spawn-place-worker-at 6342 "TWO") (spawn-place-worker-at 6343 "THREE") bank-vm - (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk #:restart-on-exit #t) + (spawn-vm-with-place-thunk-at "localhost" #:listen-port 6345 (quote-module-name) 'wait-place-thunk #:restart-on-exit #t) (every-seconds 3.3 (printf "Hello from every-seconds\n") (flush-output)) (after-seconds 2 (displayln (bank-new-account bank-place 'user0)) diff --git a/collects/racket/place/distributed/examples/named/master.rkt b/collects/racket/place/distributed/examples/named/master.rkt index 73d8afe07e..f4be876ce8 100644 --- a/collects/racket/place/distributed/examples/named/master.rkt +++ b/collects/racket/place/distributed/examples/named/master.rkt @@ -15,7 +15,7 @@ (define tuple-place (supervise-named-dynamic-place-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) (define bank-place (supervise-dynamic-place-at remote-vm bank-path 'make-bank)) - (master-event-loop + (message-router remote-vm (after-seconds 4 (displayln (bank-new-account bank-place 'user0)) diff --git a/collects/racket/place/distributed/examples/restart/master.rkt b/collects/racket/place/distributed/examples/restart/master.rkt index 0fef5d0709..1af2434535 100644 --- a/collects/racket/place/distributed/examples/restart/master.rkt +++ b/collects/racket/place/distributed/examples/restart/master.rkt @@ -1,7 +1,8 @@ #lang racket/base (require racket/place/distributed racket/class - racket/place) + racket/place + syntax/location) (provide wait-place-thunk) (provide main) @@ -13,7 +14,7 @@ (printf "SLEEP DONE\n"))) (define (main) - (master-event-loop - (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk #:restart-on-exit #t) + (message-router + (spawn-vm-with-place-thunk-at "localhost" #:listen-port 6345 (quote-module-name) 'wait-place-thunk #:restart-on-exit #t) (after-seconds 50 (exit 0)))) diff --git a/collects/racket/place/distributed/examples/restart/restarter.rkt b/collects/racket/place/distributed/examples/restart/restarter.rkt index 35a239bbb5..41adcecaad 100644 --- a/collects/racket/place/distributed/examples/restart/restarter.rkt +++ b/collects/racket/place/distributed/examples/restart/restarter.rkt @@ -1,7 +1,8 @@ #lang racket/base (require racket/place/distributed racket/class - racket/place) + racket/place + syntax/location) (provide wait-place-thunk) (provide main) @@ -13,6 +14,6 @@ (printf "SLEEP DONE\n"))) (define (main) - (master-event-loop - (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk + (message-router + (spawn-vm-with-place-thunk-at "localhost" #:listen-port 6345 (quote-module-name) 'wait-place-thunk #:restart-on-exit (restart-every 5 #:retry 3)))) diff --git a/collects/racket/place/distributed/examples/thread/master.rkt b/collects/racket/place/distributed/examples/thread/master.rkt index 44d3e640c6..015efa90eb 100644 --- a/collects/racket/place/distributed/examples/thread/master.rkt +++ b/collects/racket/place/distributed/examples/thread/master.rkt @@ -4,7 +4,8 @@ racket/match racket/place racket/place/define-remote-server - racket/runtime-path) + racket/runtime-path + syntax/location) (define-remote-server bank @@ -43,9 +44,9 @@ (define (main) (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) - (define bank-place (supervise-thread-at remote-vm (get-current-module-path) 'make-bank)) + (define bank-place (supervise-thread-at remote-vm (quote-module-name) 'make-bank)) - (master-event-loop + (message-router remote-vm (after-seconds 2 (displayln (bank-new-account bank-place 'user0)) diff --git a/collects/racket/place/distributed/launch.rkt b/collects/racket/place/distributed/launch.rkt index 29f7f83fe7..b29672ccda 100644 --- a/collects/racket/place/distributed/launch.rkt +++ b/collects/racket/place/distributed/launch.rkt @@ -1,7 +1,8 @@ #lang racket/base (require racket/match racket/tcp - racket/place/distributed) + racket/place/distributed + racket/place/private/coercion) (provide main) diff --git a/collects/racket/place/private/coercion.rkt b/collects/racket/place/private/coercion.rkt new file mode 100644 index 0000000000..93b66b1367 --- /dev/null +++ b/collects/racket/place/private/coercion.rkt @@ -0,0 +1,60 @@ +#lang racket/base + +(provide ->path + ->module-path-bytes + ->module-path + ->number + ->string + ->length + ->symbol) + +(define (->path-bytes x) + (cond + [(path? x) (path->bytes x)] + [else x])) + +(define (path-bytes->path x) + (cond + [(bytes? x) (bytes->path x)] + [else x])) + +(define (->path x) + (cond [(path? x) x] + [(string? x) (string->path x)] + [(bytes? x) (bytes->path x)])) + +(define (->module-path-bytes x) + (cond [(path? x) (path->bytes x)] + [(list? x) (map ->path-bytes x)] + [(string? x) (string->bytes/locale x)] + [(bytes? x) x])) + +(define (->module-path x) + (cond [(path? x) x] + [(list? x) (map path-bytes->path x)] + [(bytes? x) (bytes->path x)] + [(string? x) (string->path x)])) + +(define (->number x) + (cond [(number? x) x] + [(string? x) (string->number x)])) + +(define (->string x) + (cond [(string? x) x] + [(number? x) (number->string x)] + [(symbol? x) (symbol->string x)] + [(bytes? x) (bytes->string/locale x)] + [else (raise-type-error '->string "a string, number, symbol, or bytes" x)] + )) + +(define (->length x) + (cond [(string? x) (string-length x)] + [(bytes? x) (bytes-length x)] + [(list? x) (length x)])) + +(define (->symbol x) + (cond + [(symbol? x) x] + [(string? x) (string->symbol x)] + [(bytes? x) (string->symbol (bytes->string/locale x))])) + diff --git a/collects/scribblings/reference/distributed.scrbl b/collects/scribblings/reference/distributed.scrbl index 31b1a24402..2b141a391a 100644 --- a/collects/scribblings/reference/distributed.scrbl +++ b/collects/scribblings/reference/distributed.scrbl @@ -38,9 +38,9 @@ machine nodes that do computation. The user/programmer configures a new distributed system using a declarative syntax and callbacks. A node begins life with one initial place, the message router. @;{See @figure-ref["node-places"]}. Once the node has been configured the -message router is activated by calling the @racket[master-event-loop] +message router is activated by calling the @racket[message-router] function. The message router listens on a TCP port for incoming -connections from other nodes in the distributed system. Compute places +connections from other nodes in the distributed system. Places can be spawned within the node by sending place-spawn request messages to the node's message router. @@ -73,15 +73,15 @@ The use of Distributed Places is predicated on a couple assumptions: (define (main) (define-values (vm pl) - (spawn-vm-supervise-place-thunk-at/2 "localhost" + (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6344 (get-current-module-path) 'hello-world)) - (master-event-loop + (message-router vm (after-seconds 2 (dplace-put pl "Hello") - (printf "master-event-loop received: ~a\n" (dplace-get pl))) + (printf "message-router received: ~a\n" (dplace-get pl))) (after-seconds 6 (exit 0)) @@ -90,11 +90,11 @@ The use of Distributed Places is predicated on a couple assumptions: (require 'hello-world-example) ] -@defproc[(master-event-loop [ec events-container<%>?] ...+) void?]{ +@defproc[(message-router [ec events-container<%>?] ...+) void?]{ Waits in an endless loop for one of many events to become ready. The - @racket[master-event-loop] procedure constructs a @racket[node%] + @racket[message-router] procedure constructs a @racket[node%] instance to serve as the message router for then node. The - @racket[master-event-loop] procedure then adds all the declared + @racket[message-router] procedure then adds all the declared @racket[events-container<%>]s to the @racket[node%] and finally calls the never ending loop @racket[sync-events] method, which handles the events for the node. @@ -107,37 +107,41 @@ The use of Distributed Places is predicated on a couple assumptions: @p{This function returns a @racket[remote-node%] instance not a @racket[remote-place%] Call @racket[(send vm get-first-place)] to obtain the @racket[remote-place%] instance.})) ) -@defproc[(spawn-vm-supervise-dynamic-place-at +@(define spawn-vm-dynamic-note + (make-splice + (list + @p{ +Spawns a new remote node at @racket[hostname] with one instance place specified by +the @racket[instance-module-path] and @racket[instance-place-function-name] +parameters. This procedure constructs the new remote-place by calling +@racket[(dynamic-place instance-module-path instance-place-function-name)]. + }))) + +@defproc[(spawn-vm-with-dynamic-place-at [hostname string?] - [compute-instance-module-path module-path?] - [compute-instance-place-function-name symbol?] + [instance-module-path module-path?] + [instance-place-function-name symbol?] [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] [#:initial-message initial-message any? #f] [#:racket-path racketpath string-path? (racket-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] - [#:restart-on-exit restart-on-exit boolean? #f]) remote-place?]{ -Spawns a new remote vm node at @racket[hostname] with one compute instance place specified by -the @racket[compute-instance-module-path] and @racket[compute-instance-place-function-name] -parameters. This procedure constructs the new remote-place by calling -@racket[(dynamic-place compute-instance-module-path compute-instance-place-function-name)]. + [#:restart-on-exit restart-on-exit any/c #f]) remote-place?]{ +@|spawn-vm-dynamic-note| @|spawn-vm-note| } -@defproc[(spawn-vm-supervise-dynamic-place-at/2 +@defproc[(spawn-vm-supervise-dynamic-place-at [hostname string?] - [compute-instance-module-path module-path?] - [compute-instance-place-function-name symbol?] + [instance-module-path module-path?] + [instance-place-function-name symbol?] [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] [#:initial-message initial-message any? #f] [#:racket-path racketpath string-path? (racket-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] - [#:restart-on-exit restart-on-exit boolean? #f]) (values remote-node%? remote-place%?)]{ -Spawns a new remote vm node at @racket[hostname] with one compute instance place specified by -the @racket[compute-instance-module-path] and @racket[compute-instance-place-function-name] -parameters. This procedure constructs the new remote-place by calling -@racket[(dynamic-place compute-instance-module-path compute-instance-place-function-name)]. + [#:restart-on-exit restart-on-exit any/c #f]) (values remote-node%? remote-place%?)]{ +@|spawn-vm-dynamic-note| The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two return values. } @@ -145,54 +149,53 @@ The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two (make-splice (list @p{ -The @racket[compute-instance-thunk-function-name] procedure is +The @racket[instance-thunk-function-name] procedure is responsible for creating the place and returning the newly constructed the place descriptor. The -@racket[compute-instance-thunk-function-name] procedure should +@racket[instance-thunk-function-name] procedure should accomplish this by calling either @racket[dynamic-place] or @racket[place] inside the thunk. })) ) -@defproc[(spawn-vm-supervise-place-thunk-at + +@(define spawn-vm-thunk-note + (make-splice + (list + @p{ +Spawns a new remote node at @racket[hostname] with one instance place. +the @racket[instance-module-path] and @racket[instance-thunk-function-name] +parameters. This procedure constructs the new remote-place by calling +dynamically requiring the +@racket[instance-thunk-function-name] and invoking the +@racket[instance-thunk-function-name]. + } + @p{ +@racket[((dynamic-require instance-module-path instance-thunk-function-name))] + }))) +@defproc[(spawn-vm-with-place-thunk-at [hostname string?] - [compute-instance-module-path module-path?] - [compute-instance-thunk-function-name symbol?] + [instance-module-path module-path?] + [instance-thunk-function-name symbol?] [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] [#:initial-message initial-message any? #f] [#:racket-path racketpath string-path? (racket-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] - [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ -Spawns a new remote vm node at @racket[hostname] with one compute instance place. -the @racket[compute-instance-module-path] and @racket[compute-instance-thunk-function-name] -parameters. This procedure constructs the new remote-place by calling -dynamically requiring the -@racket[compute-instance-thunk-function-name] and invoking the -@racket[compute-instance-thunk-function-name]. - -@racket[((dynamic-require compute-instance-module-path compute-instance-thunk-function-name))] - + [#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{ +@|spawn-vm-thunk-note| @|place-thunk-function| @|spawn-vm-note| } -@defproc[(spawn-vm-supervise-place-thunk-at/2 +@defproc[(spawn-vm-supervise-place-thunk-at [hostname string?] - [compute-instance-module-path module-path?] - [compute-instance-thunk-function-name symbol?] + [instance-module-path module-path?] + [instance-thunk-function-name symbol?] [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] [#:initial-message initial-message any? #f] [#:racket-path racketpath string-path? (racket-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] - [#:restart-on-exit restart-on-exit boolean? #f]) (values remote-vm%? remote-place%?)]{ -Spawns a new remote vm node at @racket[hostname] with one compute instance place. -the @racket[compute-instance-module-path] and @racket[compute-instance-thunk-function-name] -parameters. This procedure constructs the new remote-place by calling -dynamically requiring the -@racket[compute-instance-thunk-function-name] and invoking the -@racket[compute-instance-thunk-function-name]. - -@racket[((dynamic-require compute-instance-module-path compute-instance-thunk-function-name))] - + [#:restart-on-exit restart-on-exit any/c #f]) (values remote-vm%? remote-place%?)]{ +@|spawn-vm-thunk-note| @|place-thunk-function| The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two return values. } @@ -203,27 +206,27 @@ The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two [#:racket-path racketpath string-path? (racket-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:launcher-path launcherpath string-path? (->string distributed-launch-path)]) remote-node%?]{ -Spawns a new remote vm node at @racket[hostname] and returns a @racket[remote-node%] handle. +Spawns a new remote node at @racket[hostname] and returns a @racket[remote-node%] handle. } @defproc[(supervise-dynamic-place-at [remote-vm remote-vm?] - [compute-instance-module-path module-path?] - [compute-instance-place-function-name symbol?] - [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ + [instance-module-path module-path?] + [instance-place-function-name symbol?] + [#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{ Creates a new place on the @racket[remote-vm] by using @racket[dynamic-place] to invoke -@racket[compute-instance-place-function-name] from the module -@racket[compute-instance-module-path]. +@racket[instance-place-function-name] from the module +@racket[instance-module-path]. } @defproc[(supervise-place-thunk-at [remote-vm remote-vm?] - [compute-instance-module-path module-path?] - [compute-instance-thunk-function-name symbol?] - [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ + [instance-module-path module-path?] + [instance-thunk-function-name symbol?] + [#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{ Creates a new place on the @racket[remote-vm] by executing the thunk -@racket[compute-instance-thunk-function-name] from the module -@racket[compute-instance-module-path]. +@racket[instance-thunk-function-name] from the module +@racket[instance-module-path]. @|place-thunk-function| } @@ -238,25 +241,25 @@ Spawns an attached external process at host @racket[hostname]. @defproc[(supervise-named-dynamic-place-at [remote-vm remote-vm?] [place-name symbol?] - [compute-instance-module-path module-path?] - [compute-instance-place-function-name symbol?] - [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ + [instance-module-path module-path?] + [instance-place-function-name symbol?] + [#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{ Creates a new place on the @racket[remote-vm] by using @racket[dynamic-place] to invoke -@racket[compute-instance-place-function-name] from the module -@racket[compute-instance-module-path]. The @racket[place-name] symbol +@racket[instance-place-function-name] from the module +@racket[instance-module-path]. The @racket[place-name] symbol is used to establish later connections to the named place. } @defproc[(supervise-named-place-thunk-at [remote-vm remote-vm?] [place-name symbol?] - [compute-instance-module-path module-path?] - [compute-instance-thunk-function-name symbol?] - [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ + [instance-module-path module-path?] + [instance-thunk-function-name symbol?] + [#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{ Creates a new place on the @racket[remote-vm] by executing the thunk -@racket[compute-instance-thunk-function-name] from the module -@racket[compute-instance-module-path]. The @racket[place-name] symbol +@racket[instance-thunk-function-name] from the module +@racket[instance-module-path]. The @racket[place-name] symbol is used to establish later connections to the named place. @@ -271,12 +274,12 @@ Returns a @racket[restarter%] instance that should be supplied to a @racket[#:re } @defform[(every-seconds seconds body ....)]{ -Returns a @racket[respawn-and-fire%] instance that should be supplied to a @racket[master-event-loop]. +Returns a @racket[respawn-and-fire%] instance that should be supplied to a @racket[message-router]. The @racket[respawn-and-fire%] instance executes the body expressions every @racket[seconds]. } @defform[(after-seconds seconds body ....)]{ -Returns a @racket[after-seconds%] instance that should be supplied to a @racket[master-event-loop]. +Returns a @racket[after-seconds%] instance that should be supplied to a @racket[message-router]. Executes the body expressions after a delay of @racket[seconds] from the start of the event loop. } @@ -289,19 +292,19 @@ Connects to a named place on the @racket[vm] named @racket[name] and returns a @ } @definterface[event-container<%> ()]{ - All objects that are supplied to the @racket[master-event-loop] must + All objects that are supplied to the @racket[message-router] must implement the @racket[event-container<%>] interface. The - @racket[master-event-loop] calls the @racket[register] method on each + @racket[message-router] calls the @racket[register] method on each supplied @racket[event-container<%>] to obtain a list of events the event loop should wait for. @defmethod[(register [events (listof events?)]) (listof events?)]{ Returns the list of events inside the @racket[event-container<%>] that - should be waited on by the @racket[master-event-loop]. + should be waited on by the @racket[message-router]. } The following classes all implement @racket[event-container<%>] and -can be supplied to a @racket[master-event-loop]: +can be supplied to a @racket[message-router]: @racket[spawned-process%], @racket[place-socket-bridge%], @racket[node%], @racket[remote-node%], @racket[remote-place%], @racket[place%] @racket[connection%], @racket[respawn-and-fire%], and @@ -347,8 +350,8 @@ the socket-connection subchannel for this inter-node place connection. @defclass[node% object% (event-container<%>)]{ The @racket[node%] instance controls a distributed places node. It -launches compute places and routes inter-node place messages in the -distributed system. The @racket[master-event-loop] form constructs a +launches places and routes inter-node place messages in the +distributed system. The @racket[message-router] form constructs a @racket[node%] instance under the hood. Newly spawned nodes also have a @racket[node%] instance in their initial place that serves as the node's message router. @@ -383,7 +386,7 @@ node's message router. @(define one-sided-note (make-splice (list - @p{The @racket[#:one-sided-place] argument is an internal use + @p{The @racket[#:one-sided-place?] argument is an internal use argument for launching remote places from within a place using the old design pattern 1.}))) @@ -407,7 +410,7 @@ node's message router. @racket[spawn-vm-supervise-place-thunk-at]. @defconstructor[([listen-port tcp-listen-port? #f] - [restart-on-exit boolean? #f])]{ + [restart-on-exit any/c #f])]{ Constructs a @racket[node%] that will listen on @racket[listen-port] for inter-node connections. @@ -428,8 +431,8 @@ node's message router. @defmethod[(launch-place [place-exec list?] - [#:restart-on-exit restart-on-exit boolean? #f] - [#:one-sided-place one-sided-place boolean? #f]) remote-place%?]{ + [#:restart-on-exit restart-on-exit any/c #f] + [#:one-sided-place? one-sided-place? any/c #f]) remote-place%?]{ Launches a place on the remote node represented by this @racket[remote-node%] instance. @|place-exec-note| @|one-sided-note| @@ -455,29 +458,29 @@ places and routes inter-node place messages to the remote place. @defconstructor[([vm remote-node%?] [place-exec list?] [restart-on-exit #f] - [one-sided-place #f] - [on-channel/2 #f])]{ + [one-sided-place? #f] + [on-channel #f])]{ Constructs a @racket[remote-place%] instance. @|place-exec-note| @|one-sided-note| @|restart-on-exit-note| - See @racket[set-on-channel/2!] for description of @racket[on-channel/2] argument. + See @racket[set-on-channel!] for description of @racket[on-channel] argument. } -@defmethod[(set-on-channel/2! [callback (-> channel msg void?)]) void?]{ +@defmethod[(set-on-channel! [callback (-> channel msg void?)]) void?]{ Installs a handler function that handles messages from the remote place. The @racket[setup/distributed-docs] module uses this callback to handle job completion messages. } } @defproc[(dplace-put [pl remote-place%?] [msg any/c]) void?]{ - This function is used inside @racket[master-event-loop] callbacks. + This function is used inside @racket[message-router] callbacks. It sends messages to remote places. } @defproc[(dplace-get [pl remote-place%?]) any/c]{ - This function is used inside @racket[master-event-loop] callbacks. + This function is used inside @racket[message-router] callbacks. It takes the current delimited continuation and resumes it when a message arrives from @racket[pl]. } @@ -490,14 +493,14 @@ places and routes inter-node place messages to the remote place. @defconstructor[([vm remote-node%?] [name string?] [restart-on-exit #f] - [on-channel/2 #f])]{ + [on-channel #f])]{ Constructs a @racket[remote-place%] instance. @|restart-on-exit-note| - See @racket[set-on-channel/2!] for description of @racket[on-channel/2] argument. + See @racket[set-on-channel!] for description of @racket[on-channel] argument. } -@defmethod[(set-on-channel/2! [callback (-> channel msg void?)]) void?]{ +@defmethod[(set-on-channel! [callback (-> channel msg void?)]) void?]{ Installs a handler function that handles messages from the remote place. The @racket[setup/distributed-docs] module uses this callback to handle job completion messages. } @@ -559,7 +562,7 @@ place messages to the named place. @defconstructor[([seconds (and/c real? (not/c negative?))] [thunk (-> void?)])]{ Constructs a @racket[respawn-and-fire%] instance that when placed - inside a @racket[master-event-loop] construct causes the supplied + inside a @racket[message-router] construct causes the supplied thunk to execute every @racket[n] seconds. } } @@ -572,7 +575,7 @@ place messages to the named place. @defconstructor[([seconds (and/c real? (not/c negative?))] [thunk (-> void?)])]{ Constructs an @racket[after-seconds%] instance that when placed - inside a @racket[master-event-loop] construct causes the supplied + inside a @racket[message-router] construct causes the supplied thunk to execute after @racket[n] seconds. } } @@ -695,6 +698,8 @@ Returns the path to the current module. (require 'my-module)) ] +@;{ + @defproc[(->string) string?]{ Coerces strings, numbers, symbols, and paths to a string. } @@ -730,6 +735,7 @@ Returns the length of strings, bytes, and lists. (->length #"Woo") (->length (list 1 2 3 4)) ] +} @defproc[(write-flush [datum any?] [port port?]) (void)]{ Writes @racket[datum] to @racket[port] and then flushes @racket[port]. diff --git a/collects/tests/racket/place/distributed/distributed.rkt b/collects/tests/racket/place/distributed/distributed.rkt index 1a8bc4b575..095af617eb 100644 --- a/collects/tests/racket/place/distributed/distributed.rkt +++ b/collects/tests/racket/place/distributed/distributed.rkt @@ -25,10 +25,10 @@ ok?))) (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) - (define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) - (define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank)) + (define tuple-place (supervise-named-dynamic-place-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) + (define bank-place (supervise-dynamic-place-at remote-vm bank-path 'make-bank)) - (master-event-loop + (message-router remote-vm (after-seconds 2 (define c (connect-to-named-place remote-vm 'tuple-server)) diff --git a/collects/tests/racket/place/distributed/restarter.rkt b/collects/tests/racket/place/distributed/restarter.rkt index e61ac3e71b..8fe3000808 100644 --- a/collects/tests/racket/place/distributed/restarter.rkt +++ b/collects/tests/racket/place/distributed/restarter.rkt @@ -1,7 +1,8 @@ #lang racket/base (require racket/place/distributed racket/class - racket/place) + racket/place + syntax/location) (provide wait-place-thunk) (provide main) @@ -13,8 +14,8 @@ (printf "SLEEP DONE\n"))) (define (main) - (master-event-loop - (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk + (message-router + (spawn-vm-with-place-thunk-at "localhost" #:listen-port 6345 (quote-module-name) 'wait-place-thunk #:restart-on-exit (restart-every 5 #:retry 3 #:on-final-fail (lambda () (printf "Failed 3 times exititing\n")