#lang racket/base (require racket/file racket/future racket/place racket/port racket/fasl racket/match racket/path racket/class racket/serialize racket/stxparam (for-syntax syntax/parse racket/base)) (provide parallel-do current-executable-path current-collects-path match-message-loop send/success send/error send/msg recv/req worker/die work-queue<%> define/class/generics list-queue) (define-syntax-rule (mk-generic func clss method args ...) (begin (define g (generic clss method)) (define (func obj args ...) (send-generic obj g args ...)))) (define-syntax-rule (define/class/generics class (func method args ...) ...) (begin (mk-generic func class method args ...) ...)) (define-syntax-rule (define/class/generics/provide class (func method args ...) ...) (begin (begin (mk-generic func class method args ...) (provide func)) ...)) (define-syntax-rule (DEBUG_COMM a ...) (void) ; (begin a ...) ) (define worker<%> (interface () spawn send/msg kill break wait recv/msg read-all get-id get-out)) (define worker% (class* object% (worker<%>) (field [id 0] [process-handle null] [out null] [in null] [err null] [module-path null] [funcname null]) (define/public (spawn _id _module-path _funcname [initialmsg #f]) (set! module-path _module-path) (set! funcname _funcname) (define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")) (define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f)) (let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)]) (set! id _id) (set! process-handle _process-handle) (set! out _out) (set! in _in) (set! err _err) (send/msg dynamic-require-cmd) (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))) (define/public (send/msg msg) (with-handlers ([exn:fail? (lambda (x) (eprintf "While sending message to parallel-do worker: ~a ~a\n" id (exn-message x)) (exit 1))]) (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg)) (write msg in) (flush-output in))) (define/public (recv/msg) (with-handlers ([exn:fail? (lambda (x) (eprintf "While receiving message from parallel-do worker ~a ~a\n" id (exn-message x)) (exit 1))]) (define r (read out)) (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r)) r)) (define/public (read-all) (port->string out)) (define/public (get-id) id) (define/public (get-out) out) (define/public (kill) (DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id)) (close-output-port in) (close-input-port out) (subprocess-kill process-handle #t)) (define/public (break) (kill)) (define/public (kill/respawn worker-cmdline-list [initialmsg #f]) (kill) (spawn id module-path funcname [initialmsg #f])) (define/public (wait) (subprocess-wait process-handle)) (super-new))) (define place-worker% (class* object% (worker<%>) (init-field [id 0] [pl null]) (define/public (spawn _id module-path funcname [initialmsg #f]) (set! id _id) (set! pl (dynamic-place (string->path module-path) funcname)) (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))) (define/public (send/msg msg) (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg)) (place-channel-put pl msg)) (define/public (recv/msg) (define r (place-channel-get pl)) (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r)) r) (define/public (read-all) "") (define/public (get-id) id) (define/public (get-out) pl) (define/public (kill) #f) (define/public (break) (place-break pl)) (define/public (wait) (place-wait pl)) (super-new))) (define work-queue<%> (interface () get-job work-done has-jobs? jobs-cnt get-results)) (define/class/generics/provide worker<%> (wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg) (wrkr/send send/msg msg) (wrkr/kill kill) (wrkr/break break) (wrkr/recv recv/msg) (wrkr/read-all read-all) (wrkr/id get-id) (wrkr/out get-out) (wrkr/wait wait)) (define/class/generics/provide work-queue<%> (queue/get get-job wrkrid) (queue/work-done work-done node wrkr msg) (queue/has has-jobs?) (queue/count jobs-cnt) (queue/results get-results)) (define (current-executable-path) (parameterize ([current-directory (find-system-path 'orig-dir)]) (find-executable-path (find-system-path 'exec-file) #f))) (define (current-collects-path) (let ([p (find-system-path 'collects-dir)]) (if (complete-path? p) p (path->complete-path p (or (path-only (current-executable-path)) (find-system-path 'orig-dir)))))) (define (parallel-do-event-loop module-path funcname initialmsg work-queue nprocs [stopat #f]) (define use-places? (place-enabled?)) ; set to #f to use processes instead of places (define (spawn id) ;; spawns a new worker (define wrkr (if use-places? (new place-worker%) (new worker%))) (wrkr/spawn wrkr id module-path funcname initialmsg) wrkr) (define workers null) (define (spawn! id) ;; spawn a worker and add it to the list; ;; disable breaks because we want to make sure ;; that a new worker is added to the list of workers ;; before a break exception is raised: (parameterize-break #f (let ([w (spawn id)]) (set! workers (cons w workers)) w))) (define (unspawn! wkr) (wrkr/kill wkr) (set! workers (remq wkr workers))) (define (jobs?) (queue/has work-queue)) (define (empty?) (not (queue/has work-queue))) ;; If any exception (including a break exception) happens before ;; the work loop ends, then send a break to interrupt each worker; ;; the `normal-finish?' flag is set to #t when the working loop ends ;; normally. (define normal-finish? #f) (define log-exn (lambda (exn [msg #f]) (log-error (let ([s (if (exn? exn) (exn-message exn) (format "exception: ~v" exn))]) (if msg (format "~a; ~a" msg s) s))))) (dynamic-wind (lambda () (void)) (lambda () (define (check-error-threshold x) (when (x . >= . 4) (error 'parallel-do "error count reached ~a, exiting" x))) (for/list ([i (in-range nprocs)]) (spawn! i)) (let loop ([idle workers] [inflight null] [count 0] [error-count 0]) (check-error-threshold error-count) (cond ;; Reached stopat count STOP [(and stopat (= count stopat)) ; ??? (log-error "done at limit")] ;; Queue empty and all workers idle, we are all done [(and (empty?) (null? inflight)) ;; done (void)] ;; Send work to idle worker [(and (jobs?) (pair? idle)) (match-define (cons wrkr idle-rest) idle) (define-values (job cmd-list) (queue/get work-queue (wrkr/id wrkr))) (let retry-loop ([wrkr wrkr] [error-count error-count]) (check-error-threshold error-count) (with-handlers* ([exn:fail? (lambda (e) (log-exn e (format "error writing to worker: ~v" (wrkr/id wrkr))) (unspawn! wrkr) (retry-loop (spawn! (wrkr/id wrkr)) (add1 error-count)))]) (wrkr/send wrkr cmd-list)) (loop idle-rest (cons (list job wrkr) inflight) count error-count))] [else (define (kill/remove-dead-worker node-worker wrkr) (DEBUG_COMM (printf "KILLING ~v\n" (wrkr/id wrkr))) (unspawn! wrkr) (loop (cons (spawn! (wrkr/id wrkr)) idle) (remove node-worker inflight) count (add1 error-count))) (define (gen-node-handler node-worker) (match node-worker [(list node wrkr) (handle-evt (wrkr/out wrkr) (λ (e) (let ([msg (with-handlers* ([exn:fail? (lambda (e) (log-exn e (format "error reading from worker: ~v" (wrkr/id wrkr))) (kill/remove-dead-worker node-worker wrkr))]) (if use-places? e (wrkr/recv wrkr)))]) (if (pair? msg) (if (queue/work-done work-queue node wrkr msg) (loop (cons wrkr idle) (remove node-worker inflight) (add1 count) error-count) (loop idle inflight count error-count)) (begin (queue/work-done work-queue node wrkr (string-append msg (wrkr/read-all wrkr))) (kill/remove-dead-worker node-worker wrkr))))))] [else (log-error (format "parallel-do-event-loop match node-worker failed trying to match: ~e" node-worker))])) (DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n")) (apply sync (map gen-node-handler inflight))])) ;; Ask workers to stop: (for ([p workers]) (wrkr/send p (list 'DIE))) ;; Finish normally: (set! normal-finish? #t)) (lambda () (unless normal-finish? ;; There was an exception, so tell workers to stop: (for ([p workers]) (with-handlers ([exn? log-exn]) (wrkr/break p)))) ;; Wait for workers to complete: (for ([p workers]) (with-handlers ([exn? log-exn]) (wrkr/wait p)))))) (define list-queue% (class* object% (work-queue<%>) (init-field queue create-job-thunk success-thunk failure-thunk) (field [results null]) (define/public (work-done work workerid msg) (match msg [(list (list 'DONE result) stdout stderr) (set! results (cons (success-thunk work result stdout stderr) results))] [(list (list 'ERROR errmsg) stdout stderr) (failure-thunk work errmsg stdout stderr)])) (define/public (get-job workerid) (match queue [(cons h t) (set! queue t) (values h (create-job-thunk h workerid))])) (define/public (has-jobs?) (not (null? queue))) (define/public (get-results) (reverse results)) (define/public (jobs-cnt) (length queue)) (super-new))) (define (list-queue list-of-work create-job-thunk job-success-thunk job-failure-thunk) (make-object list-queue% list-of-work create-job-thunk job-success-thunk job-failure-thunk)) (define-syntax-rule (define-parallel-keyword-error d x) (d x (lambda (stx) (raise-syntax-error 'x "only allowed inside parallel worker definition" stx)))) (define-syntax-rule (define-syntax-parameter-error x) (define-parallel-keyword-error define-syntax-parameter x)) (define-parallel-keyword-error define match-message-loop) (define-syntax-parameter-error send/msg) (define-syntax-parameter-error send/success) (define-syntax-parameter-error send/error) (define-syntax-parameter-error recv/req) (define-syntax-parameter-error worker/die) (define-for-syntax (gen-worker-body globals-list globals-body work-body channel) (with-syntax ([globals-list globals-list] [(globals-body ...) globals-body] [([work work-body ...] ...) work-body] [ch channel]) #'(do-worker ch (lambda (msg per-loop-body) ;; single starting message: (match msg [globals-list globals-body ... ;; bind per-worker-set procedures: (per-loop-body (lambda (send/msgp recv/reqp die-k) (syntax-parameterize ([send/msg (make-rename-transformer #'send/msgp)] [recv/req (make-rename-transformer #'recv/reqp)] [worker/die (make-rename-transformer #'die-k)]) ;; message handler: (lambda (msg send/successp send/errorp) (syntax-parameterize ([send/success (make-rename-transformer #'send/successp)] [send/error (make-rename-transformer #'send/errorp)]) (match msg [work work-body ...] ...))))))]))))) (define (do-worker ch setup-proc) (define orig-err (current-error-port)) (define orig-out (current-output-port)) (define orig-in (current-input-port)) (define (raw-send msg) (cond [ch (place-channel-put ch msg)] [else (write msg orig-out) (flush-output orig-out)])) (define (raw-recv) (cond [ch (place-channel-get ch)] [else (read orig-in)])) (define (pdo-send msg) (with-handlers ([exn:fail? (lambda (x) (log-error (format "WORKER SEND MESSAGE ERROR: ~a" (exn-message x))) (exit 1))]) (DEBUG_COMM (fprintf orig-err "WSENDING ~v\n" msg)) (raw-send msg))) (define (pdo-recv) (with-handlers ([exn:fail? (lambda (x) (log-error (format "WORKER RECEIVE MESSAGE ERROR: ~a" (exn-message x))) (exit 1))]) (define r (raw-recv)) (DEBUG_COMM (fprintf orig-err "WRECVEIVED ~v\n" r)) r)) (setup-proc (deserialize (fasl->s-exp (pdo-recv))) (lambda (set-proc) (let/ec die-k (define (recv/reqp) (pdo-recv)) (define (send/msgp msg) (pdo-send msg)) (let ([msg-proc (set-proc send/msgp recv/reqp die-k)]) (let loop ([i 0]) (DEBUG_COMM (fprintf orig-err "WAITING ON CONTROLLER TO RESPOND ~v ~v\n" orig-in i)) (let ([out-str-port (open-output-string)] [err-str-port (open-output-string)]) (define (send/resp type) (pdo-send (list type (get-output-string out-str-port) (get-output-string err-str-port)))) (define (send/successp result) (send/resp (list 'DONE result))) (define (send/errorp message) (send/resp (list 'ERROR message))) (with-handlers ([exn:fail? (lambda (x) (send/errorp (exn-message x)) (loop (add1 i)))]) (parameterize ([current-output-port out-str-port] [current-error-port err-str-port]) (let ([msg (pdo-recv)]) (match msg [(list 'DIE) (void)] [_ (msg-proc msg send/successp send/errorp) (loop (add1 i))]))))))))))) (define-syntax (lambda-worker stx) (syntax-parse stx #:literals (match-message-loop) [(_ (globals-list:id ...) globals-body:expr ... (match-message-loop [work:expr work-body:expr ...] ...)) (with-syntax ([body (gen-worker-body #'(list globals-list ...) #'(globals-body ...) #'([work work-body ...] ...) #'ch)]) #'(lambda (ch) body))])) (define-syntax (parallel-do stx) (syntax-case stx (define-worker) [(_ worker-count initalmsg work-queue (define-worker (name args ...) body ...)) (begin (with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda-worker (args ...) body ...))]) (syntax-local-lift-provide #'(rename interal-def-name name))) #'(let ([wq work-queue]) (define module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference))))) (parallel-do-event-loop module-path 'name initalmsg wq worker-count) (queue/results wq)))]))