From ee463056a8e25b1aeee655f4da3dc44b799497cc Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Wed, 4 Apr 2012 13:01:31 -0600 Subject: [PATCH] [Distributed Places] fully connected network patch --- collects/meta/props | 1 + .../racket/place/define-remote-server.rkt | 5 + collects/racket/place/distributed.rkt | 194 +++++++++++++++--- .../racket/place/private/async-bi-channel.rkt | 39 ++++ 4 files changed, 210 insertions(+), 29 deletions(-) create mode 100644 collects/racket/place/private/async-bi-channel.rkt diff --git a/collects/meta/props b/collects/meta/props index d4afc92c34..73b61cc7e7 100755 --- a/collects/meta/props +++ b/collects/meta/props @@ -1181,6 +1181,7 @@ path/s is either such a string or a list of them. "collects/racket/match" responsible (samth) "collects/racket/match.rkt" responsible (samth) "collects/racket/place" responsible (tewk) +"collects/racket/place/distributed/examples/hello-world.rkt" drdr:command-line #f "collects/racket/slice.rkt" responsible (jay) "collects/racklog" responsible (jay) "collects/rackunit" responsible (jay noel ryanc) diff --git a/collects/racket/place/define-remote-server.rkt b/collects/racket/place/define-remote-server.rkt index 3dcbdb1c14..feab9e3a22 100644 --- a/collects/racket/place/define-remote-server.rkt +++ b/collects/racket/place/define-remote-server.rkt @@ -4,6 +4,7 @@ racket/place racket/place/private/th-place racket/place/private/coercion + racket/place/private/async-bi-channel racket/match racket/class racket/stxparam @@ -23,12 +24,16 @@ (cond [(place-channel? dest) (place-channel-get dest)] [(th-place-channel? dest) (th-place-channel-get dest)] + [(channel? dest) (channel-get dest)] + [(async-bi-channel? dest) (async-bi-channel-get dest)] [else (send dest get-msg)])) (define (dplace/place-channel-put dest msg) (cond [(place-channel? dest) (place-channel-put dest msg)] [(th-place-channel? dest) (th-place-channel-put dest msg)] + [(channel? dest) (channel-put dest msg)] + [(async-bi-channel? dest) (async-bi-channel-put dest msg)] [else (send dest put-msg msg)])) diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 42a64ecf03..53899ecc03 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -5,6 +5,7 @@ racket/place racket/place/private/th-place racket/place/private/coercion + racket/place/private/async-bi-channel racket/class racket/trait racket/udp @@ -12,6 +13,15 @@ racket/date syntax/location) +(define-syntax define/provide + (syntax-rules () + [(_ (name x ...) body ...) + (begin (provide name) + (define (name x ...) body ...))] + [(_ name val) + (begin (provide name) + (define name val))])) + (provide ssh-bin-path racket-path distributed-launch-path @@ -46,6 +56,7 @@ ll-channel-get ll-channel-put write-flush + printf/f log-message start-spawned-node-router @@ -67,6 +78,8 @@ (struct-out node-config) (struct-out dcg) + ;v3 api + ;classes event-container<%> spawned-process% @@ -137,6 +150,10 @@ (write msg p) (flush-output p)) +(define (printf/f . args) + (apply printf args) + (flush-output)) + (define (tcp-connect/backoff rname rport #:times [times 4] #:start-seconds [start-seconds 1]) (let loop ([t 0] [wait-time start-seconds]) @@ -185,6 +202,12 @@ (define DCGM-TYPE-NEW-PLACE 9) (define DCGM-TYPE-SET-OWNER 10) +(define DCGM-NEW-NODE-CONNECT 50) + +(define DCGM-CONTROL-NEW-NODE 100) +(define DCGM-CONTROL-NEW-PLACE 101) +(define DCGM-CONTROL-NEW-CONNECTION 102) + (define (dchannel-put ch msg) (unless (or (dchannel? ch) (place-channel? ch)) @@ -332,6 +355,7 @@ sch id node) + (define/public (register nes) (cons (wrap-evt @@ -350,6 +374,7 @@ (loop) (dcgm-msg msg)))) (define/public (put-msg msg) + ;(printf/f "PSB3 ~a ~a ~a\n" sch id msg) (sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))) (super-new) ))) @@ -363,11 +388,13 @@ (init-field [socket-ports null]) (init-field [sub-ecs null]) (init-field [psbs null]) - (init-field [spawned-nodes null]) + (init-field [spawned-nodes (make-hash)]) (init-field [named-places (make-hash)]) (init-field [beacon #f]) (init-field [owner #f]) + (init-field [nodes #f]) (field [id 0]) + (define/public (nextid) (set! id (add1 id)) id) @@ -375,8 +402,10 @@ (set! socket-ports (append socket-ports (list pair)))) (define/public (add-sub-ec ec) (set! sub-ecs (append sub-ecs (list ec)))) - (define (add-spawned-node ec) - (set! spawned-nodes (append spawned-nodes (list ec)))) + (define (add-spawned-node key ec) + (hash-set! spawned-nodes key ec)) + (define (find-spawned-node key) + (hash-ref spawned-nodes key #f)) (define (add-psb ec) (set! psbs (append psbs (list ec)))) (define (add-named-place name np) @@ -386,6 +415,7 @@ (define (add-place-channel-socket-bridge pch sch id) (add-psb (new place-socket-bridge% [pch pch] [sch sch] [id id] [node this]))) (define (forward-mesg m src-channel) + ;(printf/f "FORWARD MESSAGE ~a ~a\n" src-channel m) (match m [(dcgm 1 #;(== DCGM-TYPE-DIE) src dest "DIE") (exit 1)] [(dcgm 2 #;(== DCGM-TYPE-NEW-DCHANNEL) src dest pch) @@ -432,20 +462,21 @@ (place-channel-put d (dcgm DCGM-TYPE-NEW-DCHANNEL src dest pch2))] [(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg) (define pch (sconn-lookup-subchannel src-channel ch-id)) + ;(printf/f "4 ~a ~a ~a ~a\n" src-channel ch-id pch msg) (cond [(place-channel? pch) (place-channel-put pch msg)] [(is-a? pch connection%) (send pch forward msg)] [(th-place-channel? pch) - (th-place-channel-put pch msg)])] + (th-place-channel-put pch msg)] + [else + (raise "OOPS\n")])] [(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1) - (define node - (new remote-node% - [host-name node-name] - [listen-port node-port] - [cmdline-list (list (ssh-bin-path) node-name (racket-path) "-tm" (->string distributed-launch-path) "spawn" (->string node-port))])) - (add-spawned-node node) + (define node (spawn-remote-racket-node node-name #:listen-port node-port)) + (for ([x (in-hash-values spawned-nodes)]) + (send x notify-of-new-node node-name node-port)) + (add-spawned-node (list node-name node-port) node) (send node launch-place (list 'dynamic-place mod-path funcname) ;#:initial-message initial-message @@ -459,6 +490,20 @@ [(dcgm 10 #;(== DCGM-TYPE-SET-OWNER) -1 -1 msg) (log-debug (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel)) (set! owner src-channel)] + [(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port)) + (add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port]))] + [(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 -1 (list node-name node-port)) + (define node (spawn-remote-racket-node node-name #:listen-port node-port)) + (for ([x (in-hash-values spawned-nodes)]) + (send x notify-of-new-node node-name node-port)) + (add-spawned-node (list node-name node-port) node)] + [(dcgm #;101 (== DCGM-CONTROL-NEW-PLACE) dest -1 place-exec) + (define node (find-spawned-node dest)) + (send node launch-place place-exec)] + [(dcgm #;102 (== DCGM-CONTROL-NEW-CONNECTION) dest -1 (list name ch)) + (define node (find-spawned-node dest)) + (send node remote-connect name #:one-sided ch)] + [(dcgm mtype srcs dest msg) (define d (vector-ref chan-vec dest)) (cond @@ -484,7 +529,6 @@ [(list count host port) (void)]))])) - (define/public (log-from-child msg #:severity [severity 'info]) ;(printf "Received Log Message ~a ~a\n" severity msg) (cond @@ -504,7 +548,12 @@ (sconn-get-forward-event x forward-mesg)] [(or (place-channel? x) (place? x)) (wrap-evt x (lambda (e) - (forward-mesg e x)))]) + (forward-mesg e x)))] + [(channel? x) + (wrap-evt x (lambda (e) + (forward-mesg e x)))] + [else (raise (format "Unexpected channel type ~a" x))]) + n)) nes)] [nes @@ -528,7 +577,8 @@ [(or (place-channel? x) (place? x)) (wrap-evt x (lambda (e) ;(printf "SOCKET-PORT PLACE MESSAGE ~a\n" e) - (forward-mesg e x)))]) + (forward-mesg e x)))] + [else (raise (format "Unexpected channel type ~a" x))]) n)) nes)] [nes @@ -543,15 +593,15 @@ nes)] [nes (if spawned-nodes - (for/fold ([n nes]) ([x spawned-nodes]) + (for/fold ([n nes]) ([x (in-hash-values spawned-nodes)]) (send x register n)) nes)] [nes (register-beacon nes)] [nes (cond [named-places - (for/fold ([n nes]) ([x (in-hash-values named-places)]) - (send x register n))] + (for/fold ([n nes]) ([x (in-hash-values named-places)]) + (send x register n))] [else nes])]) nes)) @@ -562,6 +612,15 @@ (loop ))) + (when nodes + (for ([n nodes]) + (define n-host-name (send n get-host-name)) + (define n-port (send n get-listen-port)) + + (for ([sn (in-hash-values spawned-nodes)]) + (send sn notify-of-new-node n-host-name n-port)) + (add-spawned-node (list n-host-name n-port) n))) + (super-new) ))) @@ -613,6 +672,7 @@ (define/public (lookup-subchannel id) (cdr (assoc id subchannels))) (define/public (_write-flush x) (when (equal? out #f) (ensure-connected)) + ;(printf/f "SC ~a ~a\n" x out) (write-flush x out)) (define/public (remove-subchannel id) (set! subchannels (filter-map @@ -661,6 +721,8 @@ (field [id 0]) (field [remote-places null]) + (define/public (get-host-name) host-name) + (define/public (get-listen-port) listen-port) (define/public (nextid) (set! id (add1 id)) id) @@ -702,11 +764,18 @@ [else (raise (format "remote-place for sc-id ~a not found\n" ch-id))])] [(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg) (define pch (sconn-lookup-subchannel sc ch-id)) + ;(printf/f "44 ~a ~a ~a ~a\n" in-port ch-id pch msg) (cond [(place-channel? pch) (place-channel-put pch msg)] [(is-a? pch connection%) - (send pch forward msg)])] + (send pch forward msg)] + [(th-place-channel? pch) + (th-place-channel-put pch msg)] + [(async-bi-channel? pch) + (async-bi-channel-put pch msg)] + [else + (raise "OOPS\n")])] [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) (define parent (send this get-router)) (cond @@ -747,12 +816,13 @@ (define/public (launch-place place-exec #:restart-on-exit [restart-on-exit #f] #:one-sided-place? [one-sided-place? #f]) (define rp (new remote-place% [node this] [place-exec place-exec] [restart-on-exit restart-on-exit] - [one-sided-place? one-sided-place?])) + [one-sided one-sided-place?])) (add-remote-place rp) rp) - (define/public (remote-connect name #:restart-on-exit [restart-on-exit #f]) - (define rp (new remote-connection% [node this] [name name] [restart-on-exit restart-on-exit])) + (define/public (remote-connect name #:restart-on-exit [restart-on-exit #f] #:one-sided [one-sided #f]) + (define rp (new remote-connection% [node this] [name name] [restart-on-exit restart-on-exit] + [one-sided one-sided])) (add-remote-place rp) rp) @@ -785,6 +855,9 @@ es)]) es)) + (define/public (notify-of-new-node node-name node-port) + (sconn-write-flush sc (dcgm DCGM-NEW-NODE-CONNECT -1 -1 (list node-name node-port)))) + (super-new) ))) @@ -808,7 +881,7 @@ (init-field node) (init-field [place-exec #f]) (init-field [name #f]) - (init-field [one-sided-place #f]) + (init-field [one-sided #f]) (init-field [restart-on-exit #f]) (init-field [on-channel #f]) (init-field [on-channel/2 #f]) @@ -826,17 +899,17 @@ (raise "named place connections that restart on exit are not possible")) (cond - [one-sided-place - (set! rpc one-sided-place) - (set! psb (send vm spawn-remote-place place-exec rpc))] + [one-sided + (set! rpc one-sided)] [else (define-values (pch1 pch2) (place-channel)) (set! rpc pch1) - (set! pc pch2) - (set! psb - (if place-exec - (send vm spawn-remote-place place-exec rpc) - (send vm spawn-remote-connection name rpc)))]) + (set! pc pch2)]) + + (set! psb + (if place-exec + (send node spawn-remote-place place-exec rpc) + (send node spawn-remote-connection name rpc))) (define (restart-place) (send node drop-sc-id (send psb get-sc-id)) @@ -1357,3 +1430,66 @@ (flush-output out) (sleep)) (subprocess-kill (first (first n)) #f)))))) + +;; +;; API Version 3 +;; + +;;;(define (start-message-router/place #:listen-port [listen-port DEFAULT-ROUTER-PORT]) +;;; (define mr +;;; (place ch +;;; (match (place-channel-get ch) +;;; [(list listen-port) +;;; (define listener (tcp-listen listen-port 4 #t)) +;;; (define mrn (new node% [listen-port listener] +;;; [chan-vec (vector ch)])) +;;; (send mrn sync-events)]))) +;;; (place-channel-put mr (list listen-port))) + +(define (*channel-put ch msg) + ((cond + [(place-channel? ch) place-channel-put] + [(channel? ch) channel-put]) + ch msg)) + +(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT]) + (*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 -1 (list host listen-port)))) + +(define/provide (mr-supervise-named-dynamic-place-at mrch dest name path func) + (*channel-put mrch (dcgm DCGM-CONTROL-NEW-PLACE dest -1 (list 'dynamic-place path func name)))) + +(define/provide (mr-connect-to mrch dest name) + (define-values (ch1 ch2) (make-async-bi-channel)) + (*channel-put mrch (dcgm DCGM-CONTROL-NEW-CONNECTION dest -1 (list name ch2))) + ch1) + +(define/provide (start-message-router/thread #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:nodes [nodes null]) + (define ch (make-channel)) + (define mr + (thread + (lambda () + (define listener (tcp-listen listen-port 4 #t)) + (define mrn (new node% [listen-port listener] + [nodes nodes] + [chan-vec (vector ch)])) + (send mrn sync-events)))) + (values mr ch)) + +(define (spawn-node-at host #:listen-port [listen-port DEFAULT-ROUTER-PORT]) + (define ch (make-channel)) + (thread + (lambda () (channel-put ch (spawn-remote-racket-node host #:listen-port listen-port)))) + ch) + +(define/provide (spawn-nodes/join nodes-desc) + (channels-join + (for/list ([n nodes-desc]) + (match-define (list host listen-port) n) + (spawn-node-at host #:listen-port listen-port)))) + +(define/provide (channels-join chs) + (for/list ([x chs]) + (channel-get x))) + + diff --git a/collects/racket/place/private/async-bi-channel.rkt b/collects/racket/place/private/async-bi-channel.rkt new file mode 100644 index 0000000000..d110a13873 --- /dev/null +++ b/collects/racket/place/private/async-bi-channel.rkt @@ -0,0 +1,39 @@ +#lang racket/base + +(provide make-async-bi-channel + async-bi-channel-put + async-bi-channel-get + async-bi-channel?) + + +(define (make-async-channel) + (define ch (make-channel)) + (values + (thread + (lambda () + (let loop () + (let ([v (thread-receive)]) + (channel-put ch v) + (loop))))) + ch)) + +(define-struct Async-Bi-Channel (in out) + #:property prop:evt (lambda (x) (Async-Bi-Channel-out x))) + +(define async-bi-channel? Async-Bi-Channel?) + +(define (make-async-bi-channel) + (define-values (ch1s ch1r) (make-async-channel)) + (define-values (ch2s ch2r) (make-async-channel)) + + (values + (Async-Bi-Channel ch1s ch2r) + (Async-Bi-Channel ch2s ch1r))) + + + +(define (async-bi-channel-put ch msg) + (void (thread-send (Async-Bi-Channel-in ch) msg #f))) + +(define (async-bi-channel-get ch) + (channel-get (Async-Bi-Channel-out ch)))