refactor some parallel-build code

- simpler and more reliable break protocol
- use logging instead of `eprintf' for non-exception errors
- avoid runtime code duplication in parallel-do macro
- fix some non-tail calls that should be tail calls
- print relevant place in "making" lines
- minor source formatting
This commit is contained in:
Matthew Flatt 2011-08-19 09:50:29 -06:00
parent 1b0abe85c7
commit 3f447b39e2
3 changed files with 259 additions and 183 deletions

View File

@ -74,26 +74,29 @@
(append-error cc "making" null out err "output"))
;(when last (printer (current-output-port) "made" "~a" (cc-name cc)))
#t]
[else (eprintf "Failed trying to match:\n~v\n" result-type)]))]
[else (eprintf "Failed trying to match:\n~e\n" result-type)]))]
[else
(match work
[(list-rest (list cc file last) message)
(append-error cc "making" null "" "" "error")
(eprintf "work-done match cc failed.\n")
(eprintf "trying to match:\n~a\n" (list work msg))
(eprintf "trying to match:\n~e\n" (list work msg))
#t]
[else
(eprintf "work-done match cc failed.\n")
(eprintf "trying to match:\n~a\n" (list work msg))
(eprintf "trying to match:\n~e\n" (list work msg))
(eprintf "FATAL\n")
(exit 1)])]))
;; assigns a collection to each worker to be compiled
;; when it runs out of collections, steals work from other workers collections
(define/public (get-job workerid)
(define (say-making x)
(define (say-making id x)
(unless (null? x)
(printer (current-output-port) "making" "~a" (cc-name (car (car x))))))
(printer (current-output-port)
(format "~a making" id)
"~a"
(cc-name (car (car x))))))
(define (find-job-in-cc cc id)
(define (retry) (get-job workerid))
(define (build-job cc file last)
@ -108,16 +111,16 @@
[(list (list cc (list) (list))) ;empty collect
(hash-remove! hash id) (retry)]
[(cons (list cc (list) (list)) tail) ;empty parent collect
(say-making tail)
(say-making id tail)
(hash-set! hash id tail) (retry)]
[(cons (list cc (list) subs) tail) ;empty srcs list
(define nl (append subs tail))
(say-making nl)
(say-making id nl)
(hash-set! hash id nl) (retry)]
[(cons (list cc (list file) subs) tail)
(define nl (append subs tail))
(hash-set! hash id nl)
(say-making nl)
(say-making id nl)
(build-job cc file #t)]
[(cons (list cc (cons file ft) subs) tail)
(hash-set! hash id (cons (list cc ft subs) tail))
@ -135,7 +138,7 @@
; get next cc from cclst
[(pair? cclst)
(define workercc (list (car cclst)))
(say-making workercc)
(say-making workerid workercc)
(set! cclst (cdr cclst))
(hash-set! hash workerid workercc)
(find-job-in-cc workercc workerid)]
@ -208,7 +211,8 @@
(define-worker (parallel-compile-worker worker-id)
(DEBUG_COMM (eprintf "WORKER ~a\n" worker-id))
(define prev-uncaught-exception-handler (uncaught-exception-handler))
(uncaught-exception-handler (lambda (x)
(uncaught-exception-handler
(lambda (x)
(when (exn:break? x) (exit 1))
(prev-uncaught-exception-handler x)))

View File

@ -47,7 +47,8 @@
; (begin a ...)
)
(define worker<%> (interface ()
(define worker<%>
(interface ()
spawn
send/msg
kill
@ -135,8 +136,8 @@
(define/public (wait) (place-wait pl))
(super-new)))
(define work-queue<%> (interface ()
(define work-queue<%>
(interface ()
get-job
work-done
has-jobs?
@ -174,93 +175,128 @@
(find-system-path 'orig-dir))))))
(define (parallel-do-event-loop module-path funcname initialmsg work-queue nprocs [stopat #f])
(define use-places (place-enabled?))
; (define use-places #f)
(define use-places? (place-enabled?)) ; set to #f to use processes instead of places
(define (spawn id)
(define wrkr (if use-places (new place-worker%) (new worker%)))
;; spawns a new worker
(define wrkr (if use-places? (new place-worker%) (new worker%)))
(wrkr/spawn wrkr id module-path funcname initialmsg)
wrkr)
(define workers null)
(define (spawn! id)
;; spawn a worker and add it to the list;
;; disable breaks because we want to make sure
;; that a new worker is added to the list of workers
;; before a break exception is raised:
(parameterize-break
#f
(let ([w (spawn id)])
(set! workers (cons w workers))
w)))
(define (unspawn! wkr)
(wrkr/kill wkr)
(set! workers (remq wkr workers)))
(define (jobs?) (queue/has work-queue))
(define (empty?) (not (queue/has work-queue)))
(define workers #f)
(define breaks #t)
;; If any exception (including a break exception) happens before
;; the work loop ends, then send a break to interrupt each worker;
;; the `normal-finish?' flag is set to #t when the working loop ends
;; normally.
(define normal-finish? #f)
(define log-exn (lambda (exn [msg #f])
(log-error (let ([s (if (exn? exn)
(exn-message exn)
(format "exception: ~v" exn))])
(if msg
(format "~a; ~a" msg s)
s)))))
(dynamic-wind
(lambda () (void))
(lambda ()
(parameterize-break #f
(set! workers (for/list ([i (in-range nprocs)]) (spawn i)))))
(lambda ()
(define (error-threshold x)
(if (x . >= . 4)
(begin
(eprintf "Error count reached ~a, exiting\n" x)
(exit 1))
#f))
(define (check-error-threshold x)
(when (x . >= . 4)
(error 'parallel-do "error count reached ~a, exiting" x)))
(for/list ([i (in-range nprocs)])
(spawn! i))
(let loop ([idle workers]
[inflight null]
[count 0]
[error-count 0])
(check-error-threshold error-count)
(cond
[(error-threshold error-count)]
;; Reached stopat count STOP
[(and stopat (= count stopat)) (printf "DONE AT LIMIT\n")]
[(and stopat (= count stopat)) ; ???
(log-error "done at limit")]
;; Queue empty and all workers idle, we are all done
[(and (empty?) (null? inflight)) (parameterize-break #f (set! workers idle))] ; ALL DONE
[(and (empty?) (null? inflight))
;; done
(void)]
;; Send work to idle worker
[(and (jobs?) (pair? idle))
(match-define (cons wrkr idle-rest) idle)
(define-values (job cmd-list) (queue/get work-queue (wrkr/id wrkr)))
(let retry-loop ([wrkr wrkr]
[error-count error-count])
(error-threshold error-count)
(check-error-threshold error-count)
(with-handlers* ([exn:fail? (lambda (e)
(printf "Error writing to worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
(wrkr/kill wrkr)
(retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))])
(log-exn e (format "error writing to worker: ~v"
(wrkr/id wrkr)))
(unspawn! wrkr)
(retry-loop (spawn! (wrkr/id wrkr)) (add1 error-count)))])
(wrkr/send wrkr cmd-list))
(loop idle-rest (cons (list job wrkr) inflight) count error-count))]
[else
(define (kill/remove-dead-worker node-worker wrkr)
(DEBUG_COMM (printf "KILLING ~v\n" (wrkr/id wrkr)))
(wrkr/kill wrkr)
(loop (cons (spawn (wrkr/id wrkr)) idle)
(unspawn! wrkr)
(loop (cons (spawn! (wrkr/id wrkr)) idle)
(remove node-worker inflight)
count
(add1 error-count)))
(define (gen-node-handler node-worker)
(match node-worker
[(list node wrkr)
(handle-evt (wrkr/out wrkr) (λ (e)
(handle-evt
(wrkr/out wrkr)
(λ (e)
(let ([msg
(with-handlers* ([exn:fail? (lambda (e)
(printf "Error reading from worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
(log-exn e (format "error reading from worker: ~v"
(wrkr/id wrkr)))
(kill/remove-dead-worker node-worker wrkr))])
(let ([msg (if use-places e (wrkr/recv wrkr))])
(if use-places? e (wrkr/recv wrkr)))])
(if (pair? msg)
(if (queue/work-done work-queue node wrkr msg)
(loop (cons wrkr idle) (remove node-worker inflight) (add1 count) error-count)
(loop idle inflight count error-count))
(begin
(kill/remove-dead-worker node-worker wrkr)
(queue/work-done work-queue node wrkr (string-append msg (wrkr/read-all wrkr)))))))))]
(queue/work-done work-queue node wrkr (string-append msg (wrkr/read-all wrkr)))
(kill/remove-dead-worker node-worker wrkr))))))]
[else
(eprintf "parallel-do-event-loop match node-worker failed.\n")
(eprintf "trying to match:\n~a\n" node-worker)]))
(log-error (format "parallel-do-event-loop match node-worker failed trying to match: ~e"
node-worker))]))
(DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n"))
(begin0
(apply sync (map gen-node-handler inflight))
(set! breaks #f))])))
(apply sync (map gen-node-handler inflight))]))
;; Ask workers to stop:
(for ([p workers])
(wrkr/send p (list 'DIE)))
;; Finish normally:
(set! normal-finish? #t))
(lambda ()
(cond
[breaks
(for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/break p)))]
[else
;(printf "Asking all workers to die\n")
(for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/send p (list 'DIE))))
;(printf "Waiting for all workers to die")(flush-output)
(for ([p workers]
[i (in-naturals)])
(wrkr/wait p)
;(printf " ~a" (add1 i)) (flush-output))(printf "\n")
)]))))
(unless normal-finish?
;; There was an exception, so tell workers to stop:
(for ([p workers])
(with-handlers ([exn? log-exn])
(wrkr/break p))))
;; Wait for workers to complete:
(for ([p workers])
(with-handlers ([exn? log-exn])
(wrkr/wait p))))))
(define list-queue%
(class* object% (work-queue<%>)
@ -277,7 +313,7 @@
(match queue
[(cons h t)
(set! queue t)
(values h (create-job-thunk h))]))
(values h (create-job-thunk h workerid))]))
(define/public (has-jobs?) (not (null? queue)))
(define/public (get-results) (reverse results))
(define/public (jobs-cnt) (length queue))
@ -297,13 +333,33 @@
(define-syntax-parameter-error recv/req)
(define-syntax-parameter-error worker/die)
(define-for-syntax (gen-worker-body globals-list globals-body work-body channel)
(with-syntax ([globals-list globals-list]
[(globals-body ...) globals-body]
[([work work-body ...] ...) work-body]
[ch channel])
#'(begin
#'(do-worker
ch
(lambda (msg per-loop-body)
;; single starting message:
(match msg
[globals-list
globals-body ...
;; bind per-worker-set procedures:
(per-loop-body
(lambda (send/msgp recv/reqp die-k)
(syntax-parameterize ([send/msg (make-rename-transformer #'send/msgp)]
[recv/req (make-rename-transformer #'recv/reqp)]
[worker/die (make-rename-transformer #'die-k)])
;; message handler:
(lambda (msg send/successp send/errorp)
(syntax-parameterize ([send/success (make-rename-transformer #'send/successp)]
[send/error (make-rename-transformer #'send/errorp)])
(match msg
[work work-body ...]
...))))))])))))
(define (do-worker ch setup-proc)
(define orig-err (current-error-port))
(define orig-out (current-output-port))
(define orig-in (current-input-port))
@ -319,32 +375,30 @@
(define (pdo-send msg)
(with-handlers ([exn:fail?
(lambda (x)
(fprintf orig-err "WORKER SEND MESSAGE ERROR ~a\n" (exn-message x))
(log-error (format "WORKER SEND MESSAGE ERROR: ~a" (exn-message x)))
(exit 1))])
(DEBUG_COMM (fprintf orig-err "WSENDING ~v\n" msg))
(raw-send msg)))
(define (pdo-recv)
(with-handlers ([exn:fail?
(lambda (x)
(fprintf orig-err "WORKER RECEIVE MESSAGE ERROR ~a\n" (exn-message x))
(log-error (format "WORKER RECEIVE MESSAGE ERROR: ~a" (exn-message x)))
(exit 1))])
(define r (raw-recv))
(DEBUG_COMM (fprintf orig-err "WRECVEIVED ~v\n" r))
r))
(match (deserialize (fasl->s-exp (pdo-recv)))
[globals-list
globals-body ...
(setup-proc (deserialize (fasl->s-exp (pdo-recv)))
(lambda (set-proc)
(let/ec die-k
(let loop ([i 0])
(DEBUG_COMM (fprintf orig-err "WAITING ON CONTROLLER TO RESPOND ~v ~v\n" orig-in i))
(match (pdo-recv)
[(list 'DIE) void]
[work
(let ([out-str-port (open-output-string)]
[err-str-port (open-output-string)])
(define (recv/reqp) (pdo-recv))
(define (send/msgp msg)
(pdo-send msg))
(let ([msg-proc (set-proc send/msgp recv/reqp die-k)])
(let loop ([i 0])
(DEBUG_COMM (fprintf orig-err "WAITING ON CONTROLLER TO RESPOND ~v ~v\n" orig-in i))
(let ([out-str-port (open-output-string)]
[err-str-port (open-output-string)])
(define (send/resp type)
(pdo-send (list type (get-output-string out-str-port) (get-output-string err-str-port))))
(define (send/successp result)
@ -354,16 +408,14 @@
(with-handlers ([exn:fail? (lambda (x) (send/errorp (exn-message x)) (loop (add1 i)))])
(parameterize ([current-output-port out-str-port]
[current-error-port err-str-port])
(syntax-parameterize ([send/msg (make-rename-transformer #'send/msgp)]
[send/success (make-rename-transformer #'send/successp)]
[send/error (make-rename-transformer #'send/errorp)]
[recv/req (make-rename-transformer #'recv/reqp)]
[worker/die (make-rename-transformer #'die-k)])
work-body ...
(loop (add1 i))))))] ...)))]))))
(let ([msg (pdo-recv)])
(match msg
[(list 'DIE) (void)]
[_ (msg-proc msg send/successp send/errorp)
(loop (add1 i))])))))))))))
(define-syntax (lambda-worker stx)
(syntax-parse stx #:literals(match-message-loop)
(syntax-parse stx #:literals (match-message-loop)
[(_ (globals-list:id ...)
globals-body:expr ...
(match-message-loop
@ -375,9 +427,10 @@
(define-syntax (parallel-do stx)
(syntax-case stx (define-worker)
[(_ worker-count initalmsg work-queue (define-worker (name args ...) body ...))
(begin
(with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda-worker (args ...) body ...))])
(syntax-local-lift-provide #'(rename interal-def-name name)))
#'(let ([wq work-queue])
(define module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference)))))
(parallel-do-event-loop module-path 'name initalmsg wq worker-count)
(queue/results wq))]))
(queue/results wq)))]))

View File

@ -136,20 +136,28 @@
(and (ormap can-build*? docs)
(filter values
(if (not (worker-count . > . 1))
(map (get-doc-info only-dirs latex-dest auto-main? auto-user? with-record-error setup-printf) docs)
(map (get-doc-info only-dirs latex-dest auto-main? auto-user?
with-record-error setup-printf #f)
docs)
(parallel-do
worker-count
(lambda (workerid) (list workerid program-name (verbose) only-dirs latex-dest auto-main? auto-user?))
(lambda (workerid)
(list workerid program-name (verbose) only-dirs latex-dest auto-main? auto-user?))
(list-queue
docs
(lambda (x) (s-exp->fasl (serialize x)))
(lambda (work r outstr errstr) (printf "~a" outstr) (printf "~a" errstr) (deserialize (fasl->s-exp r)))
(lambda (work errmsg outstr errstr) (parallel-do-error-handler setup-printf work errmsg outstr errstr)))
(define-worker (get-doc-info-worker workerid program-name verbosev only-dirs latex-dest auto-main? auto-user?)
(define ((get-doc-info-local program-name only-dirs latex-dest auto-main? auto-user?) doc)
(lambda (x workerid) (s-exp->fasl (serialize x)))
(lambda (work r outstr errstr)
(printf "~a" outstr)
(printf "~a" errstr)
(deserialize (fasl->s-exp r)))
(lambda (work errmsg outstr errstr)
(parallel-do-error-handler setup-printf work errmsg outstr errstr)))
(define-worker (get-doc-info-worker workerid program-name verbosev only-dirs latex-dest
auto-main? auto-user?)
(define ((get-doc-info-local program-name only-dirs latex-dest auto-main? auto-user?)
doc)
(define (setup-printf subpart formatstr . rest)
(let ([task
(if subpart
(let ([task (if subpart
(format "~a: " subpart)
"")])
(printf "~a: ~a~a\n" program-name task (apply format formatstr rest))))
@ -159,13 +167,16 @@
(eprintf "~a\n" (exn-message exn))
(raise exn))])
(go)))
(s-exp->fasl (serialize ((get-doc-info only-dirs latex-dest auto-main? auto-user? with-record-error setup-printf)
(s-exp->fasl (serialize
((get-doc-info only-dirs latex-dest auto-main? auto-user?
with-record-error setup-printf workerid)
(deserialize (fasl->s-exp doc))))))
(verbose verbosev)
(match-message-loop
[doc (send/success ((get-doc-info-local program-name only-dirs latex-dest auto-main? auto-user?) doc))])))))))
[doc (send/success
((get-doc-info-local program-name only-dirs latex-dest auto-main? auto-user?)
doc))])))))))
(define (make-loop first? iter)
(let ([ht (make-hash)]
@ -299,8 +310,11 @@
(set-info-need-run?! i #f)
i)))
infos)])
(define (say-rendering i)
(setup-printf (if (info-rendered? i) "re-rendering" "rendering") "~a"
(define (say-rendering i workerid)
(setup-printf (string-append
(if workerid (format "~a " workerid) "")
(if (info-rendered? i) "re-rendering" "rendering") )
"~a"
(path->relative-string/setup (doc-src-file (info-doc i)))))
(define (update-info info response)
(match response
@ -318,21 +332,22 @@
(set-info-time! info (/ (current-inexact-milliseconds) 1000))]))
(if (not (worker-count . > . 1))
(map (lambda (i)
(say-rendering i)
(say-rendering i #f)
(update-info i (build-again! latex-dest i with-record-error))) need-rerun)
(parallel-do
worker-count
(lambda (workerid) (list workerid (verbose) latex-dest))
(list-queue
need-rerun
(lambda (i)
(say-rendering i)
(lambda (i workerid)
(say-rendering i workerid)
(s-exp->fasl (serialize (info-doc i))))
(lambda (i r outstr errstr)
(printf "~a" outstr)
(printf "~a" errstr)
(update-info i (deserialize (fasl->s-exp r))))
(lambda (i errmsg outstr errstr) (parallel-do-error-handler setup-printf (info-doc i) errmsg outstr errstr)))
(lambda (i errmsg outstr errstr)
(parallel-do-error-handler setup-printf (info-doc i) errmsg outstr errstr)))
(define-worker (build-again!-worker2 workerid verbosev latex-dest)
(define (with-record-error cc go fail-k)
(with-handlers ([exn:fail?
@ -342,7 +357,8 @@
(go)))
(verbose verbosev)
(match-message-loop
[info (send/success
[info
(send/success
(s-exp->fasl (serialize (build-again! latex-dest (deserialize (fasl->s-exp info)) with-record-error))))])))))
;; If we only build 1, then it reaches it own fixpoint
;; even if the info doesn't seem to converge immediately.
@ -481,7 +497,7 @@
[else t])))
(define ((get-doc-info only-dirs latex-dest auto-main? auto-user?
with-record-error setup-printf)
with-record-error setup-printf workerid)
doc)
(let* ([info-out-file (sxref-path latex-dest doc "out.sxref")]
[info-in-file (sxref-path latex-dest doc "in.sxref")]
@ -543,7 +559,9 @@
(memq 'depends-all (doc-flags doc)))))])
(when (or (not up-to-date?) (verbose))
(setup-printf
(cond [up-to-date? "using"] [can-run? "running"] [else "skipping"])
(string-append
(if workerid (format "~a " workerid) "")
(cond [up-to-date? "using"] [can-run? "running"] [else "skipping"]))
"~a"
(path->relative-string/setup (doc-src-file doc))))
@ -552,12 +570,13 @@
(render-time
"use"
(with-handlers ([exn:fail? (lambda (exn)
(fprintf (current-error-port) "get-doc-info ERROR ~a\n" (exn-message exn))
(log-error (format "get-doc-info error: ~a"
(exn-message exn)))
(delete-file info-out-file)
(delete-file info-in-file)
((get-doc-info only-dirs latex-dest auto-main?
auto-user? with-record-error
setup-printf)
setup-printf workerid)
doc))])
(let* ([v-in (load-sxref info-in-file)]
[v-out (load-sxref info-out-file)])