diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 41fb12f057..979cd1fc8d 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -106,6 +106,9 @@ port-no? ) +;(define klogger displayln) +(define klogger (lambda (x) (log-debug x))) + (define in-message-router-mark (cons #f #f)) (define (call-in-message-router thunk) (with-continuation-mark in-message-router-mark #f @@ -171,7 +174,7 @@ [wait-time start-seconds]) (with-handlers ([exn? (lambda (e) (cond [(t . < . times) - (log-debug (format "backing off ~a sec to ~a:~a" (expt 2 t) rname rport)) + (klogger (format "backing off ~a sec to ~a:~a" (expt 2 t) rname rport)) (sleep wait-time) (loop (add1 t) (* 2 wait-time))] [else (raise e)]))]) @@ -181,7 +184,7 @@ (let loop ([t 0]) (with-handlers ([exn? (lambda (e) (cond [(t . < . times) - (log-debug (format "waiting ~a sec to retry connection to ~a:~a" delay rname rport)) + (klogger (format "waiting ~a sec to retry connection to ~a:~a" delay rname rport)) (sleep delay) (loop (add1 t))] [else (raise e)]))]) @@ -316,13 +319,13 @@ (set! o (box _o)) (set! i (box _i)) (set! e (box _e))) - (log-debug (format"SPAWNED-PROCESS:~a ~a" pid cmdline-list)) + (klogger (format"SPAWNED-PROCESS:~a ~a" pid cmdline-list)) (define (mk-handler _port desc) (define port (unbox _port)) (if port (wrap-evt port (lambda (e) - (define (print-out x) (log-debug (format "SPAWNED-PROCESS ~a:~a:~a ~a" pid desc (->length x) x)) + (define (print-out x) (klogger (format "SPAWNED-PROCESS ~a:~a:~a ~a" pid desc (->length x) x)) (flush-output)) (cond [(not port) (print-out "IS #F")] @@ -344,7 +347,7 @@ (for/filter/fold/cons nes ([x (list s (list o "OUT") (list e "ERR"))]) (cond [(subprocess? x) (wrap-evt s (lambda (e) - (log-debug (format "SPAWNED-PROCESS ~a DIED" pid)) + (klogger (format "SPAWNED-PROCESS ~a DIED" pid)) (and parent (send parent process-died this))))] [(list? x) (apply mk-handler x)] [else #f]))) @@ -379,11 +382,18 @@ (send node forward-mesg e pch)] [else (put-msg e)]))) nes)) + (define/public (drain-place-channel) + (let loop () + (when (apply sync/timeout/enable-break 0 (register null)) + (loop)))) (define/public (get-sc-id) id) (define/public (get-raw-msg) (let loop () (define msg (send sch read-message)) (cond + [(eof-object? msg) + (printf "EOF from remote end during place-socket-bridge% get-raw-msg\n") + msg] [(= (dcgm-type msg) DCGM-DPLACE-DIED) (set! msg-queue (append msg-queue (list msg))) (loop)] @@ -489,15 +499,18 @@ [else (sconn-write-flush src-channel (dcgm DCGM-TYPE-INTER-DCHANNEL ch-id ch-id (format "ERROR: name not found ~a" name)))])] - [else - (define np (new place% + [else + (with-handlers ([exn? (lambda (e) + (printf/f "Error starting place command ~a ~a\n" place-exec e) + #;(raise e))]) + (define np (new place% [place-exec place-exec] [ch-id ch-id] [sc src-channel] [node this])) - (match place-exec - [(list _ _ _ name) (add-named-place name np)] - [else (add-sub-ec np)])])] + (match place-exec + [(list _ _ _ name) (add-named-place name np)] + [else (add-sub-ec np)]))])] [(dcgm 3 #;(== DCGM-TYPE-NEW-INTER-DCHANNEL) src dest ch-id) (define s src-channel) (define d (vector-ref chan-vec dest)) @@ -508,14 +521,17 @@ [(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg) (define pch (sconn-lookup-subchannel src-channel ch-id)) ;(printf/f "4 ~a ~a ~a ~a\n" src-channel ch-id pch msg) - (cond - [(place-channel? pch) - (place-channel-put pch msg)] - [(is-a? pch connection%) - (send pch forward msg)] - [(th-place-channel? pch) - (th-place-channel-put pch msg)] - [else (raise (format "Unexpected channel type2 ~a" pch))])] + (match pch + [#f (raise (format "Unknown channel ch-id ~a in message ~a" ch-id m))] + [else + (cond + [(place-channel? pch) + (place-channel-put pch msg)] + [(is-a? pch connection%) + (send pch forward msg)] + [(th-place-channel? pch) + (th-place-channel-put pch msg)] + [else (raise (format "Unexpected channel type2 ~a" pch))])])] [(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1) (define node (spawn-remote-racket-node node-name #:listen-port node-port)) (for ([x (in-hash-values spawned-nodes)]) @@ -528,11 +544,11 @@ ;#:restart-on-exit restart-on-exit )] [(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id) - (log-debug (format"PLACE ~a died" ch-id))] + (klogger (format"PLACE ~a died" ch-id))] [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) (log-from-child #:severity severity msg)] [(dcgm 11 #;(== DCGM-TYPE-SET-OWNER) -1 -1 msg) - (log-debug (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel)) + (klogger (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel)) (set! owner src-channel)] [(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port)) (define node (find-spawned-node (list node-name node-port))) @@ -576,7 +592,7 @@ [(or (place-channel? d) (place? d)) (place-channel-put d m)])] [(? eof-object?) - (log-debug (format "connection died")) + (klogger (format "connection died")) (flush-output) (exit 1) ])) @@ -624,7 +640,7 @@ (wrap-evt listen-port (lambda (e) (define-values (in out) (tcp-accept listen-port)) (define-values (lh lp rh rp) (tcp-addresses in #t)) - (log-debug (format "INCOMING CONNECTION ~a:~a <- ~a:~a" lh lp rh rp)) + (klogger (format "INCOMING CONNECTION ~a:~a <- ~a:~a" lh lp rh rp)) (define sp (new socket-connection% [in in] [out out])) (add-socket-port sp))) nes) @@ -723,7 +739,7 @@ (let loop ([t 0]) (with-handlers ([exn? (lambda (e) (cond [(t . < . times) - (log-debug (format "try ~a waiting ~a sec to retry connection to ~a:~a" t delay rname rport)) + (klogger (format "try ~a waiting ~a sec to retry connection to ~a:~a" t delay rname rport)) (sleep delay) (loop (add1 t))] [else (raise e)]))]) @@ -745,7 +761,10 @@ (define/public (add-subchannel id pch) (set! subchannels (append subchannels (list (cons id pch))))) - (define/public (lookup-subchannel id) (cdr (assoc id subchannels))) + (define/public (lookup-subchannel id) + (match (assoc id subchannels) + [(cons _id _pch) _pch] + [else #f])) (define/public (_write-flush x) (when (equal? out #f) (ensure-connected)) ;(printf/f "SC ~a ~a\n" x out) @@ -761,7 +780,9 @@ (wrap-evt in (lambda (e) (forwarder (with-handlers ([exn:fail? handle-error]) - (read in)) + (define r (read in)) + ;(printf/f "SC IN ~a\n" r) + r) this)))) (define/public (read-message) @@ -846,7 +867,7 @@ (define (on-socket-event it in-port) (match it [(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id) - (log-debug (format "SPAWNED-PROCESS:~a PLACE DIED ~a:~a:~a" (get-sp-pid) host-name listen-port ch-id)) + (klogger (format "SPAWNED-PROCESS:~a PLACE DIED ~a:~a:~a" (get-sp-pid) host-name listen-port ch-id)) (cond [(find-place-by-sc-id ch-id) => (lambda (rp) (send rp place-died))] @@ -854,16 +875,19 @@ [(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg) (define pch (sconn-lookup-subchannel sc ch-id)) ;(printf/f "44 ~a ~a ~a ~a\n" in-port ch-id pch msg) - (cond - [(place-channel? pch) - (place-channel-put pch msg)] - [(is-a? pch connection%) - (send pch forward msg)] - [(th-place-channel? pch) - (th-place-channel-put pch msg)] - [(async-bi-channel? pch) - (async-bi-channel-put pch msg)] - [else (raise (format "Unexpected channel type5 ~a" pch))])] + (match pch + [#f (raise (format "Unknown channel ch-id ~a in message ~a" ch-id it))] + [else + (cond + [(place-channel? pch) + (place-channel-put pch msg)] + [(is-a? pch connection%) + (send pch forward msg)] + [(th-place-channel? pch) + (th-place-channel-put pch msg)] + [(async-bi-channel? pch) + (async-bi-channel-put pch msg)] + [else (raise (format "Unexpected channel type ~a" pch))])])] [(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg)) (define parent (send this get-router)) (cond @@ -873,14 +897,14 @@ [(? eof-object?) (define-values (lh lp rh rp) (send sc addresses)) - (log-debug (format "EOF on node socket connection pid to ~a ~a:~a CONNECTION ~a:~a -> ~a:~a" (get-sp-pid) host-name listen-port lh lp rh rp)) + (klogger (format "EOF on node socket connection pid to ~a ~a:~a CONNECTION ~a:~a -> ~a:~a" (get-sp-pid) host-name listen-port lh lp rh rp)) (set! sc #f)] - [else (log-debug (format"received message ~a from ~a" it in-port))])) + [else (klogger (format"received message ~a from ~a" it in-port))])) (define/public (get-log-prefix) (format "PLACE ~a:~a" host-name listen-port)) (define/public (tcp-connection-died host port e) - (log-debug (format "TCP connection ~a:~a died, ~a, restarting node/connection" host-name listen-port e)) + (klogger (format "TCP connection ~a:~a died, ~a, restarting node/connection" host-name listen-port e)) (and sp (send sp kill)) (set! sp #f) (cond @@ -890,10 +914,10 @@ (restart-node) (send restart-on-exit restart restart-node))] [else - (log-debug (format "No restart condition for ~a" (get-log-prefix)))])) + (klogger (format "No restart condition for ~a" (get-log-prefix)))])) (define/public (process-died child) - (log-debug (format "Remote node pid ~a ~a:~a died" (get-sp-pid) host-name listen-port)) + (klogger (format "Remote node pid ~a ~a:~a died" (get-sp-pid) host-name listen-port)) (set! sp #f) (cond [restart-on-exit @@ -903,9 +927,9 @@ (restart-node) (send restart-on-exit restart restart-node))] [else - (log-debug (format "No restart cmdline arguments for ~a" (get-log-prefix)))])] + (klogger (format "No restart cmdline arguments for ~a" (get-log-prefix)))])] [else - (log-debug (format "No restart condition for ~a" (get-log-prefix)))])) + (klogger (format "No restart condition for ~a" (get-log-prefix)))])) (define/public (get-first-place) (car remote-places)) @@ -1024,11 +1048,11 @@ [(procedure? restart-on-exit) (restart-on-exit)] [else (send restart-on-exit restart restart-place)])] [else - (log-debug (format "No restart condition for ~a:~a" + (klogger (format "No restart condition for ~a:~a" (send node get-log-prefix) (send psb get-sc-id)))])) (define (on-channel-event e) - (log-debug (format "~a ~a" (send node get-log-prefix) e))) + (klogger (format "~a ~a" (send node get-log-prefix) e))) (define/public (register es) (let* ([es (if pc (cons (wrap-evt pc (cond @@ -1079,6 +1103,8 @@ (field [running #f]) (define (default-on-place-dead e) (set! pd #f) + ;drain operation might be needed if sync returns dead place evt before ready place channel + ;(send psb drain-place-channel) ;(set! psb #f) (sconn-write-flush sc (dcgm DCGM-DPLACE-DIED -1 -1 ch-id)) (sconn-remove-subchannel sc ch-id)) @@ -1227,7 +1253,7 @@ (define/public (restart restart-func) (cond [(and retry (>= retries retry)) - (log-debug (format "Already retried to restart ~a times" retry)) + (klogger (format "Already retried to restart ~a times" retry)) (and on-final-fail (on-final-fail))] [(> (- (current-inexact-milliseconds) last-attempt) (* seconds 1000)) (when (> (- (current-inexact-milliseconds) last-attempt) (* retry-reset 1000))