Parallel build: improved error handling

This commit is contained in:
Kevin Tew 2010-07-30 11:42:18 -06:00
parent 0e8af6bc5d
commit 6c2e1fa34f

View File

@ -67,27 +67,36 @@
(parameterize-break #f (parameterize-break #f
(set! workers (for/list ([i (in-range nprocs)]) (spawn i))))) (set! workers (for/list ([i (in-range nprocs)]) (spawn i)))))
(lambda () (lambda ()
(define (error-threshold x)
(if (x . >= . 4)
(begin
(eprintf "Error count reached ~a, exiting~n" x)
(exit 1))
#f))
(letrec ([loop (match-lambda* (letrec ([loop (match-lambda*
;; QUEUE IDLE INFLIGHT COUNT ;; QUEUE IDLE INFLIGHT COUNT
;; Reached stopat count STOP ;; Reached stopat count STOP
[(list idle inflight (? (lambda (x) (= x stopat)))) (printf "DONE AT LIMIT~n")] [(list idle inflight count (? error-threshold error-count)) (void)]
[(list idle inflight (? (lambda (x) (= x stopat))) error-count) (printf "DONE AT LIMIT~n")]
;; Send work to idle worker ;; Send work to idle worker
[(list (and (? jobs?) (cons wrkr idle)) inflight count) [(list (and (? jobs?) (cons wrkr idle)) inflight count error-count)
(let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))]) (let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))])
(let retry-loop ([wrkr wrkr]) (let retry-loop ([wrkr wrkr]
[error-count error-count])
(error-threshold error-count)
(match wrkr (match wrkr
[(worker i s o in e) [(worker i s o in e)
(with-handlers* ([exn:fail? (lambda (e) (with-handlers* ([exn:fail? (lambda (e)
(printf "MASTER WRITE ERROR - writing to worker: ~a~n" (exn-message e)) (printf "MASTER WRITE ERROR - writing to worker: ~a~n" (exn-message e))
(kill-worker wrkr) (kill-worker wrkr)
(retry-loop (spawn i)))]) (retry-loop (spawn i) (add1 error-count)))])
(send/msg cmd-list in))]) (send/msg cmd-list in))])
(loop idle (cons (list job wrkr) inflight) count)))] (loop idle (cons (list job wrkr) inflight) count error-count)))]
;; Queue empty and all workers idle, we are all done ;; Queue empty and all workers idle, we are all done
[(list (and (? empty?) idle) (list) count) [(list (and (? empty?) idle) (list) count error-count)
(set! workers idle)] (set! workers idle)]
;; Wait for reply from worker ;; Wait for reply from worker
[(list idle inflight count) [(list idle inflight count error-count)
(apply sync (map (λ (node-worker) (match node-worker (apply sync (map (λ (node-worker) (match node-worker
[(list node (and wrkr (worker id sh out in err))) [(list node (and wrkr (worker id sh out in err)))
(handle-evt out (λ (e) (handle-evt out (λ (e)
@ -97,16 +106,18 @@
(kill-worker wrkr) (kill-worker wrkr)
(loop (cons (spawn id) idle) (loop (cons (spawn id) idle)
(remove node-worker inflight) (remove node-worker inflight)
count))]) count
(add1 error-count)))])
(read out))]) (read out))])
(work-done jobqueue node id msg) (work-done jobqueue node id msg)
(loop (loop
(cons wrkr idle) (cons wrkr idle)
(remove node-worker inflight) (remove node-worker inflight)
(+ count 1)))))])) (add1 count)
error-count))))]))
inflight))])]) inflight))])])
(loop workers null 0))) (loop workers null 0 0)))
(lambda () (lambda ()
(for ([p workers]) (for ([p workers])
(with-handlers ([exn? void]) (with-handlers ([exn? void])
@ -160,13 +171,15 @@
(define (pdo-send msg) (define (pdo-send msg)
(with-handlers ([exn:fail? (with-handlers ([exn:fail?
(lambda (x) (lambda (x)
(fprintf orig-err "WORKER SEND MESSAGE ERROR ~a~n" (exn-message x)))]) (fprintf orig-err "WORKER SEND MESSAGE ERROR ~a~n" (exn-message x))
(exit 1))])
(write msg orig-out) (write msg orig-out)
(flush-output orig-out))) (flush-output orig-out)))
(define (pdo-recv) (define (pdo-recv)
(with-handlers ([exn:fail? (with-handlers ([exn:fail?
(lambda (x) (lambda (x)
(fprintf orig-err "WORKER RECEIVE MESSAGE ERROR ~a~n" (exn-message x)))]) (fprintf orig-err "WORKER RECEIVE MESSAGE ERROR ~a~n" (exn-message x))
(exit 1))])
(read))) (read)))
(match (deserialize (fasl->s-exp (pdo-recv))) (match (deserialize (fasl->s-exp (pdo-recv)))
[globals-list [globals-list