From e3d7ffbe8295927eba5eb465a6b575fae1bd2b28 Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Mon, 10 Jan 2011 14:16:09 -0700 Subject: [PATCH] convert parallel-build to classes --- collects/setup/parallel-build.rkt | 108 +++++++++---- collects/setup/parallel-do.rkt | 258 +++++++++++++++++------------- 2 files changed, 219 insertions(+), 147 deletions(-) diff --git a/collects/setup/parallel-build.rkt b/collects/setup/parallel-build.rkt index 0bf98f9ed6..1eb9f656ed 100644 --- a/collects/setup/parallel-build.rkt +++ b/collects/setup/parallel-build.rkt @@ -6,36 +6,67 @@ racket/path setup/collects setup/parallel-do - unstable/generics) + racket/class) + +(define Lock-Manager% (class object% + (field (locks (make-hash))) + (define/public (lock fn wrkr) + (let ([v (hash-ref locks fn #f)]) + (hash-set! locks fn + (if v + (match v [(list w waitlst) (list w (append waitlst (list wrkr)))]) + (begin + (send wrkr send/msg 'locked) + (list wrkr null)))) + (not v))) + (define/public (unlock fn) + (for ([x (second (hash-ref locks fn))]) + (wrkr/send x 'compiled)) + (hash-remove! locks fn)) + (super-new))) (provide parallel-compile parallel-build-worker) -(define-struct collects-queue (cclst hash collects-dir printer append-error) #:transparent - #:mutable - #:property prop:jobqueue - (define-methods jobqueue - (define (work-done jobqueue work workerid msg) - (match (list work msg) - [(list (list cc file last) (list result-type out err)) - (let ([cc-name (cc-name cc)]) - (match result-type - [(list 'ERROR msg) - ((collects-queue-append-error jobqueue) cc "making" (exn msg (current-continuation-marks)) out err "error")] - ['DONE - (when (or (not (zero? (string-length out))) (not (zero? (string-length err)))) - ((collects-queue-append-error jobqueue) cc "making" null out err "output"))]) - (when last ((collects-queue-printer jobqueue) (current-output-port) "made" "~a" cc-name )))] - [else - (match work - [(list-rest (list cc file last) message) - ((collects-queue-append-error jobqueue) cc "making" null "" "" "error") - (eprintf "work-done match cc failed.\n") - (eprintf "trying to match:\n~a\n" (list work msg))])])) + +(define CollectsQueue% (class* object% (WorkQueue<%>) + (init-field cclst collects-dir printer append-error) + (field (lock-mgr (new Lock-Manager%))) + (field (hash (make-hash))) + (inspect #f) + + (define/public (work-done work wrkr msg) + (match (list work msg) + [(list (list cc file last) (list result-type out err)) + (begin0 + (match result-type + [(list 'ERROR msg) + (append-error cc "making" (exn msg (current-continuation-marks)) out err "error") + #t] + ;[(list 'LOCK fn) (lock fn wrkr) #f] + ;[(list 'UNLOCK fn) (unlock fn) #f] + ['DONE + (define (string-!empty? s) (not (zero? (string-length s)))) + (when (ormap string-!empty? (list out err)) + (append-error cc "making" null out err "output")) + #t]) + (when last (printer (current-output-port) "made" "~a" (cc-name cc))))] + [else + (match work + [(list-rest (list cc file last) message) + (append-error cc "making" null "" "" "error") + (eprintf "work-done match cc failed.\n") + (eprintf "trying to match:\n~a\n" (list work msg)) + #t] + [else + (eprintf "work-done match cc failed.\n") + (eprintf "trying to match:\n~a\n" (list work msg)) + (eprintf "FATAL\n") + (exit 1)])])) ;; assigns a collection to each worker to be compiled ;; when it runs out of collections, steals work from other workers collections - (define (get-job jobqueue workerid) + (define/public (get-job workerid) (define (hash/first-pair hash) (match (hash-iterate-first hash) [#f #f] @@ -46,12 +77,12 @@ [#f #f] [x (hash-set! hash key x) x])))) (define (take-cc) - (match (collects-queue-cclst jobqueue) + (match cclst [(list) #f] [(cons h t) - (set-collects-queue-cclst! jobqueue t) + (set! cclst t) (list h)])) - (let ([w-hash (collects-queue-hash jobqueue)]) + (let ([w-hash hash]) (define (build-job cc file last) (define (->bytes x) (cond [(path? x) (path->bytes x)] @@ -87,22 +118,26 @@ (match (hash/first-pair w-hash) [(cons id cc) (find-job-in-cc cc id)])] [cc (find-job-in-cc cc workerid)])))) - (define (has-jobs? jobqueue) + + (define/public (has-jobs?) (define (hasjob? cct) (let loop ([cct cct]) (ormap (lambda (x) (or ((length (second x)) . > . 0) (loop (third x)))) cct))) - (or (hasjob? (collects-queue-cclst jobqueue)) - (for/or ([cct (in-hash-values (collects-queue-hash jobqueue))]) + (or (hasjob? cclst) + (for/or ([cct (in-hash-values hash)]) (hasjob? cct)))) - (define (jobs-cnt jobqueue) + + (define/public (jobs-cnt) (define (count-cct cct) (let loop ([cct cct]) (apply + (map (lambda (x) (+ (length (second x)) (loop (third x)))) cct)))) - (+ (count-cct (collects-queue-cclst jobqueue)) - (for/fold ([cnt 0]) ([cct (in-hash-values (collects-queue-hash jobqueue))]) - (+ cnt (count-cct cct))))))) + (+ (count-cct cclst) + (for/fold ([cnt 0]) ([cct (in-hash-values hash)]) + (+ cnt (count-cct cct))))) + (define/public (get-results) (void)) + (super-new))) (define (parallel-compile worker-count setup-fprintf append-error collects-tree) (let ([collects-dir (current-collects-path)]) @@ -114,10 +149,15 @@ (path->string collects-dir) "-l" "setup/parallel-build-worker.rkt") - (make-collects-queue collects-tree (make-hash) collects-dir setup-fprintf append-error) + (make-object CollectsQueue% collects-tree collects-dir setup-fprintf append-error) worker-count 999999999))) (define (parallel-build-worker) + (define prev-uncaught-exception-handler (uncaught-exception-handler)) + (uncaught-exception-handler (lambda (x) + (when (exn:break? x) (exit 1)) + (prev-uncaught-exception-handler x))) + (let ([cmc (make-caching-managed-compile-zo)] [worker-id (read)]) (let loop () diff --git a/collects/setup/parallel-do.rkt b/collects/setup/parallel-do.rkt index fc7629e063..785d28b104 100644 --- a/collects/setup/parallel-do.rkt +++ b/collects/setup/parallel-do.rkt @@ -6,8 +6,8 @@ racket/fasl racket/match racket/path + racket/class racket/serialize - unstable/generics racket/stxparam (for-syntax syntax/parse racket/base)) @@ -20,16 +20,71 @@ match-message-loop send/success send/error - jobqueue - prop:jobqueue) + WorkQueue<%> + wrkr/send + define/class/generics) + +(define Worker% (class object% + (field [id 0] + [process-handle null] + [out null] + [in null] + [err null]) + + (define/public (spawn _id worker-cmdline-list initialcode initialmsg) + (let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)]) + (set! id _id) + (set! process-handle _process-handle) + (set! out _out) + (set! in _in) + (set! err _err) + (when initialcode (send/msg initialcode)) + (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))) + + (define/public (send/msg msg) (write msg in) (flush-output in)) + (define/public (recv/msg) (read out)) + (define/public (get-id) id) + (define/public (get-out) out) + (define/public (kill) + (eprintf "KILLING WORKER ~a\n" id) + (close-output-port in) + (close-input-port out) + (subprocess-kill process-handle #t)) + (define/public (wait) (subprocess-wait process-handle)) + (super-new))) + +(define WorkQueue<%> (interface () + get-job + work-done + has-jobs? + jobs-cnt + get-results)) + +(define-syntax-rule (mk-generic func clss method args ...) + (begin + (define g (generic clss method)) + (define (func obj args ...) + (send-generic obj g args ...)))) + +(define-syntax-rule (define/class/generics class (func method args ...) ...) + (begin + (mk-generic func class method args ...) ...)) + +(define/class/generics Worker% + (wrkr/send send/msg msg) + (wrkr/kill kill) + (wrkr/recv recv/msg) + (wrkr/id get-id) + (wrkr/out get-out) + (wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg)) + +(define/class/generics WorkQueue<%> + (queue/get get-job wrkrid) + (queue/work-done work-done node wrkr msg) + (queue/has has-jobs?) + (queue/count jobs-cnt)) -(define-generics (jobqueue prop:jobqueue jobqueue?) - (work-done jobqueue work workerid msg) - (get-job jobqueue workerid) - (has-jobs? jobqueue) - (jobs-cnt jobqueue)) -(define-struct worker (id process-handle out in err)) (define (current-executable-path) (parameterize ([current-directory (find-system-path 'orig-dir)]) (find-executable-path (find-system-path 'exec-file) #f))) @@ -40,27 +95,13 @@ (path->complete-path p (or (path-only (current-executable-path)) (find-system-path 'orig-dir)))))) - (define (parallel-do-event-loop initialcode initialmsg worker-cmdline-list jobqueue nprocs stopat) - (define (send/msg x ch) - (write x ch) - (flush-output ch)) (define (spawn id) - (let-values ([(process-handle out in err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)]) - (when initialcode - (send/msg initialcode in)) - (when initialmsg - (send/msg (s-exp->fasl (serialize (initialmsg id))) in)) - (make-worker id process-handle out in err))) - (define (kill-worker wrkr) - (match wrkr - [(worker id process-handle out in err) - (eprintf "KILLING WORKER ~a ~a\n" id wrkr) - (close-output-port in) - (close-input-port out) - (subprocess-kill process-handle #t)])) - (define (jobs? x) (has-jobs? jobqueue)) - (define (empty? x) (not (has-jobs? jobqueue ))) + (define wrkr (new Worker%)) + (wrkr/spawn wrkr id worker-cmdline-list initialcode initialmsg) + wrkr) + (define (jobs?) (queue/has jobqueue)) + (define (empty?) (not (queue/has jobqueue))) (define workers #f) (dynamic-wind @@ -74,91 +115,86 @@ (eprintf "Error count reached ~a, exiting\n" x) (exit 1)) #f)) - (letrec ([loop (match-lambda* - ;; QUEUE IDLE INFLIGHT COUNT - ;; Reached stopat count STOP - [(list idle inflight count (? error-threshold error-count)) (void)] - [(list idle inflight (? (lambda (x) (= x stopat))) error-count) (printf "DONE AT LIMIT\n")] - ;; Send work to idle worker - [(list (and (? jobs?) (cons wrkr idle)) inflight count error-count) - (let-values ([(job cmd-list) (get-job jobqueue (worker-id wrkr))]) - (let retry-loop ([wrkr wrkr] - [error-count error-count]) - (error-threshold error-count) - (match wrkr - [(worker i s o in e) + (let loop ([idle workers] + [inflight null] + [count 0] + [error-count 0]) + (cond + [(error-threshold error-count)] + ;; Reached stopat count STOP + [(= count stopat) (printf "DONE AT LIMIT\n")] + ;; Queue empty and all workers idle, we are all done + [(and (empty?) (null? inflight)) (set! workers idle)] + ;; Send work to idle worker + [(and (jobs?) (pair? idle)) + (match idle [(cons wrkr idle-rest) + (let-values ([(job cmd-list) (queue/get jobqueue (wrkr/id wrkr))]) + (let retry-loop ([wrkr wrkr] + [error-count error-count]) + (error-threshold error-count) + (with-handlers* ([exn:fail? (lambda (e) + (printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e)) + (wrkr/kill wrkr) + (retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))]) + (wrkr/send wrkr cmd-list)) + (loop idle-rest (cons (list job wrkr) inflight) count error-count)))])] + + [else + (define (kill/remove-dead-worker node-worker wrkr) + (wrkr/kill wrkr) + (loop (cons (spawn (wrkr/id wrkr)) idle) + (remove node-worker inflight) + count + (add1 error-count))) + (apply sync (for/list ([node-worker inflight]) + (match node-worker [(list node wrkr) + (define out (wrkr/out wrkr)) + (handle-evt out (λ (e) (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER WRITE ERROR - writing to worker: ~a\n" (exn-message e)) - (kill-worker wrkr) - (retry-loop (spawn i) (add1 error-count)))]) - (send/msg cmd-list in))]) - (loop idle (cons (list job wrkr) inflight) count error-count)))] - ;; Queue empty and all workers idle, we are all done - [(list (and (? empty?) idle) (list) count error-count) (set! workers idle)] - ;; Wait for reply from worker - [(list idle inflight count error-count) - (define (remove-dead-worker id node-worker) - (loop (cons (spawn id) idle) - (remove node-worker inflight) - count - (add1 error-count))) - - (apply sync (map (λ (node-worker) (match node-worker - [(list node (and wrkr (worker id sh out in err))) - (handle-evt out (λ (e) - (let ([msg (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e)) - (kill-worker wrkr) - (remove-dead-worker id node-worker))]) - (let ([read-msg (read out)]) - (if (pair? read-msg) - read-msg - (begin - (work-done jobqueue node id (string-append read-msg (port->string out))) - (kill-worker wrkr) - (remove-dead-worker id node-worker)))))]) - (work-done jobqueue node id msg) - (loop (cons wrkr idle) - (remove node-worker inflight) - (add1 count) - error-count))))] - [else - (eprintf "parallel-do-event-loop match node-worker failed.\n") - (eprintf "trying to match:\n~a\n" node-worker)])) - - inflight))] - [x - (eprintf "parallel-do-event-loop match-lambda* failed.\n") - (eprintf "trying to match:\n~a\n" x)])]) - (loop workers null 0 0))) + (printf "MASTER READ ERROR - reading from worker: ~a\n" (exn-message e)) + (kill/remove-dead-worker node-worker wrkr))]) + (let ([msg (wrkr/recv wrkr)]) + (if (pair? msg) + (if (queue/work-done jobqueue node wrkr msg) + (loop (cons wrkr idle) + (remove node-worker inflight) + (add1 count) + error-count) + (loop idle inflight count error-count)) + (begin + (queue/work-done jobqueue node wrkr (string-append msg (port->string out))) + (kill/remove-dead-worker node-worker wrkr)))))))] + [else + (eprintf "parallel-do-event-loop match node-worker failed.\n") + (eprintf "trying to match:\n~a\n" node-worker)])))]))) (lambda () - (for ([p workers]) (with-handlers ([exn? void]) (send/msg (list 'DIE) (worker-in p)))) - (for ([p workers]) (subprocess-wait (worker-process-handle p)))))) + (for ([p workers]) (with-handlers ([exn? void]) (wrkr/send p (list 'DIE)))) + (for ([p workers]) (send p wait))))) (define (parallel-do-default-error-handler work error-message outstr errstr) (printf "WORKER ERROR ~a\n" error-message) (printf "STDOUT\n~a=====\n" outstr) (printf "STDERR\n~a=====\n" errstr)) - -(define-struct list-queue (queue results create-job-thunk success-thunk failure-thunk) - #:transparent #:mutable #:property prop:jobqueue - (define-methods jobqueue - (define (work-done jobqueue work workerid msg) - (match msg - [(list (list 'DONE result) stdout stderr) - (let ([result ((list-queue-success-thunk jobqueue) work result stdout stderr)]) - (set-list-queue-results! jobqueue (cons result (list-queue-results jobqueue))))] - [(list (list 'ERROR errmsg) stdout stderr) - ((list-queue-failure-thunk jobqueue) work errmsg stdout stderr)])) - (define (get-job jobqueue workerid) - (match (list-queue-queue jobqueue) - [(cons h t) - (set-list-queue-queue! jobqueue t) - (values h ((list-queue-create-job-thunk jobqueue) h))])) - (define (has-jobs? jobqueue) - (not (null? (list-queue-queue jobqueue)))) - (define (jobs-cnt jobqueue) - (length (list-queue-queue jobqueue))))) + +(define ListQueue% (class* object% (WorkQueue<%>) + (init-field queue create-job-thunk success-thunk failure-thunk) + (field [results null]) + + (define/public (work-done work workerid msg) + (match msg + [(list (list 'DONE result) stdout stderr) + (set! results (cons (success-thunk work result stdout stderr) results))] + [(list (list 'ERROR errmsg) stdout stderr) + (failure-thunk work errmsg stdout stderr)])) + (define/public (get-job workerid) + (match queue + [(cons h t) + (set! queue t) + (values h (create-job-thunk h))])) + (define/public (has-jobs?) (not (null? queue))) + (define/public (get-results) results) + (define/public (jobs-cnt) (length queue)) + (super-new))) (define match-message-loop (lambda (stx) @@ -235,9 +271,9 @@ #`(begin ;(printf "CMDLINE ~v\n" cmdline) ;(printf "INITIALTHUNK ~v\n" initial-stdin-data) - (let ([jobqueue (make-list-queue list-of-work null create-job-thunk job-success-thunk job-failure-thunk)]) + (let ([jobqueue (make-object ListQueue% list-of-work create-job-thunk job-success-thunk job-failure-thunk)]) (parallel-do-event-loop initial-stdin-data initalmsg cmdline jobqueue worker-count 999999999) - (reverse (list-queue-results jobqueue)))))) + (reverse (send jobqueue get-results)))))) (define (gen-dynamic-require-current-module funcname) (with-syntax ([funcname funcname]) #'(let ([module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference))))]) @@ -248,8 +284,4 @@ (syntax-local-lift-provide #'(rename interal-def-name name))) (gen-parallel-do-event-loop-syntax #'(list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))") - (gen-dynamic-require-current-module #'name))] - [funcname - (gen-parallel-do-event-loop-syntax - #'(list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))") - (gen-dynamic-require-current-module #'funcname))]))])) + (gen-dynamic-require-current-module #'name))]))]))