convert parallel-build to classes
This commit is contained in:
parent
264e7f2c15
commit
e3d7ffbe82
|
@ -6,36 +6,67 @@
|
|||
racket/path
|
||||
setup/collects
|
||||
setup/parallel-do
|
||||
unstable/generics)
|
||||
racket/class)
|
||||
|
||||
(define Lock-Manager% (class object%
|
||||
(field (locks (make-hash)))
|
||||
(define/public (lock fn wrkr)
|
||||
(let ([v (hash-ref locks fn #f)])
|
||||
(hash-set! locks fn
|
||||
(if v
|
||||
(match v [(list w waitlst) (list w (append waitlst (list wrkr)))])
|
||||
(begin
|
||||
(send wrkr send/msg 'locked)
|
||||
(list wrkr null))))
|
||||
(not v)))
|
||||
(define/public (unlock fn)
|
||||
(for ([x (second (hash-ref locks fn))])
|
||||
(wrkr/send x 'compiled))
|
||||
(hash-remove! locks fn))
|
||||
(super-new)))
|
||||
|
||||
(provide parallel-compile
|
||||
parallel-build-worker)
|
||||
|
||||
(define-struct collects-queue (cclst hash collects-dir printer append-error) #:transparent
|
||||
#:mutable
|
||||
#:property prop:jobqueue
|
||||
(define-methods jobqueue
|
||||
(define (work-done jobqueue work workerid msg)
|
||||
(match (list work msg)
|
||||
[(list (list cc file last) (list result-type out err))
|
||||
(let ([cc-name (cc-name cc)])
|
||||
(match result-type
|
||||
[(list 'ERROR msg)
|
||||
((collects-queue-append-error jobqueue) cc "making" (exn msg (current-continuation-marks)) out err "error")]
|
||||
['DONE
|
||||
(when (or (not (zero? (string-length out))) (not (zero? (string-length err))))
|
||||
((collects-queue-append-error jobqueue) cc "making" null out err "output"))])
|
||||
(when last ((collects-queue-printer jobqueue) (current-output-port) "made" "~a" cc-name )))]
|
||||
[else
|
||||
(match work
|
||||
[(list-rest (list cc file last) message)
|
||||
((collects-queue-append-error jobqueue) cc "making" null "" "" "error")
|
||||
(eprintf "work-done match cc failed.\n")
|
||||
(eprintf "trying to match:\n~a\n" (list work msg))])]))
|
||||
|
||||
(define CollectsQueue% (class* object% (WorkQueue<%>)
|
||||
(init-field cclst collects-dir printer append-error)
|
||||
(field (lock-mgr (new Lock-Manager%)))
|
||||
(field (hash (make-hash)))
|
||||
(inspect #f)
|
||||
|
||||
(define/public (work-done work wrkr msg)
|
||||
(match (list work msg)
|
||||
[(list (list cc file last) (list result-type out err))
|
||||
(begin0
|
||||
(match result-type
|
||||
[(list 'ERROR msg)
|
||||
(append-error cc "making" (exn msg (current-continuation-marks)) out err "error")
|
||||
#t]
|
||||
;[(list 'LOCK fn) (lock fn wrkr) #f]
|
||||
;[(list 'UNLOCK fn) (unlock fn) #f]
|
||||
['DONE
|
||||
(define (string-!empty? s) (not (zero? (string-length s))))
|
||||
(when (ormap string-!empty? (list out err))
|
||||
(append-error cc "making" null out err "output"))
|
||||
#t])
|
||||
(when last (printer (current-output-port) "made" "~a" (cc-name cc))))]
|
||||
[else
|
||||
(match work
|
||||
[(list-rest (list cc file last) message)
|
||||
(append-error cc "making" null "" "" "error")
|
||||
(eprintf "work-done match cc failed.\n")
|
||||
(eprintf "trying to match:\n~a\n" (list work msg))
|
||||
#t]
|
||||
[else
|
||||
(eprintf "work-done match cc failed.\n")
|
||||
(eprintf "trying to match:\n~a\n" (list work msg))
|
||||
(eprintf "FATAL\n")
|
||||
(exit 1)])]))
|
||||
|
||||
;; assigns a collection to each worker to be compiled
|
||||
;; when it runs out of collections, steals work from other workers collections
|
||||
(define (get-job jobqueue workerid)
|
||||
(define/public (get-job workerid)
|
||||
(define (hash/first-pair hash)
|
||||
(match (hash-iterate-first hash)
|
||||
[#f #f]
|
||||
|
@ -46,12 +77,12 @@
|
|||
[#f #f]
|
||||
[x (hash-set! hash key x) x]))))
|
||||
(define (take-cc)
|
||||
(match (collects-queue-cclst jobqueue)
|
||||
(match cclst
|
||||
[(list) #f]
|
||||
[(cons h t)
|
||||
(set-collects-queue-cclst! jobqueue t)
|
||||
(set! cclst t)
|
||||
(list h)]))
|
||||
(let ([w-hash (collects-queue-hash jobqueue)])
|
||||
(let ([w-hash hash])
|
||||
(define (build-job cc file last)
|
||||
(define (->bytes x)
|
||||
(cond [(path? x) (path->bytes x)]
|
||||
|
@ -87,22 +118,26 @@
|
|||
(match (hash/first-pair w-hash)
|
||||
[(cons id cc) (find-job-in-cc cc id)])]
|
||||
[cc (find-job-in-cc cc workerid)]))))
|
||||
(define (has-jobs? jobqueue)
|
||||
|
||||
(define/public (has-jobs?)
|
||||
(define (hasjob? cct)
|
||||
(let loop ([cct cct])
|
||||
(ormap (lambda (x) (or ((length (second x)) . > . 0) (loop (third x)))) cct)))
|
||||
|
||||
(or (hasjob? (collects-queue-cclst jobqueue))
|
||||
(for/or ([cct (in-hash-values (collects-queue-hash jobqueue))])
|
||||
(or (hasjob? cclst)
|
||||
(for/or ([cct (in-hash-values hash)])
|
||||
(hasjob? cct))))
|
||||
(define (jobs-cnt jobqueue)
|
||||
|
||||
(define/public (jobs-cnt)
|
||||
(define (count-cct cct)
|
||||
(let loop ([cct cct])
|
||||
(apply + (map (lambda (x) (+ (length (second x)) (loop (third x)))) cct))))
|
||||
|
||||
(+ (count-cct (collects-queue-cclst jobqueue))
|
||||
(for/fold ([cnt 0]) ([cct (in-hash-values (collects-queue-hash jobqueue))])
|
||||
(+ cnt (count-cct cct)))))))
|
||||
(+ (count-cct cclst)
|
||||
(for/fold ([cnt 0]) ([cct (in-hash-values hash)])
|
||||
(+ cnt (count-cct cct)))))
|
||||
(define/public (get-results) (void))
|
||||
(super-new)))
|
||||
|
||||
(define (parallel-compile worker-count setup-fprintf append-error collects-tree)
|
||||
(let ([collects-dir (current-collects-path)])
|
||||
|
@ -114,10 +149,15 @@
|
|||
(path->string collects-dir)
|
||||
"-l"
|
||||
"setup/parallel-build-worker.rkt")
|
||||
(make-collects-queue collects-tree (make-hash) collects-dir setup-fprintf append-error)
|
||||
(make-object CollectsQueue% collects-tree collects-dir setup-fprintf append-error)
|
||||
worker-count 999999999)))
|
||||
|
||||
(define (parallel-build-worker)
|
||||
(define prev-uncaught-exception-handler (uncaught-exception-handler))
|
||||
(uncaught-exception-handler (lambda (x)
|
||||
(when (exn:break? x) (exit 1))
|
||||
(prev-uncaught-exception-handler x)))
|
||||
|
||||
(let ([cmc (make-caching-managed-compile-zo)]
|
||||
[worker-id (read)])
|
||||
(let loop ()
|
||||
|
|
|
@ -6,8 +6,8 @@
|
|||
racket/fasl
|
||||
racket/match
|
||||
racket/path
|
||||
racket/class
|
||||
racket/serialize
|
||||
unstable/generics
|
||||
racket/stxparam
|
||||
(for-syntax syntax/parse
|
||||
racket/base))
|
||||
|
@ -20,16 +20,71 @@
|
|||
match-message-loop
|
||||
send/success
|
||||
send/error
|
||||
jobqueue
|
||||
prop:jobqueue)
|
||||
WorkQueue<%>
|
||||
wrkr/send
|
||||
define/class/generics)
|
||||
|
||||
(define Worker% (class object%
|
||||
(field [id 0]
|
||||
[process-handle null]
|
||||
[out null]
|
||||
[in null]
|
||||
[err null])
|
||||
|
||||
(define/public (spawn _id worker-cmdline-list initialcode initialmsg)
|
||||
(let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)])
|
||||
(set! id _id)
|
||||
(set! process-handle _process-handle)
|
||||
(set! out _out)
|
||||
(set! in _in)
|
||||
(set! err _err)
|
||||
(when initialcode (send/msg initialcode))
|
||||
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))))
|
||||
|
||||
(define/public (send/msg msg) (write msg in) (flush-output in))
|
||||
(define/public (recv/msg) (read out))
|
||||
(define/public (get-id) id)
|
||||
(define/public (get-out) out)
|
||||
(define/public (kill)
|
||||
(eprintf "KILLING WORKER ~a\n" id)
|
||||
(close-output-port in)
|
||||
(close-input-port out)
|
||||
(subprocess-kill process-handle #t))
|
||||
(define/public (wait) (subprocess-wait process-handle))
|
||||
(super-new)))
|
||||
|
||||
(define WorkQueue<%> (interface ()
|
||||
get-job
|
||||
work-done
|
||||
has-jobs?
|
||||
jobs-cnt
|
||||
get-results))
|
||||
|
||||
(define-syntax-rule (mk-generic func clss method args ...)
|
||||
(begin
|
||||
(define g (generic clss method))
|
||||
(define (func obj args ...)
|
||||
(send-generic obj g args ...))))
|
||||
|
||||
(define-syntax-rule (define/class/generics class (func method args ...) ...)
|
||||
(begin
|
||||
(mk-generic func class method args ...) ...))
|
||||
|
||||
(define/class/generics Worker%
|
||||
(wrkr/send send/msg msg)
|
||||
(wrkr/kill kill)
|
||||
(wrkr/recv recv/msg)
|
||||
(wrkr/id get-id)
|
||||
(wrkr/out get-out)
|
||||
(wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg))
|
||||
|
||||
(define/class/generics WorkQueue<%>
|
||||
(queue/get get-job wrkrid)
|
||||
(queue/work-done work-done node wrkr msg)
|
||||
(queue/has has-jobs?)
|
||||
(queue/count jobs-cnt))
|
||||
|
||||
(define-generics (jobqueue prop:jobqueue jobqueue?)
|
||||
(work-done jobqueue work workerid msg)
|
||||
(get-job jobqueue workerid)
|
||||
(has-jobs? jobqueue)
|
||||
(jobs-cnt jobqueue))
|
||||
|
||||
(define-struct worker (id process-handle out in err))
|
||||
(define (current-executable-path)
|
||||
(parameterize ([current-directory (find-system-path 'orig-dir)])
|
||||
(find-executable-path (find-system-path 'exec-file) #f)))
|
||||
|
@ -40,27 +95,13 @@
|
|||
(path->complete-path p (or (path-only (current-executable-path))
|
||||
(find-system-path 'orig-dir))))))
|
||||
|
||||
|
||||
(define (parallel-do-event-loop initialcode initialmsg worker-cmdline-list jobqueue nprocs stopat)
|
||||
(define (send/msg x ch)
|
||||
(write x ch)
|
||||
(flush-output ch))
|
||||
(define (spawn id)
|
||||
(let-values ([(process-handle out in err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)])
|
||||
(when initialcode
|
||||
(send/msg initialcode in))
|
||||
(when initialmsg
|
||||
(send/msg (s-exp->fasl (serialize (initialmsg id))) in))
|
||||
(make-worker id process-handle out in err)))
|
||||
(define (kill-worker wrkr)
|
||||
(match wrkr
|
||||
[(worker id process-handle out in err)
|
||||
(eprintf "KILLING WORKER ~a ~a\n" id wrkr)
|
||||
(close-output-port in)
|
||||
(close-input-port out)
|
||||
(subprocess-kill process-handle #t)]))
|
||||
(define (jobs? x) (has-jobs? jobqueue))
|
||||
(define (empty? x) (not (has-jobs? jobqueue )))
|
||||
(define wrkr (new Worker%))
|
||||
(wrkr/spawn wrkr id worker-cmdline-list initialcode initialmsg)
|
||||
wrkr)
|
||||
(define (jobs?) (queue/has jobqueue))
|
||||
(define (empty?) (not (queue/has jobqueue)))
|
||||
(define workers #f)
|
||||
|
||||
(dynamic-wind
|
||||
|
@ -74,91 +115,86 @@
|
|||
(eprintf "Error count reached ~a, exiting\n" x)
|
||||
(exit 1))
|
||||
#f))
|
||||
(letrec ([loop (match-lambda*
|
||||
;; QUEUE IDLE INFLIGHT COUNT
|
||||
;; Reached stopat count STOP
|
||||
[(list idle inflight count (? error-threshold error-count)) (void)]
|
||||
[(list idle inflight (? (lambda (x) (= x stopat))) error-count) (printf "DONE AT LIMIT\n")]
|
||||
;; Send work to idle worker
|
||||
[(list (and (? jobs?) (cons wrkr idle)) inflight count error-count)
|
||||
(let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))])
|
||||
(let retry-loop ([wrkr wrkr]
|
||||
[error-count error-count])
|
||||
(error-threshold error-count)
|
||||
(match wrkr
|
||||
[(worker i s o in e)
|
||||
(let loop ([idle workers]
|
||||
[inflight null]
|
||||
[count 0]
|
||||
[error-count 0])
|
||||
(cond
|
||||
[(error-threshold error-count)]
|
||||
;; Reached stopat count STOP
|
||||
[(= count stopat) (printf "DONE AT LIMIT\n")]
|
||||
;; Queue empty and all workers idle, we are all done
|
||||
[(and (empty?) (null? inflight)) (set! workers idle)]
|
||||
;; Send work to idle worker
|
||||
[(and (jobs?) (pair? idle))
|
||||
(match idle [(cons wrkr idle-rest)
|
||||
(let-values ([(job cmd-list) (queue/get jobqueue (wrkr/id wrkr))])
|
||||
(let retry-loop ([wrkr wrkr]
|
||||
[error-count error-count])
|
||||
(error-threshold error-count)
|
||||
(with-handlers* ([exn:fail? (lambda (e)
|
||||
(printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e))
|
||||
(wrkr/kill wrkr)
|
||||
(retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))])
|
||||
(wrkr/send wrkr cmd-list))
|
||||
(loop idle-rest (cons (list job wrkr) inflight) count error-count)))])]
|
||||
|
||||
[else
|
||||
(define (kill/remove-dead-worker node-worker wrkr)
|
||||
(wrkr/kill wrkr)
|
||||
(loop (cons (spawn (wrkr/id wrkr)) idle)
|
||||
(remove node-worker inflight)
|
||||
count
|
||||
(add1 error-count)))
|
||||
(apply sync (for/list ([node-worker inflight])
|
||||
(match node-worker [(list node wrkr)
|
||||
(define out (wrkr/out wrkr))
|
||||
(handle-evt out (λ (e)
|
||||
(with-handlers* ([exn:fail? (lambda (e)
|
||||
(printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e))
|
||||
(kill-worker wrkr)
|
||||
(retry-loop (spawn i) (add1 error-count)))])
|
||||
(send/msg cmd-list in))])
|
||||
(loop idle (cons (list job wrkr) inflight) count error-count)))]
|
||||
;; Queue empty and all workers idle, we are all done
|
||||
[(list (and (? empty?) idle) (list) count error-count) (set! workers idle)]
|
||||
;; Wait for reply from worker
|
||||
[(list idle inflight count error-count)
|
||||
(define (remove-dead-worker id node-worker)
|
||||
(loop (cons (spawn id) idle)
|
||||
(remove node-worker inflight)
|
||||
count
|
||||
(add1 error-count)))
|
||||
|
||||
(apply sync (map (λ (node-worker) (match node-worker
|
||||
[(list node (and wrkr (worker id sh out in err)))
|
||||
(handle-evt out (λ (e)
|
||||
(let ([msg (with-handlers* ([exn:fail? (lambda (e)
|
||||
(printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e))
|
||||
(kill-worker wrkr)
|
||||
(remove-dead-worker id node-worker))])
|
||||
(let ([read-msg (read out)])
|
||||
(if (pair? read-msg)
|
||||
read-msg
|
||||
(begin
|
||||
(work-done jobqueue node id (string-append read-msg (port->string out)))
|
||||
(kill-worker wrkr)
|
||||
(remove-dead-worker id node-worker)))))])
|
||||
(work-done jobqueue node id msg)
|
||||
(loop (cons wrkr idle)
|
||||
(remove node-worker inflight)
|
||||
(add1 count)
|
||||
error-count))))]
|
||||
[else
|
||||
(eprintf "parallel-do-event-loop match node-worker failed.\n")
|
||||
(eprintf "trying to match:\n~a\n" node-worker)]))
|
||||
|
||||
inflight))]
|
||||
[x
|
||||
(eprintf "parallel-do-event-loop match-lambda* failed.\n")
|
||||
(eprintf "trying to match:\n~a\n" x)])])
|
||||
(loop workers null 0 0)))
|
||||
(printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e))
|
||||
(kill/remove-dead-worker node-worker wrkr))])
|
||||
(let ([msg (wrkr/recv wrkr)])
|
||||
(if (pair? msg)
|
||||
(if (queue/work-done jobqueue node wrkr msg)
|
||||
(loop (cons wrkr idle)
|
||||
(remove node-worker inflight)
|
||||
(add1 count)
|
||||
error-count)
|
||||
(loop idle inflight count error-count))
|
||||
(begin
|
||||
(queue/work-done jobqueue node wrkr (string-append msg (port->string out)))
|
||||
(kill/remove-dead-worker node-worker wrkr)))))))]
|
||||
[else
|
||||
(eprintf "parallel-do-event-loop match node-worker failed.\n")
|
||||
(eprintf "trying to match:\n~a\n" node-worker)])))])))
|
||||
(lambda ()
|
||||
(for ([p workers]) (with-handlers ([exn? void]) (send/msg (list 'DIE) (worker-in p))))
|
||||
(for ([p workers]) (subprocess-wait (worker-process-handle p))))))
|
||||
(for ([p workers]) (with-handlers ([exn? void]) (wrkr/send p (list 'DIE))))
|
||||
(for ([p workers]) (send p wait)))))
|
||||
|
||||
(define (parallel-do-default-error-handler work error-message outstr errstr)
|
||||
(printf "WORKER ERROR ~a\n" error-message)
|
||||
(printf "STDOUT\n~a=====\n" outstr)
|
||||
(printf "STDERR\n~a=====\n" errstr))
|
||||
|
||||
(define-struct list-queue (queue results create-job-thunk success-thunk failure-thunk)
|
||||
#:transparent #:mutable #:property prop:jobqueue
|
||||
(define-methods jobqueue
|
||||
(define (work-done jobqueue work workerid msg)
|
||||
(match msg
|
||||
[(list (list 'DONE result) stdout stderr)
|
||||
(let ([result ((list-queue-success-thunk jobqueue) work result stdout stderr)])
|
||||
(set-list-queue-results! jobqueue (cons result (list-queue-results jobqueue))))]
|
||||
[(list (list 'ERROR errmsg) stdout stderr)
|
||||
((list-queue-failure-thunk jobqueue) work errmsg stdout stderr)]))
|
||||
(define (get-job jobqueue workerid)
|
||||
(match (list-queue-queue jobqueue)
|
||||
[(cons h t)
|
||||
(set-list-queue-queue! jobqueue t)
|
||||
(values h ((list-queue-create-job-thunk jobqueue) h))]))
|
||||
(define (has-jobs? jobqueue)
|
||||
(not (null? (list-queue-queue jobqueue))))
|
||||
(define (jobs-cnt jobqueue)
|
||||
(length (list-queue-queue jobqueue)))))
|
||||
|
||||
(define ListQueue% (class* object% (WorkQueue<%>)
|
||||
(init-field queue create-job-thunk success-thunk failure-thunk)
|
||||
(field [results null])
|
||||
|
||||
(define/public (work-done work workerid msg)
|
||||
(match msg
|
||||
[(list (list 'DONE result) stdout stderr)
|
||||
(set! results (cons (success-thunk work result stdout stderr) results))]
|
||||
[(list (list 'ERROR errmsg) stdout stderr)
|
||||
(failure-thunk work errmsg stdout stderr)]))
|
||||
(define/public (get-job workerid)
|
||||
(match queue
|
||||
[(cons h t)
|
||||
(set! queue t)
|
||||
(values h (create-job-thunk h))]))
|
||||
(define/public (has-jobs?) (not (null? queue)))
|
||||
(define/public (get-results) results)
|
||||
(define/public (jobs-cnt) (length queue))
|
||||
(super-new)))
|
||||
|
||||
(define match-message-loop
|
||||
(lambda (stx)
|
||||
|
@ -235,9 +271,9 @@
|
|||
#`(begin
|
||||
;(printf "CMDLINE ~v\n" cmdline)
|
||||
;(printf "INITIALTHUNK ~v\n" initial-stdin-data)
|
||||
(let ([jobqueue (make-list-queue list-of-work null create-job-thunk job-success-thunk job-failure-thunk)])
|
||||
(let ([jobqueue (make-object ListQueue% list-of-work create-job-thunk job-success-thunk job-failure-thunk)])
|
||||
(parallel-do-event-loop initial-stdin-data initalmsg cmdline jobqueue worker-count 999999999)
|
||||
(reverse (list-queue-results jobqueue))))))
|
||||
(reverse (send jobqueue get-results))))))
|
||||
(define (gen-dynamic-require-current-module funcname)
|
||||
(with-syntax ([funcname funcname])
|
||||
#'(let ([module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference))))])
|
||||
|
@ -248,8 +284,4 @@
|
|||
(syntax-local-lift-provide #'(rename interal-def-name name)))
|
||||
(gen-parallel-do-event-loop-syntax
|
||||
#'(list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")
|
||||
(gen-dynamic-require-current-module #'name))]
|
||||
[funcname
|
||||
(gen-parallel-do-event-loop-syntax
|
||||
#'(list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")
|
||||
(gen-dynamic-require-current-module #'funcname))]))]))
|
||||
(gen-dynamic-require-current-module #'name))]))]))
|
||||
|
|
Loading…
Reference in New Issue
Block a user