From 033cdc141378535c50dfdfd35c2a721e5333041f Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Thu, 5 Apr 2012 14:03:09 -0600 Subject: [PATCH] [Distributed Places] add solo connections and tcp-connection-died handler --- collects/racket/place/distributed.rkt | 88 ++++++++++++++++++++------- 1 file changed, 65 insertions(+), 23 deletions(-) diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 53899ecc03..6a7f02ccc5 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -335,6 +335,7 @@ #f)) (define/public (get-pid) pid) + (define/public (kill [force #t]) (subprocess-kill s force)) (define/public (wait-for-die) (subprocess-wait s)) (define/public (register nes) (for/filter/fold/cons nes ([x (list s (list o "OUT") (list e "ERR"))]) @@ -389,6 +390,7 @@ (init-field [sub-ecs null]) (init-field [psbs null]) (init-field [spawned-nodes (make-hash)]) + (init-field [solo-nodes (make-hash)]) (init-field [named-places (make-hash)]) (init-field [beacon #f]) (init-field [owner #f]) @@ -406,6 +408,10 @@ (hash-set! spawned-nodes key ec)) (define (find-spawned-node key) (hash-ref spawned-nodes key #f)) + (define (add-solo-node key ec) + (hash-set! solo-nodes key ec)) + (define (find-solo-node key) + (hash-ref solo-nodes key #f)) (define (add-psb ec) (set! psbs (append psbs (list ec)))) (define (add-named-place name np) @@ -427,7 +433,8 @@ (add-place-channel-socket-bridge pch d ch-id) (sconn-write-flush d (dcgm DCGM-TYPE-NEW-INTER-DCHANNEL src dest ch-id))] [(or (place-channel? d) (place? d)) - (place-channel-put d m)])] + (place-channel-put d m)] + [else (raise (format "Unexpected channel type ~a" d))])] [(dcgm 9 #;(== DCGM-TYPE-NEW-PLACE) -1 (and place-exec (list-rest type rest)) ch-id) (match place-exec [(list 'connect name) @@ -470,8 +477,7 @@ (send pch forward msg)] [(th-place-channel? pch) (th-place-channel-put pch msg)] - [else - (raise "OOPS\n")])] + [else (raise (format "Unexpected channel type ~a" pch))])] [(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1) (define node (spawn-remote-racket-node node-name #:listen-port node-port)) (for ([x (in-hash-values spawned-nodes)]) @@ -492,11 +498,15 @@ (set! owner src-channel)] [(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port)) (add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port]))] - [(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 -1 (list node-name node-port)) + [(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 solo (list node-name node-port)) (define node (spawn-remote-racket-node node-name #:listen-port node-port)) - (for ([x (in-hash-values spawned-nodes)]) - (send x notify-of-new-node node-name node-port)) - (add-spawned-node (list node-name node-port) node)] + (cond + [solo + (add-solo-node (list node-name node-port) node)] + [else + (for ([x (in-hash-values spawned-nodes)]) + (send x notify-of-new-node node-name node-port)) + (add-spawned-node (list node-name node-port) node)])] [(dcgm #;101 (== DCGM-CONTROL-NEW-PLACE) dest -1 place-exec) (define node (find-spawned-node dest)) (send node launch-place place-exec)] @@ -547,11 +557,9 @@ [(is-a? x socket-connection%) (sconn-get-forward-event x forward-mesg)] [(or (place-channel? x) (place? x)) - (wrap-evt x (lambda (e) - (forward-mesg e x)))] + (wrap-evt x (lambda (e) (forward-mesg e x)))] [(channel? x) - (wrap-evt x (lambda (e) - (forward-mesg e x)))] + (wrap-evt x (lambda (e) (forward-mesg e x)))] [else (raise (format "Unexpected channel type ~a" x))]) n)) @@ -596,6 +604,11 @@ (for/fold ([n nes]) ([x (in-hash-values spawned-nodes)]) (send x register n)) nes)] + [nes + (if solo-nodes + (for/fold ([n nes]) ([x (in-hash-values solo-nodes)]) + (send x register n)) + nes)] [nes (register-beacon nes)] [nes (cond @@ -642,7 +655,8 @@ [delay 1] [background-connect #t] [in #f] - [out #f]) + [out #f] + [remote-node #f]) (field [subchannels null] [connecting #f] [ch #f]) @@ -667,13 +681,20 @@ (set! out _out) (set! connecting #f)]))) + (define (handle-error e) + (cond + [remote-node => (lambda (n) + (send n tcp-connection-died host port))] + [else (raise (format "TCP connection to ~a:~a failed.\n" host port))])) + (define/public (add-subchannel id pch) (set! subchannels (append subchannels (list (cons id pch))))) (define/public (lookup-subchannel id) (cdr (assoc id subchannels))) (define/public (_write-flush x) (when (equal? out #f) (ensure-connected)) ;(printf/f "SC ~a ~a\n" x out) - (write-flush x out)) + (with-handlers ([exn:fail? handle-error]) + (write-flush x out))) (define/public (remove-subchannel id) (set! subchannels (filter-map (lambda (x) (and (not (= (car x) id)) x)) @@ -682,7 +703,10 @@ (define/public (get-forward-event forwarder) (when (equal? out #f) (ensure-connected)) (wrap-evt in (lambda (e) - (forwarder (read in) this)))) + (forwarder + (with-handlers ([exn:fail? handle-error]) + (read in)) + this)))) (define/public (read-message) (when (equal? out #f) (ensure-connected)) @@ -730,9 +754,10 @@ (define (add-remote-place rp) (set! remote-places (append remote-places(list rp)))) (define (spawn-node) - (set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this]))) + (and cmdline-list + (set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this])))) (define (setup-socket-connection) - (set! sc (new socket-connection% [host host-name] [port listen-port])) + (set! sc (new socket-connection% [host host-name] [port listen-port] [remote-node this])) (sconn-write-flush sc (dcgm DCGM-TYPE-SET-OWNER -1 -1 ""))) (define (restart-node) (spawn-node) @@ -774,8 +799,7 @@ (th-place-channel-put pch msg)] [(async-bi-channel? pch) (async-bi-channel-put pch msg)] - [else - (raise "OOPS\n")])] + [else (raise (format "Unexpected channel type ~a" pch))])] [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) (define parent (send this get-router)) (cond @@ -791,6 +815,19 @@ [else (log-debug (format"received message ~a" it))])) (define/public (get-log-prefix) (format "PLACE ~a:~a" host-name listen-port)) + (define/public (tcp-connection-died host port) + (log-debug (format "TCP connection~a:~a died, restarting node/connection" host-name listen-port)) + (and sp (send sp kill)) + (set! sp #f) + (cond + [cmdline-list (process-died null)] + [restart-on-exit + (if (equal? restart-on-exit #t) + (restart-node) + (send restart-on-exit restart restart-node))] + [else + (log-debug (format "No restart condition for ~a" (get-log-prefix)))])) + (define/public (process-died child) (log-debug (format "Remote node pid ~a ~a:~a died" (get-sp-pid) host-name listen-port)) (set! sp #f) @@ -799,8 +836,8 @@ (cond [cmdline-list (if (equal? restart-on-exit #t) - (restart-node) - (send restart-on-exit restart restart-node))] + (restart-node) + (send restart-on-exit restart restart-node))] [else (log-debug (format "No restart cmdline arguments for ~a" (get-log-prefix)))])] [else @@ -1452,14 +1489,19 @@ [(channel? ch) channel-put]) ch msg)) -(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT]) - (*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 -1 (list host listen-port)))) +(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:solo [solo #f]) + (*channel-put mrch (dcgm DCGM-CONTROL-NEW-NODE -1 solo (list host listen-port)))) (define/provide (mr-supervise-named-dynamic-place-at mrch dest name path func) (*channel-put mrch (dcgm DCGM-CONTROL-NEW-PLACE dest -1 (list 'dynamic-place path func name)))) (define/provide (mr-connect-to mrch dest name) - (define-values (ch1 ch2) (make-async-bi-channel)) + (define-values (ch1 ch2) + (cond + [(channel? mrch) (make-async-bi-channel)] + [(place-channel? mrch) (place-channel)] + [else (raise (format "Unexpected channel type ~a" mrch))])) (*channel-put mrch (dcgm DCGM-CONTROL-NEW-CONNECTION dest -1 (list name ch2))) ch1)