From 09bed0d61e926f99785dc3fab583c996f7e69160 Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Tue, 6 Mar 2012 13:19:06 -0700 Subject: [PATCH] Initial Distributed Places commit Distributed Places allows the spawning of remote racket processes and places. Distributed Places communicate transparently over TCP sockets. --- collects/meta/props | 5 + .../racket/place/define-remote-server.rkt | 150 ++ collects/racket/place/distributed.rkt | 1407 +++++++++++++++++ .../distributed/examples/hello-world.rkt | 30 + .../distributed/examples/logging/bank.rkt | 36 + .../distributed/examples/logging/master.rkt | 37 + .../distributed/examples/logging/tuple.rkt | 14 + .../distributed/examples/multiple/bank.rkt | 35 + .../distributed/examples/multiple/master.rkt | 39 + .../examples/multiple/place-worker.rkt | 30 + .../place/distributed/examples/named/bank.rkt | 35 + .../distributed/examples/named/master.rkt | 40 + .../distributed/examples/named/tuple.rkt | 15 + .../distributed/examples/restart/master.rkt | 19 + .../examples/restart/restarter.rkt | 18 + collects/racket/place/distributed/info.rkt | 3 + collects/racket/place/distributed/launch.rkt | 18 + collects/scribblings/guide/distributed.scrbl | 140 ++ collects/scribblings/guide/performance.scrbl | 1 + .../scribblings/reference/concurrency.scrbl | 1 + .../scribblings/reference/distributed.scrbl | 746 +++++++++ collects/tests/racket/place-channel.rkt | 3 +- .../tests/racket/place/distributed/bank.rkt | 35 + .../racket/place/distributed/distributed.rkt | 54 + .../racket/place/distributed/restarter.rkt | 21 + .../tests/racket/place/distributed/tuple.rkt | 15 + 26 files changed, 2946 insertions(+), 1 deletion(-) create mode 100644 collects/racket/place/define-remote-server.rkt create mode 100644 collects/racket/place/distributed.rkt create mode 100644 collects/racket/place/distributed/examples/hello-world.rkt create mode 100644 collects/racket/place/distributed/examples/logging/bank.rkt create mode 100644 collects/racket/place/distributed/examples/logging/master.rkt create mode 100644 collects/racket/place/distributed/examples/logging/tuple.rkt create mode 100644 collects/racket/place/distributed/examples/multiple/bank.rkt create mode 100644 collects/racket/place/distributed/examples/multiple/master.rkt create mode 100644 collects/racket/place/distributed/examples/multiple/place-worker.rkt create mode 100644 collects/racket/place/distributed/examples/named/bank.rkt create mode 100644 collects/racket/place/distributed/examples/named/master.rkt create mode 100644 collects/racket/place/distributed/examples/named/tuple.rkt create mode 100644 collects/racket/place/distributed/examples/restart/master.rkt create mode 100644 collects/racket/place/distributed/examples/restart/restarter.rkt create mode 100644 collects/racket/place/distributed/info.rkt create mode 100644 collects/racket/place/distributed/launch.rkt create mode 100644 collects/scribblings/guide/distributed.scrbl create mode 100644 collects/scribblings/reference/distributed.scrbl create mode 100644 collects/tests/racket/place/distributed/bank.rkt create mode 100644 collects/tests/racket/place/distributed/distributed.rkt create mode 100644 collects/tests/racket/place/distributed/restarter.rkt create mode 100644 collects/tests/racket/place/distributed/tuple.rkt diff --git a/collects/meta/props b/collects/meta/props index 19c738b47a..7b51ac3f8d 100755 --- a/collects/meta/props +++ b/collects/meta/props @@ -1177,6 +1177,7 @@ path/s is either such a string or a list of them. "collects/racket/gui.rkt" drdr:command-line (gracket "-t" *) "collects/racket/match" responsible (samth) "collects/racket/match.rkt" responsible (samth) +"collects/racket/place" responsible (tewk) "collects/racklog" responsible (jay) "collects/rackunit" responsible (jay noel ryanc) "collects/rackunit/gui.rkt" responsible (ryanc) drdr:command-line (gracket-text "-t" *) @@ -1279,6 +1280,7 @@ path/s is either such a string or a list of them. "collects/scribblings/guide/contracts-examples/6.rkt" drdr:command-line (racket "-f" *) "collects/scribblings/guide/read.scrbl" drdr:command-line #f "collects/scribblings/raco" responsible (jay mflatt) +"collects/scribblings/reference/distributed.scrbl" responsible (tewk) "collects/scriblib" responsible (mflatt) "collects/setup" responsible (mflatt) "collects/setup/main.rkt" drdr:command-line #f @@ -1858,6 +1860,9 @@ path/s is either such a string or a list of them. "collects/tests/racket/path.rktl" drdr:command-line #f "collects/tests/racket/pathlib.rktl" drdr:command-line (racket "-f" *) "collects/tests/racket/pconvert.rktl" drdr:command-line #f +"collects/tests/racket/place" responsible (tewk) +"collects/tests/racket/place/distributed/distributed.rkt" drdr:command-line (racket "-tm" *) +"collects/tests/racket/place/distributed/restarter.rkt" drdr:command-line (racket "-tm" *) "collects/tests/racket/place-chan-rand-help.rkt" responsible (tewk) "collects/tests/racket/place-chan-rand.rkt" responsible (tewk) drdr:random #t "collects/tests/racket/place-channel-fd.rkt" responsible (tewk) drdr:command-line (racket "-tm" *) diff --git a/collects/racket/place/define-remote-server.rkt b/collects/racket/place/define-remote-server.rkt new file mode 100644 index 0000000000..9e2ca51ee5 --- /dev/null +++ b/collects/racket/place/define-remote-server.rkt @@ -0,0 +1,150 @@ +#lang racket/base +(require (for-syntax racket/base) + (for-syntax syntax/stx) + racket/place + racket/match + racket/class + racket/stxparam + (for-syntax racket/pretty) + racket/place/distributed) + +(define-syntax define/provide + (syntax-rules () + [(_ (name x ...) body ...) + (begin (provide name) + (define (name x ...) body ...))] + [(_ name val) + (begin (provide name) + (define name val))])) + +(define (dplace/place-channel-get dest) + (cond + [(place-channel? dest) (place-channel-get dest)] + [else (send dest get-msg)])) + +(define (dplace/place-channel-put dest msg) + (cond + [(place-channel? dest) (place-channel-put dest msg)] + [else (send dest put-msg msg)])) + + +(define-syntax-rule (define-syntax-parameter-error x) + (define-syntax-parameter x (lambda (stx) (raise-syntax-error 'x "only allowed inside define-*-remote-server definition" stx)))) + +(define-syntax-parameter-error log-to-parent) + +(define-syntax (define-define-remote-server stx) + (syntax-case stx () + [(_ form-name) + #;(printf "FORM_NAME ~a ~a ~a\n" #'form-name (syntax->datum #'form-name) + (equal? (syntax->datum #'form-name) 'define-named-remote-server)) + (with-syntax ([receive-line + (cond + [(eq? (syntax->datum #'form-name) 'define-named-remote-server) + #'(list (list fname-symbol args (... ...)) src)] + [else + #'(list fname-symbol args (... ...))])] + [send-dest + (cond + [(eq? (syntax->datum #'form-name) 'define-named-remote-server) + #'src] + [else + #'ch])]) +(define x +#'(define-syntax (form-name stx) + (syntax-case stx () + [(_ name forms (... ...)) + (let () + + (define (is-id? id stx) + (equal? (syntax-e stx) id)) + (define (define? stx) (is-id? 'define-state (stx-car stx))) + + (define-values (states rpcs) + (for/fold ([states null] + [rpcs null]) ([f (syntax->list #'(forms (... ...)))]) + (cond + [(define? f) + (values (append states (list f)) rpcs)] + [else + (values states (append rpcs (list f)))] + ))) + + (define (id->string x) + (symbol->string (syntax->datum x))) + (define (string->id stx x) + (datum->syntax stx (string->symbol x))) + + + (define trans-rpcs + (for/list ([f rpcs]) + (syntax-case f () + [(define-type (fname args (... ...)) body (... ...)) + (with-syntax ([fname-symbol (string->id stx (format "~a-~a" (id->string #'name) (id->string #'fname)))] + [(receive (... ...)) + (cond + [(is-id? 'define-rpc #'define-type) #'((dplace/place-channel-get dest))] + [(is-id? 'define-cast #'define-type) #'()] + [else (raise "Bad define in define-remote-server")])]) + + #'(define/provide (fname-symbol dest args (... ...)) + (dplace/place-channel-put dest (list (quote fname) args (... ...))) + receive (... ...)))]))) + + (define trans-place + (with-syntax ([(states2 (... ...)) + (for/list ([s states]) + (syntax-case s () + [(_ rest (... ...)) + #'(define rest (... ...))]))] + [(cases (... ...)) + (for/list ([r rpcs]) + (syntax-case r () + [(define-type (fname args (... ...)) body (... ...)) + (let () + (with-syntax ([fname-symbol #'(quote fname)] + [(send-line (... ...)) + (cond + [(is-id? 'define-rpc #'define-type) #'((place-channel-put send-dest result))] + [(is-id? 'define-cast #'define-type) #'()] + [else (raise "Bad define in define-remote-server")])]) + #'[receive-line + (define result + (let () + body (... ...))) + send-line (... ...) + (loop)]))]))]) + #`(place ch + (let () + states2 (... ...) + (let loop () + (define msg (place-channel-get ch)) + (define (log-to-parent-real msg #:severity [severity 'info]) + (place-channel-put ch (log-message severity msg))) + (syntax-parameterize ([log-to-parent (make-rename-transformer #'log-to-parent-real)]) + (match msg + cases (... ...) + )) + loop) + )))) + (with-syntax ([mkname (string->id stx (format "make-~a" (id->string #'name)))]) + (define x + #`(begin + (require racket/place + racket/match) + #,@trans-rpcs + (define/provide (mkname) #,trans-place) + (void))) + ;(pretty-print (syntax->datum x)) + x))])) +) +;(pretty-print (syntax->datum x)) +x)])) + +(define-define-remote-server define-remote-server) +(define-define-remote-server define-named-remote-server) +(provide define-remote-server) +(provide define-named-remote-server) +(provide log-to-parent) + + diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt new file mode 100644 index 0000000000..06c9074c75 --- /dev/null +++ b/collects/racket/place/distributed.rkt @@ -0,0 +1,1407 @@ +#lang racket/base +(require racket/list + racket/match + racket/tcp + racket/place + racket/class + racket/trait + racket/udp + racket/runtime-path + racket/date) + +(provide ssh-bin-path + racket-path + distributed-places-path + distributed-launch-path + get-current-module-path + + ;; New Design Pattern 2 API + master-event-loop + spawn-vm-supervise-dynamic-place-at + spawn-vm-supervise-place-thunk-at + spawn-vm-supervise-dynamic-place-at/2 + spawn-vm-supervise-place-thunk-at/2 + supervise-named-place-thunk-at + supervise-process-at + every-seconds + after-seconds + restart-every + + connect-to-named-place + + spawn-remote-racket-vm + node-send-exit + node-get-first-place + supervise-place-thunk-at + supervise-dynamic-place-at + dplace-put + dplace-get + + ;; low-level API + + ll-channel-get + ll-channel-put + write-flush + log-message + + ;; + start-spawned-node-router + + ;;Coercion Routines + ->string + ->path + ->number + ->length + + + ;; Old Design Pattern 1 API + dcg-get-cg + dcg-send + dcg-send-type + dcg-recv + dcg-kill + + dcg-send-new-dchannel + dcg-spawn-remote-dplace + + dchannel-put + dchannel-get + + launch-config + startup-config + (struct-out node-config) + (struct-out dcg) + + ;classes + event-container<%> + spawned-process% + place-socket-bridge% + node% + socket-connection% + remote-node% + remote-place% + remote-connection% + place% + connection% + respawn-and-fire% + after-seconds% + restarter% + ) + +(define-runtime-path distributed-launch-path "distributed/launch.rkt") + +(define DEFAULT-ROUTER-PORT 6340) + +(define (->path x) + (cond [(path? x) x] + [(string? x) (string->path x)])) + +(define (->number x) + (cond [(number? x) x] + [(string? x) (string->number x)])) + +(define (->string x) + (cond [(string? x) x] + [(number? x) (number->string x)] + [(symbol? x) (symbol->string x)] + [(path? x) (path->string x)] + [(bytes? x) (bytes->string/locale x)] + )) + +(define (->length x) + (cond [(string? x) (string-length x)] + [(bytes? x) (bytes-length x)] + [(list? x) (length x)])) + +(define-syntax-rule (get-current-module-path) + (let () + (define rmpn (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference)))) + (cond + [(symbol? rmpn) rmpn] + [(path? rmpn) (path->string rmpn)]))) + +; returns the path to the racket executable on the current machine. +(define (racket-path) + (parameterize ([current-directory (find-system-path 'orig-dir)]) + (find-executable-path (find-system-path 'exec-file) #f))) + +; returns the path to the distributed.rkt file on the current machine. +(define (distributed-places-path) (get-current-module-path)) + +; find ssh-binary +(define (ssh-bin-path) + (define (exists? paths) + (and paths + (for/or ([p paths]) (and (file-exists? p) p)))) + + (define (fallback-paths) + (exists? + (case (system-type 'os) + [(unix macosx) + (list "/usr/local/bin/ssh" "/usr/bin/ssh" "/bin/ssh" "/opt/local/bin/ssh" "/opt/bin/ssh")] + [(windows) #f]))) + + (define (which cmd) + (define path (getenv "PATH")) + (and path + (exists? (map (lambda (x) (build-path x cmd)) (regexp-split (case (system-type 'os) + [(unix macosx) ":"] + [(windows) "#:;"]) + path))))) + (or (which "ssh") + (fallback-paths) + (raise "ssh binary not found"))) + + + +(define (write-flush msg [p (current-output-port)]) + ;(printf "WRITING ~v\n" msg) + (write msg p) + (flush-output p)) + +(define (tcp-connect/backoff rname rport #:times [times 4] #:start-seconds [start-seconds 1]) + (let loop ([t 0] + [wait-time start-seconds]) + (with-handlers ([exn? (lambda (e) + (cond [(t . < . times) + (printf "backing off ~a sec to ~a:~a\n" (expt 2 t) rname rport) + (sleep wait-time) + (loop (add1 t) (* 2 wait-time))] + [else (raise e)]))]) + (tcp-connect rname (->number rport))))) + +(define (tcp-connect/retry rname rport #:times [times 10] #:delay [delay 1]) + (let loop ([t 0]) + (with-handlers ([exn? (lambda (e) + (cond [(t . < . times) + (printf "waiting ~a sec to retry connection to ~a:~a\n" delay rname rport) + (sleep delay) + (loop (add1 t))] + [else (raise e)]))]) + (tcp-connect rname (->number rport))))) + +(define (print-log-message severity msg) + (printf "~a ~a ~a\n" (date->string (current-date) #t) severity msg) + (flush-output)) + + +;node configuration +(struct node-config (node-name node-port proc-count ssh-path racket-path distributed-path mod-path func-name conf-path conf-name) #:prefab) +;distributed communication group +(struct dcg (ch id n)) +;distributed communication group message +(struct dcgm (type src dest msg) #:prefab) + +(struct dchannel (ch) #:prefab) + +;dcg types +(define DCGM-TYPE-NORMAL 0) +(define DCGM-TYPE-DIE 1) +(define DCGM-TYPE-NEW-DCHANNEL 2) +(define DCGM-TYPE-NEW-INTER-DCHANNEL 3) +(define DCGM-TYPE-INTER-DCHANNEL 4) +(define DCGM-TYPE-KILL-DPLACE 5) +(define DCGM-TYPE-SPAWN-REMOTE-PROCESS 6) +(define DCGM-DPLACE-DIED 7) +(define DCGM-TYPE-LOG-TO-PARENT 8) +(define DCGM-TYPE-NEW-PLACE 9) +(define DCGM-TYPE-SET-OWNER 10) + + +(define (dchannel-put ch msg) + (unless (or (dchannel? ch) (place-channel? ch)) + (raise-mismatch-error 'dchannel-get "expected dchannel?, got " ch)) + (if (dchannel? ch) + (place-channel-put (dchannel-ch ch) msg) + (place-channel-put ch msg))) + +(define (dchannel-get ch) + (unless (or (dchannel? ch) (place-channel? ch)) + (raise-mismatch-error 'dchannel-get "expected dchannel?, got " ch)) + (if (dchannel? ch) + (place-channel-get (dchannel-ch ch)) + (place-channel-get ch))) + +(define (dcg-send-type c type dest msg) + (place-channel-put (dcg-ch c) (dcgm type (dcg-id c) dest msg))) + +(define (dcg-send c dest msg) + (dcg-send-type c DCGM-TYPE-NORMAL dest msg)) + +(define (dcg-get-cg ch) (apply dcg ch (place-channel-get ch))) + +(define (dcg-kill c dest) + (place-channel-put (dcg-ch c) (dcgm DCGM-TYPE-DIE (dcg-id c) dest "DIE"))) + +(define (dcg-send-new-dchannel c dest) + (define-values (e1 e2) (place-channel)) + (dcg-send-type c DCGM-TYPE-NEW-DCHANNEL dest (dchannel e2)) + (dchannel e1)) + +;; Contract: start-node-router : VectorOf[ (or/c place-channel socket-connection)] -> (void) +;; Purpose: Forward messages between channels and build new point-to-point subchannels +;; Example: +(define (dcg-spawn-remote-dplace c hostname modpath funcname #:listen-port [listen-port 6432]) + (define-values (e1 e2) (place-channel)) + (dcg-send-type c DCGM-TYPE-SPAWN-REMOTE-PROCESS (list hostname listen-port modpath funcname) e2) + e1) + +(define (dcg-recv c) + (dcgm-msg (place-channel-get (dcg-ch c)))) + +(define-syntax-rule (reduce-sum seq item body ...) + (for/fold ([sum 0]) ([item seq]) (+ sum (begin body ...)))) + +(define (total-node-count conf) (reduce-sum conf item (node-config-proc-count item))) + +;; Contract: start-node-router : VectorOf[ (or/c place-channel socket-connection)] -> (void) +;; +;; Purpose: Forward messages between channels and build new point-to-point subchannels +;; +;; Example: + +(define (start-spawned-node-router listener) + (define nc (new node% [listen-port listener])) + (send nc sync-events)) + + +(define (start-node-router chan-vec) + (define nc (new node% [chan-vec chan-vec])) + (send nc sync-events)) + +(define backlink + (trait->mixin + (trait + (field [router #f]) + (define/public (remove-from-router) + (and router (send router remove-ec this))) + (define/public (get-router) router) + (define/public (backlink _router) + (set! router _router)) + ))) + +(define event-container<%> + (interface () + register + )) + +(define-syntax-rule (for/filter/fold/cons tail xs body ...) + (for/fold ([n tail]) xs + (define r (let () body ...)) + (if r (cons r n) n))) + +(define (filter/list* . lst) + (define rl (reverse lst)) + (for/filter/fold/cons (car rl) ([x (cdr rl)]) x)) + +(define spawned-process% + (backlink + (class* + object% (event-container<%>) + (init-field cmdline-list) + (init-field [parent #f]) + (field [s #f] + [i #f] + [o #f] + [e #f] + [pid #f]) + + (let-values ([(_s _o _i _e) (apply subprocess #f #f #f cmdline-list)]) + (set! pid (subprocess-pid _s)) + (set! s _s) + (set! o (box _o)) + (set! i (box _i)) + (set! e (box _e))) + (printf "SPAWNED-PROCESS:~a ~a\n" pid cmdline-list) + + (define (mk-handler _port desc) + (define port (unbox _port)) + (if port + (wrap-evt port (lambda (e) + (define (print-out x) (printf "SPAWNED-PROCESS ~a:~a:~a ~a\n" pid desc (->length x) x) + (flush-output)) + (cond + [(not port) (print-out "IS #F")] + [else + (define bb (make-bytes 4096)) + (define bbl (read-bytes-avail!* bb port)) + (cond + [(eof-object? bbl) + (print-out "EOF") + (set-box! _port #f)] + [else + (print-out (subbytes bb 0 bbl))])]))) + #f)) + + (define/public (get-pid) pid) + (define/public (wait-for-die) (subprocess-wait s)) + (define/public (register nes) + (for/filter/fold/cons nes ([x (list s (list o "OUT") (list e "ERR"))]) + (cond + [(subprocess? x) (wrap-evt s (lambda (e) + (printf "SPAWNED-PROCESS ~a DIED\n" pid) + (and parent (send parent process-died this))))] + [(list? x) (apply mk-handler x)] + [else #f]))) + (super-new) + ))) + +(define place-socket-bridge% + (backlink + (class* + object% (event-container<%>) + (init-field pch + sch + id + node) + (define/public (register nes) + (cons + (wrap-evt + (if (dchannel? pch) (dchannel-ch pch) pch) + (lambda (e) + ; (printf "MSG ~v\n" e) + ; (flush-output) + (match e + [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) + (send node log-from-child #:severity severity msg)] + [else (put-msg e)]))) + nes)) + (define/public (get-sc-id) id) + (define/public (get-raw-msg) + (let loop () + (define msg (send sch read-message)) + (if (= (dcgm-type msg) DCGM-DPLACE-DIED) + (loop) + (dcgm-msg msg)))) + (define/public (put-msg msg) + (sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))) + (super-new) + ))) + +(define node% + (backlink + (class* + object% (event-container<%>) + (init-field [chan-vec #f]) + (init-field [listen-port #f]) + (init-field [socket-ports null]) + (init-field [sub-ecs null]) + (init-field [psbs null]) + (init-field [spawned-vms null]) + (init-field [named-places (make-hash)]) + (init-field [beacon #f]) + (init-field [owner #f]) + (field [id 0]) + (define/public (nextid) + (set! id (add1 id)) + id) + (define (add-socket-port pair) + (set! socket-ports (append socket-ports (list pair)))) + (define/public (add-sub-ec ec) + (set! sub-ecs (append sub-ecs (list ec)))) + (define (add-spawned-vm ec) + (set! spawned-vms (append spawned-vms (list ec)))) + (define (add-psb ec) + (set! psbs (append psbs (list ec)))) + (define (add-named-place name np) + (hash-set! named-places (->string name) np)) + (define (named-place-lookup name) + (hash-ref named-places (->string name) #f)) + (define (add-place-channel-socket-bridge pch sch id) + (add-psb (new place-socket-bridge% [pch pch] [sch sch] [id id] [node this]))) + (define (forward-mesg m src-channel) + (match m + [(dcgm 1 #;(== DCGM-TYPE-DIE) src dest "DIE") (exit 1)] + [(dcgm 2 #;(== DCGM-TYPE-NEW-DCHANNEL) src dest pch) + (define d (vector-ref chan-vec dest)) + (cond + [(is-a? d socket-connection%) + (define ch-id (nextid)) + (sconn-add-subchannel d ch-id pch) + (add-place-channel-socket-bridge pch d ch-id) + (sconn-write-flush d (dcgm DCGM-TYPE-NEW-INTER-DCHANNEL src dest ch-id))] + [(or (place-channel? d) (place? d)) + (place-channel-put d m)])] + [(dcgm 9 #;(== DCGM-TYPE-NEW-PLACE) -1 (and place-exec (list-rest type rest)) ch-id) + (match place-exec + [(list 'connect name) + (define np (named-place-lookup name)) + (cond + [np + (define nc (new connection% + [name-pl np] + [ch-id ch-id] + [sc src-channel] + [node this])) + (add-sub-ec nc)] + + [else + (sconn-write-flush src-channel (dcgm DCGM-TYPE-INTER-DCHANNEL ch-id ch-id + (format "ERROR: name not found ~a" name)))])] + + [else + (define np (new place% + [place-exec place-exec] + [ch-id ch-id] + [sc src-channel] + [node this])) + (match place-exec + [(list _ _ _ name) (add-named-place name np)] + [else (add-sub-ec np)])])] + [(dcgm 3 #;(== DCGM-TYPE-NEW-INTER-DCHANNEL) src dest ch-id) + (define s src-channel) + (define d (vector-ref chan-vec dest)) + (define-values (pch1 pch2) (place-channel)) + (sconn-add-subchannel s ch-id pch1) + (add-place-channel-socket-bridge pch1 s ch-id) + (place-channel-put d (dcgm DCGM-TYPE-NEW-DCHANNEL src dest pch2))] + [(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg) + (define pch (sconn-lookup-subchannel src-channel ch-id)) + (cond + [(place-channel? pch) + ;(printf "SOCKET to PLACE CHANNEL ~a\n" msg) + (place-channel-put pch msg)] + [(is-a? pch connection%) + (send pch forward msg)])] + [(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1) + (define vm + (new remote-node% + [host-name node-name] + [listen-port node-port] + [cmdline-list (list (ssh-bin-path) node-name (racket-path) "-tm" (->string distributed-launch-path) "spawn" (->string node-port))])) + (add-spawned-vm vm) + (send vm launch-place + (list 'dynamic-place mod-path funcname) + ;#:initial-message initial-message + #:one-sided-place ch1 + ;#:restart-on-exit restart-on-exit + )] + [(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id) + (printf "PLACE ~a died\n" ch-id)] + [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) + (log-from-child #:severity severity msg)] + [(dcgm 10 #;(== DCGM-TYPE-SET-OWNER) -1 -1 msg) + (printf "RECV DCGM-TYPE-SET-OWNER ~a\n" src-channel) + (set! owner src-channel)] + [(dcgm mtype srcs dest msg) + (define d (vector-ref chan-vec dest)) + (cond + [(is-a? d socket-connection%) + (sconn-write-flush d m)] + [(or (place-channel? d) (place? d)) + (place-channel-put d m)])] + [(? eof-object?) + (printf "connection died\n") + (flush-output) + (exit 1) + ])) + + (define us-buffer (make-bytes 4096)) + (define (register-beacon nes) + (cond + [(not beacon) nes] + [(equal? #t beacon) + (set! beacon (udp-open-socket)) + (udp-bind! beacon "255.255.255.255" DEFAULT-ROUTER-PORT) + (wrap-evt (udp-receive!-evt beacon us-buffer) + (match-lambda + [(list count host port) + (void)]))])) + + + (define/public (log-from-child msg #:severity [severity 'info]) + ;(printf "Received Log Message ~a ~a\n" severity msg) + (cond + [owner + ;(printf "Sent to owner\n") + (sconn-write-flush owner (log-message severity msg))] + [else (print-log-message severity msg)])) + + (define/public (register nes) + (let* + ([nes + (if chan-vec + (for/fold ([n nes]) ([x (in-vector chan-vec)]) + (cons + (cond + [(is-a? x socket-connection%) + (sconn-get-forward-event x forward-mesg)] + [(or (place-channel? x) (place? x)) + (wrap-evt x (lambda (e) + ;(printf "VECTOR PLACE MESSAGE ~a\n" e) + (forward-mesg e x)))]) + n)) + nes)] + [nes + (if listen-port + (cons + (wrap-evt listen-port (lambda (e) + (define-values (in out) (tcp-accept listen-port)) + (define-values (lh lp rh rp) (tcp-addresses in #t)) + (printf "INCOMING CONNECTION ~a:~a <- ~a:~a\n" lh lp rh rp) + (define sp (new socket-connection% [in in] [out out])) + (add-socket-port sp))) + nes) + nes)] + [nes + (if socket-ports + (for/fold ([n nes]) ([x socket-ports]) + (cons + (cond + [(is-a? x socket-connection%) + (sconn-get-forward-event x forward-mesg)] + [(or (place-channel? x) (place? x)) + (wrap-evt x (lambda (e) + ;(printf "SOCKET-PORT PLACE MESSAGE ~a\n" e) + (forward-mesg e x)))]) + n)) + nes)] + [nes + (if sub-ecs + (for/fold ([n nes]) ([x sub-ecs]) + (send x register n)) + nes)] + [nes + (if psbs + (for/fold ([n nes]) ([x psbs]) + (send x register n)) + nes)] + [nes + (if spawned-vms + (for/fold ([n nes]) ([x spawned-vms]) + (send x register n)) + nes)] + [nes (register-beacon nes)] + [nes + (cond + [named-places + (for/fold ([n nes]) ([x (in-hash-values named-places)]) + (send x register n))] + [else nes])]) + nes)) + + (define/public (sync-events) + (let loop () + (define l (register null)) + (apply sync/enable-break l) + + (loop ))) + + + (super-new) + ))) + + +;socket channel +(define (sconn-add-subchannel s ch-id ch) (send s add-subchannel ch-id ch)) +(define (sconn-lookup-subchannel s ch-id) (send s lookup-subchannel ch-id)) +(define (sconn-write-flush s x) (send s _write-flush x)) +(define (sconn-remove-subchannel s scid) (send s remove-subchannel scid)) +(define (sconn-get-forward-event s forwarder) (send s get-forward-event forwarder)) + +(define socket-connection% + (backlink + (class* object% (event-container<%>) + (init-field [host #f] + [port #f] + [retry-times 30] + [delay 1] + [background-connect #t] + [in #f] + [out #f]) + (field [subchannels null] + [connecting #f] + [ch #f]) + + (define (forward-mesg x) (void)) + + (define (tcp-connect/retry rname rport #:times [times 10] #:delay [delay 1]) + (let loop ([t 0]) + (with-handlers ([exn? (lambda (e) + (cond [(t . < . times) + (printf "try ~a waiting ~a sec to retry connection to ~a:~a\n" t delay rname rport) + (sleep delay) + (loop (add1 t))] + [else (raise e)]))]) + (tcp-connect rname (->number rport))))) + + (define (ensure-connected) + ;(printf "Waiting on connecting to ~a ~a\n" host port) + (when connecting + (match (channel-get ch) + [(list _in _out) + (set! in _in) + (set! out _out) + (set! connecting #f)]))) + + (define/public (add-subchannel id pch) + (set! subchannels (append subchannels (list (cons id pch))))) + (define/public (lookup-subchannel id) (cdr (assoc id subchannels))) + (define/public (_write-flush x) + (when (equal? out #f) (ensure-connected)) + (write-flush x out)) + (define/public (remove-subchannel id) + (set! subchannels (filter-map + (lambda (x) (and (not (= (car x) id)) x)) + subchannels))) + (define/public (addresses) (tcp-addresses in #t)) + (define/public (get-forward-event forwarder) + (when (equal? out #f) (ensure-connected)) + (wrap-evt in (lambda (e) + ;(printf "VECTOR SOCKET MESSAGE ~a\n" e) + (forwarder (read in) this)))) + + (define/public (read-message) + (when (equal? out #f) (ensure-connected)) + (read in)) + (define/public (register nes) + (error) + (cons (wrap-evt in void) nes)) + + (when (and host port background-connect) + (set! connecting #t) + (set! ch (make-channel)) + ;(printf "Delay connecting to ~a ~a\n" host port) + (thread + (lambda () + (channel-put + ch + (call-with-values + (lambda () (with-handlers ([exn:fail? (lambda (e) + (printf "OPPS ~a\n" e) + (values 'bozo #f))]) + (tcp-connect/retry host port #:times retry-times #:delay delay))) + list))))) + (when (and host port (not background-connect)) + (tcp-connect/retry host port #:times retry-times #:delay delay)) + (super-new) + ))) + +(define remote-node% + (backlink + (class* + object% (event-container<%>) + (init-field host-name) + (init-field listen-port) + (init-field [cmdline-list #f]) + (init-field [sc #f]) ;socket-connection + (init-field [restart-on-exit #f]) + (field [sp #f]) ;spawned-process + (field [id 0]) + (field [remote-places null]) + + (define/public (nextid) + (set! id (add1 id)) + id) + + (define (add-remote-place rp) + (set! remote-places (append remote-places(list rp)))) + (define (spawn-node) + (set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this]))) + (define (setup-socket-connection) + (set! sc (new socket-connection% [host host-name] [port listen-port])) + (sconn-write-flush sc (dcgm DCGM-TYPE-SET-OWNER -1 -1 ""))) + (define (restart-node) + (spawn-node) + (setup-socket-connection)) + + (when (and cmdline-list (not sc)) + (spawn-node)) + (unless sc + (setup-socket-connection)) + + (define (find-place-by-sc-id scid) + (for/fold ([r #f]) ([rp remote-places]) + (if (= (send rp get-sc-id) scid) + rp + r))) + + (define (on-socket-event it in-port) + (match it + [(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id) + (printf "SPAWNED-PROCESS:~a PLACE DIED ~a:~a:~a\n" (send sp get-pid) host-name listen-port ch-id) + (cond + [(find-place-by-sc-id ch-id) => (lambda (rp) + (send rp place-died))] + [else (printf "remote-place for sc-id ~a not found\n" ch-id)])] + [(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg) + (define pch (sconn-lookup-subchannel sc ch-id)) + (cond + [(place-channel? pch) + ;(printf "SOCKET to PLACE CHANNEL ~a\n" msg) + (place-channel-put pch msg)] + [(is-a? pch connection%) + (send pch forward msg)])] + [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) + (define parent (send this get-router)) + (cond + [parent + ;(printf "Sent to Parent ~a ~a \n" severity msg) + (send parent log-from-child #:severity severity msg)] + [else (print-log-message severity msg)])] + + [(? eof-object?) + (define-values (lh lp rh rp) (send sc addresses)) + (printf "EOF on vm socket connection pid to ~a ~a:~a CONNECTION ~a:~a -> ~a:~a\n" (send sp get-pid) host-name listen-port lh lp rh rp) + (set! sc #f)] + + [else (printf "received message ~a\n" it)])) + + (define/public (get-log-prefix) (format "PLACE ~a:~a" host-name listen-port)) + (define/public (process-died child) + (printf "Remote VM pid ~a ~a:~a died \n" (send sp get-pid) host-name listen-port) + (set! sp #f) + (cond + [restart-on-exit + (cond + [cmdline-list + (if (equal? restart-on-exit #t) + (restart-node) + (send restart-on-exit restart restart-node))] + [else + (printf "No restart cmdline arguments for ~a\n" + (get-log-prefix))])] + [else + (printf "No restart condition for ~a\n" + (get-log-prefix))])) + + (define/public (get-first-place) + (car remote-places)) + (define/public (get-first-place-channel) + (send (car remote-places) get-channel)) + + (define/public (drop-sc-id scid) + (sconn-remove-subchannel sc scid)) + + (define/public (launch-place place-exec #:restart-on-exit [restart-on-exit #f] #:one-sided-place [one-sided-place #f]) + (define rp (new remote-place% [vm this] [place-exec place-exec] [restart-on-exit restart-on-exit] + [one-sided-place one-sided-place])) + (add-remote-place rp) + rp) + + (define/public (remote-connect name #:restart-on-exit [restart-on-exit #f]) + (define rp (new remote-connection% [vm this] [name name] [restart-on-exit restart-on-exit])) + (add-remote-place rp) + rp) + + (define/public (spawn-remote-place place-exec dch) + (define ch-id (nextid)) + (sconn-add-subchannel sc ch-id dch) + (sconn-write-flush sc (dcgm DCGM-TYPE-NEW-PLACE -1 place-exec ch-id)) + (new place-socket-bridge% [pch dch] [sch sc] [id ch-id] [node this])) + + (define/public (spawn-remote-connection name dch) + (define ch-id (nextid)) + (sconn-add-subchannel sc ch-id dch) + (sconn-write-flush sc (dcgm DCGM-TYPE-NEW-PLACE -1 (list 'connect name) ch-id)) + (new place-socket-bridge% [pch dch] [sch sc] [id ch-id] [node this])) + + (define/public (send-exit) + (sconn-write-flush sc (dcgm DCGM-TYPE-DIE -1 -1 "DIE"))) + + (define/public (wait-for-die) + (send sp wait-for-die)) + + (define/public (register es) + (let* ([es (if sp (send sp register es) es)] + [es (for/fold ([nes es]) ([rp remote-places]) + (send rp register nes))] + [es (if sc (cons (sconn-get-forward-event sc on-socket-event) es) es)] + [es (if (and restart-on-exit + (not (equal? restart-on-exit #t))) + (send restart-on-exit register es) + es)]) + es)) + + (super-new) + ))) + +(define (node-send-exit node) (send node send-exit)) +(define (node-get-first-place node) (send node get-first-place)) + +(define remote-place% + (backlink + (class* + object% (event-container<%>) + (init-field vm) + (init-field [place-exec #f]) + (init-field [restart-on-exit #f]) + (init-field [one-sided-place #f]) + (init-field [on-channel/2 #f]) + (field [psb #f]) + (field [pc #f]) + (field [rpc #f]) + (field [running #f]) + (field [k #f]) + (field [handle-channel #t]) + + (cond + [one-sided-place + (set! rpc one-sided-place)] + [else + (define-values (pch1 pch2) (place-channel)) + (set! rpc pch1) + (set! pc pch2)]) + + (set! psb (send vm spawn-remote-place place-exec rpc)) + + (define (restart-place) + (send vm drop-sc-id (send psb get-sc-id)) + (set! psb (send vm spawn-remote-place place-exec rpc))) + + (define/public (stop) (void)) + (define/public (get-channel) pc) + (define/public (set-on-channel/2! proc) (set! on-channel/2 proc)) + (define/public (get-sc-id) (send psb get-sc-id)) + (define/public (set-handle-channel! x) (set! handle-channel x)) + (define/public (place-died) + (cond + [restart-on-exit + (if (equal? restart-on-exit #t) + (restart-place) + (send restart-on-exit restart restart-place))] + [else + (printf "No restart condition for ~a:~a\n" + (send vm get-log-prefix) + (send psb get-sc-id))])) + (define (on-channel-event e) + (printf "~a ~a\n" (send vm get-log-prefix) e)) + (define/public (register es) + (let* ([es (if (and handle-channel pc) + (cons (wrap-evt pc + (cond + [k + (lambda (e) + (call-with-continuation-prompt (lambda () + (begin0 + (k e) + (set! k #f)))))] + [on-channel/2 + (lambda (e) + (on-channel/2 pc e))] + [else + on-channel-event])) es) + es)] + [es (send psb register es)] + [es (if (and restart-on-exit + (not (equal? restart-on-exit #t))) + (send restart-on-exit register es) + es)]) + es)) + (define/public (set-continuation _k) (set! k _k)) + + (define/public (get-raw-msg) (send psb get-raw-msg)) + (define/public (get-msg) + (call-with-composable-continuation + (lambda (_k) + (set! k _k) + (abort-current-continuation (default-continuation-prompt-tag) void)))) + + (define/public (put-msg msg) (send psb put-msg msg)) + + (super-new) + ))) + +(define (dplace-get dest) + (cond + [(place-channel? dest) (place-channel-get dest)] + [else (send dest get-msg)])) + +(define (dplace-put dest msg) + (cond + [(place-channel? dest) (place-channel-put dest msg)] + [else (send dest put-msg msg)])) + +(define remote-connection% + (backlink + (class* + object% (event-container<%>) + (init-field vm) + (init-field name) + (init-field [restart-on-exit #f]) + (init-field [on-channel/2 #f]) + (field [psb #f]) + (field [pc #f]) + (field [running #f]) + (field [k #f]) + + (define-values (pch1 pch2) (place-channel)) + (set! psb (send vm spawn-remote-connection name pch1)) + (set! pc pch2) + + (define/public (stop) (void)) + (define/public (get-channel) pc) + (define/public (set-on-channel/2! proc) (set! on-channel/2 proc)) + (define/public (get-sc-id) (send psb get-sc-id)) + (define/public (place-died) + (printf "No restart condition for ~a:~a\n" + (send vm get-log-prefix) + (send psb get-sc-id))) + (define (on-channel-event e) + (printf "~a ~a\n" (send vm get-log-prefix) e)) + (define/public (register es) + (let* ([es (if pc (cons (wrap-evt pc + (cond + [k + (lambda (e) + (call-with-continuation-prompt (lambda () + (begin0 + (k e) + (set! k #f)))))] + [on-channel/2 + (lambda (e) + (on-channel/2 pc e))] + [else + on-channel-event])) es) es)] + [es (send psb register es)]) + es)) + (define/public (set-continuation _k) (set! k _k)) + + (define/public (get-raw-msg) (send psb get-raw-msg)) + (define/public (get-msg) + (call-with-composable-continuation + (lambda (_k) + (set! k _k) + (abort-current-continuation (default-continuation-prompt-tag) void)))) + (define/public (put-msg msg) (send psb put-msg msg)) + + (super-new) + ))) + +(define place% + (backlink + (class* + object% (event-container<%>) + (init-field place-exec + ch-id + sc + node) + (field [pd #f]) + (field [psb #f]) + (field [running #f]) + (define (default-on-place-dead e) + (set! pd #f) + (set! psb #f) + (sconn-write-flush sc (dcgm DCGM-DPLACE-DIED -1 -1 ch-id)) + (sconn-remove-subchannel sc ch-id)) + + (init-field [on-place-dead default-on-place-dead]) + + (set! pd + (match place-exec + ;place% is a named place + [(list 'dynamic-place place-path place-func name) + (dynamic-place (->path place-path) place-func)] + [(list 'place place-path place-func name) + ((dynamic-require (->path place-path) place-func))] + ;place% is a single connected place + [(list 'dynamic-place place-path place-func) + (dynamic-place (->path place-path) place-func)] + [(list 'place place-path place-func) + ((dynamic-require (->path place-path) place-func))])) + + (sconn-add-subchannel sc ch-id pd) + (set! psb (new place-socket-bridge% [pch pd] [sch sc] [id ch-id] [node node])) + (define/public (get-channel) pd) + (define/public (stop) + (cond + [pd + (place-kill pd) + (set! pd #f)] + [else (void)])) ;send place not running message + + (define/public (register es) + (let* ([es (if pd (cons (wrap-evt (place-dead-evt pd) on-place-dead) es) es)] + [es (if psb (send psb register es) es)]) + es)) + (super-new) + ))) + +(define connection% + (backlink + (class* + object% (event-container<%>) + (init-field name-pl + ch-id + sc + node) + (field [psb #f]) + + (define-values (pch1 pch2) (place-channel)) + + (define name-ch (send name-pl get-channel)) + + (init-field [on-place-dead #f]) + + (sconn-add-subchannel sc ch-id this) + (set! psb (new place-socket-bridge% [pch pch1] [sch sc] [id ch-id] [node node])) + + (define/public (forward msg) + (place-channel-put name-ch (list msg pch2))) + + (define/public (put msg) + (sconn-write-flush sc (dcgm DCGM-TYPE-INTER-DCHANNEL ch-id ch-id msg))) + (define/public (register es) (send psb register es)) + + (super-new) + ))) + + +(define (ll-channel-put ch msg) (send ch put-msg msg)) +(define (ll-channel-get ch) (send ch get-raw-msg)) + +(define respawn-and-fire% + (backlink + (class* + object% (event-container<%>) + (init-field seconds) + (init-field thunk) + (field [fire-time (current-inexact-milliseconds)]) + + (define/public (register es) + (if fire-time + (cons + (wrap-evt (alarm-evt fire-time) + (lambda (x) + (set! fire-time (+ (current-inexact-milliseconds) (* seconds 1000))) + (thunk))) + es) + es)) + + (super-new) + ))) + +(define after-seconds% + (backlink + (class* + object% (event-container<%>) + (init-field seconds) + (init-field thunk) + (init-field [fire-time (+ (current-inexact-milliseconds) (* seconds 1000))]) + + (define/public (register es) + (if fire-time + (cons + (wrap-evt (alarm-evt fire-time) + (lambda (x) + (set! fire-time #f) + (call-with-continuation-prompt thunk))) + es) + es)) + + (super-new) + ))) + +(define restarter% + (class* + after-seconds% (event-container<%>) + (inherit-field seconds thunk fire-time) + (init-field [retry #f]) + (init-field [on-final-fail #f]) + (super-new [fire-time #f] [thunk #f]) + (init-field [retry-reset (* 2 seconds)]) + (field [last-attempt 0]) + (field [retries 0]) + (define/public (restart restart-func) + (cond + [(and retry (>= retries retry)) + (printf "Already retried to restart ~a times\n" retry) + (and on-final-fail (on-final-fail))] + [(> (- (current-inexact-milliseconds) last-attempt) (* seconds 1000)) + (when (> (- (current-inexact-milliseconds) last-attempt) (* retry-reset 1000)) + (set! retries 0)) + (set! last-attempt (current-inexact-milliseconds)) + (set! retries (+ 1 retries)) + (set! fire-time #f) + (restart-func)] + [else + (set! thunk (lambda () (restart restart-func))) + (set! fire-time (+ (current-inexact-milliseconds) (* seconds 1000)))])) + )) + + +(define (startup-config conf conf-idx) + (start-node-router + (cond + ;master + [(= 0 conf-idx) + (define t-n-c (total-node-count conf)) + (define cv (make-vector t-n-c null)) + (build-down (node-config-node-port (first conf)) cv conf 0) + cv] + ;slave + [else + (listen/init-channels (node-config-node-port (list-ref conf conf-idx)))]))) + +;; Contract: build-down : port channel-vector conf conf-idx -> (void) +;; +;; Purpose: build up channel-vector by connecting to nodes greater than my-id +;; +;; Example: (build-down 6432 channel-vector conf 0) +;; +(define (build-down port cv conf conf-idx) + (define t-n-c (total-node-count conf)) + (match-define (node-config node-name _ node-cnt _ _ _ modpath funcname config-path confname) (list-ref conf conf-idx)) + (define (isself? rid) (equal? rid conf-idx)) + + (for/fold ([my-id #f] + [next-node-id 0]) + ([item conf] + [curr-conf-idx (in-naturals)]) + (match-define (node-config rname rport rcnt _ _ _ modpath funcname conf-path confname) item) + + (define (loopit my-id) + (values my-id (+ next-node-id rcnt))) + (define (remote-spawn) + (define sp (new socket-connection% [host rname] [port rport])) + (define msg (list my-id node-name node-cnt curr-conf-idx next-node-id rname rcnt conf)) + ;(printf "Sending ~v\n" msg) + (sconn-write-flush sp msg) + (for ([i (in-range rcnt)]) + (vector-set! cv (+ next-node-id i) sp)) + (loopit my-id)) + (define (local-spawn) + (for ([i (in-range rcnt)]) + (define sp (dynamic-place (->path modpath) funcname)) + (vector-set! cv (+ next-node-id i) sp) + (place-channel-put sp (list (+ next-node-id i) t-n-c))) + (loopit next-node-id)) + + (cond + [my-id (remote-spawn)] + [(isself? curr-conf-idx) (local-spawn)] + [(not my-id) (loopit my-id)]))) + + +;; Contract: listen/init-channels : port -> VectorOf[ socket-connection%] +;; +;; Purpose: build up channel-vector by listening for connect requests for nodes less than +;; myid. Spawn thread to build channel-vector by connecting to nodes greater than myid. +;; +;; Example: (listen/init-channels 6432) +;; +;; Node 1 Node 2 Node 3 +;; 1 2 3 4 5 6 +;; +;; +(define (listen/init-channels port) + (define listener (tcp-listen (->number port) 4 #t)) + (let loop ([cnt #f] + [thr #f] + [cv #f]) + (define-values (in out) (tcp-accept listener)) + (define sp (new socket-connection% [in in] [out out])) + (match-define (list sid sname scnt myidx myid myname mycnt conf) (read in)) + ;(printf "Listen ~a\n" (list sid sname scnt myidx myid myname mycnt conf)) + (let* + ([cnt (or cnt (- myidx 1))] + [cv (or cv (make-vector (total-node-count conf) null))] + [thr (or thr (thread (lambda () (build-down port cv conf myidx))))]) + + (for ([i (in-range scnt)]) + (vector-set! cv (+ sid i) sp)) + + (if (= 0 cnt) + (begin (thread-wait thr) cv) + (loop (sub1 cnt) thr cv))))) + +(define (supervise-process-at host #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:restart-on-exit [restart-on-exit #f] + . command-line-list) + (void) + ) + +(define (supervise-named-place-thunk-at vm name place-path place-func + #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:restart-on-exit [restart-on-exit #f]) + (send vm launch-place + (list 'place (->string place-path) place-func (->string name)) + ;#:initial-message initial-message + #:restart-on-exit restart-on-exit + )) +(define (spawn-vm-supervise-dynamic-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:restart-on-exit [restart-on-exit #f]) + (define-values (vm pl) + (spawn-vm-supervise-place-at/exec host (list 'dynamic-place (->string place-path) place-func) #:listen-port listen-port + #:initial-message initial-message + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath + #:restart-on-exit restart-on-exit)) + vm) + +(define (spawn-vm-supervise-place-thunk-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:restart-on-exit [restart-on-exit #f]) + (define-values (vm pl) + (spawn-vm-supervise-place-at/exec host (list 'place (->string place-path) place-func) #:listen-port listen-port + #:initial-message initial-message + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath + #:restart-on-exit restart-on-exit)) + vm) + +(define (spawn-vm-supervise-dynamic-place-at/2 host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:restart-on-exit [restart-on-exit #f]) + (spawn-vm-supervise-place-at/exec host (list 'dynamic-place (->string place-path) place-func) #:listen-port listen-port + #:initial-message initial-message + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath + #:restart-on-exit restart-on-exit)) + +(define (spawn-vm-supervise-place-thunk-at/2 host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:restart-on-exit [restart-on-exit #f]) + (spawn-vm-supervise-place-at/exec host (list 'place (->string place-path) place-func) #:listen-port listen-port + #:initial-message initial-message + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath + #:restart-on-exit restart-on-exit)) + +(define (spawn-vm-supervise-place-at/exec host place-exec #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:initial-message [initial-message #f] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)] + #:restart-on-exit [restart-on-exit #f]) + (define vm (spawn-remote-racket-vm host + #:listen-port listen-port + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath)) + (define dp + (send vm launch-place + place-exec + ;#:initial-message initial-message + #:restart-on-exit restart-on-exit + )) + + (values vm dp)) + +(define (master-event-loop #:listen-port [listen-port DEFAULT-ROUTER-PORT] . event-containers) + (define listener (tcp-listen listen-port 4 #t)) + (define nc (new node% [listen-port listener])) + (for ([ec event-containers]) + (send nc add-sub-ec ec) + (send ec backlink nc)) + (send nc sync-events)) + + +(define (spawn-remote-racket-vm host #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->string distributed-launch-path)]) + (new remote-node% + [host-name host] + [listen-port listen-port] + [cmdline-list (list sshpath host racketpath "-tm" distributedlaunchpath "spawn" (->string listen-port))])) + +(define (supervise-dynamic-place-at remote-vm place-path place-func) + (send remote-vm launch-place (list 'dynamic-place (->string place-path) place-func))) + +(define (supervise-place-thunk-at remote-vm place-path place-func) + (send remote-vm launch-place (list 'place (->string place-path) place-func))) + +(define-syntax-rule (every-seconds _seconds _body ...) + (new respawn-and-fire% [seconds _seconds] [thunk (lambda () _body ...)])) + +(define-syntax-rule (after-seconds _seconds _body ...) + (new after-seconds% [seconds _seconds] [thunk (lambda () _body ...)])) + +(define (connect-to-named-place vm name) + (send vm remote-connect name)) + +(define (restart-every seconds #:retry [retry #f] #:on-fail-email [fail-email-address #f] + #:on-final-fail [on-final-fail #f]) + (new restarter% [seconds seconds] [retry retry] + [on-final-fail on-final-fail])) + +(define (log-message severity msg) + (dcgm DCGM-TYPE-LOG-TO-PARENT -1 -1 (list severity msg))) + + + + +;; Contract: node-config -> (void) +;; +;; Purpose: use ssh to launch remote nodes of distributed places +;; +;; Example: +(define (launch-config config) + ;FIXME kill safety + (define nodes + (for/list ([c config] + [i (in-naturals)]) + (list + (call-with-values + (lambda () + (match-define (node-config node-name node-port _ ssh-path racket-path distributed-path mod-path func-name config-path conf-name) c) + (subprocess #f #f #f (ssh-bin-path) node-name racket-path "-tm" + distributed-launch-path + "launch" + config-path + (symbol->string conf-name) + (number->string i))) + list) + c))) + + (define bb (make-bytes 4096)) + (define handlers + (let () + (define (mkhandler port config) + (let () + (define self + (wrap-evt port + (lambda (x) + (define bbl (read-bytes-avail!* bb x)) + (define (print-out x) + (printf "~a:~a:~a ~a\n" (node-config-node-name config) (node-config-node-port config) bbl x) + (flush-output)) + (cond [(eof-object? bbl) + (print-out "EOF") + (set! handlers (remove self handlers))] + [else + (print-out (subbytes bb 0 bbl))])))) + self)) + + (for/fold ([r null]) ([n nodes]) + (list* (mkhandler (second (first n)) (second n)) + (mkhandler (fourth (first n)) (second n)) + r)))) + (define normal-finish #f) + (dynamic-wind + (lambda () (void)) + (lambda () + (let loop () + (apply sync/enable-break handlers) + (unless (null? handlers) + (loop))) + (set! normal-finish #t)) + (lambda () + (unless normal-finish + (for ([n nodes]) + (printf "Killing ~a\n" n) + (define out (third (first n))) + (with-handlers ([exn:fail? (lambda (e) (printf "Error sending Ctrl-C: ~a\n" e))]) + (write-byte 3 out) + (flush-output out) + (sleep)) + (subprocess-kill (first (first n)) #f)))))) diff --git a/collects/racket/place/distributed/examples/hello-world.rkt b/collects/racket/place/distributed/examples/hello-world.rkt new file mode 100644 index 0000000000..8847c0e6bc --- /dev/null +++ b/collects/racket/place/distributed/examples/hello-world.rkt @@ -0,0 +1,30 @@ +#lang racket/base +(require racket/place/distributed + racket/place) + +(provide main + hello-world) + +(define (hello-world) + (place ch + (printf "hello-world received: ~a\n" (place-channel-get ch)) + (define HW "Hello World") + (place-channel-put ch (format "~a\n" HW)) + (printf "hello-world sent: ~a\n" HW))) + + +(define (main) + (define-values (vm pl) + (spawn-vm-supervise-place-thunk-at/2 "localhost" + #:listen-port 6344 + (get-current-module-path) + 'hello-world)) + (master-event-loop + vm + (after-seconds 2 + (dplace-put pl "Hello") + (printf "master-event-loop received: ~a\n" (dplace-get pl))) + + (after-seconds 6 + (exit 0)) + )) diff --git a/collects/racket/place/distributed/examples/logging/bank.rkt b/collects/racket/place/distributed/examples/logging/bank.rkt new file mode 100644 index 0000000000..824416d9b3 --- /dev/null +++ b/collects/racket/place/distributed/examples/logging/bank.rkt @@ -0,0 +1,36 @@ +#lang racket/base +(require racket/match + racket/place/define-remote-server) + +(define-remote-server + bank + + (define-state accounts (make-hash)) + (define-rpc (new-account who) + (match (hash-has-key? accounts who) + [#t '(already-exists)] + [else + (hash-set! accounts who 0) + (log-to-parent #:severity 'debug (format "Logging new account for ~a" who)) + (list 'created who)])) + (define-rpc (removeM who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (cond [(<= amount balance) + (define new-balance (- balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance)] + [else + (list 'insufficient-funds balance)]))] + [else + (list 'invalid-account who)])) + (define-rpc (add who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (define new-balance (+ balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance))] + [else + (list 'invalid-account who)]))) diff --git a/collects/racket/place/distributed/examples/logging/master.rkt b/collects/racket/place/distributed/examples/logging/master.rkt new file mode 100644 index 0000000000..388afe07d9 --- /dev/null +++ b/collects/racket/place/distributed/examples/logging/master.rkt @@ -0,0 +1,37 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/place + racket/runtime-path + "bank.rkt" + "tuple.rkt") +(define-runtime-path bank-path "bank.rkt") +(define-runtime-path tuple-path "tuple.rkt") + +(provide main) + +(define (main) + (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) + (define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) + (define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank)) + + (master-event-loop + remote-vm + (after-seconds 4 + (displayln (bank-new-account bank-place 'user1)) + (displayln (bank-add bank-place 'user1 10)) + (displayln (bank-removeM bank-place 'user1 5))) + + (after-seconds 2 + (define c (connect-to-named-place remote-vm 'tuple-server)) + (define d (connect-to-named-place remote-vm 'tuple-server)) + (displayln (tuple-server-set c "user0" 100)) + (displayln (tuple-server-set d "user2" 200)) + (displayln (tuple-server-get c "user0")) + (displayln (tuple-server-get d "user2")) + (displayln (tuple-server-get d "user0")) + (displayln (tuple-server-get c "user2"))) + (after-seconds 6 + (node-send-exit remote-vm)) + (after-seconds 8 + (exit 0)))) diff --git a/collects/racket/place/distributed/examples/logging/tuple.rkt b/collects/racket/place/distributed/examples/logging/tuple.rkt new file mode 100644 index 0000000000..41565682ab --- /dev/null +++ b/collects/racket/place/distributed/examples/logging/tuple.rkt @@ -0,0 +1,14 @@ +#lang racket/base +(require racket/match + racket/place/define-remote-server) + +(define-named-remote-server + tuple-server + + (define-state h (make-hash)) + (define-rpc (set k v) + (hash-set! h k v) + (log-to-parent #:severity 'debug (format "~a set to ~a" k v)) + v) + (define-rpc (get k) + (hash-ref h k #f))) diff --git a/collects/racket/place/distributed/examples/multiple/bank.rkt b/collects/racket/place/distributed/examples/multiple/bank.rkt new file mode 100644 index 0000000000..c1cef20b3f --- /dev/null +++ b/collects/racket/place/distributed/examples/multiple/bank.rkt @@ -0,0 +1,35 @@ +#lang racket/base +(require racket/match + racket/place/define-remote-server) + +(define-remote-server + bank + + (define-state accounts (make-hash)) + (define-rpc (new-account who) + (match (hash-has-key? accounts who) + [#t '(already-exists)] + [else + (hash-set! accounts who 0) + (list 'created who)])) + (define-rpc (removeM who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (cond [(<= amount balance) + (define new-balance (- balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance)] + [else + (list 'insufficient-funds balance)]))] + [else + (list 'invalid-account who)])) + (define-rpc (add who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (define new-balance (+ balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance))] + [else + (list 'invalid-account who)]))) diff --git a/collects/racket/place/distributed/examples/multiple/master.rkt b/collects/racket/place/distributed/examples/multiple/master.rkt new file mode 100644 index 0000000000..2deb3bf674 --- /dev/null +++ b/collects/racket/place/distributed/examples/multiple/master.rkt @@ -0,0 +1,39 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/place + racket/runtime-path + "bank.rkt") + +(define-runtime-path bank-path "bank.rkt") +(define-runtime-path place-worker-path "place-worker.rkt") +(define-runtime-path process-worker-path "process-worker.rkt") + +(provide main + wait-place-thunk) + +(define (spawn-place-worker-at port message) + (spawn-vm-supervise-dynamic-place-at "localhost" #:listen-port port place-worker-path 'place-worker #:initial-message message #:restart-on-exit #f)) + +(define (wait-place-thunk) + (place ch + (printf "BEGINING SLEEP\n") + (sleep 5) + (printf "SLEEP DONE\n"))) + + +(define (main) + (define bank-vm (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6344 bank-path 'make-bank)) + (define bank-place (send bank-vm get-first-place)) + (master-event-loop + (spawn-place-worker-at 6341 "ONE") + (spawn-place-worker-at 6342 "TWO") + (spawn-place-worker-at 6343 "THREE") + bank-vm + (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk #:restart-on-exit #t) + (every-seconds 3.3 (printf "Hello from every-seconds\n") (flush-output)) + (after-seconds 2 + (displayln (bank-new-account bank-place 'user0)) + (displayln (bank-add bank-place 'user0 10)) + (displayln (bank-removeM bank-place 'user0 5))) + )) diff --git a/collects/racket/place/distributed/examples/multiple/place-worker.rkt b/collects/racket/place/distributed/examples/multiple/place-worker.rkt new file mode 100644 index 0000000000..aab672693d --- /dev/null +++ b/collects/racket/place/distributed/examples/multiple/place-worker.rkt @@ -0,0 +1,30 @@ +#lang racket/base +(require racket/place) + +(provide place-worker + main) + +(define (place-worker ch) + (random-seed (current-seconds)) + ;(define id (place-channel-get ch)) + (define id "HI") + (for ([i (in-range (+ 5 (random 5)))]) + (displayln (list (current-seconds) id i)) + (flush-output) + (place-channel-put ch (list (current-seconds) id i)) + (sleep 3))) + +;(define-values (p1 p2) (place-channel)) +;(place-worker p1) + +(define (main . argv) + (define p (place ch + (random-seed (current-seconds)) + ;(define id (place-channel-get ch)) + (define id "HI") + (for ([i (in-range (+ 5 (random 5)))]) + (displayln (list (current-seconds) id i)) + (flush-output) + ;(place-channel-put ch (list (current-seconds) id i)) + #;(sleep 3)))) + (sync (handle-evt (place-dead-evt p) (lambda (e) (printf "DEAD\n"))))) diff --git a/collects/racket/place/distributed/examples/named/bank.rkt b/collects/racket/place/distributed/examples/named/bank.rkt new file mode 100644 index 0000000000..c1cef20b3f --- /dev/null +++ b/collects/racket/place/distributed/examples/named/bank.rkt @@ -0,0 +1,35 @@ +#lang racket/base +(require racket/match + racket/place/define-remote-server) + +(define-remote-server + bank + + (define-state accounts (make-hash)) + (define-rpc (new-account who) + (match (hash-has-key? accounts who) + [#t '(already-exists)] + [else + (hash-set! accounts who 0) + (list 'created who)])) + (define-rpc (removeM who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (cond [(<= amount balance) + (define new-balance (- balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance)] + [else + (list 'insufficient-funds balance)]))] + [else + (list 'invalid-account who)])) + (define-rpc (add who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (define new-balance (+ balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance))] + [else + (list 'invalid-account who)]))) diff --git a/collects/racket/place/distributed/examples/named/master.rkt b/collects/racket/place/distributed/examples/named/master.rkt new file mode 100644 index 0000000000..bf36adf060 --- /dev/null +++ b/collects/racket/place/distributed/examples/named/master.rkt @@ -0,0 +1,40 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/place + racket/runtime-path + "bank.rkt" + "tuple.rkt") +(define-runtime-path bank-path "bank.rkt") +(define-runtime-path tuple-path "tuple.rkt") + +(provide main) + +(define (main) + (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) + (define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) + (define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank)) + + (master-event-loop + remote-vm + (after-seconds 4 + (displayln (bank-new-account bank-place 'user0)) + (displayln (bank-add bank-place 'user0 10)) + (displayln (bank-removeM bank-place 'user0 5))) + + (after-seconds 2 + (define c (connect-to-named-place remote-vm 'tuple-server)) + (define d (connect-to-named-place remote-vm 'tuple-server)) + (tuple-server-hello c) + (tuple-server-hello d) + (displayln (tuple-server-set c "user0" 100)) + (displayln (tuple-server-set d "user2" 200)) + (displayln (tuple-server-get c "user0")) + (displayln (tuple-server-get d "user2")) + (displayln (tuple-server-get d "user0")) + (displayln (tuple-server-get c "user2")) + ) + (after-seconds 8 + (node-send-exit remote-vm)) + (after-seconds 10 + (exit 0)))) diff --git a/collects/racket/place/distributed/examples/named/tuple.rkt b/collects/racket/place/distributed/examples/named/tuple.rkt new file mode 100644 index 0000000000..ca1179783f --- /dev/null +++ b/collects/racket/place/distributed/examples/named/tuple.rkt @@ -0,0 +1,15 @@ +#lang racket/base +(require racket/match + racket/place/define-remote-server) + +(define-named-remote-server + tuple-server + + (define-state h (make-hash)) + (define-rpc (set k v) + (hash-set! h k v) + v) + (define-rpc (get k) + (hash-ref h k #f)) + (define-cast (hello) + (printf "Hello from define-cast\n")(flush-output))) diff --git a/collects/racket/place/distributed/examples/restart/master.rkt b/collects/racket/place/distributed/examples/restart/master.rkt new file mode 100644 index 0000000000..0fef5d0709 --- /dev/null +++ b/collects/racket/place/distributed/examples/restart/master.rkt @@ -0,0 +1,19 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/place) + +(provide wait-place-thunk) +(provide main) + +(define (wait-place-thunk) + (place ch + (printf "BEGINING SLEEP\n") + (sleep 5) + (printf "SLEEP DONE\n"))) + +(define (main) + (master-event-loop + (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk #:restart-on-exit #t) + (after-seconds 50 + (exit 0)))) diff --git a/collects/racket/place/distributed/examples/restart/restarter.rkt b/collects/racket/place/distributed/examples/restart/restarter.rkt new file mode 100644 index 0000000000..35a239bbb5 --- /dev/null +++ b/collects/racket/place/distributed/examples/restart/restarter.rkt @@ -0,0 +1,18 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/place) + +(provide wait-place-thunk) +(provide main) + +(define (wait-place-thunk) + (place ch + (printf "BEGINING SLEEP\n") + (sleep 1) + (printf "SLEEP DONE\n"))) + +(define (main) + (master-event-loop + (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk + #:restart-on-exit (restart-every 5 #:retry 3)))) diff --git a/collects/racket/place/distributed/info.rkt b/collects/racket/place/distributed/info.rkt new file mode 100644 index 0000000000..7e41f36e49 --- /dev/null +++ b/collects/racket/place/distributed/info.rkt @@ -0,0 +1,3 @@ +#lang setup/infotab + +(define compile-omit-paths '("examples")) diff --git a/collects/racket/place/distributed/launch.rkt b/collects/racket/place/distributed/launch.rkt new file mode 100644 index 0000000000..29f7f83fe7 --- /dev/null +++ b/collects/racket/place/distributed/launch.rkt @@ -0,0 +1,18 @@ +#lang racket/base +(require racket/match + racket/tcp + racket/place/distributed) + +(provide main) + +(define (main . args) + (match args + [(list "spawn" node-port) + (define listener (tcp-listen (->number node-port) 4 #t)) + (write-flush (list (->number node-port))) + (start-spawned-node-router listener)] + + ;; Used to launch Design Pattern 1, MPI style distributed system. + [(list "launch" mod-path conf-name i) + (startup-config (dynamic-require (->path mod-path) (string->symbol conf-name)) (->number i))])) + diff --git a/collects/scribblings/guide/distributed.scrbl b/collects/scribblings/guide/distributed.scrbl new file mode 100644 index 0000000000..da5d16e08d --- /dev/null +++ b/collects/scribblings/guide/distributed.scrbl @@ -0,0 +1,140 @@ +#lang scribble/doc +@(require scribble/manual + (except-in "guide-utils.rkt" log-message) + scribble/eval + scriblib/figure + racket/port + racket/contract + (for-label racket/place/distributed)) + +@(define (codeblockfromfile filename) + (call-with-input-file + filename + (lambda (i) + (codeblock (port->string i))))) + +@title[#:tag "distributed-places"]{Distributed Places} + +The @racketmodname[racket/place/distributed] library provides support for +distributed programming. + +The example bellow demonstrates how to launch a remote racket vm instance, +launch remote places on the new remote vm instance, and start an +event loop that monitors the remote vm instance. + +The example code can also be found in +@filepath{racket/distributed/examples/named/master.rkt}. + +@figure["named-example-master" "examples/named/master.rkt"]{ +@codeblockfromfile["../../racket/place/distributed/examples/named/master.rkt"]} + +The @racket[spawn-remote-racket-vm] primitive connects to +@tt{"localhost"} and starts a racloud node there that listens on port +6344 for further instructions. The handle to the new racloud node is +assigned to the @racket[remote-vm] variable. Localhost is used so that +the example can be run using only a single machine. However localhost +can be replaced by any host with ssh publickey access and racket. The +@racket[supervise-named-place-thunk-at] creates a new place on the +@racket[remote-vm]. The new place will be identified in the future by +its name symbol @racket['tuple-server]. A place descriptor is +expected to be returned by dynamically requiring +@racket['make-tuple-server] from the @racket[tuple-path] module and +invoking @racket['make-tuple-server]. + +The code for the tuple-server place exists in the file +@filepath{tuple.rkt}. The @filepath{tuple.rkt} file contains the use of +@racket[define-named-remote-server] form, which defines a RPC server +suitiable for invocation by @racket[supervise-named-place-thunk-at]. + + + +@figure["named-example" "examples/named/tuple.rkt"]{ +@codeblockfromfile["../../racket/place/distributed/examples/named/tuple.rkt"]} + + + +The @racket[define-named-remote-server] form takes an identifier and a +list of custom expressions as its arguments. From the identifier a +place-thunk function is created by prepending the @tt{make-} prefix. +In this case @racket[make-tuple-server]. The +@racket[make-tuple-server] identifier is the +@racket{compute-instance-place-function-name} given to the +@racket[supervise-named-place-thunk-at] form above. The +@racket[define-state] custom form translates into a simple +@racket[define] form, which is closed over by @racket[define-rpc] +forms. + +The @racket[define-rpc] form is expanded into two parts. The first +part is the client stub that calls the rpc function. The client +function name is formed by concatenating the +@racket[define-named-remote-server] identifier, @tt{tuple-server}. +with the RPC function name @tt{set} to form @racket[tuple-server-set]. +The RPC client functions take a destination argument which is a +@racket[remote-connection%] descriptor and then the RPC function +arguments. The RPC client function sends the RPC function name, +@racket[set], and the RPC arguments to the destination by calling an +internal function @racket[named-place-channel-put]. The RPC client +then calls @racket[named-place-channel-get] to wait for the RPC +response. + +The second expansion part of @racket[define-rpc] is the server +implementation of the RPC call. The server is implemented by a match +expression inside the @racket[make-tuple-server] function. The match +clause for @racket[tuple-server-set] matches on messages beginning +with the @racket['set] symbol. The server executes the RPC call with +the communicated arguments and sends the result back to the RPC +client. + +The @racket[define-rpc] form is similar to the @racket[define-rpc] form +except there is no reply message from the server to client + +@figure["define-named-remote-server-expansion" "Expansion of define-named-remote-server"]{ +@codeblock{ +'(begin + (require racket/place racket/match) + (define/provide + (tuple-server-set dest k v) + (named-place-channel-put dest (list 'set k v)) + (named-place-channel-get dest)) + (define/provide + (tuple-server-get dest k) + (named-place-channel-put dest (list 'get k)) + (named-place-channel-get dest)) + (define/provide + (tuple-server-hello dest) + (named-place-channel-put dest (list 'hello))) + (define/provide + (make-tuple-server) + (place + ch + (let () + (define h (make-hash)) + (let loop () + (define msg (place-channel-get ch)) + (define (log-to-parent-real msg #:severity (severity 'info)) + (place-channel-put ch (log-message severity msg))) + (syntax-parameterize + ((log-to-parent (make-rename-transformer #'log-to-parent-real))) + (match + msg + ((list (list 'set k v) src) + (define result (let () (hash-set! h k v) v)) + (place-channel-put src result) + (loop)) + ((list (list 'get k) src) + (define result (let () (hash-ref h k #f))) + (place-channel-put src result) + (loop)) + ((list (list 'hello) src) + (define result + (let () (printf "Hello from define-cast\n") (flush-output))) + (loop)))) + loop)))) + (void)) +} +} + + + + + diff --git a/collects/scribblings/guide/performance.scrbl b/collects/scribblings/guide/performance.scrbl index 26e469a932..71d06427cc 100644 --- a/collects/scribblings/guide/performance.scrbl +++ b/collects/scribblings/guide/performance.scrbl @@ -418,3 +418,4 @@ argument instead. @include-section["futures.scrbl"] @include-section["places.scrbl"] +@include-section["distributed.scrbl"] diff --git a/collects/scribblings/reference/concurrency.scrbl b/collects/scribblings/reference/concurrency.scrbl index cb8fa93d5b..15ac304602 100644 --- a/collects/scribblings/reference/concurrency.scrbl +++ b/collects/scribblings/reference/concurrency.scrbl @@ -18,3 +18,4 @@ support for parallelism to improve performance. @include-section["thread-local.scrbl"] @include-section["futures.scrbl"] @include-section["places.scrbl"] +@include-section["distributed.scrbl"] diff --git a/collects/scribblings/reference/distributed.scrbl b/collects/scribblings/reference/distributed.scrbl new file mode 100644 index 0000000000..f55656981b --- /dev/null +++ b/collects/scribblings/reference/distributed.scrbl @@ -0,0 +1,746 @@ +#lang scribble/manual +@(require scribble/eval + scribble/struct + scribble/decode + racket/contract + racket/place/distributed + racket/sandbox + racket/class) +@(require (for-label racket/place/distributed racket/class)) + + +@(define evaler (make-base-eval)) +@(interaction-eval #:eval evaler (require racket/place/distributed + racket/class + racket/place/define-remote-server)) + +@(define output-evaluator + (parameterize ([sandbox-output 'string] + [sandbox-error-output 'string]) + (make-evaluator 'racket/base))) + +@(define-syntax interaction-output + (syntax-rules () + [(_ #:eval evaluator e) + (begin + (interaction #:eval evaluator e) + (printf "K ~a\n" (get-output evaluator)))])) + +@title[#:tag "distributed-places"]{Distributed Places} + +@defmodule[racket/place/distributed] + +Distributed Places is a prototype of a distributed computing framework for +Racket. + +Distributed Places' distributed computing design is centered around +machine nodes that do computation. The user/programmer configures a +new distributed system using a declarative syntax and callbacks. A +node begins life with one initial place, the message router. @;{See +@figure-ref["node-places"]}. Once the node has been configured the +message router is activated by calling the @racket[master-event-loop] +function. The message router listens on a TCP port for incoming +connections from other nodes in the distributed system. Compute places +can be spawned within the node by sending place-spawn request messages +to the node's message router. + +The use of Distributed Places is predicated on a couple assumptions: + +@itemlist[ +@item{ .ssh/config and authorized_keys are configured correctly to + allow passwordless connection to remote hosts using public key + authentication.} +@item{The same user account is used across all nodes in the + distributed network.} +@item{All machines run the same version of Racket.} +] + +@examples[ +(module hello-world-example racket/base + (require racket/place/distributed + racket/place) + + (provide main + hello-world) + + (define (hello-world) + (place ch + (printf "hello-world received: ~a\n" (place-channel-get ch)) + (define HW "Hello World") + (place-channel-put ch (format "~a\n" HW)) + (printf "hello-world sent: ~a\n" HW))) + + + (define (main) + (define-values (vm pl) + (spawn-vm-supervise-place-thunk-at/2 "localhost" + #:listen-port 6344 + (get-current-module-path) + 'hello-world)) + (master-event-loop + vm + (after-seconds 2 + (dplace-put pl "Hello") + (printf "master-event-loop received: ~a\n" (dplace-get pl))) + + (after-seconds 6 + (exit 0)) + ))) + +(require 'hello-world-example) +] + +@defproc[(master-event-loop [ec events-container<%>?] ...+) void?]{ + Waits in an endless loop for one of many events to become ready. The + @racket[master-event-loop] procedure constructs a @racket[node%] + instance to serve as the message router for then node. The + @racket[master-event-loop] procedure then adds all the declared + @racket[events-container<%>]s to the @racket[node%] and finally calls + the never ending loop @racket[sync-events] method, which handles the + events for the node. +} + +@(define (p . l) (decode-paragraph l)) +@(define spawn-vm-note + (make-splice + (list + @p{This function returns a @racket[remote-node%] instance not a @racket[remote-place%] + Call @racket[(send vm get-first-place)] to obtain the @racket[remote-place%] instance.})) ) + +@defproc[(spawn-vm-supervise-dynamic-place-at + [hostname string?] + [compute-instance-module-path module-path?] + [compute-instance-place-function-name symbol?] + [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] + [#:initial-message initial-message any? #f] + [#:racket-path racketpath string-path? (racket-path)] + [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] + [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] + [#:restart-on-exit restart-on-exit boolean? #f]) remote-place?]{ +Spawns a new remote vm node at @racket[hostname] with one compute instance place specified by +the @racket[compute-instance-module-path] and @racket[compute-instance-place-function-name] +parameters. This procedure constructs the new remote-place by calling +@racket[(dynamic-place compute-instance-module-path compute-instance-place-function-name)]. +@|spawn-vm-note| +} + +@defproc[(spawn-vm-supervise-dynamic-place-at/2 + [hostname string?] + [compute-instance-module-path module-path?] + [compute-instance-place-function-name symbol?] + [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] + [#:initial-message initial-message any? #f] + [#:racket-path racketpath string-path? (racket-path)] + [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] + [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] + [#:restart-on-exit restart-on-exit boolean? #f]) (values remote-node%? remote-place%?)]{ +Spawns a new remote vm node at @racket[hostname] with one compute instance place specified by +the @racket[compute-instance-module-path] and @racket[compute-instance-place-function-name] +parameters. This procedure constructs the new remote-place by calling +@racket[(dynamic-place compute-instance-module-path compute-instance-place-function-name)]. +The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two return values. +} + +@(define place-thunk-function + (make-splice + (list + @p{ +The @racket[compute-instance-thunk-function-name] procedure is +responsible for creating the place and returning the newly constructed +the place descriptor. The +@racket[compute-instance-thunk-function-name] procedure should +accomplish this by calling either @racket[dynamic-place] or +@racket[place] inside the thunk. + })) ) +@defproc[(spawn-vm-supervise-place-thunk-at + [hostname string?] + [compute-instance-module-path module-path?] + [compute-instance-thunk-function-name symbol?] + [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] + [#:initial-message initial-message any? #f] + [#:racket-path racketpath string-path? (racket-path)] + [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] + [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] + [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ +Spawns a new remote vm node at @racket[hostname] with one compute instance place. +the @racket[compute-instance-module-path] and @racket[compute-instance-thunk-function-name] +parameters. This procedure constructs the new remote-place by calling +dynamically requiring the +@racket[compute-instance-thunk-function-name] and invoking the +@racket[compute-instance-thunk-function-name]. + +@racket[((dynamic-require compute-instance-module-path compute-instance-thunk-function-name))] + +@|place-thunk-function| +@|spawn-vm-note| +} +@defproc[(spawn-vm-supervise-place-thunk-at/2 + [hostname string?] + [compute-instance-module-path module-path?] + [compute-instance-thunk-function-name symbol?] + [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] + [#:initial-message initial-message any? #f] + [#:racket-path racketpath string-path? (racket-path)] + [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] + [#:launcher-path launcherpath string-path? (->string distributed-launch-path)] + [#:restart-on-exit restart-on-exit boolean? #f]) (values remote-vm%? remote-place%?)]{ +Spawns a new remote vm node at @racket[hostname] with one compute instance place. +the @racket[compute-instance-module-path] and @racket[compute-instance-thunk-function-name] +parameters. This procedure constructs the new remote-place by calling +dynamically requiring the +@racket[compute-instance-thunk-function-name] and invoking the +@racket[compute-instance-thunk-function-name]. + +@racket[((dynamic-require compute-instance-module-path compute-instance-thunk-function-name))] + +@|place-thunk-function| +The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two return values. +} + +@defproc[(spawn-remote-racket-vm + [hostname string?] + [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT] + [#:racket-path racketpath string-path? (racket-path)] + [#:ssh-bin-path sshpath string-path? (ssh-bin-path)] + [#:launcher-path launcherpath string-path? (->string distributed-launch-path)]) remote-node%?]{ +Spawns a new remote vm node at @racket[hostname] and returns a @racket[remote-node%] handle. +} +@defproc[(supervise-dynamic-place-at + [remote-vm remote-vm?] + [compute-instance-module-path module-path?] + [compute-instance-place-function-name symbol?] + [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ +Creates a new place on the @racket[remote-vm] by using +@racket[dynamic-place] to invoke +@racket[compute-instance-place-function-name] from the module +@racket[compute-instance-module-path]. +} + +@defproc[(supervise-place-thunk-at + [remote-vm remote-vm?] + [compute-instance-module-path module-path?] + [compute-instance-thunk-function-name symbol?] + [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ +Creates a new place on the @racket[remote-vm] by executing the thunk +@racket[compute-instance-thunk-function-name] from the module +@racket[compute-instance-module-path]. + +@|place-thunk-function| +} + +@defproc[(supervise-process-at + [hostname string?] + [commandline-argument string?] ...+ + [#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]) remote-process%?]{ +Spawns an attached external process at host @racket[hostname]. +} + +@defproc[(supervise-named-dynamic-place-at + [remote-vm remote-vm?] + [place-name symbol?] + [compute-instance-module-path module-path?] + [compute-instance-place-function-name symbol?] + [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ +Creates a new place on the @racket[remote-vm] by using +@racket[dynamic-place] to invoke +@racket[compute-instance-place-function-name] from the module +@racket[compute-instance-module-path]. The @racket[place-name] symbol +is used to establish later connections to the named place. +} + +@defproc[(supervise-named-place-thunk-at + [remote-vm remote-vm?] + [place-name symbol?] + [compute-instance-module-path module-path?] + [compute-instance-thunk-function-name symbol?] + [#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{ +Creates a new place on the @racket[remote-vm] by executing the thunk +@racket[compute-instance-thunk-function-name] from the module +@racket[compute-instance-module-path]. The @racket[place-name] symbol +is used to establish later connections to the named place. + + +@|place-thunk-function| +} + +@defform[(restart-every [seconds (and/c real? nonnegative?)] + [#:retry retry (or/c nonnegative-integer? #f) #f] + [#:on-final-fail on-final-fail (or/c #f (-> any/c)) #f])]{ + +Returns a @racket[restarter%] instance that should be supplied to a @racket[#:restart-on-exit] argument. +} + +@defform[(every-seconds seconds body ....)]{ +Returns a @racket[respawn-and-fire%] instance that should be supplied to a @racket[master-event-loop]. +The @racket[respawn-and-fire%] instance executes the body expressions every @racket[seconds]. +} + +@defform[(after-seconds seconds body ....)]{ +Returns a @racket[after-seconds%] instance that should be supplied to a @racket[master-event-loop]. +Executes the body expressions after a delay of @racket[seconds] from the start of the event loop. +} + +@defproc[(connect-to-named-place [vm remote-node%?] [name symbol?]) remote-connection%?]{ +Connects to a named place on the @racket[vm] named @racket[name] and returns a @racket[remote-connection%] object. +} + +@defproc[(log-message [severity symbol?] [msg string?]) void?]{ + Logs a message, which traversed the process tree until it reaches the root, where it is printed to the console. +} + +@definterface[event-container<%> ()]{ + All objects that are supplied to the @racket[master-event-loop] must + implement the @racket[event-container<%>] interface. The + @racket[master-event-loop] calls the @racket[register] method on each + supplied @racket[event-container<%>] to obtain a list of events the + event loop should wait for. + + @defmethod[(register [events (listof events?)]) (listof events?)]{ + Returns the list of events inside the @racket[event-container<%>] that + should be waited on by the @racket[master-event-loop]. + } + +The following classes all implement @racket[event-container<%>] and +can be supplied to a @racket[master-event-loop]: +@racket[spawned-process%], @racket[place-socket-bridge%], +@racket[node%], @racket[remote-node%], @racket[remote-place%], +@racket[place%] @racket[connection%], @racket[respawn-and-fire%], and +@racket[after-seconds%]. + +} + +@defclass[spawned-process% object% (event-container<%>) + (defmethod (get-pid) exact-positive-integer?) ]{ + +@defconstructor[([cmdline-list (listof (or/c string? path?))] + [parent remote-node%? #f] + )]{ +The @racket[cmdline-list] is a list of command line arguments of type @racket[string] and/or @racket[path]. + +The @racket[parent] argument is a @racket[remote-node%] instance that will be notified when the process dies via +a @racket[(send parent process-died this)] call. +} +} + +@examples[ #:eval evaler +(new spawned-process% [cmdline-list + (list (ssh-bin-path) "localhost" (racket-path) "-tm" distributed-launch-path "spawn" (->string 6340))]) +] + +@defclass[place-socket-bridge% object% (event-container<%>) + (defmethod (get-sc-id) exact-positive-integer?) ]{ + +@defconstructor[([pch place-channel?] + [sch socket-connection%?] + [id exact-positive-integer?] + )]{ +The @racket[pch] argument is a @racket[place-channel]. Messages +received on @racket[pch] are forwarded to the socket-connection% +@racket[sch] via a @racket[dcgm] message. e.g. +@racket[(sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))] +The @racket[id] is a @racket[exact-positive-integer] that identifies +the socket-connection subchannel for this inter-node place connection. +} +} + +@defclass[node% object% (event-container<%>)]{ + +The @racket[node%] instance controls a distributed places node. It +launches compute places and routes inter-node place messages in the +distributed system. The @racket[master-event-loop] form constructs a +@racket[node%] instance under the hood. Newly spawned nodes also have +a @racket[node%] instance in their initial place that serves as the +node's message router. + +@defconstructor[([listen-port tcp-listen-port? #f])]{ + Constructs a @racket[node%] that will listen on @racket[listen-port] for inter-node connections.} + +@defmethod[(sync-events) void?]{ + Starts the never ending event loop for this distributed places node. +} +} + +@(define place-exec-note + (make-splice + (list + @p{The @racket[place-exec] argument describes how the remote place should be launched.} + @itemize[@item{@racket[(list 'place place-module-path place-thunk)]} + @item{@racket[(list 'dynamic-place place-module-path place-func)]}] + @p{The difference between these two launching methods is that + the @racket['place] version of @racket[place-exec] expects a + thunk, zero argument function, to be exported by the module + @racket[place-module-path]. Executing the thunk is expected to + create a new place and return a place descriptor to the newly + created place. The @racket['dynamic-place] version of + @racket[place-exec] expects place-func to be a function taking a + single argument, the initial channel argument, and calls + @racket[dynamic-place] on behalf of the user and creates the new + place from the @racket[place-module-path] and + @racket[place-func].} +))) + +@(define one-sided-note + (make-splice + (list + @p{The @racket[#:one-sided-place] argument is an internal use + argument for launching remote places from within a place using + the old design pattern 1.}))) + +@(define restart-on-exit-note + (make-splice + (list + @p{The @racket[#:restart-on-exit] boolean argument instructs the + remote-place% instance to respawn the place on the remote node + should it exit or terminate at any time. This boolean needs to + be expanded to a restart criteria object in the future.}))) + + +@defclass[remote-node% object% (event-container<%>)]{ + + The @racket[node%] instance controls a distributed places node. It + launches compute places and routes inter-node place messages in the + distributed system. This is the remote api to a distributed places + node. Instances of @racket[remote-node%] are returned by + @racket[spawn-remote-racket-vm], + @racket[spawn-vm-supervise-dynamic-place-at], and + @racket[spawn-vm-supervise-place-thunk-at]. + + @defconstructor[([listen-port tcp-listen-port? #f] + [restart-on-exit boolean? #f])]{ + Constructs a @racket[node%] that will listen on + @racket[listen-port] for inter-node connections. + + When set to true the @racket[restart-on-exit] parameter causes the + specified node to be restarted when the ssh session spawning the node + dies. + } + + @defmethod[(get-first-place) remote-place%?]{ + Returns the @racket[remote-place%] object instance for the first place spawned on this node. + } + @defmethod[(get-first-place-channel) place-channel?]{ + Returns the communication channel for the first place spawned on this node. + } + @defmethod[(get-log-prefix) string?]{ + Returns @racket[(format "PLACE ~a:~a" host-name listen-port)] + } + + @defmethod[(launch-place + [place-exec list?] + [#:restart-on-exit restart-on-exit boolean? #f] + [#:one-sided-place one-sided-place boolean? #f]) remote-place%?]{ + Launches a place on the remote node represented by this @racket[remote-node%] instance. + @|place-exec-note| + @|one-sided-note| + @|restart-on-exit-note| + } + + @defmethod[(remote-connect [name string?]) remote-connection%]{ + Connects to a named place on the remote node represented by this @racket[remote-node%] instance. + } + + @defmethod[(send-exit) void?]{ + Sends a message instructing the remote node represented by this + @racket[remote-node%] instance to exit immediately + } +} + +@defclass[remote-place% object% (event-container<%>)]{ + +The @racket[remote-place%] instance provides a remote api to a place +running on a remote distributed places node. It launches a compute +places and routes inter-node place messages to the remote place. + +@defconstructor[([vm remote-node%?] + [place-exec list?] + [restart-on-exit #f] + [one-sided-place #f] + [on-channel/2 #f])]{ + Constructs a @racket[remote-place%] instance. + @|place-exec-note| + @|one-sided-note| + @|restart-on-exit-note| + + See @racket[set-on-channel/2!] for description of @racket[on-channel/2] argument. +} + +@defmethod[(set-on-channel/2! [callback (-> channel msg void?)]) void?]{ + Installs a handler function that handles messages from the remote place. + The @racket[setup/distributed-docs] module uses this callback to handle job completion messages. +} +} + +@defproc[(dplace-put [pl remote-place%?] [msg any/c]) void?]{ + This function is used inside @racket[master-event-loop] callbacks. + It sends messages to remote places. +} + +@defproc[(dplace-get [pl remote-place%?]) any/c]{ + This function is used inside @racket[master-event-loop] callbacks. + It takes the current delimited continuation and resumes it when a message arrives from @racket[pl]. +} + +@defclass[remote-connection% object% (event-container<%>)]{ + +The @racket[remote-connection%] instance provides a remote api to a named place +running on a remote distributed places node. It connects to a named compute +places and routes inter-node place messages to the remote place. + +@defconstructor[([vm remote-node%?] + [name string?] + [restart-on-exit #f] + [on-channel/2 #f])]{ + Constructs a @racket[remote-place%] instance. + @|restart-on-exit-note| + + See @racket[set-on-channel/2!] for description of @racket[on-channel/2] argument. +} + +@defmethod[(set-on-channel/2! [callback (-> channel msg void?)]) void?]{ + Installs a handler function that handles messages from the remote place. + The @racket[setup/distributed-docs] module uses this callback to handle job completion messages. +} +} + +@defclass[place% object% (event-container<%>)]{ + + The @racket[place%] instance represents a place launched on a + distributed places node at that node. It launches a compute places and + routes inter-node place messages to the place. + + @defconstructor[([vm remote-place%?] + [place-exec list?] + [ch-id exact-positive-integer?] + [sc socket-connection%?] + [on-place-dead (-> event void?) default-on-place-dead])]{ + Constructs a @racket[remote-place%] instance. + @|place-exec-note| + The @racket[ch-id] and @racket[sc] arguments are internally used to + establish routing between the remote node spawning this place and the + place itself. The @racket[on-place-dead] callback handles the event + when the newly spawned place terminates. + } + + @defmethod[(wait-for-die) void?]{ + Blocks and waits for the subprocess representing the @racket[remote-node%] to exit. + } +} + +@defproc[(node-send-exit [remote-node% node]) void?]{ +Sends @racket[node] a message telling it to exit immediately. +} +@defproc[(node-get-first-place [remote-node% node]) remote-place%?]{ +Returns the @racket[remote-place%] instance of the first place spawned at this node +} + +@defclass[connection% object% (event-container<%>)]{ + +The @racket[connection%] instance represents a connection to a +named-place instance running on the current node. It routes inter-node +place messages to the named place. + +@defconstructor[([vm remote-node%?] + [name string?] + [ch-id exact-positive-integer?] + [sc socket-connection%?])]{ + Constructs a @racket[remote-place%] instance. + @|place-exec-note| + The @racket[ch-id] and @racket[sc] arguments are internally used to + establish routing between the remote node and this named-place. + } +} + +@defclass[respawn-and-fire% object% (event-container<%>)]{ + + The @racket[respawn-and-fire%] instance represents a thunk that should + execute every @racket[n] seconds. + +@defconstructor[([seconds (and/c real? (not/c negative?))] + [thunk (-> void?)])]{ + Constructs a @racket[respawn-and-fire%] instance that when placed + inside a @racket[master-event-loop] construct causes the supplied + thunk to execute every @racket[n] seconds. +} +} + +@defclass[after-seconds% object% (event-container<%>)]{ + + The @racket[after-seconds%] instance represents a thunk that should + execute after @racket[n] seconds. + +@defconstructor[([seconds (and/c real? (not/c negative?))] + [thunk (-> void?)])]{ + Constructs an @racket[after-seconds%] instance that when placed + inside a @racket[master-event-loop] construct causes the supplied + thunk to execute after @racket[n] seconds. +} +} + +@defclass[restarter% after-seconds% (event-container<%>)]{ + + The @racket[restarter%] instance represents a restart strategy. + +@defconstructor[([seconds (and/c real? (not/c negative?))] + [retry (or/c #f nonnegative-integer?) #f] + [on-final-fail (or/c #f (-> any/c)) #f])]{ + Constructs an @racket[restarter%] instance that when supplied to a + @racket[#:restart-on-exit] argument, attempts to restart the process + every @racket[seconds]. The @racket[retry] argument specifies how + many time to attempt to restart the process before giving up. If the + process stays alive for @racket[(* 2 seconds)] the attempted retries + count is reset to @racket[0]. The @racket[on-final-fail] thunk is + called when the number of retries is exceeded +} +} + + + +@defform[(define-remote-server name forms ...)]{ + +Creates a @racket[make-name] function that spawns a place running a instance of the @racket[name] +remote server. The server sits in a loop waiting for rpc requests from the @racket[define-rpc] functions +documented below. + +@defform[(define-state id value)]{ + Expands to a @@racket[define], which is closed over by the @racket[define-rpc] functions + to form local state. +} + +@defform[(define-rpc (id args ...) body ...)]{ + Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to + the rpc server @racket[rpc-place] and waits for a response. + @racket[(define (name-id rpc-place args ...) body)] +} + +@defform[(define-cast (id args ...) body ...)]{ + Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to + the rpc server @racket[rpc-place] but does not receive any response. A cast is a one-way communication + technique. + @racket[(define (name-id rpc-place args ...) body)] +} +} + +@examples[ #:eval evaler +(define-named-remote-server + tuple-server + + (define-state h (make-hash)) + (define-rpc (set k v) + (hash-set! h k v) + v) + (define-rpc (get k) + (hash-ref h k #f)))] + +@examples[ #:eval evaler +(define-remote-server + bank + + (define-state accounts (make-hash)) + (define-rpc (new-account who) + (match (hash-has-key? accounts who) + [#t '(already-exists)] + [else + (hash-set! accounts who 0) + (list 'created who)])) + (define-rpc (removeM who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (cond [(<= amount balance) + (define new-balance (- balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance)] + [else + (list 'insufficient-funds balance)]))] + [else + (list 'invalid-account who)])) + (define-rpc (add who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (define new-balance (+ balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance))] + [else + (list 'invalid-account who)])))] + +@defthing[distributed-launch-path path?]{ +Contains the path to the distributed places launcher. +} + +@defproc[(ssh-bin-path) string?]{ +Returns the path to the ssh binary on the local system in string form. +} +@examples[ #:eval evaler +(ssh-bin-path) +] + +@defproc[(racket-path) path?]{ +Returns the path to the currently executing racket binary on the local system. +} +@examples[ #:eval evaler +(racket-path) +] + +@defproc[(distributed-places-path) string?]{ +Returns the path to the distributed-places module on the local system. +} +@examples[ #:eval evaler +(distributed-places-path) +] + +@defform[(get-current-module-path)]{ +Returns the path to the current module. +} +@examples[ #:eval evaler +(begin + (module my-module racket/base + (require racket/place/distributed) + (get-current-module-path)) + (require 'my-module)) +] + +@defproc[(->string) string?]{ +Coerces strings, numbers, symbols, and paths to a string. +} +@examples[ #:eval evaler +(->string "hello") +(->string 1) +(->string 'bye) +(->string (build-path "ridge")) +(->string #"bytes") +] + +@defproc[(->number) number?]{ +Coerces strings, numbers, to a number. +} +@examples[ #:eval evaler +(->number "100") +(->number 1) +] + +@defproc[(->path) path?]{ +Coerces paths and strings to a path. +} +@examples[ #:eval evaler +(->path "/usr/bin") +(->path (build-path "ridge")) +] + +@defproc[(->length) path?]{ +Returns the length of strings, bytes, and lists. +} +@examples[ #:eval evaler +(->length "Boo") +(->length #"Woo") +(->length (list 1 2 3 4)) +] + +@defproc[(write-flush [datum any?] [port port?]) (void)]{ +Writes @racket[datum] to @racket[port] and then flushes @racket[port]. +} +@examples[ #:eval evaler +(write-flush "Hello World" (current-output-port)) +] + +@(close-eval evaler) diff --git a/collects/tests/racket/place-channel.rkt b/collects/tests/racket/place-channel.rkt index eba62e30f2..953966a992 100644 --- a/collects/tests/racket/place-channel.rkt +++ b/collects/tests/racket/place-channel.rkt @@ -43,7 +43,8 @@ (printf "~s\n" res) (let ([ok? (equal? expect res)]) (unless ok? - (printf " BUT EXPECTED ~s\n" expect)) + (printf " BUT EXPECTED ~s\n" expect) + (eprintf "ERROR\n")) ok?))) (define (echo ch) (place-channel-put ch (place-channel-get ch))) diff --git a/collects/tests/racket/place/distributed/bank.rkt b/collects/tests/racket/place/distributed/bank.rkt new file mode 100644 index 0000000000..c1cef20b3f --- /dev/null +++ b/collects/tests/racket/place/distributed/bank.rkt @@ -0,0 +1,35 @@ +#lang racket/base +(require racket/match + racket/place/define-remote-server) + +(define-remote-server + bank + + (define-state accounts (make-hash)) + (define-rpc (new-account who) + (match (hash-has-key? accounts who) + [#t '(already-exists)] + [else + (hash-set! accounts who 0) + (list 'created who)])) + (define-rpc (removeM who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (cond [(<= amount balance) + (define new-balance (- balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance)] + [else + (list 'insufficient-funds balance)]))] + [else + (list 'invalid-account who)])) + (define-rpc (add who amount) + (cond + [(hash-ref accounts who (lambda () #f)) => + (lambda (balance) + (define new-balance (+ balance amount)) + (hash-set! accounts who new-balance) + (list 'ok new-balance))] + [else + (list 'invalid-account who)]))) diff --git a/collects/tests/racket/place/distributed/distributed.rkt b/collects/tests/racket/place/distributed/distributed.rkt new file mode 100644 index 0000000000..1a8bc4b575 --- /dev/null +++ b/collects/tests/racket/place/distributed/distributed.rkt @@ -0,0 +1,54 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/place + racket/runtime-path + "bank.rkt" + "tuple.rkt") +(define-runtime-path bank-path "bank.rkt") +(define-runtime-path tuple-path "tuple.rkt") + +(provide main) + +(define (main) + (define (test expect fun . args) + (printf "~s ==> " (cons fun args)) + (flush-output) + (let ([res (if (procedure? fun) + (apply fun args) + (car args))]) + (printf "~s\n" res) + (let ([ok? (equal? expect res)]) + (unless ok? + (printf " BUT EXPECTED ~s\n" expect) + (eprintf "ERROR\n")) + ok?))) + + (define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344)) + (define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server)) + (define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank)) + + (master-event-loop + remote-vm + (after-seconds 2 + (define c (connect-to-named-place remote-vm 'tuple-server)) + (define d (connect-to-named-place remote-vm 'tuple-server)) + (tuple-server-hello c) + (tuple-server-hello d) + (test 100 tuple-server-set c "user0" 100) + (test 200 tuple-server-set d "user2" 200) + (test 100 tuple-server-get c "user0") + (test 200 tuple-server-get d "user2") + (test 100 tuple-server-get d "user0") + (test 200 tuple-server-get c "user2") + ) + (after-seconds 4 + (test '(created user1) bank-new-account bank-place 'user1) + (test '(ok 10) bank-add bank-place 'user1 10) + (test '(ok 5) bank-removeM bank-place 'user1 5) + ) + + (after-seconds 15 + (node-send-exit remote-vm)) + (after-seconds 20 + (exit 0)))) diff --git a/collects/tests/racket/place/distributed/restarter.rkt b/collects/tests/racket/place/distributed/restarter.rkt new file mode 100644 index 0000000000..e61ac3e71b --- /dev/null +++ b/collects/tests/racket/place/distributed/restarter.rkt @@ -0,0 +1,21 @@ +#lang racket/base +(require racket/place/distributed + racket/class + racket/place) + +(provide wait-place-thunk) +(provide main) + +(define (wait-place-thunk) + (place ch + (printf "BEGINING SLEEP\n") + (sleep 1) + (printf "SLEEP DONE\n"))) + +(define (main) + (master-event-loop + (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk + #:restart-on-exit (restart-every 5 #:retry 3 + #:on-final-fail (lambda () + (printf "Failed 3 times exititing\n") + (exit 1)))))) diff --git a/collects/tests/racket/place/distributed/tuple.rkt b/collects/tests/racket/place/distributed/tuple.rkt new file mode 100644 index 0000000000..ca1179783f --- /dev/null +++ b/collects/tests/racket/place/distributed/tuple.rkt @@ -0,0 +1,15 @@ +#lang racket/base +(require racket/match + racket/place/define-remote-server) + +(define-named-remote-server + tuple-server + + (define-state h (make-hash)) + (define-rpc (set k v) + (hash-set! h k v) + v) + (define-rpc (get k) + (hash-ref h k #f)) + (define-cast (hello) + (printf "Hello from define-cast\n")(flush-output)))