Improved place/distributed error reporting

This commit is contained in:
Kevin Tew 2012-12-16 16:46:29 -07:00
parent 6d9740fcf1
commit ff9dcb62e8

View File

@ -106,6 +106,9 @@
port-no?
)
;(define klogger displayln)
(define klogger (lambda (x) (log-debug x)))
(define in-message-router-mark (cons #f #f))
(define (call-in-message-router thunk)
(with-continuation-mark in-message-router-mark #f
@ -171,7 +174,7 @@
[wait-time start-seconds])
(with-handlers ([exn? (lambda (e)
(cond [(t . < . times)
(log-debug (format "backing off ~a sec to ~a:~a" (expt 2 t) rname rport))
(klogger (format "backing off ~a sec to ~a:~a" (expt 2 t) rname rport))
(sleep wait-time)
(loop (add1 t) (* 2 wait-time))]
[else (raise e)]))])
@ -181,7 +184,7 @@
(let loop ([t 0])
(with-handlers ([exn? (lambda (e)
(cond [(t . < . times)
(log-debug (format "waiting ~a sec to retry connection to ~a:~a" delay rname rport))
(klogger (format "waiting ~a sec to retry connection to ~a:~a" delay rname rport))
(sleep delay)
(loop (add1 t))]
[else (raise e)]))])
@ -316,13 +319,13 @@
(set! o (box _o))
(set! i (box _i))
(set! e (box _e)))
(log-debug (format"SPAWNED-PROCESS:~a ~a" pid cmdline-list))
(klogger (format"SPAWNED-PROCESS:~a ~a" pid cmdline-list))
(define (mk-handler _port desc)
(define port (unbox _port))
(if port
(wrap-evt port (lambda (e)
(define (print-out x) (log-debug (format "SPAWNED-PROCESS ~a:~a:~a ~a" pid desc (->length x) x))
(define (print-out x) (klogger (format "SPAWNED-PROCESS ~a:~a:~a ~a" pid desc (->length x) x))
(flush-output))
(cond
[(not port) (print-out "IS #F")]
@ -344,7 +347,7 @@
(for/filter/fold/cons nes ([x (list s (list o "OUT") (list e "ERR"))])
(cond
[(subprocess? x) (wrap-evt s (lambda (e)
(log-debug (format "SPAWNED-PROCESS ~a DIED" pid))
(klogger (format "SPAWNED-PROCESS ~a DIED" pid))
(and parent (send parent process-died this))))]
[(list? x) (apply mk-handler x)]
[else #f])))
@ -379,11 +382,18 @@
(send node forward-mesg e pch)]
[else (put-msg e)])))
nes))
(define/public (drain-place-channel)
(let loop ()
(when (apply sync/timeout/enable-break 0 (register null))
(loop))))
(define/public (get-sc-id) id)
(define/public (get-raw-msg)
(let loop ()
(define msg (send sch read-message))
(cond
[(eof-object? msg)
(printf "EOF from remote end during place-socket-bridge% get-raw-msg\n")
msg]
[(= (dcgm-type msg) DCGM-DPLACE-DIED)
(set! msg-queue (append msg-queue (list msg)))
(loop)]
@ -489,15 +499,18 @@
[else
(sconn-write-flush src-channel (dcgm DCGM-TYPE-INTER-DCHANNEL ch-id ch-id
(format "ERROR: name not found ~a" name)))])]
[else
(define np (new place%
[else
(with-handlers ([exn? (lambda (e)
(printf/f "Error starting place command ~a ~a\n" place-exec e)
#;(raise e))])
(define np (new place%
[place-exec place-exec]
[ch-id ch-id]
[sc src-channel]
[node this]))
(match place-exec
[(list _ _ _ name) (add-named-place name np)]
[else (add-sub-ec np)])])]
(match place-exec
[(list _ _ _ name) (add-named-place name np)]
[else (add-sub-ec np)]))])]
[(dcgm 3 #;(== DCGM-TYPE-NEW-INTER-DCHANNEL) src dest ch-id)
(define s src-channel)
(define d (vector-ref chan-vec dest))
@ -508,14 +521,17 @@
[(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg)
(define pch (sconn-lookup-subchannel src-channel ch-id))
;(printf/f "4 ~a ~a ~a ~a\n" src-channel ch-id pch msg)
(cond
[(place-channel? pch)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)]
[else (raise (format "Unexpected channel type2 ~a" pch))])]
(match pch
[#f (raise (format "Unknown channel ch-id ~a in message ~a" ch-id m))]
[else
(cond
[(place-channel? pch)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)]
[else (raise (format "Unexpected channel type2 ~a" pch))])])]
[(dcgm 6 #;(== DCGM-TYPE-SPAWN-REMOTE-PROCESS) src (list node-name node-port mod-path funcname) ch1)
(define node (spawn-remote-racket-node node-name #:listen-port node-port))
(for ([x (in-hash-values spawned-nodes)])
@ -528,11 +544,11 @@
;#:restart-on-exit restart-on-exit
)]
[(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id)
(log-debug (format"PLACE ~a died" ch-id))]
(klogger (format"PLACE ~a died" ch-id))]
[(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg))
(log-from-child #:severity severity msg)]
[(dcgm 11 #;(== DCGM-TYPE-SET-OWNER) -1 -1 msg)
(log-debug (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel))
(klogger (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel))
(set! owner src-channel)]
[(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port))
(define node (find-spawned-node (list node-name node-port)))
@ -576,7 +592,7 @@
[(or (place-channel? d) (place? d))
(place-channel-put d m)])]
[(? eof-object?)
(log-debug (format "connection died"))
(klogger (format "connection died"))
(flush-output)
(exit 1)
]))
@ -624,7 +640,7 @@
(wrap-evt listen-port (lambda (e)
(define-values (in out) (tcp-accept listen-port))
(define-values (lh lp rh rp) (tcp-addresses in #t))
(log-debug (format "INCOMING CONNECTION ~a:~a <- ~a:~a" lh lp rh rp))
(klogger (format "INCOMING CONNECTION ~a:~a <- ~a:~a" lh lp rh rp))
(define sp (new socket-connection% [in in] [out out]))
(add-socket-port sp)))
nes)
@ -723,7 +739,7 @@
(let loop ([t 0])
(with-handlers ([exn? (lambda (e)
(cond [(t . < . times)
(log-debug (format "try ~a waiting ~a sec to retry connection to ~a:~a" t delay rname rport))
(klogger (format "try ~a waiting ~a sec to retry connection to ~a:~a" t delay rname rport))
(sleep delay)
(loop (add1 t))]
[else (raise e)]))])
@ -745,7 +761,10 @@
(define/public (add-subchannel id pch)
(set! subchannels (append subchannels (list (cons id pch)))))
(define/public (lookup-subchannel id) (cdr (assoc id subchannels)))
(define/public (lookup-subchannel id)
(match (assoc id subchannels)
[(cons _id _pch) _pch]
[else #f]))
(define/public (_write-flush x)
(when (equal? out #f) (ensure-connected))
;(printf/f "SC ~a ~a\n" x out)
@ -761,7 +780,9 @@
(wrap-evt in (lambda (e)
(forwarder
(with-handlers ([exn:fail? handle-error])
(read in))
(define r (read in))
;(printf/f "SC IN ~a\n" r)
r)
this))))
(define/public (read-message)
@ -846,7 +867,7 @@
(define (on-socket-event it in-port)
(match it
[(dcgm 7 #;(== DCGM-DPLACE-DIED) -1 -1 ch-id)
(log-debug (format "SPAWNED-PROCESS:~a PLACE DIED ~a:~a:~a" (get-sp-pid) host-name listen-port ch-id))
(klogger (format "SPAWNED-PROCESS:~a PLACE DIED ~a:~a:~a" (get-sp-pid) host-name listen-port ch-id))
(cond
[(find-place-by-sc-id ch-id) => (lambda (rp)
(send rp place-died))]
@ -854,16 +875,19 @@
[(dcgm 4 #;(== DCGM-TYPE-INTER-DCHANNEL) _ ch-id msg)
(define pch (sconn-lookup-subchannel sc ch-id))
;(printf/f "44 ~a ~a ~a ~a\n" in-port ch-id pch msg)
(cond
[(place-channel? pch)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)]
[(async-bi-channel? pch)
(async-bi-channel-put pch msg)]
[else (raise (format "Unexpected channel type5 ~a" pch))])]
(match pch
[#f (raise (format "Unknown channel ch-id ~a in message ~a" ch-id it))]
[else
(cond
[(place-channel? pch)
(place-channel-put pch msg)]
[(is-a? pch connection%)
(send pch forward msg)]
[(th-place-channel? pch)
(th-place-channel-put pch msg)]
[(async-bi-channel? pch)
(async-bi-channel-put pch msg)]
[else (raise (format "Unexpected channel type ~a" pch))])])]
[(dcgm 8 #;(== DCGM-TYPE-LOG-TO-PARENT) _ _ (list severity msg))
(define parent (send this get-router))
(cond
@ -873,14 +897,14 @@
[(? eof-object?)
(define-values (lh lp rh rp) (send sc addresses))
(log-debug (format "EOF on node socket connection pid to ~a ~a:~a CONNECTION ~a:~a -> ~a:~a" (get-sp-pid) host-name listen-port lh lp rh rp))
(klogger (format "EOF on node socket connection pid to ~a ~a:~a CONNECTION ~a:~a -> ~a:~a" (get-sp-pid) host-name listen-port lh lp rh rp))
(set! sc #f)]
[else (log-debug (format"received message ~a from ~a" it in-port))]))
[else (klogger (format"received message ~a from ~a" it in-port))]))
(define/public (get-log-prefix) (format "PLACE ~a:~a" host-name listen-port))
(define/public (tcp-connection-died host port e)
(log-debug (format "TCP connection ~a:~a died, ~a, restarting node/connection" host-name listen-port e))
(klogger (format "TCP connection ~a:~a died, ~a, restarting node/connection" host-name listen-port e))
(and sp (send sp kill))
(set! sp #f)
(cond
@ -890,10 +914,10 @@
(restart-node)
(send restart-on-exit restart restart-node))]
[else
(log-debug (format "No restart condition for ~a" (get-log-prefix)))]))
(klogger (format "No restart condition for ~a" (get-log-prefix)))]))
(define/public (process-died child)
(log-debug (format "Remote node pid ~a ~a:~a died" (get-sp-pid) host-name listen-port))
(klogger (format "Remote node pid ~a ~a:~a died" (get-sp-pid) host-name listen-port))
(set! sp #f)
(cond
[restart-on-exit
@ -903,9 +927,9 @@
(restart-node)
(send restart-on-exit restart restart-node))]
[else
(log-debug (format "No restart cmdline arguments for ~a" (get-log-prefix)))])]
(klogger (format "No restart cmdline arguments for ~a" (get-log-prefix)))])]
[else
(log-debug (format "No restart condition for ~a" (get-log-prefix)))]))
(klogger (format "No restart condition for ~a" (get-log-prefix)))]))
(define/public (get-first-place)
(car remote-places))
@ -1024,11 +1048,11 @@
[(procedure? restart-on-exit) (restart-on-exit)]
[else (send restart-on-exit restart restart-place)])]
[else
(log-debug (format "No restart condition for ~a:~a"
(klogger (format "No restart condition for ~a:~a"
(send node get-log-prefix)
(send psb get-sc-id)))]))
(define (on-channel-event e)
(log-debug (format "~a ~a" (send node get-log-prefix) e)))
(klogger (format "~a ~a" (send node get-log-prefix) e)))
(define/public (register es)
(let* ([es (if pc (cons (wrap-evt pc
(cond
@ -1079,6 +1103,8 @@
(field [running #f])
(define (default-on-place-dead e)
(set! pd #f)
;drain operation might be needed if sync returns dead place evt before ready place channel
;(send psb drain-place-channel)
;(set! psb #f)
(sconn-write-flush sc (dcgm DCGM-DPLACE-DIED -1 -1 ch-id))
(sconn-remove-subchannel sc ch-id))
@ -1227,7 +1253,7 @@
(define/public (restart restart-func)
(cond
[(and retry (>= retries retry))
(log-debug (format "Already retried to restart ~a times" retry))
(klogger (format "Already retried to restart ~a times" retry))
(and on-final-fail (on-final-fail))]
[(> (- (current-inexact-milliseconds) last-attempt) (* seconds 1000))
(when (> (- (current-inexact-milliseconds) last-attempt) (* retry-reset 1000))