[Distributed Places] add solo connections and tcp-connection-died handler

This commit is contained in:
Kevin Tew 2012-04-05 14:03:09 -06:00
parent ee463056a8
commit 033cdc1413

View File

@ -335,6 +335,7 @@
#f))
(define/public (get-pid) pid)
(define/public (kill [force #t]) (subprocess-kill s force))
(define/public (wait-for-die) (subprocess-wait s))
(define/public (register nes)
(for/filter/fold/cons nes ([x (list s (list o "OUT") (list e "ERR"))])
@ -389,6 +390,7 @@
(init-field [sub-ecs null])
(init-field [psbs null])
(init-field [spawned-nodes (make-hash)])
(init-field [solo-nodes (make-hash)])
(init-field [named-places (make-hash)])
(init-field [beacon #f])
(init-field [owner #f])
@ -406,6 +408,10 @@
(hash-set! spawned-nodes key ec))
(define (find-spawned-node key)
(hash-ref spawned-nodes key #f))
(define (add-solo-node key ec)
(hash-set! solo-nodes key ec))
(define (find-solo-node key)
(hash-ref solo-nodes key #f))
(define (add-psb ec)
(set! psbs (append psbs (list ec))))
(define (add-named-place name np)
@ -427,7 +433,8 @@
(add-place-channel-socket-bridge pch d ch-id)
(sconn-write-flush d (dcgm DCGM-TYPE-NEW-INTER-DCHANNEL src dest ch-id))]
[(or (place-channel? d) (place? d))
(place-channel-put d m)])]
(place-channel-put d m)]
[else (raise (format "Unexpected channel type ~a" d))])]
[(dcgm 9 #;(== DCGM-TYPE-NEW-PLACE) -1 (and place-exec (list-rest type rest)) ch-id)
(match place-exec
[(list 'connect name)
@ -470,8 +477,7 @@
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)]
[else
(raise "OOPS\n")])]
[else (raise (format "Unexpected channel type ~a" pch))])]
[(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1)
(define node (spawn-remote-racket-node node-name #:listen-port node-port))
(for ([x (in-hash-values spawned-nodes)])
@ -492,11 +498,15 @@
(set! owner src-channel)]
[(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port))
(add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port]))]
[(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 -1 (list node-name node-port))
[(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 solo (list node-name node-port))
(define node (spawn-remote-racket-node node-name #:listen-port node-port))
(for ([x (in-hash-values spawned-nodes)])
(send x notify-of-new-node node-name node-port))
(add-spawned-node (list node-name node-port) node)]
(cond
[solo
(add-solo-node (list node-name node-port) node)]
[else
(for ([x (in-hash-values spawned-nodes)])
(send x notify-of-new-node node-name node-port))
(add-spawned-node (list node-name node-port) node)])]
[(dcgm #;101 (== DCGM-CONTROL-NEW-PLACE) dest -1 place-exec)
(define node (find-spawned-node dest))
(send node launch-place place-exec)]
@ -547,11 +557,9 @@
[(is-a? x socket-connection%)
(sconn-get-forward-event x forward-mesg)]
[(or (place-channel? x) (place? x))
(wrap-evt x (lambda (e)
(forward-mesg e x)))]
(wrap-evt x (lambda (e) (forward-mesg e x)))]
[(channel? x)
(wrap-evt x (lambda (e)
(forward-mesg e x)))]
(wrap-evt x (lambda (e) (forward-mesg e x)))]
[else (raise (format "Unexpected channel type ~a" x))])
n))
@ -596,6 +604,11 @@
(for/fold ([n nes]) ([x (in-hash-values spawned-nodes)])
(send x register n))
nes)]
[nes
(if solo-nodes
(for/fold ([n nes]) ([x (in-hash-values solo-nodes)])
(send x register n))
nes)]
[nes (register-beacon nes)]
[nes
(cond
@ -642,7 +655,8 @@
[delay 1]
[background-connect #t]
[in #f]
[out #f])
[out #f]
[remote-node #f])
(field [subchannels null]
[connecting #f]
[ch #f])
@ -667,13 +681,20 @@
(set! out _out)
(set! connecting #f)])))
(define (handle-error e)
(cond
[remote-node => (lambda (n)
(send n tcp-connection-died host port))]
[else (raise (format "TCP connection to ~a:~a failed.\n" host port))]))
(define/public (add-subchannel id pch)
(set! subchannels (append subchannels (list (cons id pch)))))
(define/public (lookup-subchannel id) (cdr (assoc id subchannels)))
(define/public (_write-flush x)
(when (equal? out #f) (ensure-connected))
;(printf/f "SC ~a ~a\n" x out)
(write-flush x out))
(with-handlers ([exn:fail? handle-error])
(write-flush x out)))
(define/public (remove-subchannel id)
(set! subchannels (filter-map
(lambda (x) (and (not (= (car x) id)) x))
@ -682,7 +703,10 @@
(define/public (get-forward-event forwarder)
(when (equal? out #f) (ensure-connected))
(wrap-evt in (lambda (e)
(forwarder (read in) this))))
(forwarder
(with-handlers ([exn:fail? handle-error])
(read in))
this))))
(define/public (read-message)
(when (equal? out #f) (ensure-connected))
@ -730,9 +754,10 @@
(define (add-remote-place rp)
(set! remote-places (append remote-places(list rp))))
(define (spawn-node)
(set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this])))
(and cmdline-list
(set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this]))))
(define (setup-socket-connection)
(set! sc (new socket-connection% [host host-name] [port listen-port]))
(set! sc (new socket-connection% [host host-name] [port listen-port] [remote-node this]))
(sconn-write-flush sc (dcgm DCGM-TYPE-SET-OWNER -1 -1 "")))
(define (restart-node)
(spawn-node)
@ -774,8 +799,7 @@
(th-place-channel-put pch msg)]
[(async-bi-channel? pch)
(async-bi-channel-put pch msg)]
[else
(raise "OOPS\n")])]
[else (raise (format "Unexpected channel type ~a" pch))])]
[(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg))
(define parent (send this get-router))
(cond
@ -791,6 +815,19 @@
[else (log-debug (format"received message ~a" it))]))
(define/public (get-log-prefix) (format "PLACE ~a:~a" host-name listen-port))
(define/public (tcp-connection-died host port)
(log-debug (format "TCP connection~a:~a died, restarting node/connection" host-name listen-port))
(and sp (send sp kill))
(set! sp #f)
(cond
[cmdline-list (process-died null)]
[restart-on-exit
(if (equal? restart-on-exit #t)
(restart-node)
(send restart-on-exit restart restart-node))]
[else
(log-debug (format "No restart condition for ~a" (get-log-prefix)))]))
(define/public (process-died child)
(log-debug (format "Remote node pid ~a ~a:~a died" (get-sp-pid) host-name listen-port))
(set! sp #f)
@ -799,8 +836,8 @@
(cond
[cmdline-list
(if (equal? restart-on-exit #t)
(restart-node)
(send restart-on-exit restart restart-node))]
(restart-node)
(send restart-on-exit restart restart-node))]
[else
(log-debug (format "No restart cmdline arguments for ~a" (get-log-prefix)))])]
[else
@ -1452,14 +1489,19 @@
[(channel? ch) channel-put])
ch msg))
(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT])
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 -1 (list host listen-port))))
(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:solo [solo #f])
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 solo (list host listen-port))))
(define/provide (mr-supervise-named-dynamic-place-at mrch dest name path func)
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-PLACE dest -1 (list 'dynamic-place path func name))))
(define/provide (mr-connect-to mrch dest name)
(define-values (ch1 ch2) (make-async-bi-channel))
(define-values (ch1 ch2)
(cond
[(channel? mrch) (make-async-bi-channel)]
[(place-channel? mrch) (place-channel)]
[else (raise (format "Unexpected channel type ~a" mrch))]))
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-CONNECTION dest -1 (list name ch2)))
ch1)