diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index 7fcfb1f5ae..d769b9bdaa 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -7,12 +7,15 @@ racket/fixnum racket/flonum racket/vector - (for-syntax racket/base)) + + (for-syntax racket/base + racket/syntax)) (provide place place-sleep place-wait place-kill + place-break place-channel place-channel-send place-channel-receive @@ -139,7 +142,7 @@ [(_ ch body ...) (with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda (ch) body ...))] - [funcname #'OBSCURE_FUNC_NAME_%#%]) + [funcname (datum->syntax stx (generate-temporary #'place/anon))]) (syntax-local-lift-provide #'(rename interal-def-name funcname)) #'(let ([module-path (resolved-module-path-name (variable-reference->resolved-module-path diff --git a/collects/setup/parallel-do.rkt b/collects/setup/parallel-do.rkt index fa1bc790d8..219a1df68d 100644 --- a/collects/setup/parallel-do.rkt +++ b/collects/setup/parallel-do.rkt @@ -51,6 +51,7 @@ spawn send/msg kill + break wait recv/msg read-all @@ -102,6 +103,7 @@ (close-output-port in) (close-input-port out) (subprocess-kill process-handle #t)) + (define/public (break) (kill)) (define/public (kill/respawn worker-cmdline-list [initialmsg #f]) (kill) (spawn id module-path funcname [initialmsg #f])) @@ -127,6 +129,7 @@ (define/public (get-id) id) (define/public (get-out) pl) (define/public (kill) #f) + (define/public (break) (place-break pl)) (define/public (wait) (place-wait pl)) (super-new))) @@ -142,6 +145,7 @@ (wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg) (wrkr/send send/msg msg) (wrkr/kill kill) + (wrkr/break break) (wrkr/recv recv/msg) (wrkr/read-all read-all) (wrkr/id get-id) @@ -168,8 +172,8 @@ (find-system-path 'orig-dir)))))) (define (parallel-do-event-loop module-path funcname initialmsg jobqueue nprocs [stopat #f]) -; (define use-places (place-enabled?)) - (define use-places #f) + (define use-places (place-enabled?)) +; (define use-places #f) (define (spawn id) (define wrkr (if use-places (new PlaceWorker%) (new Worker%))) (wrkr/spawn wrkr id module-path funcname initialmsg) @@ -177,7 +181,7 @@ (define (jobs?) (queue/has jobqueue)) (define (empty?) (not (queue/has jobqueue))) (define workers #f) - (define no-breaks #f) + (define breaks #t) (dynamic-wind (lambda () (parameterize-break #f @@ -241,17 +245,20 @@ (DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n")) (begin0 (apply sync (map gen-node-handler inflight)) - (set! no-breaks #t))]))) + (set! breaks #f))]))) (lambda () - ;(printf "Asking all workers to die\n") - (for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/send p (list 'DIE)))) - ;(printf "Waiting for all workers to die")(flush-output) - (for ([p workers] - [i (in-naturals)]) - (wrkr/wait p) - ;(printf " ~a" (add1 i)) - (flush-output)) - #;(printf "\n")))) + (cond + [breaks + (for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/break p)))] + [else + ;(printf "Asking all workers to die\n") + (for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/send p (list 'DIE)))) + ;(printf "Waiting for all workers to die")(flush-output) + (for ([p workers] + [i (in-naturals)]) + (wrkr/wait p) + ;(printf " ~a" (add1 i)) (flush-output))(printf "\n") + )])))) (define ListQueue% (class* object% (WorkQueue<%>) (init-field queue create-job-thunk success-thunk failure-thunk) diff --git a/collects/tests/racket/place-channel.rkt b/collects/tests/racket/place-channel.rkt index 5bdf886bb9..3b819ba4eb 100644 --- a/collects/tests/racket/place-channel.rkt +++ b/collects/tests/racket/place-channel.rkt @@ -172,8 +172,6 @@ (channel-test-basic-types-master place-channel-send/receive pc6) (channel-test-basic-types-master big-sender pc6) - #| - |# (let ([try-graph (lambda (s) (let ([v (read (open-input-string s))]) @@ -188,7 +186,18 @@ (check-not-exn (λ () (place-channel-send pl (bytes->path #"/tmp/unix" 'unix)))) (check-not-exn (λ () (place-channel-send pl (bytes->path #"C:\\Windows" 'windows)))) - (place-wait pl) -) -) + (place-wait pl)) + +(let ([p (place/anon ch + (with-handlers ([exn:break? (lambda (x) (place-channel-send ch "OK") (printf "Place caught break"))]) + (place-channel-send ch "ALIVE") + (sync never-evt) + (place-channel-send ch "NOK")))]) + + (test "ALIVE" place-channel-receive p) + (place-break p) + (test "OK" place-channel-receive p) + (place-wait p))) + + ;(report-errs)