Parallel Places Build
This commit is contained in:
parent
ad0c22a77a
commit
bdbb9fe336
|
@ -62,7 +62,8 @@
|
|||
(when (ormap string-!empty? (list out err))
|
||||
(append-error cc "making" null out err "output"))
|
||||
(when last (printer (current-output-port) "made" "~a" (cc-name cc)))
|
||||
#t]))]
|
||||
#t]
|
||||
[else (eprintf "Failed trying to match:\n~v\n" result-type)]))]
|
||||
[else
|
||||
(match work
|
||||
[(list-rest (list cc file last) message)
|
||||
|
@ -205,4 +206,108 @@
|
|||
values ; identity function
|
||||
(build-parallel-build-worker-args)
|
||||
(make-object CollectsQueue% collects-tree setup-fprintf append-error)
|
||||
worker-count 999999999))
|
||||
worker-count 999999999)
|
||||
#;
|
||||
(places-parallel-build (make-object CollectsQueue% collects-tree setup-fprintf append-error) worker-count 999999999))
|
||||
|
||||
#|
|
||||
(require racket/place)
|
||||
(require (for-syntax racket/base))
|
||||
(define-syntax-rule (define-syntax-case (N a ...) b ...)
|
||||
(define-syntax (N stx)
|
||||
(syntax-case stx ()
|
||||
[(_ a ...) b ...])))
|
||||
|
||||
(define PlaceWorker% (class* object% (Worker<%>)
|
||||
(init-field [id 0]
|
||||
[pl null])
|
||||
|
||||
(define/public (send/msg msg) (place-channel-send pl msg))
|
||||
(define/public (recv/msg) (place-channel-recv pl))
|
||||
(define/public (get-id) id)
|
||||
(define/public (get-out) pl)
|
||||
(define/public (kill) #f)
|
||||
(define/public (wait) (place-wait pl))
|
||||
(super-new)))
|
||||
|
||||
(define-syntax-case (place/anon (ch) body ...)
|
||||
(with-syntax ([interal-def-name
|
||||
(syntax-local-lift-expression #'(lambda (ch) body ...))]
|
||||
[funcname #'OBSCURE_FUNC_NAME_%#%])
|
||||
(syntax-local-lift-provide #'(rename interal-def-name funcname))
|
||||
#'(let ([module-path (resolved-module-path-name
|
||||
(variable-reference->resolved-module-path
|
||||
(#%variable-reference)))])
|
||||
(place module-path (quote funcname)))))
|
||||
|
||||
(define (places-parallel-build jobqueue nprocs stopat)
|
||||
(define ps
|
||||
(for/list ([i (in-range nprocs)])
|
||||
(place/anon (ch)
|
||||
(let ([cmc ((dynamic-require 'compiler/cm 'make-caching-managed-compile-zo))])
|
||||
(let loop ()
|
||||
(match (place-channel-recv ch)
|
||||
[(list 'DIE) void]
|
||||
[(list name dir file)
|
||||
(let ([dir (bytes->path dir)]
|
||||
[file (bytes->path file)])
|
||||
(let ([out-str-port (open-output-string)]
|
||||
[err-str-port (open-output-string)])
|
||||
(define (send/msg msg)
|
||||
(place-channel-send ch msg))
|
||||
(define (send/resp type)
|
||||
(send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port))))
|
||||
(define (lock-client cmd fn)
|
||||
(match cmd
|
||||
['lock
|
||||
(send/msg (list (list 'LOCK (path->bytes fn)) "" ""))
|
||||
(match (place-channel-recv ch)
|
||||
[(list 'locked) #t]
|
||||
[(list 'compiled) #f])]
|
||||
['unlock (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))]))
|
||||
(with-handlers ([exn:fail? (lambda (x)
|
||||
(send/resp (list 'ERROR (exn-message x))))])
|
||||
(parameterize ([parallel-lock-client lock-client]
|
||||
[current-namespace (make-base-empty-namespace)]
|
||||
[current-directory dir]
|
||||
[current-load-relative-directory dir]
|
||||
[current-input-port (open-input-string "")]
|
||||
[current-output-port out-str-port]
|
||||
[current-error-port err-str-port])
|
||||
|
||||
(cmc (build-path dir file)))
|
||||
(send/resp 'DONE))))
|
||||
(loop)]))))))
|
||||
|
||||
|
||||
(define workers (for/list ([i (in-range nprocs)]
|
||||
[p ps])
|
||||
(make-object PlaceWorker% i p)))
|
||||
(define (jobs?) (queue/has jobqueue))
|
||||
(define (empty?) (not (queue/has jobqueue)))
|
||||
|
||||
(let loop ([idle workers]
|
||||
[inflight null]
|
||||
[count 0])
|
||||
(cond
|
||||
[(= count stopat) (printf "DONE AT LIMIT\n")]
|
||||
[(and (empty?) (null? inflight)) (set! workers idle)] ; ALL DONE
|
||||
[(and (jobs?) (pair? idle))
|
||||
(match-define (cons wrkr idle-rest) idle)
|
||||
(define-values (job cmd-list) (queue/get jobqueue (wrkr/id wrkr)))
|
||||
(wrkr/send wrkr cmd-list)
|
||||
(loop idle-rest (cons (list job wrkr) inflight) count)]
|
||||
|
||||
[else
|
||||
(define (gen-node-handler node-worker)
|
||||
(match-define (list node wrkr) node-worker)
|
||||
(handle-evt (wrkr/out wrkr) (λ (msg)
|
||||
(if (queue/work-done jobqueue node wrkr msg)
|
||||
(loop (cons wrkr idle) (remove node-worker inflight) (add1 count))
|
||||
(loop idle inflight count)))))
|
||||
|
||||
(apply sync (map gen-node-handler inflight))]))
|
||||
|
||||
(for ([p workers]) (wrkr/send p (list 'DIE)))
|
||||
(for ([p ps]) (place-wait p)))
|
||||
|#
|
||||
|
|
Loading…
Reference in New Issue
Block a user