diff --git a/collects/setup/parallel-build-worker.rkt b/collects/setup/parallel-build-worker.rkt deleted file mode 100644 index ef1e6e1ec2..0000000000 --- a/collects/setup/parallel-build-worker.rkt +++ /dev/null @@ -1,55 +0,0 @@ -#lang racket/base -(require compiler/cm - racket/match - racket/fasl - racket/serialize) - -(define prev-uncaught-exception-handler (uncaught-exception-handler)) -(uncaught-exception-handler (lambda (x) - (when (exn:break? x) (exit 1)) - (prev-uncaught-exception-handler x))) - -(let ([cmc (make-caching-managed-compile-zo)] - [worker-id (deserialize (fasl->s-exp (read)))]) - (let loop () - (match (read) - [(list 'DIE) void] - [(list name dir file) - (let ([dir (bytes->path dir)] - [file (bytes->path file)]) - (let ([out-str-port (open-output-string)] - [err-str-port (open-output-string)]) - (let ([cip (current-input-port)] - [cop (current-output-port)] - [cep (current-error-port)]) - (define (send/msg msg) - (write msg cop) - (flush-output cop)) - (define (send/resp type) - (send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port)))) - (define (pp x) - (fprintf cep "COMPILING ~a ~a ~a ~a\n" worker-id name file x)) - (define (lock-client cmd fn) - (match cmd - ['lock - (send/msg (list (list 'LOCK (path->bytes fn)) "" "")) - (match (read cip) - [(list 'locked) #t] - [(list 'compiled) #f])] - ['unlock (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))])) - (with-handlers ([exn:fail? (lambda (x) - (send/resp (list 'ERROR (exn-message x))))]) - (parameterize ([parallel-lock-client lock-client] - [current-namespace (make-base-empty-namespace)] - [current-directory dir] - [current-load-relative-directory dir] - [current-input-port (open-input-string "")] - [current-output-port out-str-port] - [current-error-port err-str-port] - ;[manager-compile-notify-handler pp] - ) - - (cmc (build-path dir file))) - (send/resp 'DONE)))) - (flush-output) - (loop))]))) diff --git a/collects/setup/parallel-build.rkt b/collects/setup/parallel-build.rkt index 732d0c8933..7688b6669a 100644 --- a/collects/setup/parallel-build.rkt +++ b/collects/setup/parallel-build.rkt @@ -4,6 +4,8 @@ racket/list racket/match racket/path + racket/fasl + racket/serialize setup/collects setup/parallel-do racket/class @@ -12,9 +14,14 @@ racket/place (for-syntax racket/base)) + (provide parallel-compile parallel-compile-files) +(define-syntax-rule (DEBUG_COMM a ...) + (void) +; (begin a ...) +) (define Lock-Manager% (class object% (field (locks (make-hash))) @@ -181,125 +188,71 @@ (define/public (get-results) (void)) (super-new))) +(define (parallel-build work-queue worker-count) + (parallel-do + worker-count + (lambda (workerid) (list workerid)) + work-queue + (define-worker (parallel-compile-worker worker-id) + (DEBUG_COMM (eprintf "WORKER ~a\n" worker-id)) + (define prev-uncaught-exception-handler (uncaught-exception-handler)) + (uncaught-exception-handler (lambda (x) + (when (exn:break? x) (exit 1)) + (prev-uncaught-exception-handler x))) -(define (build-parallel-build-worker-args) - (list (find-exe #f) - "-X" - (path->string (current-collects-path)) - "-l" - "setup/parallel-build-worker.rkt")) + (define cmc (make-caching-managed-compile-zo)) + (match-message-loop + [(list name _dir _file) + (DEBUG_COMM (eprintf "COMPILING ~a ~a ~a ~a\n" worker-id name _file _dir)) + (define dir (bytes->path _dir)) + (define file (bytes->path _file)) + (define out-str-port (open-output-string)) + (define err-str-port (open-output-string)) + (define cip (current-input-port)) + (define cop (current-output-port)) + (define cep (current-error-port)) + (define (send/recv msg) (send/msg msg) (recv/req)) + (define (send/resp type) + (send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port)))) + (define (pp x) (fprintf cep "COMPILING ~a ~a ~a ~a\n" worker-id name file x)) + (define (lock-client cmd fn) + (match cmd + ['lock + (DEBUG_COMM (eprintf "REQUESTING LOCK ~a ~a ~a ~a\n" worker-id name _file _dir)) + (match (send/recv (list (list 'LOCK (path->bytes fn)) "" "")) + [(list 'locked) #t] + [(list 'compiled) #f] + [(list 'DIE) (worker/die 1)] + [x (send/error (format "DIDNT MATCH B ~v\n" x))] + [else (send/error (format "DIDNT MATCH B\n"))])] + ['unlock + (DEBUG_COMM (eprintf "UNLOCKING ~a ~a ~a ~a\n" worker-id name _file _dir)) + (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))] + [x (send/error (format "DIDNT MATCH C ~v\n" x))] + [else (send/error (format "DIDNT MATCH C\n"))])) + (with-handlers ([exn:fail? (lambda (x) + (send/resp (list 'ERROR (exn-message x))))]) + (parameterize ([parallel-lock-client lock-client] + [current-namespace (make-base-empty-namespace)] + [current-directory dir] + [current-load-relative-directory dir] + [current-input-port (open-input-string "")] + [current-output-port out-str-port] + [current-error-port err-str-port] + ;[manager-compile-notify-handler pp] + ) + + (cmc (build-path dir file))) + (send/resp 'DONE))] + [x (send/error (format "DIDNT MATCH A ~v\n" x))] + [else (send/error (format "DIDNT MATCH A\n"))])))) (define (parallel-compile-files list-of-files #:worker-count [worker-count (processor-count)] #:handler [handler void]) - - (parallel-do-event-loop #f - values ; identity function - (build-parallel-build-worker-args) - (make-object FileListQueue% list-of-files handler) - worker-count 999999999)) + (parallel-build (make-object FileListQueue% list-of-files handler) worker-count)) (define (parallel-compile worker-count setup-fprintf append-error collects-tree) (setup-fprintf (current-output-port) #f "--- parallel build using ~a processor cores ---" worker-count) (define collects-queue (make-object CollectsQueue% collects-tree setup-fprintf append-error)) - (if (place-enabled?) - (places-parallel-build collects-queue worker-count 999999999) - (parallel-do-event-loop #f values (build-parallel-build-worker-args) collects-queue worker-count 999999999))) - -(define-syntax-rule (define-syntax-case (N a ...) b ...) - (define-syntax (N stx) - (syntax-case stx () - [(_ a ...) b ...]))) - -(define PlaceWorker% (class* object% (Worker<%>) - (init-field [id 0] - [pl null]) - - (define/public (send/msg msg) (place-channel-send pl msg)) - (define/public (recv/msg) (place-channel-receive pl)) - (define/public (get-id) id) - (define/public (get-out) pl) - (define/public (kill) #f) - (define/public (wait) (place-wait pl)) - (super-new))) - -(define-syntax-case (place/anon (ch) body ...) - (with-syntax ([interal-def-name - (syntax-local-lift-expression #'(lambda (ch) body ...))] - [funcname #'OBSCURE_FUNC_NAME_%#%]) - (syntax-local-lift-provide #'(rename interal-def-name funcname)) - #'(let ([module-path (resolved-module-path-name - (variable-reference->resolved-module-path - (#%variable-reference)))]) - (place module-path (quote funcname))))) - -(define (places-parallel-build jobqueue nprocs stopat) - (define ps - (for/list ([i (in-range nprocs)]) - (place/anon (ch) - (let ([cmc ((dynamic-require 'compiler/cm 'make-caching-managed-compile-zo))]) - (let loop () - (match (place-channel-receive ch) - [(list 'DIE) void] - [(list name dir file) - (let ([dir (bytes->path dir)] - [file (bytes->path file)]) - (let ([out-str-port (open-output-string)] - [err-str-port (open-output-string)]) - (define (send/msg msg) - (place-channel-send ch msg)) - (define (send/resp type) - (send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port)))) - (define (lock-client cmd fn) - (match cmd - ['lock - (send/msg (list (list 'LOCK (path->bytes fn)) "" "")) - (match (place-channel-receive ch) - [(list 'locked) #t] - [(list 'compiled) #f])] - ['unlock (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))])) - (with-handlers ([exn:fail? (lambda (x) - (send/resp (list 'ERROR (exn-message x))))]) - (parameterize ([parallel-lock-client lock-client] - [current-namespace (make-base-empty-namespace)] - [current-directory dir] - [current-load-relative-directory dir] - [current-input-port (open-input-string "")] - [current-output-port out-str-port] - [current-error-port err-str-port]) - - (cmc (build-path dir file))) - (send/resp 'DONE)))) - (loop)])))))) - - - (define workers (for/list ([i (in-range nprocs)] - [p ps]) - (make-object PlaceWorker% i p))) - (define (jobs?) (queue/has jobqueue)) - (define (empty?) (not (queue/has jobqueue))) - - (let loop ([idle workers] - [inflight null] - [count 0]) - (cond - [(= count stopat) (printf "DONE AT LIMIT\n")] - [(and (empty?) (null? inflight)) (set! workers idle)] ; ALL DONE - [(and (jobs?) (pair? idle)) - (match-define (cons wrkr idle-rest) idle) - (define-values (job cmd-list) (queue/get jobqueue (wrkr/id wrkr))) - (wrkr/send wrkr cmd-list) - (loop idle-rest (cons (list job wrkr) inflight) count)] - - [else - (define (gen-node-handler node-worker) - (match-define (list node wrkr) node-worker) - (handle-evt (wrkr/out wrkr) (λ (msg) - (if (queue/work-done jobqueue node wrkr msg) - (loop (cons wrkr idle) (remove node-worker inflight) (add1 count)) - (loop idle inflight count))))) - - (apply sync (map gen-node-handler inflight))])) - - (for ([p workers]) (wrkr/send p (list 'DIE))) - (for ([p ps]) (place-wait p))) + (parallel-build collects-queue worker-count)) diff --git a/collects/setup/parallel-do.rkt b/collects/setup/parallel-do.rkt index a80aaca184..92282c5f88 100644 --- a/collects/setup/parallel-do.rkt +++ b/collects/setup/parallel-do.rkt @@ -2,6 +2,7 @@ (require racket/file racket/future + racket/place racket/port racket/fasl racket/match @@ -13,16 +14,17 @@ racket/base)) (provide parallel-do - parallel-do-event-loop current-executable-path current-collects-path match-message-loop send/success send/error + send/msg + recv/req + worker/die WorkQueue<%> - Worker<%> - wrkr/send - define/class/generics) + define/class/generics + ListQueue) (define-syntax-rule (mk-generic func clss method args ...) (begin @@ -40,11 +42,18 @@ (mk-generic func class method args ...) (provide func)) ...)) +(define-syntax-rule (DEBUG_COMM a ...) + (void) +; (begin a ...) +) (define Worker<%> (interface () + spawn send/msg kill + wait recv/msg + read-all get-id get-out)) @@ -53,37 +62,74 @@ [process-handle null] [out null] [in null] - [err null]) + [err null] + [module-path null] + [funcname null]) - (define/public (spawn _id worker-cmdline-list [initialcode #f] [initialmsg #f]) + (define/public (spawn _id _module-path _funcname [initialmsg #f]) + (set! module-path _module-path) + (set! funcname _funcname) + (define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")) + (define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f)) (let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)]) (set! id _id) (set! process-handle _process-handle) (set! out _out) (set! in _in) (set! err _err) - (when initialcode (send/msg initialcode)) + (send/msg dynamic-require-cmd) (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))) - - (define/public (send/msg msg) (write msg in) (flush-output in)) - (define/public (recv/msg) (read out)) + (define/public (send/msg msg) + (with-handlers ([exn:fail? + (lambda (x) + (eprintf "CONTROLLER SEND MESSAGE ERROR TO WORKER ~a ~a\n" id (exn-message x)) + (exit 1))]) + (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg)) + (write msg in) (flush-output in))) + (define/public (recv/msg) + (with-handlers ([exn:fail? + (lambda (x) + (eprintf "CONTROLLER RECEIVE MESSAGE ERROR FROM WORKER ~a ~a\n" id (exn-message x)) + (exit 1))]) + (define r (read out)) + (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r)) + r)) + (define/public (read-all) (port->string out)) (define/public (get-id) id) (define/public (get-out) out) (define/public (kill) - (eprintf "KILLING WORKER ~a\n" id) + (DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id)) (close-output-port in) (close-input-port out) (subprocess-kill process-handle #t)) - (define/public (kill/respawn worker-cmdline-list [initialcode #f] [initialmsg #f]) + (define/public (kill/respawn worker-cmdline-list [initialmsg #f]) (kill) - (spawn id worker-cmdline-list [initialcode #f] [initialmsg #f])) + (spawn id module-path funcname [initialmsg #f])) (define/public (wait) (subprocess-wait process-handle)) (super-new))) -(define (wrkr/spawn id worker-cmdline-list [initialcode #f] [initialmsg #f]) - (define wrkr (new Worker%)) - (send wrkr spawn id worker-cmdline-list initialcode initialmsg) - wrkr) +(define PlaceWorker% (class* object% (Worker<%>) + (init-field [id 0] + [pl null]) + + (define/public (spawn _id module-path funcname [initialmsg #f]) + (set! id _id) + (set! pl (place (string->path module-path) funcname)) + (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))) + (define/public (send/msg msg) + (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg)) + (place-channel-send pl msg)) + (define/public (recv/msg) + (define r (place-channel-receive pl)) + (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r)) + r) + (define/public (read-all) "") + (define/public (get-id) id) + (define/public (get-out) pl) + (define/public (kill) #f) + (define/public (wait) (place-wait pl)) + (super-new))) + (define WorkQueue<%> (interface () get-job @@ -93,18 +139,22 @@ get-results)) (define/class/generics/provide Worker<%> + (wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg) (wrkr/send send/msg msg) (wrkr/kill kill) (wrkr/recv recv/msg) + (wrkr/read-all read-all) (wrkr/id get-id) - (wrkr/out get-out)) + (wrkr/out get-out) + (wrkr/wait wait)) (define/class/generics/provide WorkQueue<%> (queue/get get-job wrkrid) (queue/work-done work-done node wrkr msg) (queue/has has-jobs?) - (queue/count jobs-cnt)) + (queue/count jobs-cnt) + (queue/results get-results)) (define (current-executable-path) (parameterize ([current-directory (find-system-path 'orig-dir)]) @@ -117,12 +167,15 @@ (path->complete-path p (or (path-only (current-executable-path)) (find-system-path 'orig-dir)))))) -(define (parallel-do-event-loop initialcode initialmsg worker-cmdline-list jobqueue nprocs stopat) - (define (spawn id) (wrkr/spawn id worker-cmdline-list initialcode initialmsg)) +(define (parallel-do-event-loop module-path funcname initialmsg jobqueue nprocs [stopat #f]) + (define use-places (place-enabled?)) + (define (spawn id) + (define wrkr (if use-places (new PlaceWorker%) (new Worker%))) + (wrkr/spawn wrkr id module-path funcname initialmsg) + wrkr) (define (jobs?) (queue/has jobqueue)) (define (empty?) (not (queue/has jobqueue))) (define workers #f) - (dynamic-wind (lambda () (parameterize-break #f @@ -141,54 +194,60 @@ (cond [(error-threshold error-count)] ;; Reached stopat count STOP - [(= count stopat) (printf "DONE AT LIMIT\n")] + [(and stopat (= count stopat)) (printf "DONE AT LIMIT\n")] ;; Queue empty and all workers idle, we are all done - [(and (empty?) (null? inflight)) (set! workers idle)] + [(and (empty?) (null? inflight)) (parameterize-break #f (set! workers idle))] ; ALL DONE ;; Send work to idle worker [(and (jobs?) (pair? idle)) - (match idle [(cons wrkr idle-rest) - (let-values ([(job cmd-list) (queue/get jobqueue (wrkr/id wrkr))]) - (let retry-loop ([wrkr wrkr] - [error-count error-count]) - (error-threshold error-count) - (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e)) - (wrkr/kill wrkr) - (retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))]) - (wrkr/send wrkr cmd-list)) - (loop idle-rest (cons (list job wrkr) inflight) count error-count)))])] - + (match-define (cons wrkr idle-rest) idle) + (define-values (job cmd-list) (queue/get jobqueue (wrkr/id wrkr))) + (let retry-loop ([wrkr wrkr] + [error-count error-count]) + (error-threshold error-count) + (with-handlers* ([exn:fail? (lambda (e) + (printf "MASTER WRITE ERROR - writing to worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e)) + (wrkr/kill wrkr) + (retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))]) + (wrkr/send wrkr cmd-list)) + (loop idle-rest (cons (list job wrkr) inflight) count error-count))] [else (define (kill/remove-dead-worker node-worker wrkr) + (DEBUG_COMM (printf "KILLING ~v\n" (wrkr/id wrkr))) (wrkr/kill wrkr) (loop (cons (spawn (wrkr/id wrkr)) idle) (remove node-worker inflight) count (add1 error-count))) - (apply sync (for/list ([node-worker inflight]) - (match node-worker [(list node wrkr) - (define out (wrkr/out wrkr)) - (handle-evt out (λ (e) - (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e)) - (kill/remove-dead-worker node-worker wrkr))]) - (let ([msg (wrkr/recv wrkr)]) - (if (pair? msg) - (if (queue/work-done jobqueue node wrkr msg) - (loop (cons wrkr idle) - (remove node-worker inflight) - (add1 count) - error-count) - (loop idle inflight count error-count)) - (begin - (queue/work-done jobqueue node wrkr (string-append msg (port->string out))) - (kill/remove-dead-worker node-worker wrkr)))))))] + (define (gen-node-handler node-worker) + (match node-worker + [(list node wrkr) + (handle-evt (wrkr/out wrkr) (λ (e) + (with-handlers* ([exn:fail? (lambda (e) + (printf "MASTER READ ERROR - reading from worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e)) + (kill/remove-dead-worker node-worker wrkr))]) + (let ([msg (if use-places e (wrkr/recv wrkr))]) + (if (pair? msg) + (if (queue/work-done jobqueue node wrkr msg) + (loop (cons wrkr idle) (remove node-worker inflight) (add1 count) error-count) + (loop idle inflight count error-count)) + (begin + (kill/remove-dead-worker node-worker wrkr) + (queue/work-done jobqueue node wrkr (string-append msg (wrkr/read-all wrkr)))))))))] [else (eprintf "parallel-do-event-loop match node-worker failed.\n") - (eprintf "trying to match:\n~a\n" node-worker)])))]))) - (lambda () + (eprintf "trying to match:\n~a\n" node-worker)])) + (DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n")) + (apply sync (map gen-node-handler inflight))]))) + (lambda () + ;(printf "Asking all workers to die\n") (for ([p workers]) (with-handlers ([exn? void]) (wrkr/send p (list 'DIE)))) - (for ([p workers]) (send p wait))))) + ;(printf "Waiting for all workers to die")(flush-output) + (for ([p workers] + [i (in-naturals)]) + (wrkr/wait p) + ;(printf " ~a" (add1 i)) + (flush-output)) + #;(printf "\n")))) (define ListQueue% (class* object% (WorkQueue<%>) (init-field queue create-job-thunk success-thunk failure-thunk) @@ -206,50 +265,72 @@ (set! queue t) (values h (create-job-thunk h))])) (define/public (has-jobs?) (not (null? queue))) - (define/public (get-results) results) + (define/public (get-results) (reverse results)) (define/public (jobs-cnt) (length queue)) (super-new))) -(define match-message-loop - (lambda (stx) - (raise-syntax-error 'match-message-loop "only allowed inside a parallel worker definition" stx))) -(define-syntax-parameter send/success - (lambda (stx) - (raise-syntax-error 'send/success "only allowed inside parallel worker definition" stx))) -(define-syntax-parameter send/error - (lambda (stx) - (raise-syntax-error 'send/error "only allowed inside parallel worker definition" stx))) +(define (ListQueue list-of-work create-job-thunk job-success-thunk job-failure-thunk) + (make-object ListQueue% list-of-work create-job-thunk job-success-thunk job-failure-thunk)) + +(define-syntax-rule (define-parallel-keyword-error d x) + (d x (lambda (stx) (raise-syntax-error 'x "only allowed inside parallel worker definition" stx)))) +(define-syntax-rule (define-syntax-parameter-error x) (define-parallel-keyword-error define-syntax-parameter x)) + +(define-parallel-keyword-error define match-message-loop) +(define-syntax-parameter-error send/msg) +(define-syntax-parameter-error send/success) +(define-syntax-parameter-error send/error) +(define-syntax-parameter-error recv/req) +(define-syntax-parameter-error worker/die) -(define-for-syntax (gen-worker-body globals-list globals-body work-body) +(define-for-syntax (gen-worker-body globals-list globals-body work-body channel) (with-syntax ([globals-list globals-list] [(globals-body ...) globals-body] - [(work work-body ...) work-body]) + [([work work-body ...] ...) work-body] + [ch channel]) #'(begin (define orig-err (current-error-port)) (define orig-out (current-output-port)) + (define orig-in (current-input-port)) + (define (raw-send msg) + (cond + [ch (place-channel-send ch msg)] + [else (write msg orig-out) + (flush-output orig-out)])) + (define (raw-recv) + (cond + [ch (place-channel-receive ch)] + [else (read orig-in)])) (define (pdo-send msg) (with-handlers ([exn:fail? (lambda (x) (fprintf orig-err "WORKER SEND MESSAGE ERROR ~a\n" (exn-message x)) (exit 1))]) - (write msg orig-out) - (flush-output orig-out))) + (DEBUG_COMM (fprintf orig-err "WSENDING ~v\n" msg)) + (raw-send msg))) (define (pdo-recv) (with-handlers ([exn:fail? (lambda (x) (fprintf orig-err "WORKER RECEIVE MESSAGE ERROR ~a\n" (exn-message x)) (exit 1))]) - (read))) + (define r (raw-recv)) + (DEBUG_COMM (fprintf orig-err "WRECVEIVED ~v\n" r)) + r)) (match (deserialize (fasl->s-exp (pdo-recv))) [globals-list globals-body ... - (let loop () + (let/ec die-k + (let loop ([i 0]) + (DEBUG_COMM (fprintf orig-err "WAITING ON CONTROLLER TO RESPOND ~v ~v\n" orig-in i)) (match (pdo-recv) [(list 'DIE) void] [work (let ([out-str-port (open-output-string)] [err-str-port (open-output-string)]) + (define (recv/reqp) (pdo-recv)) + (define (send/msgp msg) + (pdo-send msg)) (define (send/resp type) (pdo-send (list type (get-output-string out-str-port) (get-output-string err-str-port)))) (define (send/successp result) @@ -259,43 +340,55 @@ (with-handlers ([exn:fail? (lambda (x) (send/errorp (exn-message x)) (loop))]) (parameterize ([current-output-port out-str-port] [current-error-port err-str-port]) - (syntax-parameterize ([send/success (make-rename-transformer #'send/successp)] - [send/error (make-rename-transformer #'send/errorp)]) + (syntax-parameterize ([send/msg (make-rename-transformer #'send/msgp)] + [send/success (make-rename-transformer #'send/successp)] + [send/error (make-rename-transformer #'send/errorp)] + [recv/req (make-rename-transformer #'recv/reqp)] + [worker/die (make-rename-transformer #'die-k)]) work-body ... - (loop)))))]))])))) + (loop (add1 i))))))] ...)))])))) (define-syntax (lambda-worker stx) (syntax-parse stx #:literals(match-message-loop) [(_ (globals-list:id ...) globals-body:expr ... (match-message-loop - [work:id work-body:expr ...])) + [work:expr work-body:expr ...] ...)) - (with-syntax ([body (gen-worker-body #'(list globals-list ...) #'(globals-body ...) #'(work work-body ...))]) - #'(lambda () - body))])) + (with-syntax ([body (gen-worker-body #'(list globals-list ...) #'(globals-body ...) #'([work work-body ...] ...) #'ch)]) + #'(lambda (ch) body))])) (define-syntax (parallel-do stx) - (syntax-case stx () - [(_ worker-count initalmsg list-of-work create-job-thunk job-success-thunk job-failure-thunk workerthunk) - (begin - (define (gen-parallel-do-event-loop-syntax cmdline initial-stdin-data) - (with-syntax ([cmdline cmdline] - [initial-stdin-data initial-stdin-data]) - #`(begin - ;(printf "CMDLINE ~v\n" cmdline) - ;(printf "INITIALTHUNK ~v\n" initial-stdin-data) - (let ([jobqueue (make-object ListQueue% list-of-work create-job-thunk job-success-thunk job-failure-thunk)]) - (parallel-do-event-loop initial-stdin-data initalmsg cmdline jobqueue worker-count 999999999) - (reverse (send jobqueue get-results)))))) - (define (gen-dynamic-require-current-module funcname) - (with-syntax ([funcname funcname]) - #'(let ([module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference))))]) - `((dynamic-require (string->path ,module-path) (quote funcname)))))) - (syntax-case #'workerthunk (define-worker) - [(define-worker (name args ...) body ...) - (with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda-worker (args ...) body ...))]) - (syntax-local-lift-provide #'(rename interal-def-name name))) - (gen-parallel-do-event-loop-syntax - #'(list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))") - (gen-dynamic-require-current-module #'name))]))])) + (syntax-case stx (define-worker) + [(_ worker-count initalmsg workqueue (define-worker (name args ...) body ...)) + (with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda-worker (args ...) body ...))]) + (syntax-local-lift-provide #'(rename interal-def-name name))) + #'(let ([wq workqueue]) + (define module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference))))) + (parallel-do-event-loop module-path 'name initalmsg wq worker-count) + (queue/results wq))])) + +(define-syntax-rule (define-syntax-case (N a ...) b ...) + (define-syntax (N stx) + (syntax-case stx () + [(_ a ...) b ...]))) + +(define-for-syntax (gen-create-place stx) + (syntax-case stx () + [(_ ch body ...) + (with-syntax ([interal-def-name + (syntax-local-lift-expression #'(lambda (ch) body ...))] + [funcname #'OBSCURE_FUNC_NAME_%#%]) + (syntax-local-lift-provide #'(rename interal-def-name funcname)) + #'(let ([module-path (resolved-module-path-name + (variable-reference->resolved-module-path + (#%variable-reference)))]) + (place module-path (quote funcname))))])) + +(define-syntax (place/thunk stx) + (with-syntax ([create-place (gen-create-place stx)]) + #'(lambda () create-place))) + +(define-syntax (place/anon stx) + (gen-create-place stx)) + diff --git a/collects/setup/scribble.rkt b/collects/setup/scribble.rkt index 946590bade..e7c745ad42 100644 --- a/collects/setup/scribble.rkt +++ b/collects/setup/scribble.rkt @@ -139,10 +139,11 @@ (parallel-do worker-count (lambda (workerid) (list workerid program-name (verbose) only-dirs latex-dest auto-main? auto-user?)) - docs - (lambda (x) (s-exp->fasl (serialize x))) - (lambda (work r outstr errstr) (printf "~a" outstr) (printf "~a" errstr) (deserialize (fasl->s-exp r))) - (lambda (work errmsg outstr errstr) (parallel-do-error-handler setup-printf work errmsg outstr errstr)) + (ListQueue + docs + (lambda (x) (s-exp->fasl (serialize x))) + (lambda (work r outstr errstr) (printf "~a" outstr) (printf "~a" errstr) (deserialize (fasl->s-exp r))) + (lambda (work errmsg outstr errstr) (parallel-do-error-handler setup-printf work errmsg outstr errstr))) (define-worker (get-doc-info-worker workerid program-name verbosev only-dirs latex-dest auto-main? auto-user?) (define ((get-doc-info-local program-name only-dirs latex-dest auto-main? auto-user?) doc) (define (setup-printf subpart formatstr . rest) @@ -321,15 +322,16 @@ (parallel-do worker-count (lambda (workerid) (list workerid (verbose) latex-dest)) - need-rerun - (lambda (i) - (say-rendering i) - (s-exp->fasl (serialize (info-doc i)))) - (lambda (i r outstr errstr) - (printf "~a" outstr) - (printf "~a" errstr) - (update-info i (deserialize (fasl->s-exp r)))) - (lambda (i errmsg outstr errstr) (parallel-do-error-handler setup-printf (info-doc i) errmsg outstr errstr)) + (ListQueue + need-rerun + (lambda (i) + (say-rendering i) + (s-exp->fasl (serialize (info-doc i)))) + (lambda (i r outstr errstr) + (printf "~a" outstr) + (printf "~a" errstr) + (update-info i (deserialize (fasl->s-exp r)))) + (lambda (i errmsg outstr errstr) (parallel-do-error-handler setup-printf (info-doc i) errmsg outstr errstr))) (define-worker (build-again!-worker2 workerid verbosev latex-dest) (define (with-record-error cc go fail-k) (with-handlers ([exn:fail?