Turn places build back on, provide place-break.
This commit is contained in:
parent
3bf438f580
commit
46dc051931
|
@ -7,12 +7,15 @@
|
|||
racket/fixnum
|
||||
racket/flonum
|
||||
racket/vector
|
||||
(for-syntax racket/base))
|
||||
|
||||
(for-syntax racket/base
|
||||
racket/syntax))
|
||||
|
||||
(provide place
|
||||
place-sleep
|
||||
place-wait
|
||||
place-kill
|
||||
place-break
|
||||
place-channel
|
||||
place-channel-send
|
||||
place-channel-receive
|
||||
|
@ -139,7 +142,7 @@
|
|||
[(_ ch body ...)
|
||||
(with-syntax ([interal-def-name
|
||||
(syntax-local-lift-expression #'(lambda (ch) body ...))]
|
||||
[funcname #'OBSCURE_FUNC_NAME_%#%])
|
||||
[funcname (datum->syntax stx (generate-temporary #'place/anon))])
|
||||
(syntax-local-lift-provide #'(rename interal-def-name funcname))
|
||||
#'(let ([module-path (resolved-module-path-name
|
||||
(variable-reference->resolved-module-path
|
||||
|
|
|
@ -51,6 +51,7 @@
|
|||
spawn
|
||||
send/msg
|
||||
kill
|
||||
break
|
||||
wait
|
||||
recv/msg
|
||||
read-all
|
||||
|
@ -102,6 +103,7 @@
|
|||
(close-output-port in)
|
||||
(close-input-port out)
|
||||
(subprocess-kill process-handle #t))
|
||||
(define/public (break) (kill))
|
||||
(define/public (kill/respawn worker-cmdline-list [initialmsg #f])
|
||||
(kill)
|
||||
(spawn id module-path funcname [initialmsg #f]))
|
||||
|
@ -127,6 +129,7 @@
|
|||
(define/public (get-id) id)
|
||||
(define/public (get-out) pl)
|
||||
(define/public (kill) #f)
|
||||
(define/public (break) (place-break pl))
|
||||
(define/public (wait) (place-wait pl))
|
||||
(super-new)))
|
||||
|
||||
|
@ -142,6 +145,7 @@
|
|||
(wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg)
|
||||
(wrkr/send send/msg msg)
|
||||
(wrkr/kill kill)
|
||||
(wrkr/break break)
|
||||
(wrkr/recv recv/msg)
|
||||
(wrkr/read-all read-all)
|
||||
(wrkr/id get-id)
|
||||
|
@ -168,8 +172,8 @@
|
|||
(find-system-path 'orig-dir))))))
|
||||
|
||||
(define (parallel-do-event-loop module-path funcname initialmsg jobqueue nprocs [stopat #f])
|
||||
; (define use-places (place-enabled?))
|
||||
(define use-places #f)
|
||||
(define use-places (place-enabled?))
|
||||
; (define use-places #f)
|
||||
(define (spawn id)
|
||||
(define wrkr (if use-places (new PlaceWorker%) (new Worker%)))
|
||||
(wrkr/spawn wrkr id module-path funcname initialmsg)
|
||||
|
@ -177,7 +181,7 @@
|
|||
(define (jobs?) (queue/has jobqueue))
|
||||
(define (empty?) (not (queue/has jobqueue)))
|
||||
(define workers #f)
|
||||
(define no-breaks #f)
|
||||
(define breaks #t)
|
||||
(dynamic-wind
|
||||
(lambda ()
|
||||
(parameterize-break #f
|
||||
|
@ -241,17 +245,20 @@
|
|||
(DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n"))
|
||||
(begin0
|
||||
(apply sync (map gen-node-handler inflight))
|
||||
(set! no-breaks #t))])))
|
||||
(set! breaks #f))])))
|
||||
(lambda ()
|
||||
;(printf "Asking all workers to die\n")
|
||||
(for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/send p (list 'DIE))))
|
||||
;(printf "Waiting for all workers to die")(flush-output)
|
||||
(for ([p workers]
|
||||
[i (in-naturals)])
|
||||
(wrkr/wait p)
|
||||
;(printf " ~a" (add1 i))
|
||||
(flush-output))
|
||||
#;(printf "\n"))))
|
||||
(cond
|
||||
[breaks
|
||||
(for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/break p)))]
|
||||
[else
|
||||
;(printf "Asking all workers to die\n")
|
||||
(for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/send p (list 'DIE))))
|
||||
;(printf "Waiting for all workers to die")(flush-output)
|
||||
(for ([p workers]
|
||||
[i (in-naturals)])
|
||||
(wrkr/wait p)
|
||||
;(printf " ~a" (add1 i)) (flush-output))(printf "\n")
|
||||
)]))))
|
||||
|
||||
(define ListQueue% (class* object% (WorkQueue<%>)
|
||||
(init-field queue create-job-thunk success-thunk failure-thunk)
|
||||
|
|
|
@ -172,8 +172,6 @@
|
|||
(channel-test-basic-types-master place-channel-send/receive pc6)
|
||||
(channel-test-basic-types-master big-sender pc6)
|
||||
|
||||
#|
|
||||
|#
|
||||
(let ([try-graph
|
||||
(lambda (s)
|
||||
(let ([v (read (open-input-string s))])
|
||||
|
@ -188,7 +186,18 @@
|
|||
(check-not-exn (λ () (place-channel-send pl (bytes->path #"/tmp/unix" 'unix))))
|
||||
(check-not-exn (λ () (place-channel-send pl (bytes->path #"C:\\Windows" 'windows))))
|
||||
|
||||
(place-wait pl)
|
||||
)
|
||||
)
|
||||
(place-wait pl))
|
||||
|
||||
(let ([p (place/anon ch
|
||||
(with-handlers ([exn:break? (lambda (x) (place-channel-send ch "OK") (printf "Place caught break"))])
|
||||
(place-channel-send ch "ALIVE")
|
||||
(sync never-evt)
|
||||
(place-channel-send ch "NOK")))])
|
||||
|
||||
(test "ALIVE" place-channel-receive p)
|
||||
(place-break p)
|
||||
(test "OK" place-channel-receive p)
|
||||
(place-wait p)))
|
||||
|
||||
|
||||
;(report-errs)
|
||||
|
|
Loading…
Reference in New Issue
Block a user