diff --git a/collects/setup/parallel-do.rkt b/collects/setup/parallel-do.rkt index cac9403426..dbf340fa18 100644 --- a/collects/setup/parallel-do.rkt +++ b/collects/setup/parallel-do.rkt @@ -74,54 +74,49 @@ (exit 1)) #f)) (letrec ([loop (match-lambda* - ;; QUEUE IDLE INFLIGHT COUNT - ;; Reached stopat count STOP - [(list idle inflight count (? error-threshold error-count)) (void)] - [(list idle inflight (? (lambda (x) (= x stopat))) error-count) (printf "DONE AT LIMIT\n")] - ;; Send work to idle worker - [(list (and (? jobs?) (cons wrkr idle)) inflight count error-count) - (let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))]) - (let retry-loop ([wrkr wrkr] - [error-count error-count]) - (error-threshold error-count) - (match wrkr - [(worker i s o in e) - (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e)) - (kill-worker wrkr) - (retry-loop (spawn i) (add1 error-count)))]) - (send/msg cmd-list in))]) - (loop idle (cons (list job wrkr) inflight) count error-count)))] - ;; Queue empty and all workers idle, we are all done - [(list (and (? empty?) idle) (list) count error-count) - (set! workers idle)] - ;; Wait for reply from worker - [(list idle inflight count error-count) - (apply sync (map (λ (node-worker) (match node-worker - [(list node (and wrkr (worker id sh out in err))) - (handle-evt out (λ (e) - (let ([msg - (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e)) - (kill-worker wrkr) - (loop (cons (spawn id) idle) - (remove node-worker inflight) - count - (add1 error-count)))]) - (read out))]) - (work-done jobqueue node id msg) - (loop - (cons wrkr idle) - (remove node-worker inflight) - (add1 count) - error-count))))])) - - inflight))])]) + ;; QUEUE IDLE INFLIGHT COUNT + ;; Reached stopat count STOP + [(list idle inflight count (? error-threshold error-count)) (void)] + [(list idle inflight (? (lambda (x) (= x stopat))) error-count) (printf "DONE AT LIMIT\n")] + ;; Send work to idle worker + [(list (and (? jobs?) (cons wrkr idle)) inflight count error-count) + (let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))]) + (let retry-loop ([wrkr wrkr] + [error-count error-count]) + (error-threshold error-count) + (match wrkr + [(worker i s o in e) + (with-handlers* ([exn:fail? (lambda (e) + (printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e)) + (kill-worker wrkr) + (retry-loop (spawn i) (add1 error-count)))]) + (send/msg cmd-list in))]) + (loop idle (cons (list job wrkr) inflight) count error-count)))] + ;; Queue empty and all workers idle, we are all done + [(list (and (? empty?) idle) (list) count error-count) (set! workers idle)] + ;; Wait for reply from worker + [(list idle inflight count error-count) + (apply sync (map (λ (node-worker) (match node-worker + [(list node (and wrkr (worker id sh out in err))) + (handle-evt out (λ (e) + (let ([msg (with-handlers* ([exn:fail? (lambda (e) + (printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e)) + (kill-worker wrkr) + (loop (cons (spawn id) idle) + (remove node-worker inflight) + count + (add1 error-count)))]) + (read out))]) + (work-done jobqueue node id msg) + (loop (cons wrkr idle) + (remove node-worker inflight) + (add1 count) + error-count))))])) + + inflight))])]) (loop workers null 0 0))) (lambda () - (for ([p workers]) - (with-handlers ([exn? void]) - (send/msg (list 'DIE) (worker-in p)))) + (for ([p workers]) (with-handlers ([exn? void]) (send/msg (list 'DIE) (worker-in p)))) (for ([p workers]) (subprocess-wait (worker-process-handle p)))))) (define (parallel-do-default-error-handler work error-message outstr errstr) @@ -129,9 +124,8 @@ (printf "STDOUT\n~a=====\n" outstr) (printf "STDERR\n~a=====\n" errstr)) -(define-struct list-queue (queue results create-job-thunk success-thunk failure-thunk) #:transparent - #:mutable - #:property prop:jobqueue +(define-struct list-queue (queue results create-job-thunk success-thunk failure-thunk) + #:transparent #:mutable #:property prop:jobqueue (define-methods jobqueue (define (work-done jobqueue work workerid msg) (match msg