shorten some lines

This commit is contained in:
Kevin Tew 2010-11-02 14:11:26 -06:00
parent c7700dbe06
commit 3ec54a61c2

View File

@ -74,54 +74,49 @@
(exit 1)) (exit 1))
#f)) #f))
(letrec ([loop (match-lambda* (letrec ([loop (match-lambda*
;; QUEUE IDLE INFLIGHT COUNT ;; QUEUE IDLE INFLIGHT COUNT
;; Reached stopat count STOP ;; Reached stopat count STOP
[(list idle inflight count (? error-threshold error-count)) (void)] [(list idle inflight count (? error-threshold error-count)) (void)]
[(list idle inflight (? (lambda (x) (= x stopat))) error-count) (printf "DONE AT LIMIT\n")] [(list idle inflight (? (lambda (x) (= x stopat))) error-count) (printf "DONE AT LIMIT\n")]
;; Send work to idle worker ;; Send work to idle worker
[(list (and (? jobs?) (cons wrkr idle)) inflight count error-count) [(list (and (? jobs?) (cons wrkr idle)) inflight count error-count)
(let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))]) (let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))])
(let retry-loop ([wrkr wrkr] (let retry-loop ([wrkr wrkr]
[error-count error-count]) [error-count error-count])
(error-threshold error-count) (error-threshold error-count)
(match wrkr (match wrkr
[(worker i s o in e) [(worker i s o in e)
(with-handlers* ([exn:fail? (lambda (e) (with-handlers* ([exn:fail? (lambda (e)
(printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e)) (printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e))
(kill-worker wrkr) (kill-worker wrkr)
(retry-loop (spawn i) (add1 error-count)))]) (retry-loop (spawn i) (add1 error-count)))])
(send/msg cmd-list in))]) (send/msg cmd-list in))])
(loop idle (cons (list job wrkr) inflight) count error-count)))] (loop idle (cons (list job wrkr) inflight) count error-count)))]
;; Queue empty and all workers idle, we are all done ;; Queue empty and all workers idle, we are all done
[(list (and (? empty?) idle) (list) count error-count) [(list (and (? empty?) idle) (list) count error-count) (set! workers idle)]
(set! workers idle)] ;; Wait for reply from worker
;; Wait for reply from worker [(list idle inflight count error-count)
[(list idle inflight count error-count) (apply sync (map (λ (node-worker) (match node-worker
(apply sync (map (λ (node-worker) (match node-worker [(list node (and wrkr (worker id sh out in err)))
[(list node (and wrkr (worker id sh out in err))) (handle-evt out (λ (e)
(handle-evt out (λ (e) (let ([msg (with-handlers* ([exn:fail? (lambda (e)
(let ([msg (printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e))
(with-handlers* ([exn:fail? (lambda (e) (kill-worker wrkr)
(printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e)) (loop (cons (spawn id) idle)
(kill-worker wrkr) (remove node-worker inflight)
(loop (cons (spawn id) idle) count
(remove node-worker inflight) (add1 error-count)))])
count (read out))])
(add1 error-count)))]) (work-done jobqueue node id msg)
(read out))]) (loop (cons wrkr idle)
(work-done jobqueue node id msg) (remove node-worker inflight)
(loop (add1 count)
(cons wrkr idle) error-count))))]))
(remove node-worker inflight)
(add1 count) inflight))])])
error-count))))]))
inflight))])])
(loop workers null 0 0))) (loop workers null 0 0)))
(lambda () (lambda ()
(for ([p workers]) (for ([p workers]) (with-handlers ([exn? void]) (send/msg (list 'DIE) (worker-in p))))
(with-handlers ([exn? void])
(send/msg (list 'DIE) (worker-in p))))
(for ([p workers]) (subprocess-wait (worker-process-handle p)))))) (for ([p workers]) (subprocess-wait (worker-process-handle p))))))
(define (parallel-do-default-error-handler work error-message outstr errstr) (define (parallel-do-default-error-handler work error-message outstr errstr)
@ -129,9 +124,8 @@
(printf "STDOUT\n~a=====\n" outstr) (printf "STDOUT\n~a=====\n" outstr)
(printf "STDERR\n~a=====\n" errstr)) (printf "STDERR\n~a=====\n" errstr))
(define-struct list-queue (queue results create-job-thunk success-thunk failure-thunk) #:transparent (define-struct list-queue (queue results create-job-thunk success-thunk failure-thunk)
#:mutable #:transparent #:mutable #:property prop:jobqueue
#:property prop:jobqueue
(define-methods jobqueue (define-methods jobqueue
(define (work-done jobqueue work workerid msg) (define (work-done jobqueue work workerid msg)
(match msg (match msg