fix indentation
This commit is contained in:
parent
8aca195b35
commit
4d7584d9b7
|
@ -23,24 +23,25 @@
|
||||||
; (begin a ...)
|
; (begin a ...)
|
||||||
)
|
)
|
||||||
|
|
||||||
(define lock-manager% (class object%
|
(define lock-manager%
|
||||||
(field (locks (make-hash)))
|
(class object%
|
||||||
(define/public (lock fn wrkr)
|
(field (locks (make-hash)))
|
||||||
(let ([v (hash-ref locks fn #f)])
|
(define/public (lock fn wrkr)
|
||||||
(hash-set! locks fn
|
(let ([v (hash-ref locks fn #f)])
|
||||||
(if v
|
(hash-set! locks fn
|
||||||
(match v [(list w waitlst) (list w (append waitlst (list wrkr)))])
|
(if v
|
||||||
(begin
|
(match v [(list w waitlst) (list w (append waitlst (list wrkr)))])
|
||||||
(wrkr/send wrkr (list 'locked))
|
(begin
|
||||||
(list wrkr null))))
|
(wrkr/send wrkr (list 'locked))
|
||||||
(not v)))
|
(list wrkr null))))
|
||||||
(define/public (unlock fn)
|
(not v)))
|
||||||
(match (hash-ref locks fn)
|
(define/public (unlock fn)
|
||||||
[(list w waitlst)
|
(match (hash-ref locks fn)
|
||||||
(for ([x (second (hash-ref locks fn))])
|
[(list w waitlst)
|
||||||
(wrkr/send x (list 'compiled)))
|
(for ([x (second (hash-ref locks fn))])
|
||||||
(hash-remove! locks fn)]))
|
(wrkr/send x (list 'compiled)))
|
||||||
(super-new)))
|
(hash-remove! locks fn)]))
|
||||||
|
(super-new)))
|
||||||
|
|
||||||
(define/class/generics lock-manager%
|
(define/class/generics lock-manager%
|
||||||
(lm/lock lock fn wrkr)
|
(lm/lock lock fn wrkr)
|
||||||
|
@ -50,152 +51,154 @@
|
||||||
(cond [(path? x) (path->bytes x)]
|
(cond [(path? x) (path->bytes x)]
|
||||||
[(string? x) (string->bytes/locale x)]))
|
[(string? x) (string->bytes/locale x)]))
|
||||||
|
|
||||||
(define collects-queue% (class* object% (work-queue<%>)
|
(define collects-queue%
|
||||||
(init-field cclst printer append-error)
|
(class* object% (work-queue<%>)
|
||||||
(field (lock-mgr (new lock-manager%)))
|
(init-field cclst printer append-error)
|
||||||
(field (hash (make-hash)))
|
(field (lock-mgr (new lock-manager%)))
|
||||||
(inspect #f)
|
(field (hash (make-hash)))
|
||||||
|
(inspect #f)
|
||||||
|
|
||||||
(define/public (work-done work wrkr msg)
|
(define/public (work-done work wrkr msg)
|
||||||
(match (list work msg)
|
(match (list work msg)
|
||||||
[(list (list cc file last) (list result-type out err))
|
[(list (list cc file last) (list result-type out err))
|
||||||
(begin0
|
(begin0
|
||||||
(match result-type
|
(match result-type
|
||||||
[(list 'ERROR msg)
|
[(list 'ERROR msg)
|
||||||
(append-error cc "making" (exn msg (current-continuation-marks)) out err "error")
|
(append-error cc "making" (exn msg (current-continuation-marks)) out err "error")
|
||||||
|
#t]
|
||||||
|
[(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f]
|
||||||
|
[(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f]
|
||||||
|
['DONE
|
||||||
|
(define (string-!empty? s) (not (zero? (string-length s))))
|
||||||
|
(when (ormap string-!empty? (list out err))
|
||||||
|
(append-error cc "making" null out err "output"))
|
||||||
|
;(when last (printer (current-output-port) "made" "~a" (cc-name cc)))
|
||||||
|
#t]
|
||||||
|
[else (eprintf "Failed trying to match:\n~v\n" result-type)]))]
|
||||||
|
[else
|
||||||
|
(match work
|
||||||
|
[(list-rest (list cc file last) message)
|
||||||
|
(append-error cc "making" null "" "" "error")
|
||||||
|
(eprintf "work-done match cc failed.\n")
|
||||||
|
(eprintf "trying to match:\n~a\n" (list work msg))
|
||||||
#t]
|
#t]
|
||||||
|
[else
|
||||||
|
(eprintf "work-done match cc failed.\n")
|
||||||
|
(eprintf "trying to match:\n~a\n" (list work msg))
|
||||||
|
(eprintf "FATAL\n")
|
||||||
|
(exit 1)])]))
|
||||||
|
|
||||||
|
;; assigns a collection to each worker to be compiled
|
||||||
|
;; when it runs out of collections, steals work from other workers collections
|
||||||
|
(define/public (get-job workerid)
|
||||||
|
(define (say-making x)
|
||||||
|
(unless (null? x)
|
||||||
|
(printer (current-output-port) "making" "~a" (cc-name (car (car x))))))
|
||||||
|
(define (find-job-in-cc cc id)
|
||||||
|
(define (retry) (get-job workerid))
|
||||||
|
(define (build-job cc file last)
|
||||||
|
(values
|
||||||
|
(list cc file last)
|
||||||
|
(list (->bytes (cc-name cc))
|
||||||
|
(->bytes (cc-path cc))
|
||||||
|
(->bytes file))))
|
||||||
|
(match cc
|
||||||
|
[(list)
|
||||||
|
(hash-remove! hash id) (retry)]
|
||||||
|
[(list (list cc (list) (list))) ;empty collect
|
||||||
|
(hash-remove! hash id) (retry)]
|
||||||
|
[(cons (list cc (list) (list)) tail) ;empty parent collect
|
||||||
|
(say-making tail)
|
||||||
|
(hash-set! hash id tail) (retry)]
|
||||||
|
[(cons (list cc (list) subs) tail) ;empty srcs list
|
||||||
|
(define nl (append subs tail))
|
||||||
|
(say-making nl)
|
||||||
|
(hash-set! hash id nl) (retry)]
|
||||||
|
[(cons (list cc (list file) subs) tail)
|
||||||
|
(define nl (append subs tail))
|
||||||
|
(hash-set! hash id nl)
|
||||||
|
(say-making nl)
|
||||||
|
(build-job cc file #t)]
|
||||||
|
[(cons (list cc (cons file ft) subs) tail)
|
||||||
|
(hash-set! hash id (cons (list cc ft subs) tail))
|
||||||
|
(build-job cc file #f)]
|
||||||
|
[else
|
||||||
|
(eprintf "get-job match cc failed.\n")
|
||||||
|
(eprintf "trying to match:\n~v\n" cc)]))
|
||||||
|
|
||||||
|
|
||||||
|
; find a cc
|
||||||
|
(cond
|
||||||
|
; lookup already assigned cc
|
||||||
|
[(hash-ref hash workerid #f) => (lambda (x)
|
||||||
|
(find-job-in-cc x workerid))]
|
||||||
|
; get next cc from cclst
|
||||||
|
[(pair? cclst)
|
||||||
|
(define workercc (list (car cclst)))
|
||||||
|
(say-making workercc)
|
||||||
|
(set! cclst (cdr cclst))
|
||||||
|
(hash-set! hash workerid workercc)
|
||||||
|
(find-job-in-cc workercc workerid)]
|
||||||
|
; try to steal work from another workers cc
|
||||||
|
[(hash-iterate-first hash) => (lambda (x)
|
||||||
|
(find-job-in-cc (hash-iterate-value hash x)
|
||||||
|
(hash-iterate-key hash x)))]))
|
||||||
|
; no work left
|
||||||
|
; should never get here, get-job only called when the queue has work
|
||||||
|
|
||||||
|
(define/public (has-jobs?)
|
||||||
|
(define (hasjob? cct)
|
||||||
|
(let loop ([cct cct])
|
||||||
|
(ormap (lambda (x) (or ((length (second x)) . > . 0) (loop (third x)))) cct)))
|
||||||
|
|
||||||
|
(or (hasjob? cclst)
|
||||||
|
(for/or ([cct (in-hash-values hash)])
|
||||||
|
(hasjob? cct))))
|
||||||
|
|
||||||
|
(define/public (jobs-cnt)
|
||||||
|
(define (count-cct cct)
|
||||||
|
(let loop ([cct cct])
|
||||||
|
(apply + (map (lambda (x) (+ (length (second x)) (loop (third x)))) cct))))
|
||||||
|
|
||||||
|
(+ (count-cct cclst)
|
||||||
|
(for/fold ([cnt 0]) ([cct (in-hash-values hash)])
|
||||||
|
(+ cnt (count-cct cct)))))
|
||||||
|
(define/public (get-results) (void))
|
||||||
|
(super-new)))
|
||||||
|
|
||||||
|
(define file-list-queue%
|
||||||
|
(class* object% (work-queue<%>)
|
||||||
|
(init-field filelist handler)
|
||||||
|
(field (lock-mgr (new lock-manager%)))
|
||||||
|
(inspect #f)
|
||||||
|
|
||||||
|
(define/public (work-done work wrkr msg)
|
||||||
|
(match msg
|
||||||
|
[(list result-type out err)
|
||||||
|
(match result-type
|
||||||
[(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f]
|
[(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f]
|
||||||
[(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f]
|
[(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f]
|
||||||
|
[(list 'ERROR msg) (handler 'error work msg out err) #t]
|
||||||
['DONE
|
['DONE
|
||||||
(define (string-!empty? s) (not (zero? (string-length s))))
|
(define (string-!empty? s) (not (zero? (string-length s))))
|
||||||
(when (ormap string-!empty? (list out err))
|
(if (ormap string-!empty? (list out err))
|
||||||
(append-error cc "making" null out err "output"))
|
(handler 'output work "" out err)
|
||||||
;(when last (printer (current-output-port) "made" "~a" (cc-name cc)))
|
(handler 'done work "" "" ""))
|
||||||
#t]
|
#t])]
|
||||||
[else (eprintf "Failed trying to match:\n~v\n" result-type)]))]
|
[else
|
||||||
[else
|
(handler 'fatalerror (format "Error matching work: ~a queue ~a" work filelist) "" "") #t]))
|
||||||
(match work
|
|
||||||
[(list-rest (list cc file last) message)
|
(define/public (get-job workerid)
|
||||||
(append-error cc "making" null "" "" "error")
|
(match filelist
|
||||||
(eprintf "work-done match cc failed.\n")
|
[(cons hd tail)
|
||||||
(eprintf "trying to match:\n~a\n" (list work msg))
|
(define-values (dir file b) (split-path hd))
|
||||||
#t]
|
(set! filelist tail)
|
||||||
[else
|
(values hd (list (->bytes hd) (->bytes dir) (->bytes file)))]
|
||||||
(eprintf "work-done match cc failed.\n")
|
[(list) null]))
|
||||||
(eprintf "trying to match:\n~a\n" (list work msg))
|
(define/public (has-jobs?) (not (null? filelist)))
|
||||||
(eprintf "FATAL\n")
|
(define/public (jobs-cnt) (length filelist))
|
||||||
(exit 1)])]))
|
(define/public (get-results) (void))
|
||||||
|
(super-new)))
|
||||||
;; assigns a collection to each worker to be compiled
|
|
||||||
;; when it runs out of collections, steals work from other workers collections
|
|
||||||
(define/public (get-job workerid)
|
|
||||||
(define (say-making x)
|
|
||||||
(unless (null? x)
|
|
||||||
(printer (current-output-port) "making" "~a" (cc-name (car (car x))))))
|
|
||||||
(define (find-job-in-cc cc id)
|
|
||||||
(define (retry) (get-job workerid))
|
|
||||||
(define (build-job cc file last)
|
|
||||||
(values
|
|
||||||
(list cc file last)
|
|
||||||
(list (->bytes (cc-name cc))
|
|
||||||
(->bytes (cc-path cc))
|
|
||||||
(->bytes file))))
|
|
||||||
(match cc
|
|
||||||
[(list)
|
|
||||||
(hash-remove! hash id) (retry)]
|
|
||||||
[(list (list cc (list) (list))) ;empty collect
|
|
||||||
(hash-remove! hash id) (retry)]
|
|
||||||
[(cons (list cc (list) (list)) tail) ;empty parent collect
|
|
||||||
(say-making tail)
|
|
||||||
(hash-set! hash id tail) (retry)]
|
|
||||||
[(cons (list cc (list) subs) tail) ;empty srcs list
|
|
||||||
(define nl (append subs tail))
|
|
||||||
(say-making nl)
|
|
||||||
(hash-set! hash id nl) (retry)]
|
|
||||||
[(cons (list cc (list file) subs) tail)
|
|
||||||
(define nl (append subs tail))
|
|
||||||
(hash-set! hash id nl)
|
|
||||||
(say-making nl)
|
|
||||||
(build-job cc file #t)]
|
|
||||||
[(cons (list cc (cons file ft) subs) tail)
|
|
||||||
(hash-set! hash id (cons (list cc ft subs) tail))
|
|
||||||
(build-job cc file #f)]
|
|
||||||
[else
|
|
||||||
(eprintf "get-job match cc failed.\n")
|
|
||||||
(eprintf "trying to match:\n~v\n" cc)]))
|
|
||||||
|
|
||||||
|
|
||||||
; find a cc
|
|
||||||
(cond
|
|
||||||
; lookup already assigned cc
|
|
||||||
[(hash-ref hash workerid #f) => (lambda (x)
|
|
||||||
(find-job-in-cc x workerid))]
|
|
||||||
; get next cc from cclst
|
|
||||||
[(pair? cclst)
|
|
||||||
(define workercc (list (car cclst)))
|
|
||||||
(say-making workercc)
|
|
||||||
(set! cclst (cdr cclst))
|
|
||||||
(hash-set! hash workerid workercc)
|
|
||||||
(find-job-in-cc workercc workerid)]
|
|
||||||
; try to steal work from another workers cc
|
|
||||||
[(hash-iterate-first hash) => (lambda (x)
|
|
||||||
(find-job-in-cc (hash-iterate-value hash x)
|
|
||||||
(hash-iterate-key hash x)))]))
|
|
||||||
; no work left
|
|
||||||
; should never get here, get-job only called when the queue has work
|
|
||||||
|
|
||||||
(define/public (has-jobs?)
|
|
||||||
(define (hasjob? cct)
|
|
||||||
(let loop ([cct cct])
|
|
||||||
(ormap (lambda (x) (or ((length (second x)) . > . 0) (loop (third x)))) cct)))
|
|
||||||
|
|
||||||
(or (hasjob? cclst)
|
|
||||||
(for/or ([cct (in-hash-values hash)])
|
|
||||||
(hasjob? cct))))
|
|
||||||
|
|
||||||
(define/public (jobs-cnt)
|
|
||||||
(define (count-cct cct)
|
|
||||||
(let loop ([cct cct])
|
|
||||||
(apply + (map (lambda (x) (+ (length (second x)) (loop (third x)))) cct))))
|
|
||||||
|
|
||||||
(+ (count-cct cclst)
|
|
||||||
(for/fold ([cnt 0]) ([cct (in-hash-values hash)])
|
|
||||||
(+ cnt (count-cct cct)))))
|
|
||||||
(define/public (get-results) (void))
|
|
||||||
(super-new)))
|
|
||||||
|
|
||||||
(define file-list-queue% (class* object% (work-queue<%>)
|
|
||||||
(init-field filelist handler)
|
|
||||||
(field (lock-mgr (new lock-manager%)))
|
|
||||||
(inspect #f)
|
|
||||||
|
|
||||||
(define/public (work-done work wrkr msg)
|
|
||||||
(match msg
|
|
||||||
[(list result-type out err)
|
|
||||||
(match result-type
|
|
||||||
[(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f]
|
|
||||||
[(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f]
|
|
||||||
[(list 'ERROR msg) (handler 'error work msg out err) #t]
|
|
||||||
['DONE
|
|
||||||
(define (string-!empty? s) (not (zero? (string-length s))))
|
|
||||||
(if (ormap string-!empty? (list out err))
|
|
||||||
(handler 'output work "" out err)
|
|
||||||
(handler 'done work "" "" ""))
|
|
||||||
#t])]
|
|
||||||
[else
|
|
||||||
(handler 'fatalerror (format "Error matching work: ~a queue ~a" work filelist) "" "") #t]))
|
|
||||||
|
|
||||||
(define/public (get-job workerid)
|
|
||||||
(match filelist
|
|
||||||
[(cons hd tail)
|
|
||||||
(define-values (dir file b) (split-path hd))
|
|
||||||
(set! filelist tail)
|
|
||||||
(values hd (list (->bytes hd) (->bytes dir) (->bytes file)))]
|
|
||||||
[(list) null]))
|
|
||||||
(define/public (has-jobs?) (not (null? filelist)))
|
|
||||||
(define/public (jobs-cnt) (length filelist))
|
|
||||||
(define/public (get-results) (void))
|
|
||||||
(super-new)))
|
|
||||||
|
|
||||||
(define (parallel-build work-queue worker-count)
|
(define (parallel-build work-queue worker-count)
|
||||||
(parallel-do
|
(parallel-do
|
||||||
|
|
|
@ -58,88 +58,90 @@
|
||||||
get-id
|
get-id
|
||||||
get-out))
|
get-out))
|
||||||
|
|
||||||
(define worker% (class* object% (worker<%>)
|
(define worker%
|
||||||
(field [id 0]
|
(class* object% (worker<%>)
|
||||||
[process-handle null]
|
(field [id 0]
|
||||||
[out null]
|
[process-handle null]
|
||||||
[in null]
|
[out null]
|
||||||
[err null]
|
[in null]
|
||||||
[module-path null]
|
[err null]
|
||||||
[funcname null])
|
[module-path null]
|
||||||
|
[funcname null])
|
||||||
|
|
||||||
(define/public (spawn _id _module-path _funcname [initialmsg #f])
|
(define/public (spawn _id _module-path _funcname [initialmsg #f])
|
||||||
(set! module-path _module-path)
|
(set! module-path _module-path)
|
||||||
(set! funcname _funcname)
|
(set! funcname _funcname)
|
||||||
(define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))"))
|
(define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))"))
|
||||||
(define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f))
|
(define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f))
|
||||||
(let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)])
|
(let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)])
|
||||||
(set! id _id)
|
(set! id _id)
|
||||||
(set! process-handle _process-handle)
|
(set! process-handle _process-handle)
|
||||||
(set! out _out)
|
(set! out _out)
|
||||||
(set! in _in)
|
(set! in _in)
|
||||||
(set! err _err)
|
(set! err _err)
|
||||||
(send/msg dynamic-require-cmd)
|
(send/msg dynamic-require-cmd)
|
||||||
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))))
|
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id)))))))
|
||||||
(define/public (send/msg msg)
|
(define/public (send/msg msg)
|
||||||
(with-handlers ([exn:fail?
|
(with-handlers ([exn:fail?
|
||||||
(lambda (x)
|
(lambda (x)
|
||||||
(eprintf "While sending message to parallel-do worker: ~a ~a\n" id (exn-message x))
|
(eprintf "While sending message to parallel-do worker: ~a ~a\n" id (exn-message x))
|
||||||
(exit 1))])
|
(exit 1))])
|
||||||
(DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg))
|
(DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg))
|
||||||
(write msg in) (flush-output in)))
|
(write msg in) (flush-output in)))
|
||||||
(define/public (recv/msg)
|
(define/public (recv/msg)
|
||||||
(with-handlers ([exn:fail?
|
(with-handlers ([exn:fail?
|
||||||
(lambda (x)
|
(lambda (x)
|
||||||
(eprintf "While receiving message from parallel-do worker ~a ~a\n" id (exn-message x))
|
(eprintf "While receiving message from parallel-do worker ~a ~a\n" id (exn-message x))
|
||||||
(exit 1))])
|
(exit 1))])
|
||||||
(define r (read out))
|
(define r (read out))
|
||||||
(DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r))
|
(DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r))
|
||||||
r))
|
r))
|
||||||
(define/public (read-all) (port->string out))
|
(define/public (read-all) (port->string out))
|
||||||
(define/public (get-id) id)
|
(define/public (get-id) id)
|
||||||
(define/public (get-out) out)
|
(define/public (get-out) out)
|
||||||
(define/public (kill)
|
(define/public (kill)
|
||||||
(DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id))
|
(DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id))
|
||||||
(close-output-port in)
|
(close-output-port in)
|
||||||
(close-input-port out)
|
(close-input-port out)
|
||||||
(subprocess-kill process-handle #t))
|
(subprocess-kill process-handle #t))
|
||||||
(define/public (break) (kill))
|
(define/public (break) (kill))
|
||||||
(define/public (kill/respawn worker-cmdline-list [initialmsg #f])
|
(define/public (kill/respawn worker-cmdline-list [initialmsg #f])
|
||||||
(kill)
|
(kill)
|
||||||
(spawn id module-path funcname [initialmsg #f]))
|
(spawn id module-path funcname [initialmsg #f]))
|
||||||
(define/public (wait) (subprocess-wait process-handle))
|
(define/public (wait) (subprocess-wait process-handle))
|
||||||
(super-new)))
|
(super-new)))
|
||||||
|
|
||||||
(define place-worker% (class* object% (worker<%>)
|
(define place-worker%
|
||||||
(init-field [id 0]
|
(class* object% (worker<%>)
|
||||||
[pl null])
|
(init-field [id 0]
|
||||||
|
[pl null])
|
||||||
(define/public (spawn _id module-path funcname [initialmsg #f])
|
|
||||||
|
(define/public (spawn _id module-path funcname [initialmsg #f])
|
||||||
(set! id _id)
|
(set! id _id)
|
||||||
(set! pl (dynamic-place (string->path module-path) funcname))
|
(set! pl (dynamic-place (string->path module-path) funcname))
|
||||||
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))
|
(when initialmsg (send/msg (s-exp->fasl (serialize (initialmsg id))))))
|
||||||
(define/public (send/msg msg)
|
(define/public (send/msg msg)
|
||||||
(DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg))
|
(DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg))
|
||||||
(place-channel-put pl msg))
|
(place-channel-put pl msg))
|
||||||
(define/public (recv/msg)
|
(define/public (recv/msg)
|
||||||
(define r (place-channel-get pl))
|
(define r (place-channel-get pl))
|
||||||
(DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r))
|
(DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r))
|
||||||
r)
|
r)
|
||||||
(define/public (read-all) "")
|
(define/public (read-all) "")
|
||||||
(define/public (get-id) id)
|
(define/public (get-id) id)
|
||||||
(define/public (get-out) pl)
|
(define/public (get-out) pl)
|
||||||
(define/public (kill) #f)
|
(define/public (kill) #f)
|
||||||
(define/public (break) (place-break pl))
|
(define/public (break) (place-break pl))
|
||||||
(define/public (wait) (place-wait pl))
|
(define/public (wait) (place-wait pl))
|
||||||
(super-new)))
|
(super-new)))
|
||||||
|
|
||||||
|
|
||||||
(define work-queue<%> (interface ()
|
(define work-queue<%> (interface ()
|
||||||
get-job
|
get-job
|
||||||
work-done
|
work-done
|
||||||
has-jobs?
|
has-jobs?
|
||||||
jobs-cnt
|
jobs-cnt
|
||||||
get-results))
|
get-results))
|
||||||
|
|
||||||
(define/class/generics/provide worker<%>
|
(define/class/generics/provide worker<%>
|
||||||
(wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg)
|
(wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg)
|
||||||
|
@ -260,25 +262,26 @@
|
||||||
;(printf " ~a" (add1 i)) (flush-output))(printf "\n")
|
;(printf " ~a" (add1 i)) (flush-output))(printf "\n")
|
||||||
)]))))
|
)]))))
|
||||||
|
|
||||||
(define list-queue% (class* object% (work-queue<%>)
|
(define list-queue%
|
||||||
(init-field queue create-job-thunk success-thunk failure-thunk)
|
(class* object% (work-queue<%>)
|
||||||
(field [results null])
|
(init-field queue create-job-thunk success-thunk failure-thunk)
|
||||||
|
(field [results null])
|
||||||
|
|
||||||
(define/public (work-done work workerid msg)
|
(define/public (work-done work workerid msg)
|
||||||
(match msg
|
(match msg
|
||||||
[(list (list 'DONE result) stdout stderr)
|
[(list (list 'DONE result) stdout stderr)
|
||||||
(set! results (cons (success-thunk work result stdout stderr) results))]
|
(set! results (cons (success-thunk work result stdout stderr) results))]
|
||||||
[(list (list 'ERROR errmsg) stdout stderr)
|
[(list (list 'ERROR errmsg) stdout stderr)
|
||||||
(failure-thunk work errmsg stdout stderr)]))
|
(failure-thunk work errmsg stdout stderr)]))
|
||||||
(define/public (get-job workerid)
|
(define/public (get-job workerid)
|
||||||
(match queue
|
(match queue
|
||||||
[(cons h t)
|
[(cons h t)
|
||||||
(set! queue t)
|
(set! queue t)
|
||||||
(values h (create-job-thunk h))]))
|
(values h (create-job-thunk h))]))
|
||||||
(define/public (has-jobs?) (not (null? queue)))
|
(define/public (has-jobs?) (not (null? queue)))
|
||||||
(define/public (get-results) (reverse results))
|
(define/public (get-results) (reverse results))
|
||||||
(define/public (jobs-cnt) (length queue))
|
(define/public (jobs-cnt) (length queue))
|
||||||
(super-new)))
|
(super-new)))
|
||||||
|
|
||||||
(define (list-queue list-of-work create-job-thunk job-success-thunk job-failure-thunk)
|
(define (list-queue list-of-work create-job-thunk job-success-thunk job-failure-thunk)
|
||||||
(make-object list-queue% list-of-work create-job-thunk job-success-thunk job-failure-thunk))
|
(make-object list-queue% list-of-work create-job-thunk job-success-thunk job-failure-thunk))
|
||||||
|
|
Loading…
Reference in New Issue
Block a user