Parallel-build fixes for ctrl-c/break
This commit is contained in:
parent
3a9eee936a
commit
85098c5290
|
@ -33,56 +33,64 @@
|
|||
(eprintf "KILLING WORKER ~a ~a ~n" i nw)
|
||||
(close-input-port o)
|
||||
(close-output-port in))
|
||||
(define workers (for/list ([i (in-range nprocs)]) (spawn i)))
|
||||
(define workers #f)
|
||||
(define (jobs? queue)
|
||||
(has-jobs? jobqueue queue))
|
||||
(define (empty? queue)
|
||||
(not (has-jobs? jobqueue queue)))
|
||||
|
||||
(letrec ([loop (match-lambda*
|
||||
;; QUEUE IDLE INFLIGHT COUNT
|
||||
;; Reached stopat count STOP
|
||||
[(list queue idle inflight (? (lambda (x) (= x stopat)))) (printf "DONE AT LIMIT~n")]
|
||||
;; Send work to idle worker
|
||||
[(list (? jobs? queue) (cons worker idle) inflight count)
|
||||
(let-values ([(queue-state job cmd-list) (get-job jobqueue queue (first worker))])
|
||||
(let retry-loop ([worker worker])
|
||||
(match worker
|
||||
[(list i s o in e)
|
||||
(with-handlers* ([exn:fail? (lambda (nw)
|
||||
(kill-worker i nw i o)
|
||||
(retry-loop (spawn i)))])
|
||||
(send/msg cmd-list in))])
|
||||
(loop queue-state idle (cons (list job worker) inflight) count)))]
|
||||
;; Queue empty and all workers idle, we are all done
|
||||
[(list (? empty?) idle (list) count) (void)]
|
||||
;; Wait for reply from worker
|
||||
[(list queue idle inflight count)
|
||||
(apply sync (map (λ (node-worker) (match node-worker
|
||||
[(list node worker)
|
||||
(match worker
|
||||
[(list i s o in e)
|
||||
(handle-evt o (λ (e)
|
||||
(let ([msg
|
||||
(with-handlers* ([exn:fail? (lambda (nw)
|
||||
(printf "READ ERROR - reading worker: ~a ~n" nw)
|
||||
(kill-worker i nw i o)
|
||||
(loop queue
|
||||
(cons (spawn i) idle)
|
||||
(remove node-worker inflight)
|
||||
count))])
|
||||
(read o))])
|
||||
;(list count i (length idle) (length inflight) (length queue))
|
||||
(loop (work-done jobqueue queue node i msg)
|
||||
(cons worker idle)
|
||||
(remove node-worker inflight)
|
||||
(+ count 1)))))])]))
|
||||
|
||||
inflight))])])
|
||||
(loop (initial-queue jobqueue) workers null 0))
|
||||
|
||||
(for ([p workers]) (send/msg (list 'DIE) (fourth p)))
|
||||
(for ([p workers]) (subprocess-wait (second p))))
|
||||
|
||||
(parameterize-break #f
|
||||
(set! workers (for/list ([i (in-range nprocs)]) (spawn i))))
|
||||
|
||||
(dynamic-wind
|
||||
(lambda () (void))
|
||||
(lambda ()
|
||||
(letrec ([loop (match-lambda*
|
||||
;; QUEUE IDLE INFLIGHT COUNT
|
||||
;; Reached stopat count STOP
|
||||
[(list queue idle inflight (? (lambda (x) (= x stopat)))) (printf "DONE AT LIMIT~n")]
|
||||
;; Send work to idle worker
|
||||
[(list (? jobs? queue) (cons worker idle) inflight count)
|
||||
(let-values ([(queue-state job cmd-list) (get-job jobqueue queue (first worker))])
|
||||
(let retry-loop ([worker worker])
|
||||
(match worker
|
||||
[(list i s o in e)
|
||||
(with-handlers* ([exn:fail? (lambda (nw)
|
||||
(kill-worker i nw i o)
|
||||
(retry-loop (spawn i)))])
|
||||
(send/msg cmd-list in))])
|
||||
(loop queue-state idle (cons (list job worker) inflight) count)))]
|
||||
;; Queue empty and all workers idle, we are all done
|
||||
[(list (? empty?) idle (list) count) (void)]
|
||||
;; Wait for reply from worker
|
||||
[(list queue idle inflight count)
|
||||
(apply sync (map (λ (node-worker) (match node-worker
|
||||
[(list node worker)
|
||||
(match worker
|
||||
[(list i s o in e)
|
||||
(handle-evt o (λ (e)
|
||||
(let ([msg
|
||||
(with-handlers* ([exn:fail? (lambda (nw)
|
||||
(printf "READ ERROR - reading worker: ~a ~n" nw)
|
||||
(kill-worker i nw i o)
|
||||
(loop queue
|
||||
(cons (spawn i) idle)
|
||||
(remove node-worker inflight)
|
||||
count))])
|
||||
(read o))])
|
||||
;(list count i (length idle) (length inflight) (length queue))
|
||||
(loop (work-done jobqueue queue node i msg)
|
||||
(cons worker idle)
|
||||
(remove node-worker inflight)
|
||||
(+ count 1)))))])]))
|
||||
|
||||
inflight))])])
|
||||
(loop (initial-queue jobqueue) workers null 0)))
|
||||
(lambda ()
|
||||
(for ([p workers])
|
||||
(with-handlers ([exn? void])
|
||||
(send/msg (list 'DIE) (fourth p))))
|
||||
(for ([p workers]) (subprocess-wait (second p))))))
|
||||
|
||||
|
||||
(define-struct collects-queue (cclst hash collects-dir printer) #:transparent
|
||||
|
|
Loading…
Reference in New Issue
Block a user