[Places] clean up parallel-do, docs now build with places
This commit is contained in:
parent
586478a241
commit
3f1a6ee94a
|
@ -1,55 +0,0 @@
|
||||||
#lang racket/base
|
|
||||||
(require compiler/cm
|
|
||||||
racket/match
|
|
||||||
racket/fasl
|
|
||||||
racket/serialize)
|
|
||||||
|
|
||||||
(define prev-uncaught-exception-handler (uncaught-exception-handler))
|
|
||||||
(uncaught-exception-handler (lambda (x)
|
|
||||||
(when (exn:break? x) (exit 1))
|
|
||||||
(prev-uncaught-exception-handler x)))
|
|
||||||
|
|
||||||
(let ([cmc (make-caching-managed-compile-zo)]
|
|
||||||
[worker-id (deserialize (fasl->s-exp (read)))])
|
|
||||||
(let loop ()
|
|
||||||
(match (read)
|
|
||||||
[(list 'DIE) void]
|
|
||||||
[(list name dir file)
|
|
||||||
(let ([dir (bytes->path dir)]
|
|
||||||
[file (bytes->path file)])
|
|
||||||
(let ([out-str-port (open-output-string)]
|
|
||||||
[err-str-port (open-output-string)])
|
|
||||||
(let ([cip (current-input-port)]
|
|
||||||
[cop (current-output-port)]
|
|
||||||
[cep (current-error-port)])
|
|
||||||
(define (send/msg msg)
|
|
||||||
(write msg cop)
|
|
||||||
(flush-output cop))
|
|
||||||
(define (send/resp type)
|
|
||||||
(send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port))))
|
|
||||||
(define (pp x)
|
|
||||||
(fprintf cep "COMPILING ~a ~a ~a ~a\n" worker-id name file x))
|
|
||||||
(define (lock-client cmd fn)
|
|
||||||
(match cmd
|
|
||||||
['lock
|
|
||||||
(send/msg (list (list 'LOCK (path->bytes fn)) "" ""))
|
|
||||||
(match (read cip)
|
|
||||||
[(list 'locked) #t]
|
|
||||||
[(list 'compiled) #f])]
|
|
||||||
['unlock (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))]))
|
|
||||||
(with-handlers ([exn:fail? (lambda (x)
|
|
||||||
(send/resp (list 'ERROR (exn-message x))))])
|
|
||||||
(parameterize ([parallel-lock-client lock-client]
|
|
||||||
[current-namespace (make-base-empty-namespace)]
|
|
||||||
[current-directory dir]
|
|
||||||
[current-load-relative-directory dir]
|
|
||||||
[current-input-port (open-input-string "")]
|
|
||||||
[current-output-port out-str-port]
|
|
||||||
[current-error-port err-str-port]
|
|
||||||
;[manager-compile-notify-handler pp]
|
|
||||||
)
|
|
||||||
|
|
||||||
(cmc (build-path dir file)))
|
|
||||||
(send/resp 'DONE))))
|
|
||||||
(flush-output)
|
|
||||||
(loop))])))
|
|
|
@ -4,6 +4,8 @@
|
||||||
racket/list
|
racket/list
|
||||||
racket/match
|
racket/match
|
||||||
racket/path
|
racket/path
|
||||||
|
racket/fasl
|
||||||
|
racket/serialize
|
||||||
setup/collects
|
setup/collects
|
||||||
setup/parallel-do
|
setup/parallel-do
|
||||||
racket/class
|
racket/class
|
||||||
|
@ -12,9 +14,14 @@
|
||||||
racket/place
|
racket/place
|
||||||
(for-syntax racket/base))
|
(for-syntax racket/base))
|
||||||
|
|
||||||
|
|
||||||
(provide parallel-compile
|
(provide parallel-compile
|
||||||
parallel-compile-files)
|
parallel-compile-files)
|
||||||
|
|
||||||
|
(define-syntax-rule (DEBUG_COMM a ...)
|
||||||
|
(void)
|
||||||
|
; (begin a ...)
|
||||||
|
)
|
||||||
|
|
||||||
(define Lock-Manager% (class object%
|
(define Lock-Manager% (class object%
|
||||||
(field (locks (make-hash)))
|
(field (locks (make-hash)))
|
||||||
|
@ -181,125 +188,71 @@
|
||||||
(define/public (get-results) (void))
|
(define/public (get-results) (void))
|
||||||
(super-new)))
|
(super-new)))
|
||||||
|
|
||||||
|
(define (parallel-build work-queue worker-count)
|
||||||
|
(parallel-do
|
||||||
|
worker-count
|
||||||
|
(lambda (workerid) (list workerid))
|
||||||
|
work-queue
|
||||||
|
(define-worker (parallel-compile-worker worker-id)
|
||||||
|
(DEBUG_COMM (eprintf "WORKER ~a\n" worker-id))
|
||||||
|
(define prev-uncaught-exception-handler (uncaught-exception-handler))
|
||||||
|
(uncaught-exception-handler (lambda (x)
|
||||||
|
(when (exn:break? x) (exit 1))
|
||||||
|
(prev-uncaught-exception-handler x)))
|
||||||
|
|
||||||
(define (build-parallel-build-worker-args)
|
(define cmc (make-caching-managed-compile-zo))
|
||||||
(list (find-exe #f)
|
(match-message-loop
|
||||||
"-X"
|
[(list name _dir _file)
|
||||||
(path->string (current-collects-path))
|
(DEBUG_COMM (eprintf "COMPILING ~a ~a ~a ~a\n" worker-id name _file _dir))
|
||||||
"-l"
|
(define dir (bytes->path _dir))
|
||||||
"setup/parallel-build-worker.rkt"))
|
(define file (bytes->path _file))
|
||||||
|
(define out-str-port (open-output-string))
|
||||||
|
(define err-str-port (open-output-string))
|
||||||
|
(define cip (current-input-port))
|
||||||
|
(define cop (current-output-port))
|
||||||
|
(define cep (current-error-port))
|
||||||
|
(define (send/recv msg) (send/msg msg) (recv/req))
|
||||||
|
(define (send/resp type)
|
||||||
|
(send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port))))
|
||||||
|
(define (pp x) (fprintf cep "COMPILING ~a ~a ~a ~a\n" worker-id name file x))
|
||||||
|
(define (lock-client cmd fn)
|
||||||
|
(match cmd
|
||||||
|
['lock
|
||||||
|
(DEBUG_COMM (eprintf "REQUESTING LOCK ~a ~a ~a ~a\n" worker-id name _file _dir))
|
||||||
|
(match (send/recv (list (list 'LOCK (path->bytes fn)) "" ""))
|
||||||
|
[(list 'locked) #t]
|
||||||
|
[(list 'compiled) #f]
|
||||||
|
[(list 'DIE) (worker/die 1)]
|
||||||
|
[x (send/error (format "DIDNT MATCH B ~v\n" x))]
|
||||||
|
[else (send/error (format "DIDNT MATCH B\n"))])]
|
||||||
|
['unlock
|
||||||
|
(DEBUG_COMM (eprintf "UNLOCKING ~a ~a ~a ~a\n" worker-id name _file _dir))
|
||||||
|
(send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))]
|
||||||
|
[x (send/error (format "DIDNT MATCH C ~v\n" x))]
|
||||||
|
[else (send/error (format "DIDNT MATCH C\n"))]))
|
||||||
|
(with-handlers ([exn:fail? (lambda (x)
|
||||||
|
(send/resp (list 'ERROR (exn-message x))))])
|
||||||
|
(parameterize ([parallel-lock-client lock-client]
|
||||||
|
[current-namespace (make-base-empty-namespace)]
|
||||||
|
[current-directory dir]
|
||||||
|
[current-load-relative-directory dir]
|
||||||
|
[current-input-port (open-input-string "")]
|
||||||
|
[current-output-port out-str-port]
|
||||||
|
[current-error-port err-str-port]
|
||||||
|
;[manager-compile-notify-handler pp]
|
||||||
|
)
|
||||||
|
|
||||||
|
(cmc (build-path dir file)))
|
||||||
|
(send/resp 'DONE))]
|
||||||
|
[x (send/error (format "DIDNT MATCH A ~v\n" x))]
|
||||||
|
[else (send/error (format "DIDNT MATCH A\n"))]))))
|
||||||
|
|
||||||
(define (parallel-compile-files list-of-files
|
(define (parallel-compile-files list-of-files
|
||||||
#:worker-count [worker-count (processor-count)]
|
#:worker-count [worker-count (processor-count)]
|
||||||
#:handler [handler void])
|
#:handler [handler void])
|
||||||
|
(parallel-build (make-object FileListQueue% list-of-files handler) worker-count))
|
||||||
(parallel-do-event-loop #f
|
|
||||||
values ; identity function
|
|
||||||
(build-parallel-build-worker-args)
|
|
||||||
(make-object FileListQueue% list-of-files handler)
|
|
||||||
worker-count 999999999))
|
|
||||||
|
|
||||||
(define (parallel-compile worker-count setup-fprintf append-error collects-tree)
|
(define (parallel-compile worker-count setup-fprintf append-error collects-tree)
|
||||||
(setup-fprintf (current-output-port) #f "--- parallel build using ~a processor cores ---" worker-count)
|
(setup-fprintf (current-output-port) #f "--- parallel build using ~a processor cores ---" worker-count)
|
||||||
(define collects-queue (make-object CollectsQueue% collects-tree setup-fprintf append-error))
|
(define collects-queue (make-object CollectsQueue% collects-tree setup-fprintf append-error))
|
||||||
(if (place-enabled?)
|
(parallel-build collects-queue worker-count))
|
||||||
(places-parallel-build collects-queue worker-count 999999999)
|
|
||||||
(parallel-do-event-loop #f values (build-parallel-build-worker-args) collects-queue worker-count 999999999)))
|
|
||||||
|
|
||||||
(define-syntax-rule (define-syntax-case (N a ...) b ...)
|
|
||||||
(define-syntax (N stx)
|
|
||||||
(syntax-case stx ()
|
|
||||||
[(_ a ...) b ...])))
|
|
||||||
|
|
||||||
(define PlaceWorker% (class* object% (Worker<%>)
|
|
||||||
(init-field [id 0]
|
|
||||||
[pl null])
|
|
||||||
|
|
||||||
(define/public (send/msg msg) (place-channel-send pl msg))
|
|
||||||
(define/public (recv/msg) (place-channel-receive pl))
|
|
||||||
(define/public (get-id) id)
|
|
||||||
(define/public (get-out) pl)
|
|
||||||
(define/public (kill) #f)
|
|
||||||
(define/public (wait) (place-wait pl))
|
|
||||||
(super-new)))
|
|
||||||
|
|
||||||
(define-syntax-case (place/anon (ch) body ...)
|
|
||||||
(with-syntax ([interal-def-name
|
|
||||||
(syntax-local-lift-expression #'(lambda (ch) body ...))]
|
|
||||||
[funcname #'OBSCURE_FUNC_NAME_%#%])
|
|
||||||
(syntax-local-lift-provide #'(rename interal-def-name funcname))
|
|
||||||
#'(let ([module-path (resolved-module-path-name
|
|
||||||
(variable-reference->resolved-module-path
|
|
||||||
(#%variable-reference)))])
|
|
||||||
(place module-path (quote funcname)))))
|
|
||||||
|
|
||||||
(define (places-parallel-build jobqueue nprocs stopat)
|
|
||||||
(define ps
|
|
||||||
(for/list ([i (in-range nprocs)])
|
|
||||||
(place/anon (ch)
|
|
||||||
(let ([cmc ((dynamic-require 'compiler/cm 'make-caching-managed-compile-zo))])
|
|
||||||
(let loop ()
|
|
||||||
(match (place-channel-receive ch)
|
|
||||||
[(list 'DIE) void]
|
|
||||||
[(list name dir file)
|
|
||||||
(let ([dir (bytes->path dir)]
|
|
||||||
[file (bytes->path file)])
|
|
||||||
(let ([out-str-port (open-output-string)]
|
|
||||||
[err-str-port (open-output-string)])
|
|
||||||
(define (send/msg msg)
|
|
||||||
(place-channel-send ch msg))
|
|
||||||
(define (send/resp type)
|
|
||||||
(send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port))))
|
|
||||||
(define (lock-client cmd fn)
|
|
||||||
(match cmd
|
|
||||||
['lock
|
|
||||||
(send/msg (list (list 'LOCK (path->bytes fn)) "" ""))
|
|
||||||
(match (place-channel-receive ch)
|
|
||||||
[(list 'locked) #t]
|
|
||||||
[(list 'compiled) #f])]
|
|
||||||
['unlock (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))]))
|
|
||||||
(with-handlers ([exn:fail? (lambda (x)
|
|
||||||
(send/resp (list 'ERROR (exn-message x))))])
|
|
||||||
(parameterize ([parallel-lock-client lock-client]
|
|
||||||
[current-namespace (make-base-empty-namespace)]
|
|
||||||
[current-directory dir]
|
|
||||||
[current-load-relative-directory dir]
|
|
||||||
[current-input-port (open-input-string "")]
|
|
||||||
[current-output-port out-str-port]
|
|
||||||
[current-error-port err-str-port])
|
|
||||||
|
|
||||||
(cmc (build-path dir file)))
|
|
||||||
(send/resp 'DONE))))
|
|
||||||
(loop)]))))))
|
|
||||||
|
|
||||||
|
|
||||||
(define workers (for/list ([i (in-range nprocs)]
|
|
||||||
[p ps])
|
|
||||||
(make-object PlaceWorker% i p)))
|
|
||||||
(define (jobs?) (queue/has jobqueue))
|
|
||||||
(define (empty?) (not (queue/has jobqueue)))
|
|
||||||
|
|
||||||
(let loop ([idle workers]
|
|
||||||
[inflight null]
|
|
||||||
[count 0])
|
|
||||||
(cond
|
|
||||||
[(= count stopat) (printf "DONE AT LIMIT\n")]
|
|
||||||
[(and (empty?) (null? inflight)) (set! workers idle)] ; ALL DONE
|
|
||||||
[(and (jobs?) (pair? idle))
|
|
||||||
(match-define (cons wrkr idle-rest) idle)
|
|
||||||
(define-values (job cmd-list) (queue/get jobqueue (wrkr/id wrkr)))
|
|
||||||
(wrkr/send wrkr cmd-list)
|
|
||||||
(loop idle-rest (cons (list job wrkr) inflight) count)]
|
|
||||||
|
|
||||||
[else
|
|
||||||
(define (gen-node-handler node-worker)
|
|
||||||
(match-define (list node wrkr) node-worker)
|
|
||||||
(handle-evt (wrkr/out wrkr) (λ (msg)
|
|
||||||
(if (queue/work-done jobqueue node wrkr msg)
|
|
||||||
(loop (cons wrkr idle) (remove node-worker inflight) (add1 count))
|
|
||||||
(loop idle inflight count)))))
|
|
||||||
|
|
||||||
(apply sync (map gen-node-handler inflight))]))
|
|
||||||
|
|
||||||
(for ([p workers]) (wrkr/send p (list 'DIE)))
|
|
||||||
(for ([p ps]) (place-wait p)))
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
(require racket/file
|
(require racket/file
|
||||||
racket/future
|
racket/future
|
||||||
|
racket/place
|
||||||
racket/port
|
racket/port
|
||||||
racket/fasl
|
racket/fasl
|
||||||
racket/match
|
racket/match
|
||||||
|
@ -13,16 +14,17 @@
|
||||||
racket/base))
|
racket/base))
|
||||||
|
|
||||||
(provide parallel-do
|
(provide parallel-do
|
||||||
parallel-do-event-loop
|
|
||||||
current-executable-path
|
current-executable-path
|
||||||
current-collects-path
|
current-collects-path
|
||||||
match-message-loop
|
match-message-loop
|
||||||
send/success
|
send/success
|
||||||
send/error
|
send/error
|
||||||
|
send/msg
|
||||||
|
recv/req
|
||||||
|
worker/die
|
||||||
WorkQueue<%>
|
WorkQueue<%>
|
||||||
Worker<%>
|
define/class/generics
|
||||||
wrkr/send
|
ListQueue)
|
||||||
define/class/generics)
|
|
||||||
|
|
||||||
(define-syntax-rule (mk-generic func clss method args ...)
|
(define-syntax-rule (mk-generic func clss method args ...)
|
||||||
(begin
|
(begin
|
||||||
|
@ -40,11 +42,18 @@
|
||||||
(mk-generic func class method args ...)
|
(mk-generic func class method args ...)
|
||||||
(provide func)) ...))
|
(provide func)) ...))
|
||||||
|
|
||||||
|
(define-syntax-rule (DEBUG_COMM a ...)
|
||||||
|
(void)
|
||||||
|
; (begin a ...)
|
||||||
|
)
|
||||||
|
|
||||||
(define Worker<%> (interface ()
|
(define Worker<%> (interface ()
|
||||||
|
spawn
|
||||||
send/msg
|
send/msg
|
||||||
kill
|
kill
|
||||||
|
wait
|
||||||
recv/msg
|
recv/msg
|
||||||
|
read-all
|
||||||
get-id
|
get-id
|
||||||
get-out))
|
get-out))
|
||||||
|
|
||||||
|
@ -53,37 +62,74 @@
|
||||||
[process-handle null]
|
[process-handle null]
|
||||||
[out null]
|
[out null]
|
||||||
[in null]
|
[in null]
|
||||||
[err null])
|
[err null]
|
||||||
|
[module-path null]
|
||||||
|
[funcname null])
|
||||||
|
|
||||||
(define/public (spawn _id worker-cmdline-list [initialcode #f] [initialmsg #f])
|
(define/public (spawn _id _module-path _funcname [initialmsg #f])
|
||||||
|
(set! module-path _module-path)
|
||||||
|
(set! funcname _funcname)
|
||||||
|
(define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))"))
|
||||||
|
(define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f))
|
||||||
(let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)])
|
(let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)])
|
||||||
(set! id _id)
|
(set! id _id)
|
||||||
(set! process-handle _process-handle)
|
(set! process-handle _process-handle)
|
||||||
(set! out _out)
|
(set! out _out)
|
||||||
(set! in _in)
|
(set! in _in)
|
||||||
(set! err _err)
|
(set! err _err)
|
||||||
(when initialcode (send/msg initialcode))
|
(send/msg dynamic-require-cmd)
|
||||||
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))))
|
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))))
|
||||||
|
(define/public (send/msg msg)
|
||||||
(define/public (send/msg msg) (write msg in) (flush-output in))
|
(with-handlers ([exn:fail?
|
||||||
(define/public (recv/msg) (read out))
|
(lambda (x)
|
||||||
|
(eprintf "CONTROLLER SEND MESSAGE ERROR TO WORKER ~a ~a\n" id (exn-message x))
|
||||||
|
(exit 1))])
|
||||||
|
(DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg))
|
||||||
|
(write msg in) (flush-output in)))
|
||||||
|
(define/public (recv/msg)
|
||||||
|
(with-handlers ([exn:fail?
|
||||||
|
(lambda (x)
|
||||||
|
(eprintf "CONTROLLER RECEIVE MESSAGE ERROR FROM WORKER ~a ~a\n" id (exn-message x))
|
||||||
|
(exit 1))])
|
||||||
|
(define r (read out))
|
||||||
|
(DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r))
|
||||||
|
r))
|
||||||
|
(define/public (read-all) (port->string out))
|
||||||
(define/public (get-id) id)
|
(define/public (get-id) id)
|
||||||
(define/public (get-out) out)
|
(define/public (get-out) out)
|
||||||
(define/public (kill)
|
(define/public (kill)
|
||||||
(eprintf "KILLING WORKER ~a\n" id)
|
(DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id))
|
||||||
(close-output-port in)
|
(close-output-port in)
|
||||||
(close-input-port out)
|
(close-input-port out)
|
||||||
(subprocess-kill process-handle #t))
|
(subprocess-kill process-handle #t))
|
||||||
(define/public (kill/respawn worker-cmdline-list [initialcode #f] [initialmsg #f])
|
(define/public (kill/respawn worker-cmdline-list [initialmsg #f])
|
||||||
(kill)
|
(kill)
|
||||||
(spawn id worker-cmdline-list [initialcode #f] [initialmsg #f]))
|
(spawn id module-path funcname [initialmsg #f]))
|
||||||
(define/public (wait) (subprocess-wait process-handle))
|
(define/public (wait) (subprocess-wait process-handle))
|
||||||
(super-new)))
|
(super-new)))
|
||||||
|
|
||||||
(define (wrkr/spawn id worker-cmdline-list [initialcode #f] [initialmsg #f])
|
(define PlaceWorker% (class* object% (Worker<%>)
|
||||||
(define wrkr (new Worker%))
|
(init-field [id 0]
|
||||||
(send wrkr spawn id worker-cmdline-list initialcode initialmsg)
|
[pl null])
|
||||||
wrkr)
|
|
||||||
|
(define/public (spawn _id module-path funcname [initialmsg #f])
|
||||||
|
(set! id _id)
|
||||||
|
(set! pl (place (string->path module-path) funcname))
|
||||||
|
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))
|
||||||
|
(define/public (send/msg msg)
|
||||||
|
(DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg))
|
||||||
|
(place-channel-send pl msg))
|
||||||
|
(define/public (recv/msg)
|
||||||
|
(define r (place-channel-receive pl))
|
||||||
|
(DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r))
|
||||||
|
r)
|
||||||
|
(define/public (read-all) "")
|
||||||
|
(define/public (get-id) id)
|
||||||
|
(define/public (get-out) pl)
|
||||||
|
(define/public (kill) #f)
|
||||||
|
(define/public (wait) (place-wait pl))
|
||||||
|
(super-new)))
|
||||||
|
|
||||||
|
|
||||||
(define WorkQueue<%> (interface ()
|
(define WorkQueue<%> (interface ()
|
||||||
get-job
|
get-job
|
||||||
|
@ -93,18 +139,22 @@
|
||||||
get-results))
|
get-results))
|
||||||
|
|
||||||
(define/class/generics/provide Worker<%>
|
(define/class/generics/provide Worker<%>
|
||||||
|
(wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg)
|
||||||
(wrkr/send send/msg msg)
|
(wrkr/send send/msg msg)
|
||||||
(wrkr/kill kill)
|
(wrkr/kill kill)
|
||||||
(wrkr/recv recv/msg)
|
(wrkr/recv recv/msg)
|
||||||
|
(wrkr/read-all read-all)
|
||||||
(wrkr/id get-id)
|
(wrkr/id get-id)
|
||||||
(wrkr/out get-out))
|
(wrkr/out get-out)
|
||||||
|
(wrkr/wait wait))
|
||||||
|
|
||||||
|
|
||||||
(define/class/generics/provide WorkQueue<%>
|
(define/class/generics/provide WorkQueue<%>
|
||||||
(queue/get get-job wrkrid)
|
(queue/get get-job wrkrid)
|
||||||
(queue/work-done work-done node wrkr msg)
|
(queue/work-done work-done node wrkr msg)
|
||||||
(queue/has has-jobs?)
|
(queue/has has-jobs?)
|
||||||
(queue/count jobs-cnt))
|
(queue/count jobs-cnt)
|
||||||
|
(queue/results get-results))
|
||||||
|
|
||||||
(define (current-executable-path)
|
(define (current-executable-path)
|
||||||
(parameterize ([current-directory (find-system-path 'orig-dir)])
|
(parameterize ([current-directory (find-system-path 'orig-dir)])
|
||||||
|
@ -117,12 +167,15 @@
|
||||||
(path->complete-path p (or (path-only (current-executable-path))
|
(path->complete-path p (or (path-only (current-executable-path))
|
||||||
(find-system-path 'orig-dir))))))
|
(find-system-path 'orig-dir))))))
|
||||||
|
|
||||||
(define (parallel-do-event-loop initialcode initialmsg worker-cmdline-list jobqueue nprocs stopat)
|
(define (parallel-do-event-loop module-path funcname initialmsg jobqueue nprocs [stopat #f])
|
||||||
(define (spawn id) (wrkr/spawn id worker-cmdline-list initialcode initialmsg))
|
(define use-places (place-enabled?))
|
||||||
|
(define (spawn id)
|
||||||
|
(define wrkr (if use-places (new PlaceWorker%) (new Worker%)))
|
||||||
|
(wrkr/spawn wrkr id module-path funcname initialmsg)
|
||||||
|
wrkr)
|
||||||
(define (jobs?) (queue/has jobqueue))
|
(define (jobs?) (queue/has jobqueue))
|
||||||
(define (empty?) (not (queue/has jobqueue)))
|
(define (empty?) (not (queue/has jobqueue)))
|
||||||
(define workers #f)
|
(define workers #f)
|
||||||
|
|
||||||
(dynamic-wind
|
(dynamic-wind
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(parameterize-break #f
|
(parameterize-break #f
|
||||||
|
@ -141,54 +194,60 @@
|
||||||
(cond
|
(cond
|
||||||
[(error-threshold error-count)]
|
[(error-threshold error-count)]
|
||||||
;; Reached stopat count STOP
|
;; Reached stopat count STOP
|
||||||
[(= count stopat) (printf "DONE AT LIMIT\n")]
|
[(and stopat (= count stopat)) (printf "DONE AT LIMIT\n")]
|
||||||
;; Queue empty and all workers idle, we are all done
|
;; Queue empty and all workers idle, we are all done
|
||||||
[(and (empty?) (null? inflight)) (set! workers idle)]
|
[(and (empty?) (null? inflight)) (parameterize-break #f (set! workers idle))] ; ALL DONE
|
||||||
;; Send work to idle worker
|
;; Send work to idle worker
|
||||||
[(and (jobs?) (pair? idle))
|
[(and (jobs?) (pair? idle))
|
||||||
(match idle [(cons wrkr idle-rest)
|
(match-define (cons wrkr idle-rest) idle)
|
||||||
(let-values ([(job cmd-list) (queue/get jobqueue (wrkr/id wrkr))])
|
(define-values (job cmd-list) (queue/get jobqueue (wrkr/id wrkr)))
|
||||||
(let retry-loop ([wrkr wrkr]
|
(let retry-loop ([wrkr wrkr]
|
||||||
[error-count error-count])
|
[error-count error-count])
|
||||||
(error-threshold error-count)
|
(error-threshold error-count)
|
||||||
(with-handlers* ([exn:fail? (lambda (e)
|
(with-handlers* ([exn:fail? (lambda (e)
|
||||||
(printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e))
|
(printf "MASTER WRITE ERROR - writing to worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
|
||||||
(wrkr/kill wrkr)
|
(wrkr/kill wrkr)
|
||||||
(retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))])
|
(retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))])
|
||||||
(wrkr/send wrkr cmd-list))
|
(wrkr/send wrkr cmd-list))
|
||||||
(loop idle-rest (cons (list job wrkr) inflight) count error-count)))])]
|
(loop idle-rest (cons (list job wrkr) inflight) count error-count))]
|
||||||
|
|
||||||
[else
|
[else
|
||||||
(define (kill/remove-dead-worker node-worker wrkr)
|
(define (kill/remove-dead-worker node-worker wrkr)
|
||||||
|
(DEBUG_COMM (printf "KILLING ~v\n" (wrkr/id wrkr)))
|
||||||
(wrkr/kill wrkr)
|
(wrkr/kill wrkr)
|
||||||
(loop (cons (spawn (wrkr/id wrkr)) idle)
|
(loop (cons (spawn (wrkr/id wrkr)) idle)
|
||||||
(remove node-worker inflight)
|
(remove node-worker inflight)
|
||||||
count
|
count
|
||||||
(add1 error-count)))
|
(add1 error-count)))
|
||||||
(apply sync (for/list ([node-worker inflight])
|
(define (gen-node-handler node-worker)
|
||||||
(match node-worker [(list node wrkr)
|
(match node-worker
|
||||||
(define out (wrkr/out wrkr))
|
[(list node wrkr)
|
||||||
(handle-evt out (λ (e)
|
(handle-evt (wrkr/out wrkr) (λ (e)
|
||||||
(with-handlers* ([exn:fail? (lambda (e)
|
(with-handlers* ([exn:fail? (lambda (e)
|
||||||
(printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e))
|
(printf "MASTER READ ERROR - reading from worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
|
||||||
(kill/remove-dead-worker node-worker wrkr))])
|
(kill/remove-dead-worker node-worker wrkr))])
|
||||||
(let ([msg (wrkr/recv wrkr)])
|
(let ([msg (if use-places e (wrkr/recv wrkr))])
|
||||||
(if (pair? msg)
|
(if (pair? msg)
|
||||||
(if (queue/work-done jobqueue node wrkr msg)
|
(if (queue/work-done jobqueue node wrkr msg)
|
||||||
(loop (cons wrkr idle)
|
(loop (cons wrkr idle) (remove node-worker inflight) (add1 count) error-count)
|
||||||
(remove node-worker inflight)
|
(loop idle inflight count error-count))
|
||||||
(add1 count)
|
(begin
|
||||||
error-count)
|
(kill/remove-dead-worker node-worker wrkr)
|
||||||
(loop idle inflight count error-count))
|
(queue/work-done jobqueue node wrkr (string-append msg (wrkr/read-all wrkr)))))))))]
|
||||||
(begin
|
|
||||||
(queue/work-done jobqueue node wrkr (string-append msg (port->string out)))
|
|
||||||
(kill/remove-dead-worker node-worker wrkr)))))))]
|
|
||||||
[else
|
[else
|
||||||
(eprintf "parallel-do-event-loop match node-worker failed.\n")
|
(eprintf "parallel-do-event-loop match node-worker failed.\n")
|
||||||
(eprintf "trying to match:\n~a\n" node-worker)])))])))
|
(eprintf "trying to match:\n~a\n" node-worker)]))
|
||||||
(lambda ()
|
(DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n"))
|
||||||
|
(apply sync (map gen-node-handler inflight))])))
|
||||||
|
(lambda ()
|
||||||
|
;(printf "Asking all workers to die\n")
|
||||||
(for ([p workers]) (with-handlers ([exn? void]) (wrkr/send p (list 'DIE))))
|
(for ([p workers]) (with-handlers ([exn? void]) (wrkr/send p (list 'DIE))))
|
||||||
(for ([p workers]) (send p wait)))))
|
;(printf "Waiting for all workers to die")(flush-output)
|
||||||
|
(for ([p workers]
|
||||||
|
[i (in-naturals)])
|
||||||
|
(wrkr/wait p)
|
||||||
|
;(printf " ~a" (add1 i))
|
||||||
|
(flush-output))
|
||||||
|
#;(printf "\n"))))
|
||||||
|
|
||||||
(define ListQueue% (class* object% (WorkQueue<%>)
|
(define ListQueue% (class* object% (WorkQueue<%>)
|
||||||
(init-field queue create-job-thunk success-thunk failure-thunk)
|
(init-field queue create-job-thunk success-thunk failure-thunk)
|
||||||
|
@ -206,50 +265,72 @@
|
||||||
(set! queue t)
|
(set! queue t)
|
||||||
(values h (create-job-thunk h))]))
|
(values h (create-job-thunk h))]))
|
||||||
(define/public (has-jobs?) (not (null? queue)))
|
(define/public (has-jobs?) (not (null? queue)))
|
||||||
(define/public (get-results) results)
|
(define/public (get-results) (reverse results))
|
||||||
(define/public (jobs-cnt) (length queue))
|
(define/public (jobs-cnt) (length queue))
|
||||||
(super-new)))
|
(super-new)))
|
||||||
|
|
||||||
(define match-message-loop
|
(define (ListQueue list-of-work create-job-thunk job-success-thunk job-failure-thunk)
|
||||||
(lambda (stx)
|
(make-object ListQueue% list-of-work create-job-thunk job-success-thunk job-failure-thunk))
|
||||||
(raise-syntax-error 'match-message-loop "only allowed inside a parallel worker definition" stx)))
|
|
||||||
(define-syntax-parameter send/success
|
(define-syntax-rule (define-parallel-keyword-error d x)
|
||||||
(lambda (stx)
|
(d x (lambda (stx) (raise-syntax-error 'x "only allowed inside parallel worker definition" stx))))
|
||||||
(raise-syntax-error 'send/success "only allowed inside parallel worker definition" stx)))
|
(define-syntax-rule (define-syntax-parameter-error x) (define-parallel-keyword-error define-syntax-parameter x))
|
||||||
(define-syntax-parameter send/error
|
|
||||||
(lambda (stx)
|
(define-parallel-keyword-error define match-message-loop)
|
||||||
(raise-syntax-error 'send/error "only allowed inside parallel worker definition" stx)))
|
(define-syntax-parameter-error send/msg)
|
||||||
|
(define-syntax-parameter-error send/success)
|
||||||
|
(define-syntax-parameter-error send/error)
|
||||||
|
(define-syntax-parameter-error recv/req)
|
||||||
|
(define-syntax-parameter-error worker/die)
|
||||||
|
|
||||||
|
|
||||||
(define-for-syntax (gen-worker-body globals-list globals-body work-body)
|
(define-for-syntax (gen-worker-body globals-list globals-body work-body channel)
|
||||||
(with-syntax ([globals-list globals-list]
|
(with-syntax ([globals-list globals-list]
|
||||||
[(globals-body ...) globals-body]
|
[(globals-body ...) globals-body]
|
||||||
[(work work-body ...) work-body])
|
[([work work-body ...] ...) work-body]
|
||||||
|
[ch channel])
|
||||||
#'(begin
|
#'(begin
|
||||||
(define orig-err (current-error-port))
|
(define orig-err (current-error-port))
|
||||||
(define orig-out (current-output-port))
|
(define orig-out (current-output-port))
|
||||||
|
(define orig-in (current-input-port))
|
||||||
|
(define (raw-send msg)
|
||||||
|
(cond
|
||||||
|
[ch (place-channel-send ch msg)]
|
||||||
|
[else (write msg orig-out)
|
||||||
|
(flush-output orig-out)]))
|
||||||
|
(define (raw-recv)
|
||||||
|
(cond
|
||||||
|
[ch (place-channel-receive ch)]
|
||||||
|
[else (read orig-in)]))
|
||||||
(define (pdo-send msg)
|
(define (pdo-send msg)
|
||||||
(with-handlers ([exn:fail?
|
(with-handlers ([exn:fail?
|
||||||
(lambda (x)
|
(lambda (x)
|
||||||
(fprintf orig-err "WORKER SEND MESSAGE ERROR ~a\n" (exn-message x))
|
(fprintf orig-err "WORKER SEND MESSAGE ERROR ~a\n" (exn-message x))
|
||||||
(exit 1))])
|
(exit 1))])
|
||||||
(write msg orig-out)
|
(DEBUG_COMM (fprintf orig-err "WSENDING ~v\n" msg))
|
||||||
(flush-output orig-out)))
|
(raw-send msg)))
|
||||||
(define (pdo-recv)
|
(define (pdo-recv)
|
||||||
(with-handlers ([exn:fail?
|
(with-handlers ([exn:fail?
|
||||||
(lambda (x)
|
(lambda (x)
|
||||||
(fprintf orig-err "WORKER RECEIVE MESSAGE ERROR ~a\n" (exn-message x))
|
(fprintf orig-err "WORKER RECEIVE MESSAGE ERROR ~a\n" (exn-message x))
|
||||||
(exit 1))])
|
(exit 1))])
|
||||||
(read)))
|
(define r (raw-recv))
|
||||||
|
(DEBUG_COMM (fprintf orig-err "WRECVEIVED ~v\n" r))
|
||||||
|
r))
|
||||||
(match (deserialize (fasl->s-exp (pdo-recv)))
|
(match (deserialize (fasl->s-exp (pdo-recv)))
|
||||||
[globals-list
|
[globals-list
|
||||||
globals-body ...
|
globals-body ...
|
||||||
(let loop ()
|
(let/ec die-k
|
||||||
|
(let loop ([i 0])
|
||||||
|
(DEBUG_COMM (fprintf orig-err "WAITING ON CONTROLLER TO RESPOND ~v ~v\n" orig-in i))
|
||||||
(match (pdo-recv)
|
(match (pdo-recv)
|
||||||
[(list 'DIE) void]
|
[(list 'DIE) void]
|
||||||
[work
|
[work
|
||||||
(let ([out-str-port (open-output-string)]
|
(let ([out-str-port (open-output-string)]
|
||||||
[err-str-port (open-output-string)])
|
[err-str-port (open-output-string)])
|
||||||
|
(define (recv/reqp) (pdo-recv))
|
||||||
|
(define (send/msgp msg)
|
||||||
|
(pdo-send msg))
|
||||||
(define (send/resp type)
|
(define (send/resp type)
|
||||||
(pdo-send (list type (get-output-string out-str-port) (get-output-string err-str-port))))
|
(pdo-send (list type (get-output-string out-str-port) (get-output-string err-str-port))))
|
||||||
(define (send/successp result)
|
(define (send/successp result)
|
||||||
|
@ -259,43 +340,55 @@
|
||||||
(with-handlers ([exn:fail? (lambda (x) (send/errorp (exn-message x)) (loop))])
|
(with-handlers ([exn:fail? (lambda (x) (send/errorp (exn-message x)) (loop))])
|
||||||
(parameterize ([current-output-port out-str-port]
|
(parameterize ([current-output-port out-str-port]
|
||||||
[current-error-port err-str-port])
|
[current-error-port err-str-port])
|
||||||
(syntax-parameterize ([send/success (make-rename-transformer #'send/successp)]
|
(syntax-parameterize ([send/msg (make-rename-transformer #'send/msgp)]
|
||||||
[send/error (make-rename-transformer #'send/errorp)])
|
[send/success (make-rename-transformer #'send/successp)]
|
||||||
|
[send/error (make-rename-transformer #'send/errorp)]
|
||||||
|
[recv/req (make-rename-transformer #'recv/reqp)]
|
||||||
|
[worker/die (make-rename-transformer #'die-k)])
|
||||||
work-body ...
|
work-body ...
|
||||||
(loop)))))]))]))))
|
(loop (add1 i))))))] ...)))]))))
|
||||||
|
|
||||||
(define-syntax (lambda-worker stx)
|
(define-syntax (lambda-worker stx)
|
||||||
(syntax-parse stx #:literals(match-message-loop)
|
(syntax-parse stx #:literals(match-message-loop)
|
||||||
[(_ (globals-list:id ...)
|
[(_ (globals-list:id ...)
|
||||||
globals-body:expr ...
|
globals-body:expr ...
|
||||||
(match-message-loop
|
(match-message-loop
|
||||||
[work:id work-body:expr ...]))
|
[work:expr work-body:expr ...] ...))
|
||||||
|
|
||||||
(with-syntax ([body (gen-worker-body #'(list globals-list ...) #'(globals-body ...) #'(work work-body ...))])
|
(with-syntax ([body (gen-worker-body #'(list globals-list ...) #'(globals-body ...) #'([work work-body ...] ...) #'ch)])
|
||||||
#'(lambda ()
|
#'(lambda (ch) body))]))
|
||||||
body))]))
|
|
||||||
|
|
||||||
(define-syntax (parallel-do stx)
|
(define-syntax (parallel-do stx)
|
||||||
(syntax-case stx ()
|
(syntax-case stx (define-worker)
|
||||||
[(_ worker-count initalmsg list-of-work create-job-thunk job-success-thunk job-failure-thunk workerthunk)
|
[(_ worker-count initalmsg workqueue (define-worker (name args ...) body ...))
|
||||||
(begin
|
(with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda-worker (args ...) body ...))])
|
||||||
(define (gen-parallel-do-event-loop-syntax cmdline initial-stdin-data)
|
(syntax-local-lift-provide #'(rename interal-def-name name)))
|
||||||
(with-syntax ([cmdline cmdline]
|
#'(let ([wq workqueue])
|
||||||
[initial-stdin-data initial-stdin-data])
|
(define module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference)))))
|
||||||
#`(begin
|
(parallel-do-event-loop module-path 'name initalmsg wq worker-count)
|
||||||
;(printf "CMDLINE ~v\n" cmdline)
|
(queue/results wq))]))
|
||||||
;(printf "INITIALTHUNK ~v\n" initial-stdin-data)
|
|
||||||
(let ([jobqueue (make-object ListQueue% list-of-work create-job-thunk job-success-thunk job-failure-thunk)])
|
(define-syntax-rule (define-syntax-case (N a ...) b ...)
|
||||||
(parallel-do-event-loop initial-stdin-data initalmsg cmdline jobqueue worker-count 999999999)
|
(define-syntax (N stx)
|
||||||
(reverse (send jobqueue get-results))))))
|
(syntax-case stx ()
|
||||||
(define (gen-dynamic-require-current-module funcname)
|
[(_ a ...) b ...])))
|
||||||
(with-syntax ([funcname funcname])
|
|
||||||
#'(let ([module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference))))])
|
(define-for-syntax (gen-create-place stx)
|
||||||
`((dynamic-require (string->path ,module-path) (quote funcname))))))
|
(syntax-case stx ()
|
||||||
(syntax-case #'workerthunk (define-worker)
|
[(_ ch body ...)
|
||||||
[(define-worker (name args ...) body ...)
|
(with-syntax ([interal-def-name
|
||||||
(with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda-worker (args ...) body ...))])
|
(syntax-local-lift-expression #'(lambda (ch) body ...))]
|
||||||
(syntax-local-lift-provide #'(rename interal-def-name name)))
|
[funcname #'OBSCURE_FUNC_NAME_%#%])
|
||||||
(gen-parallel-do-event-loop-syntax
|
(syntax-local-lift-provide #'(rename interal-def-name funcname))
|
||||||
#'(list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")
|
#'(let ([module-path (resolved-module-path-name
|
||||||
(gen-dynamic-require-current-module #'name))]))]))
|
(variable-reference->resolved-module-path
|
||||||
|
(#%variable-reference)))])
|
||||||
|
(place module-path (quote funcname))))]))
|
||||||
|
|
||||||
|
(define-syntax (place/thunk stx)
|
||||||
|
(with-syntax ([create-place (gen-create-place stx)])
|
||||||
|
#'(lambda () create-place)))
|
||||||
|
|
||||||
|
(define-syntax (place/anon stx)
|
||||||
|
(gen-create-place stx))
|
||||||
|
|
||||||
|
|
|
@ -139,10 +139,11 @@
|
||||||
(parallel-do
|
(parallel-do
|
||||||
worker-count
|
worker-count
|
||||||
(lambda (workerid) (list workerid program-name (verbose) only-dirs latex-dest auto-main? auto-user?))
|
(lambda (workerid) (list workerid program-name (verbose) only-dirs latex-dest auto-main? auto-user?))
|
||||||
docs
|
(ListQueue
|
||||||
(lambda (x) (s-exp->fasl (serialize x)))
|
docs
|
||||||
(lambda (work r outstr errstr) (printf "~a" outstr) (printf "~a" errstr) (deserialize (fasl->s-exp r)))
|
(lambda (x) (s-exp->fasl (serialize x)))
|
||||||
(lambda (work errmsg outstr errstr) (parallel-do-error-handler setup-printf work errmsg outstr errstr))
|
(lambda (work r outstr errstr) (printf "~a" outstr) (printf "~a" errstr) (deserialize (fasl->s-exp r)))
|
||||||
|
(lambda (work errmsg outstr errstr) (parallel-do-error-handler setup-printf work errmsg outstr errstr)))
|
||||||
(define-worker (get-doc-info-worker workerid program-name verbosev only-dirs latex-dest auto-main? auto-user?)
|
(define-worker (get-doc-info-worker workerid program-name verbosev only-dirs latex-dest auto-main? auto-user?)
|
||||||
(define ((get-doc-info-local program-name only-dirs latex-dest auto-main? auto-user?) doc)
|
(define ((get-doc-info-local program-name only-dirs latex-dest auto-main? auto-user?) doc)
|
||||||
(define (setup-printf subpart formatstr . rest)
|
(define (setup-printf subpart formatstr . rest)
|
||||||
|
@ -321,15 +322,16 @@
|
||||||
(parallel-do
|
(parallel-do
|
||||||
worker-count
|
worker-count
|
||||||
(lambda (workerid) (list workerid (verbose) latex-dest))
|
(lambda (workerid) (list workerid (verbose) latex-dest))
|
||||||
need-rerun
|
(ListQueue
|
||||||
(lambda (i)
|
need-rerun
|
||||||
(say-rendering i)
|
(lambda (i)
|
||||||
(s-exp->fasl (serialize (info-doc i))))
|
(say-rendering i)
|
||||||
(lambda (i r outstr errstr)
|
(s-exp->fasl (serialize (info-doc i))))
|
||||||
(printf "~a" outstr)
|
(lambda (i r outstr errstr)
|
||||||
(printf "~a" errstr)
|
(printf "~a" outstr)
|
||||||
(update-info i (deserialize (fasl->s-exp r))))
|
(printf "~a" errstr)
|
||||||
(lambda (i errmsg outstr errstr) (parallel-do-error-handler setup-printf (info-doc i) errmsg outstr errstr))
|
(update-info i (deserialize (fasl->s-exp r))))
|
||||||
|
(lambda (i errmsg outstr errstr) (parallel-do-error-handler setup-printf (info-doc i) errmsg outstr errstr)))
|
||||||
(define-worker (build-again!-worker2 workerid verbosev latex-dest)
|
(define-worker (build-again!-worker2 workerid verbosev latex-dest)
|
||||||
(define (with-record-error cc go fail-k)
|
(define (with-record-error cc go fail-k)
|
||||||
(with-handlers ([exn:fail?
|
(with-handlers ([exn:fail?
|
||||||
|
|
Loading…
Reference in New Issue
Block a user