From 4d7584d9b7e5472cb6239ec146f9911a893c6364 Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Wed, 13 Jul 2011 08:33:03 -0600 Subject: [PATCH] fix indentation --- collects/setup/parallel-build.rkt | 319 +++++++++++++++--------------- collects/setup/parallel-do.rkt | 189 +++++++++--------- 2 files changed, 257 insertions(+), 251 deletions(-) diff --git a/collects/setup/parallel-build.rkt b/collects/setup/parallel-build.rkt index 26bdf22481..b6dd2007ba 100644 --- a/collects/setup/parallel-build.rkt +++ b/collects/setup/parallel-build.rkt @@ -23,24 +23,25 @@ ; (begin a ...) ) -(define lock-manager% (class object% - (field (locks (make-hash))) - (define/public (lock fn wrkr) - (let ([v (hash-ref locks fn #f)]) - (hash-set! locks fn - (if v - (match v [(list w waitlst) (list w (append waitlst (list wrkr)))]) - (begin - (wrkr/send wrkr (list 'locked)) - (list wrkr null)))) - (not v))) - (define/public (unlock fn) - (match (hash-ref locks fn) - [(list w waitlst) - (for ([x (second (hash-ref locks fn))]) - (wrkr/send x (list 'compiled))) - (hash-remove! locks fn)])) - (super-new))) +(define lock-manager% + (class object% + (field (locks (make-hash))) + (define/public (lock fn wrkr) + (let ([v (hash-ref locks fn #f)]) + (hash-set! locks fn + (if v + (match v [(list w waitlst) (list w (append waitlst (list wrkr)))]) + (begin + (wrkr/send wrkr (list 'locked)) + (list wrkr null)))) + (not v))) + (define/public (unlock fn) + (match (hash-ref locks fn) + [(list w waitlst) + (for ([x (second (hash-ref locks fn))]) + (wrkr/send x (list 'compiled))) + (hash-remove! locks fn)])) + (super-new))) (define/class/generics lock-manager% (lm/lock lock fn wrkr) @@ -50,152 +51,154 @@ (cond [(path? x) (path->bytes x)] [(string? x) (string->bytes/locale x)])) -(define collects-queue% (class* object% (work-queue<%>) - (init-field cclst printer append-error) - (field (lock-mgr (new lock-manager%))) - (field (hash (make-hash))) - (inspect #f) +(define collects-queue% + (class* object% (work-queue<%>) + (init-field cclst printer append-error) + (field (lock-mgr (new lock-manager%))) + (field (hash (make-hash))) + (inspect #f) - (define/public (work-done work wrkr msg) - (match (list work msg) - [(list (list cc file last) (list result-type out err)) - (begin0 - (match result-type - [(list 'ERROR msg) - (append-error cc "making" (exn msg (current-continuation-marks)) out err "error") + (define/public (work-done work wrkr msg) + (match (list work msg) + [(list (list cc file last) (list result-type out err)) + (begin0 + (match result-type + [(list 'ERROR msg) + (append-error cc "making" (exn msg (current-continuation-marks)) out err "error") + #t] + [(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f] + [(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f] + ['DONE + (define (string-!empty? s) (not (zero? (string-length s)))) + (when (ormap string-!empty? (list out err)) + (append-error cc "making" null out err "output")) + ;(when last (printer (current-output-port) "made" "~a" (cc-name cc))) + #t] + [else (eprintf "Failed trying to match:\n~v\n" result-type)]))] + [else + (match work + [(list-rest (list cc file last) message) + (append-error cc "making" null "" "" "error") + (eprintf "work-done match cc failed.\n") + (eprintf "trying to match:\n~a\n" (list work msg)) #t] + [else + (eprintf "work-done match cc failed.\n") + (eprintf "trying to match:\n~a\n" (list work msg)) + (eprintf "FATAL\n") + (exit 1)])])) + + ;; assigns a collection to each worker to be compiled + ;; when it runs out of collections, steals work from other workers collections + (define/public (get-job workerid) + (define (say-making x) + (unless (null? x) + (printer (current-output-port) "making" "~a" (cc-name (car (car x)))))) + (define (find-job-in-cc cc id) + (define (retry) (get-job workerid)) + (define (build-job cc file last) + (values + (list cc file last) + (list (->bytes (cc-name cc)) + (->bytes (cc-path cc)) + (->bytes file)))) + (match cc + [(list) + (hash-remove! hash id) (retry)] + [(list (list cc (list) (list))) ;empty collect + (hash-remove! hash id) (retry)] + [(cons (list cc (list) (list)) tail) ;empty parent collect + (say-making tail) + (hash-set! hash id tail) (retry)] + [(cons (list cc (list) subs) tail) ;empty srcs list + (define nl (append subs tail)) + (say-making nl) + (hash-set! hash id nl) (retry)] + [(cons (list cc (list file) subs) tail) + (define nl (append subs tail)) + (hash-set! hash id nl) + (say-making nl) + (build-job cc file #t)] + [(cons (list cc (cons file ft) subs) tail) + (hash-set! hash id (cons (list cc ft subs) tail)) + (build-job cc file #f)] + [else + (eprintf "get-job match cc failed.\n") + (eprintf "trying to match:\n~v\n" cc)])) + + + ; find a cc + (cond + ; lookup already assigned cc + [(hash-ref hash workerid #f) => (lambda (x) + (find-job-in-cc x workerid))] + ; get next cc from cclst + [(pair? cclst) + (define workercc (list (car cclst))) + (say-making workercc) + (set! cclst (cdr cclst)) + (hash-set! hash workerid workercc) + (find-job-in-cc workercc workerid)] + ; try to steal work from another workers cc + [(hash-iterate-first hash) => (lambda (x) + (find-job-in-cc (hash-iterate-value hash x) + (hash-iterate-key hash x)))])) + ; no work left + ; should never get here, get-job only called when the queue has work + + (define/public (has-jobs?) + (define (hasjob? cct) + (let loop ([cct cct]) + (ormap (lambda (x) (or ((length (second x)) . > . 0) (loop (third x)))) cct))) + + (or (hasjob? cclst) + (for/or ([cct (in-hash-values hash)]) + (hasjob? cct)))) + + (define/public (jobs-cnt) + (define (count-cct cct) + (let loop ([cct cct]) + (apply + (map (lambda (x) (+ (length (second x)) (loop (third x)))) cct)))) + + (+ (count-cct cclst) + (for/fold ([cnt 0]) ([cct (in-hash-values hash)]) + (+ cnt (count-cct cct))))) + (define/public (get-results) (void)) + (super-new))) + +(define file-list-queue% + (class* object% (work-queue<%>) + (init-field filelist handler) + (field (lock-mgr (new lock-manager%))) + (inspect #f) + + (define/public (work-done work wrkr msg) + (match msg + [(list result-type out err) + (match result-type [(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f] [(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f] + [(list 'ERROR msg) (handler 'error work msg out err) #t] ['DONE (define (string-!empty? s) (not (zero? (string-length s)))) - (when (ormap string-!empty? (list out err)) - (append-error cc "making" null out err "output")) - ;(when last (printer (current-output-port) "made" "~a" (cc-name cc))) - #t] - [else (eprintf "Failed trying to match:\n~v\n" result-type)]))] - [else - (match work - [(list-rest (list cc file last) message) - (append-error cc "making" null "" "" "error") - (eprintf "work-done match cc failed.\n") - (eprintf "trying to match:\n~a\n" (list work msg)) - #t] - [else - (eprintf "work-done match cc failed.\n") - (eprintf "trying to match:\n~a\n" (list work msg)) - (eprintf "FATAL\n") - (exit 1)])])) - - ;; assigns a collection to each worker to be compiled - ;; when it runs out of collections, steals work from other workers collections - (define/public (get-job workerid) - (define (say-making x) - (unless (null? x) - (printer (current-output-port) "making" "~a" (cc-name (car (car x)))))) - (define (find-job-in-cc cc id) - (define (retry) (get-job workerid)) - (define (build-job cc file last) - (values - (list cc file last) - (list (->bytes (cc-name cc)) - (->bytes (cc-path cc)) - (->bytes file)))) - (match cc - [(list) - (hash-remove! hash id) (retry)] - [(list (list cc (list) (list))) ;empty collect - (hash-remove! hash id) (retry)] - [(cons (list cc (list) (list)) tail) ;empty parent collect - (say-making tail) - (hash-set! hash id tail) (retry)] - [(cons (list cc (list) subs) tail) ;empty srcs list - (define nl (append subs tail)) - (say-making nl) - (hash-set! hash id nl) (retry)] - [(cons (list cc (list file) subs) tail) - (define nl (append subs tail)) - (hash-set! hash id nl) - (say-making nl) - (build-job cc file #t)] - [(cons (list cc (cons file ft) subs) tail) - (hash-set! hash id (cons (list cc ft subs) tail)) - (build-job cc file #f)] - [else - (eprintf "get-job match cc failed.\n") - (eprintf "trying to match:\n~v\n" cc)])) - - - ; find a cc - (cond - ; lookup already assigned cc - [(hash-ref hash workerid #f) => (lambda (x) - (find-job-in-cc x workerid))] - ; get next cc from cclst - [(pair? cclst) - (define workercc (list (car cclst))) - (say-making workercc) - (set! cclst (cdr cclst)) - (hash-set! hash workerid workercc) - (find-job-in-cc workercc workerid)] - ; try to steal work from another workers cc - [(hash-iterate-first hash) => (lambda (x) - (find-job-in-cc (hash-iterate-value hash x) - (hash-iterate-key hash x)))])) - ; no work left - ; should never get here, get-job only called when the queue has work - - (define/public (has-jobs?) - (define (hasjob? cct) - (let loop ([cct cct]) - (ormap (lambda (x) (or ((length (second x)) . > . 0) (loop (third x)))) cct))) - - (or (hasjob? cclst) - (for/or ([cct (in-hash-values hash)]) - (hasjob? cct)))) - - (define/public (jobs-cnt) - (define (count-cct cct) - (let loop ([cct cct]) - (apply + (map (lambda (x) (+ (length (second x)) (loop (third x)))) cct)))) - - (+ (count-cct cclst) - (for/fold ([cnt 0]) ([cct (in-hash-values hash)]) - (+ cnt (count-cct cct))))) - (define/public (get-results) (void)) - (super-new))) - -(define file-list-queue% (class* object% (work-queue<%>) - (init-field filelist handler) - (field (lock-mgr (new lock-manager%))) - (inspect #f) - - (define/public (work-done work wrkr msg) - (match msg - [(list result-type out err) - (match result-type - [(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f] - [(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f] - [(list 'ERROR msg) (handler 'error work msg out err) #t] - ['DONE - (define (string-!empty? s) (not (zero? (string-length s)))) - (if (ormap string-!empty? (list out err)) - (handler 'output work "" out err) - (handler 'done work "" "" "")) - #t])] - [else - (handler 'fatalerror (format "Error matching work: ~a queue ~a" work filelist) "" "") #t])) - - (define/public (get-job workerid) - (match filelist - [(cons hd tail) - (define-values (dir file b) (split-path hd)) - (set! filelist tail) - (values hd (list (->bytes hd) (->bytes dir) (->bytes file)))] - [(list) null])) - (define/public (has-jobs?) (not (null? filelist))) - (define/public (jobs-cnt) (length filelist)) - (define/public (get-results) (void)) - (super-new))) + (if (ormap string-!empty? (list out err)) + (handler 'output work "" out err) + (handler 'done work "" "" "")) + #t])] + [else + (handler 'fatalerror (format "Error matching work: ~a queue ~a" work filelist) "" "") #t])) + + (define/public (get-job workerid) + (match filelist + [(cons hd tail) + (define-values (dir file b) (split-path hd)) + (set! filelist tail) + (values hd (list (->bytes hd) (->bytes dir) (->bytes file)))] + [(list) null])) + (define/public (has-jobs?) (not (null? filelist))) + (define/public (jobs-cnt) (length filelist)) + (define/public (get-results) (void)) + (super-new))) (define (parallel-build work-queue worker-count) (parallel-do diff --git a/collects/setup/parallel-do.rkt b/collects/setup/parallel-do.rkt index 4a6be7c183..6aa8d0981b 100644 --- a/collects/setup/parallel-do.rkt +++ b/collects/setup/parallel-do.rkt @@ -58,88 +58,90 @@ get-id get-out)) -(define worker% (class* object% (worker<%>) - (field [id 0] - [process-handle null] - [out null] - [in null] - [err null] - [module-path null] - [funcname null]) +(define worker% + (class* object% (worker<%>) + (field [id 0] + [process-handle null] + [out null] + [in null] + [err null] + [module-path null] + [funcname null]) - (define/public (spawn _id _module-path _funcname [initialmsg #f]) - (set! module-path _module-path) - (set! funcname _funcname) - (define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")) - (define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f)) - (let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)]) - (set! id _id) - (set! process-handle _process-handle) - (set! out _out) - (set! in _in) - (set! err _err) - (send/msg dynamic-require-cmd) - (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))) - (define/public (send/msg msg) - (with-handlers ([exn:fail? - (lambda (x) - (eprintf "While sending message to parallel-do worker: ~a ~a\n" id (exn-message x)) - (exit 1))]) - (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg)) - (write msg in) (flush-output in))) - (define/public (recv/msg) - (with-handlers ([exn:fail? - (lambda (x) - (eprintf "While receiving message from parallel-do worker ~a ~a\n" id (exn-message x)) - (exit 1))]) - (define r (read out)) - (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r)) - r)) - (define/public (read-all) (port->string out)) - (define/public (get-id) id) - (define/public (get-out) out) - (define/public (kill) - (DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id)) - (close-output-port in) - (close-input-port out) - (subprocess-kill process-handle #t)) - (define/public (break) (kill)) - (define/public (kill/respawn worker-cmdline-list [initialmsg #f]) - (kill) - (spawn id module-path funcname [initialmsg #f])) - (define/public (wait) (subprocess-wait process-handle)) - (super-new))) + (define/public (spawn _id _module-path _funcname [initialmsg #f]) + (set! module-path _module-path) + (set! funcname _funcname) + (define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")) + (define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f)) + (let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)]) + (set! id _id) + (set! process-handle _process-handle) + (set! out _out) + (set! in _in) + (set! err _err) + (send/msg dynamic-require-cmd) + (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))) + (define/public (send/msg msg) + (with-handlers ([exn:fail? + (lambda (x) + (eprintf "While sending message to parallel-do worker: ~a ~a\n" id (exn-message x)) + (exit 1))]) + (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg)) + (write msg in) (flush-output in))) + (define/public (recv/msg) + (with-handlers ([exn:fail? + (lambda (x) + (eprintf "While receiving message from parallel-do worker ~a ~a\n" id (exn-message x)) + (exit 1))]) + (define r (read out)) + (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r)) + r)) + (define/public (read-all) (port->string out)) + (define/public (get-id) id) + (define/public (get-out) out) + (define/public (kill) + (DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id)) + (close-output-port in) + (close-input-port out) + (subprocess-kill process-handle #t)) + (define/public (break) (kill)) + (define/public (kill/respawn worker-cmdline-list [initialmsg #f]) + (kill) + (spawn id module-path funcname [initialmsg #f])) + (define/public (wait) (subprocess-wait process-handle)) + (super-new))) -(define place-worker% (class* object% (worker<%>) - (init-field [id 0] - [pl null]) - - (define/public (spawn _id module-path funcname [initialmsg #f]) +(define place-worker% + (class* object% (worker<%>) + (init-field [id 0] + [pl null]) + + (define/public (spawn _id module-path funcname [initialmsg #f]) (set! id _id) (set! pl (dynamic-place (string->path module-path) funcname)) (when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))) - (define/public (send/msg msg) - (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg)) - (place-channel-put pl msg)) - (define/public (recv/msg) - (define r (place-channel-get pl)) - (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r)) - r) - (define/public (read-all) "") - (define/public (get-id) id) - (define/public (get-out) pl) - (define/public (kill) #f) - (define/public (break) (place-break pl)) - (define/public (wait) (place-wait pl)) - (super-new))) + (define/public (send/msg msg) + (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg)) + (place-channel-put pl msg)) + (define/public (recv/msg) + (define r (place-channel-get pl)) + (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r)) + r) + (define/public (read-all) "") + (define/public (get-id) id) + (define/public (get-out) pl) + (define/public (kill) #f) + (define/public (break) (place-break pl)) + (define/public (wait) (place-wait pl)) + (super-new))) -(define work-queue<%> (interface () - get-job - work-done - has-jobs? - jobs-cnt - get-results)) + (define work-queue<%> (interface () + get-job + work-done + has-jobs? + jobs-cnt + get-results)) (define/class/generics/provide worker<%> (wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg) @@ -260,25 +262,26 @@ ;(printf " ~a" (add1 i)) (flush-output))(printf "\n") )])))) -(define list-queue% (class* object% (work-queue<%>) - (init-field queue create-job-thunk success-thunk failure-thunk) - (field [results null]) +(define list-queue% + (class* object% (work-queue<%>) + (init-field queue create-job-thunk success-thunk failure-thunk) + (field [results null]) - (define/public (work-done work workerid msg) - (match msg - [(list (list 'DONE result) stdout stderr) - (set! results (cons (success-thunk work result stdout stderr) results))] - [(list (list 'ERROR errmsg) stdout stderr) - (failure-thunk work errmsg stdout stderr)])) - (define/public (get-job workerid) - (match queue - [(cons h t) - (set! queue t) - (values h (create-job-thunk h))])) - (define/public (has-jobs?) (not (null? queue))) - (define/public (get-results) (reverse results)) - (define/public (jobs-cnt) (length queue)) - (super-new))) + (define/public (work-done work workerid msg) + (match msg + [(list (list 'DONE result) stdout stderr) + (set! results (cons (success-thunk work result stdout stderr) results))] + [(list (list 'ERROR errmsg) stdout stderr) + (failure-thunk work errmsg stdout stderr)])) + (define/public (get-job workerid) + (match queue + [(cons h t) + (set! queue t) + (values h (create-job-thunk h))])) + (define/public (has-jobs?) (not (null? queue))) + (define/public (get-results) (reverse results)) + (define/public (jobs-cnt) (length queue)) + (super-new))) (define (list-queue list-of-work create-job-thunk job-success-thunk job-failure-thunk) (make-object list-queue% list-of-work create-job-thunk job-success-thunk job-failure-thunk))