[Distributed Places] fully connected network patch

This commit is contained in:
Kevin Tew 2012-04-04 13:01:31 -06:00
parent 394373ab2d
commit ee463056a8
4 changed files with 210 additions and 29 deletions

View File

@ -1181,6 +1181,7 @@ path/s is either such a string or a list of them.
"collects/racket/match" responsible (samth)
"collects/racket/match.rkt" responsible (samth)
"collects/racket/place" responsible (tewk)
"collects/racket/place/distributed/examples/hello-world.rkt" drdr:command-line #f
"collects/racket/slice.rkt" responsible (jay)
"collects/racklog" responsible (jay)
"collects/rackunit" responsible (jay noel ryanc)

View File

@ -4,6 +4,7 @@
racket/place
racket/place/private/th-place
racket/place/private/coercion
racket/place/private/async-bi-channel
racket/match
racket/class
racket/stxparam
@ -23,12 +24,16 @@
(cond
[(place-channel? dest) (place-channel-get dest)]
[(th-place-channel? dest) (th-place-channel-get dest)]
[(channel? dest) (channel-get dest)]
[(async-bi-channel? dest) (async-bi-channel-get dest)]
[else (send dest get-msg)]))
(define (dplace/place-channel-put dest msg)
(cond
[(place-channel? dest) (place-channel-put dest msg)]
[(th-place-channel? dest) (th-place-channel-put dest msg)]
[(channel? dest) (channel-put dest msg)]
[(async-bi-channel? dest) (async-bi-channel-put dest msg)]
[else (send dest put-msg msg)]))

View File

@ -5,6 +5,7 @@
racket/place
racket/place/private/th-place
racket/place/private/coercion
racket/place/private/async-bi-channel
racket/class
racket/trait
racket/udp
@ -12,6 +13,15 @@
racket/date
syntax/location)
(define-syntax define/provide
(syntax-rules ()
[(_ (name x ...) body ...)
(begin (provide name)
(define (name x ...) body ...))]
[(_ name val)
(begin (provide name)
(define name val))]))
(provide ssh-bin-path
racket-path
distributed-launch-path
@ -46,6 +56,7 @@
ll-channel-get
ll-channel-put
write-flush
printf/f
log-message
start-spawned-node-router
@ -67,6 +78,8 @@
(struct-out node-config)
(struct-out dcg)
;v3 api
;classes
event-container<%>
spawned-process%
@ -137,6 +150,10 @@
(write msg p)
(flush-output p))
(define (printf/f . args)
(apply printf args)
(flush-output))
(define (tcp-connect/backoff rname rport #:times [times 4] #:start-seconds [start-seconds 1])
(let loop ([t 0]
[wait-time start-seconds])
@ -185,6 +202,12 @@
(define DCGM-TYPE-NEW-PLACE 9)
(define DCGM-TYPE-SET-OWNER 10)
(define DCGM-NEW-NODE-CONNECT 50)
(define DCGM-CONTROL-NEW-NODE 100)
(define DCGM-CONTROL-NEW-PLACE 101)
(define DCGM-CONTROL-NEW-CONNECTION 102)
(define (dchannel-put ch msg)
(unless (or (dchannel? ch) (place-channel? ch))
@ -332,6 +355,7 @@
sch
id
node)
(define/public (register nes)
(cons
(wrap-evt
@ -350,6 +374,7 @@
(loop)
(dcgm-msg msg))))
(define/public (put-msg msg)
;(printf/f "PSB3 ~a ~a ~a\n" sch id msg)
(sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg)))
(super-new)
)))
@ -363,11 +388,13 @@
(init-field [socket-ports null])
(init-field [sub-ecs null])
(init-field [psbs null])
(init-field [spawned-nodes null])
(init-field [spawned-nodes (make-hash)])
(init-field [named-places (make-hash)])
(init-field [beacon #f])
(init-field [owner #f])
(init-field [nodes #f])
(field [id 0])
(define/public (nextid)
(set! id (add1 id))
id)
@ -375,8 +402,10 @@
(set! socket-ports (append socket-ports (list pair))))
(define/public (add-sub-ec ec)
(set! sub-ecs (append sub-ecs (list ec))))
(define (add-spawned-node ec)
(set! spawned-nodes (append spawned-nodes (list ec))))
(define (add-spawned-node key ec)
(hash-set! spawned-nodes key ec))
(define (find-spawned-node key)
(hash-ref spawned-nodes key #f))
(define (add-psb ec)
(set! psbs (append psbs (list ec))))
(define (add-named-place name np)
@ -386,6 +415,7 @@
(define (add-place-channel-socket-bridge pch sch id)
(add-psb (new place-socket-bridge% [pch pch] [sch sch] [id id] [node this])))
(define (forward-mesg m src-channel)
;(printf/f "FORWARD MESSAGE ~a ~a\n" src-channel m)
(match m
[(dcgm 1 #;(== DCGM-TYPE-DIE) src dest "DIE") (exit 1)]
[(dcgm 2 #;(== DCGM-TYPE-NEW-DCHANNEL) src dest pch)
@ -432,20 +462,21 @@
(place-channel-put d (dcgm DCGM-TYPE-NEW-DCHANNEL src dest pch2))]
[(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg)
(define pch (sconn-lookup-subchannel src-channel ch-id))
;(printf/f "4 ~a ~a ~a ~a\n" src-channel ch-id pch msg)
(cond
[(place-channel? pch)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)])]
(th-place-channel-put pch msg)]
[else
(raise "OOPS\n")])]
[(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1)
(define node
(new remote-node%
[host-name node-name]
[listen-port node-port]
[cmdline-list (list (ssh-bin-path) node-name (racket-path) "-tm" (->string distributed-launch-path) "spawn" (->string node-port))]))
(add-spawned-node node)
(define node (spawn-remote-racket-node node-name #:listen-port node-port))
(for ([x (in-hash-values spawned-nodes)])
(send x notify-of-new-node node-name node-port))
(add-spawned-node (list node-name node-port) node)
(send node launch-place
(list 'dynamic-place mod-path funcname)
;#:initial-message initial-message
@ -459,6 +490,20 @@
[(dcgm 10 #;(== DCGM-TYPE-SET-OWNER) -1 -1 msg)
(log-debug (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel))
(set! owner src-channel)]
[(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port))
(add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port]))]
[(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 -1 (list node-name node-port))
(define node (spawn-remote-racket-node node-name #:listen-port node-port))
(for ([x (in-hash-values spawned-nodes)])
(send x notify-of-new-node node-name node-port))
(add-spawned-node (list node-name node-port) node)]
[(dcgm #;101 (== DCGM-CONTROL-NEW-PLACE) dest -1 place-exec)
(define node (find-spawned-node dest))
(send node launch-place place-exec)]
[(dcgm #;102 (== DCGM-CONTROL-NEW-CONNECTION) dest -1 (list name ch))
(define node (find-spawned-node dest))
(send node remote-connect name #:one-sided ch)]
[(dcgm mtype srcs dest msg)
(define d (vector-ref chan-vec dest))
(cond
@ -484,7 +529,6 @@
[(list count host port)
(void)]))]))
(define/public (log-from-child msg #:severity [severity 'info])
;(printf "Received Log Message ~a ~a\n" severity msg)
(cond
@ -504,7 +548,12 @@
(sconn-get-forward-event x forward-mesg)]
[(or (place-channel? x) (place? x))
(wrap-evt x (lambda (e)
(forward-mesg e x)))])
(forward-mesg e x)))]
[(channel? x)
(wrap-evt x (lambda (e)
(forward-mesg e x)))]
[else (raise (format "Unexpected channel type ~a" x))])
n))
nes)]
[nes
@ -528,7 +577,8 @@
[(or (place-channel? x) (place? x))
(wrap-evt x (lambda (e)
;(printf "SOCKET-PORT PLACE MESSAGE ~a\n" e)
(forward-mesg e x)))])
(forward-mesg e x)))]
[else (raise (format "Unexpected channel type ~a" x))])
n))
nes)]
[nes
@ -543,15 +593,15 @@
nes)]
[nes
(if spawned-nodes
(for/fold ([n nes]) ([x spawned-nodes])
(for/fold ([n nes]) ([x (in-hash-values spawned-nodes)])
(send x register n))
nes)]
[nes (register-beacon nes)]
[nes
(cond
[named-places
(for/fold ([n nes]) ([x (in-hash-values named-places)])
(send x register n))]
(for/fold ([n nes]) ([x (in-hash-values named-places)])
(send x register n))]
[else nes])])
nes))
@ -562,6 +612,15 @@
(loop )))
(when nodes
(for ([n nodes])
(define n-host-name (send n get-host-name))
(define n-port (send n get-listen-port))
(for ([sn (in-hash-values spawned-nodes)])
(send sn notify-of-new-node n-host-name n-port))
(add-spawned-node (list n-host-name n-port) n)))
(super-new)
)))
@ -613,6 +672,7 @@
(define/public (lookup-subchannel id) (cdr (assoc id subchannels)))
(define/public (_write-flush x)
(when (equal? out #f) (ensure-connected))
;(printf/f "SC ~a ~a\n" x out)
(write-flush x out))
(define/public (remove-subchannel id)
(set! subchannels (filter-map
@ -661,6 +721,8 @@
(field [id 0])
(field [remote-places null])
(define/public (get-host-name) host-name)
(define/public (get-listen-port) listen-port)
(define/public (nextid)
(set! id (add1 id))
id)
@ -702,11 +764,18 @@
[else (raise (format "remote-place for sc-id ~a not found\n" ch-id))])]
[(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg)
(define pch (sconn-lookup-subchannel sc ch-id))
;(printf/f "44 ~a ~a ~a ~a\n" in-port ch-id pch msg)
(cond
[(place-channel? pch)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)])]
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)]
[(async-bi-channel? pch)
(async-bi-channel-put pch msg)]
[else
(raise "OOPS\n")])]
[(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg))
(define parent (send this get-router))
(cond
@ -747,12 +816,13 @@
(define/public (launch-place place-exec #:restart-on-exit [restart-on-exit #f] #:one-sided-place? [one-sided-place? #f])
(define rp (new remote-place% [node this] [place-exec place-exec] [restart-on-exit restart-on-exit]
[one-sided-place? one-sided-place?]))
[one-sided one-sided-place?]))
(add-remote-place rp)
rp)
(define/public (remote-connect name #:restart-on-exit [restart-on-exit #f])
(define rp (new remote-connection% [node this] [name name] [restart-on-exit restart-on-exit]))
(define/public (remote-connect name #:restart-on-exit [restart-on-exit #f] #:one-sided [one-sided #f])
(define rp (new remote-connection% [node this] [name name] [restart-on-exit restart-on-exit]
[one-sided one-sided]))
(add-remote-place rp)
rp)
@ -785,6 +855,9 @@
es)])
es))
(define/public (notify-of-new-node node-name node-port)
(sconn-write-flush sc (dcgm DCGM-NEW-NODE-CONNECT -1 -1 (list node-name node-port))))
(super-new)
)))
@ -808,7 +881,7 @@
(init-field node)
(init-field [place-exec #f])
(init-field [name #f])
(init-field [one-sided-place #f])
(init-field [one-sided #f])
(init-field [restart-on-exit #f])
(init-field [on-channel #f])
(init-field [on-channel/2 #f])
@ -826,17 +899,17 @@
(raise "named place connections that restart on exit are not possible"))
(cond
[one-sided-place
(set! rpc one-sided-place)
(set! psb (send vm spawn-remote-place place-exec rpc))]
[one-sided
(set! rpc one-sided)]
[else
(define-values (pch1 pch2) (place-channel))
(set! rpc pch1)
(set! pc pch2)
(set! psb
(if place-exec
(send vm spawn-remote-place place-exec rpc)
(send vm spawn-remote-connection name rpc)))])
(set! pc pch2)])
(set! psb
(if place-exec
(send node spawn-remote-place place-exec rpc)
(send node spawn-remote-connection name rpc)))
(define (restart-place)
(send node drop-sc-id (send psb get-sc-id))
@ -1357,3 +1430,66 @@
(flush-output out)
(sleep))
(subprocess-kill (first (first n)) #f))))))
;;
;; API Version 3
;;
;;;(define (start-message-router/place #:listen-port [listen-port DEFAULT-ROUTER-PORT])
;;; (define mr
;;; (place ch
;;; (match (place-channel-get ch)
;;; [(list listen-port)
;;; (define listener (tcp-listen listen-port 4 #t))
;;; (define mrn (new node% [listen-port listener]
;;; [chan-vec (vector ch)]))
;;; (send mrn sync-events)])))
;;; (place-channel-put mr (list listen-port)))
(define (*channel-put ch msg)
((cond
[(place-channel? ch) place-channel-put]
[(channel? ch) channel-put])
ch msg))
(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT])
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 -1 (list host listen-port))))
(define/provide (mr-supervise-named-dynamic-place-at mrch dest name path func)
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-PLACE dest -1 (list 'dynamic-place path func name))))
(define/provide (mr-connect-to mrch dest name)
(define-values (ch1 ch2) (make-async-bi-channel))
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-CONNECTION dest -1 (list name ch2)))
ch1)
(define/provide (start-message-router/thread #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:nodes [nodes null])
(define ch (make-channel))
(define mr
(thread
(lambda ()
(define listener (tcp-listen listen-port 4 #t))
(define mrn (new node% [listen-port listener]
[nodes nodes]
[chan-vec (vector ch)]))
(send mrn sync-events))))
(values mr ch))
(define (spawn-node-at host #:listen-port [listen-port DEFAULT-ROUTER-PORT])
(define ch (make-channel))
(thread
(lambda () (channel-put ch (spawn-remote-racket-node host #:listen-port listen-port))))
ch)
(define/provide (spawn-nodes/join nodes-desc)
(channels-join
(for/list ([n nodes-desc])
(match-define (list host listen-port) n)
(spawn-node-at host #:listen-port listen-port))))
(define/provide (channels-join chs)
(for/list ([x chs])
(channel-get x)))

View File

@ -0,0 +1,39 @@
#lang racket/base
(provide make-async-bi-channel
async-bi-channel-put
async-bi-channel-get
async-bi-channel?)
(define (make-async-channel)
(define ch (make-channel))
(values
(thread
(lambda ()
(let loop ()
(let ([v (thread-receive)])
(channel-put ch v)
(loop)))))
ch))
(define-struct Async-Bi-Channel (in out)
#:property prop:evt (lambda (x) (Async-Bi-Channel-out x)))
(define async-bi-channel? Async-Bi-Channel?)
(define (make-async-bi-channel)
(define-values (ch1s ch1r) (make-async-channel))
(define-values (ch2s ch2r) (make-async-channel))
(values
(Async-Bi-Channel ch1s ch2r)
(Async-Bi-Channel ch2s ch1r)))
(define (async-bi-channel-put ch msg)
(void (thread-send (Async-Bi-Channel-in ch) msg #f)))
(define (async-bi-channel-get ch)
(channel-get (Async-Bi-Channel-out ch)))