191 lines
9.4 KiB
Racket
191 lines
9.4 KiB
Racket
#lang racket/base
|
|
|
|
(require racket/future
|
|
racket/list
|
|
racket/match
|
|
setup/collects
|
|
unstable/generics)
|
|
|
|
(provide parallel-compile)
|
|
|
|
(define-generics (jobqueue prop:jobqueue jobqueue?)
|
|
(work-done jobqueue queue work workerid msg)
|
|
(get-job jobqueue queue workerid)
|
|
(has-jobs? jobqueue queue)
|
|
(jobs-cnt jobqueue queue)
|
|
(job-desc jobqueue wokr)
|
|
(initial-queue jobqueue))
|
|
|
|
(define (process-comp jobqueue nprocs stopat)
|
|
(define process-worker-filename
|
|
(path->string (build-path (collection-path "setup") "parallel-build-worker.rkt")))
|
|
|
|
(define executable (parameterize ([current-directory (find-system-path 'orig-dir)])
|
|
(find-executable-path (find-system-path 'exec-file) #f)))
|
|
(define (send/msg x ch)
|
|
(write x ch)
|
|
(flush-output ch))
|
|
(define (spawn i)
|
|
(let-values ([(s o in e) (subprocess #f #f (current-error-port) executable process-worker-filename)])
|
|
(send/msg i in)
|
|
(list i s o in e)))
|
|
(define (kill-worker i nw o in)
|
|
(eprintf "KILLING WORKER ~a ~a ~n" i nw)
|
|
(close-input-port o)
|
|
(close-output-port in))
|
|
(define workers #f)
|
|
(define (jobs? queue)
|
|
(has-jobs? jobqueue queue))
|
|
(define (empty? queue)
|
|
(not (has-jobs? jobqueue queue)))
|
|
|
|
(parameterize-break #f
|
|
(set! workers (for/list ([i (in-range nprocs)]) (spawn i))))
|
|
|
|
(dynamic-wind
|
|
(lambda () (void))
|
|
(lambda ()
|
|
(letrec ([loop (match-lambda*
|
|
;; QUEUE IDLE INFLIGHT COUNT
|
|
;; Reached stopat count STOP
|
|
[(list queue idle inflight (? (lambda (x) (= x stopat)))) (printf "DONE AT LIMIT~n")]
|
|
;; Send work to idle worker
|
|
[(list (? jobs? queue) (cons worker idle) inflight count)
|
|
(let-values ([(queue-state job cmd-list) (get-job jobqueue queue (first worker))])
|
|
(let retry-loop ([worker worker])
|
|
(match worker
|
|
[(list i s o in e)
|
|
(with-handlers* ([exn:fail? (lambda (nw)
|
|
(kill-worker i nw i o)
|
|
(retry-loop (spawn i)))])
|
|
(send/msg cmd-list in))])
|
|
(loop queue-state idle (cons (list job worker) inflight) count)))]
|
|
;; Queue empty and all workers idle, we are all done
|
|
[(list (? empty?) idle (list) count) (void)]
|
|
;; Wait for reply from worker
|
|
[(list queue idle inflight count)
|
|
(apply sync (map (λ (node-worker) (match node-worker
|
|
[(list node worker)
|
|
(match worker
|
|
[(list i s o in e)
|
|
(handle-evt o (λ (e)
|
|
(let ([msg
|
|
(with-handlers* ([exn:fail? (lambda (nw)
|
|
(printf "READ ERROR - reading worker: ~a ~n" nw)
|
|
(kill-worker i nw i o)
|
|
(loop queue
|
|
(cons (spawn i) idle)
|
|
(remove node-worker inflight)
|
|
count))])
|
|
(read o))])
|
|
;(list count i (length idle) (length inflight) (length queue))
|
|
(loop (work-done jobqueue queue node i msg)
|
|
(cons worker idle)
|
|
(remove node-worker inflight)
|
|
(+ count 1)))))])]))
|
|
|
|
inflight))])])
|
|
(loop (initial-queue jobqueue) workers null 0)))
|
|
(lambda ()
|
|
(for ([p workers])
|
|
(with-handlers ([exn? void])
|
|
(send/msg (list 'DIE) (fourth p))))
|
|
(for ([p workers]) (subprocess-wait (second p))))))
|
|
|
|
|
|
(define-struct collects-queue (cclst hash collects-dir printer) #:transparent
|
|
#:mutable
|
|
#:property prop:jobqueue
|
|
(define-methods jobqueue
|
|
(define (initial-queue jobqueue) null)
|
|
(define (work-done jobqueue queue work workerid msg)
|
|
(match (list work msg)
|
|
[(list (list cc file) (list result-type out err))
|
|
(let ([cc-name (cc-name cc)])
|
|
(match result-type
|
|
[(list 'ERROR msg)
|
|
((collects-queue-printer jobqueue) (current-error-port) "ERROR" "~a ~a: ~a" cc-name file msg)]
|
|
['DONE (void)])
|
|
(when (or (not (zero? (string-length out))) (not (zero? (string-length err))))
|
|
((collects-queue-printer jobqueue) (current-error-port) "build-output" "~a ~a" cc-name file)
|
|
(eprintf "STDOUT:~n~a=====~n" out)
|
|
(eprintf "STDERR:~n~a=====~n" err)))]))
|
|
;; assigns a collection to each worker to be compiled
|
|
;; when it runs out of collections, steals work from other workers collections
|
|
(define (get-job jobqueue queue workerid)
|
|
(define (hash/first-pair hash)
|
|
(match (hash-iterate-first hash)
|
|
[#f #f]
|
|
[x (cons (hash-iterate-key hash x) (hash-iterate-value hash x))]))
|
|
(define (hash-ref!/true hash key thunk)
|
|
(hash-ref hash key (lambda ()
|
|
(match (thunk)
|
|
[#f #f]
|
|
[x (hash-set! hash key x) x]))))
|
|
(define (take-cc)
|
|
(match (collects-queue-cclst jobqueue)
|
|
[(list) #f]
|
|
[(cons h t)
|
|
(set-collects-queue-cclst! jobqueue t)
|
|
(list h)]))
|
|
(let ([w-hash (collects-queue-hash jobqueue)])
|
|
(define (build-job cc file)
|
|
(define (->bytes x)
|
|
(cond [(path? x) (path->bytes x)]
|
|
[(string? x) (string->bytes/locale x)]))
|
|
(let* ([cc-name (cc-name cc)]
|
|
[cc-path (cc-path cc)]
|
|
[full-path (path->string (build-path cc-path file))])
|
|
;(printf "JOB ~a ~a ~a ~a~n" workerid cc-name cc-path file)
|
|
(values null (list cc file) (list cc-name (->bytes cc-path) (->bytes file)))))
|
|
(let retry ()
|
|
(define (find-job-in-cc cc id)
|
|
(match cc
|
|
[(list)
|
|
(hash-remove! w-hash id) (retry)]
|
|
[(list (list cc (list) (list))) ;empty collect
|
|
(hash-remove! w-hash id) (retry)]
|
|
[(cons (list cc (list) (list)) tail) ;empty parent collect
|
|
(hash-set! w-hash id tail) (retry)]
|
|
[(cons (list cc (list) subs) tail) ;empty srcs list
|
|
(hash-set! w-hash id (append subs tail)) (retry)]
|
|
[(cons (list cc (list file) subs) tail)
|
|
(hash-set! w-hash id (append subs tail))
|
|
((collects-queue-printer jobqueue) (current-output-port) "made" "~a" (cc-name cc))
|
|
(build-job cc file)]
|
|
[(cons (list cc (cons file ft) subs) tail)
|
|
(hash-set! w-hash id (cons (list cc ft subs) tail))
|
|
(build-job cc file)]))
|
|
(match (hash-ref!/true w-hash workerid take-cc)
|
|
[#f
|
|
(match (hash/first-pair w-hash)
|
|
[(cons id cc) (find-job-in-cc cc id)])]
|
|
[cc (find-job-in-cc cc workerid)]))))
|
|
(define (has-jobs? jobqueue queue)
|
|
(define (hasjob? cct)
|
|
(let loop ([cct cct])
|
|
(ormap (lambda (x) (or ((length (second x)) . > . 0) (loop (third x)))) cct)))
|
|
|
|
(let ([jc (jobs-cnt jobqueue queue)]
|
|
[hj (or (hasjob? (collects-queue-cclst jobqueue))
|
|
(for/or ([cct (in-hash-values (collects-queue-hash jobqueue))])
|
|
(hasjob? cct)))])
|
|
;(printf "JOBCNT ~a ~a ~a ~a~n" hj jc (length (collects-queue-cclst jobqueue)) (hash-count (collects-queue-hash jobqueue)))
|
|
hj))
|
|
(define (job-desc jobqueue work)
|
|
work)
|
|
(define (jobs-cnt jobqueue queue)
|
|
(define (count-cct cct)
|
|
(let loop ([cct cct])
|
|
(apply + (map (lambda (x) (+ (length (second x)) (loop (third x)))) cct))))
|
|
|
|
(+ (count-cct (collects-queue-cclst jobqueue))
|
|
(for/fold ([cnt 0]) ([cct (in-hash-values (collects-queue-hash jobqueue))])
|
|
(+ cnt (count-cct cct)))))))
|
|
|
|
(define (parallel-compile worker-count setup-fprintf collects)
|
|
(let ([cd (find-system-path 'collects-dir)])
|
|
(setup-fprintf (current-output-port) #f "--- parallel build using ~a processor cores ---" worker-count)
|
|
(process-comp (make-collects-queue collects (make-hash) cd setup-fprintf) worker-count 999999999)))
|
|
|