diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index eedaa88b9c..89f751d47c 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -59,6 +59,7 @@ (define (th-place-sleep n) (sleep n)) (define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0) (define (th-place-kill pl) (custodian-shutdown-all (TH-place-cust pl))) +(define (th-place-break pl) (break-thread (TH-place-th pl))) (define (th-place-channel) (define-values (as ar) (make-th-async-channel)) (define-values (bs br) (make-th-async-channel)) @@ -117,6 +118,7 @@ (define-pl place-sleep pl-place-sleep th-place-sleep) (define-pl place-wait pl-place-wait th-place-wait) (define-pl place-kill pl-place-kill th-place-kill) +(define-pl place-break pl-place-break th-place-break) (define-pl place-channel pl-place-channel th-place-channel) (define-pl place-channel-send pl-place-channel-send th-place-channel-send) (define-pl place-channel-receive pl-place-channel-receive th-place-channel-receive) diff --git a/collects/scribblings/reference/places.scrbl b/collects/scribblings/reference/places.scrbl index e39645ffd7..953593b553 100644 --- a/collects/scribblings/reference/places.scrbl +++ b/collects/scribblings/reference/places.scrbl @@ -119,6 +119,13 @@ racket blocking until the place completes if it has not already completed. } +@defproc[(place-break [p place?]) void?]{ + Sends place @racket[p] a break signal; see @secref["breakhandler"]. +} + +@defproc[(place-kill [p place?]) void?]{ + Terminates the place indicated by @racket[p], +} @defproc[(place-channel) (values place-channel? place-channel?)]{ diff --git a/collects/setup/parallel-do.rkt b/collects/setup/parallel-do.rkt index 2716c2affe..86f01fde16 100644 --- a/collects/setup/parallel-do.rkt +++ b/collects/setup/parallel-do.rkt @@ -82,14 +82,14 @@ (define/public (send/msg msg) (with-handlers ([exn:fail? (lambda (x) - (eprintf "CONTROLLER SEND MESSAGE ERROR TO WORKER ~a ~a\n" id (exn-message x)) + (eprintf "While sending message to parallel-do worker: ~a ~a\n" id (exn-message x)) (exit 1))]) (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg)) (write msg in) (flush-output in))) (define/public (recv/msg) (with-handlers ([exn:fail? (lambda (x) - (eprintf "CONTROLLER RECEIVE MESSAGE ERROR FROM WORKER ~a ~a\n" id (exn-message x)) + (eprintf "While receiving message from parallel-do worker ~a ~a\n" id (exn-message x)) (exit 1))]) (define r (read out)) (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r)) @@ -177,6 +177,7 @@ (define (jobs?) (queue/has jobqueue)) (define (empty?) (not (queue/has jobqueue))) (define workers #f) + (define no-breaks #f) (dynamic-wind (lambda () (parameterize-break #f @@ -206,7 +207,7 @@ [error-count error-count]) (error-threshold error-count) (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER WRITE ERROR - writing to worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e)) + (printf "Error writing to worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e)) (wrkr/kill wrkr) (retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))]) (wrkr/send wrkr cmd-list)) @@ -224,7 +225,7 @@ [(list node wrkr) (handle-evt (wrkr/out wrkr) (λ (e) (with-handlers* ([exn:fail? (lambda (e) - (printf "MASTER READ ERROR - reading from worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e)) + (printf "Error reading from worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e)) (kill/remove-dead-worker node-worker wrkr))]) (let ([msg (if use-places e (wrkr/recv wrkr))]) (if (pair? msg) @@ -238,7 +239,9 @@ (eprintf "parallel-do-event-loop match node-worker failed.\n") (eprintf "trying to match:\n~a\n" node-worker)])) (DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n")) - (apply sync (map gen-node-handler inflight))]))) + (begin0 + (apply sync (map gen-node-handler inflight)) + (set! no-breaks #t))]))) (lambda () ;(printf "Asking all workers to die\n") (for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/send p (list 'DIE)))) diff --git a/src/racket/src/places.c b/src/racket/src/places.c index 692183aaac..60c6a9ebe1 100644 --- a/src/racket/src/places.c +++ b/src/racket/src/places.c @@ -24,6 +24,7 @@ THREAD_LOCAL_DECL(void *place_object); static Scheme_Object *scheme_place(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_wait(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_kill(int argc, Scheme_Object *args[]); +static Scheme_Object *scheme_place_break(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]); @@ -92,6 +93,7 @@ void scheme_init_place(Scheme_Env *env) PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-kill", scheme_place_kill, 1, 1, plenv); + PLACE_PRIM_W_ARITY("place-break", scheme_place_break, 1, 1, plenv); PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv); PLACE_PRIM_W_ARITY("place-channel-send", scheme_place_send, 1, 2, plenv); @@ -176,7 +178,9 @@ Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]) { * it is shared acrosss place boundaries and * must be allocated with malloc and free*/ typedef struct Scheme_Place_Object { - int die; + mzrt_mutex *lock; + char die; + char pbreak; mz_jmp_buf *exit_buf; void *signal_handle; /*Thread_Local_Variables *tlvs; */ @@ -194,8 +198,10 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { place = MALLOC_ONE_TAGGED(Scheme_Place); place->so.type = scheme_place_type; place_obj = malloc(sizeof(Scheme_Place_Object)); + mzrt_mutex_create(&place_obj->lock); place->place_obj = place_obj; place_obj->die = 0; + place_obj->pbreak = 0; mzrt_sema_create(&ready, 0); @@ -266,12 +272,36 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { static int place_kill(Scheme_Place *place) { Scheme_Place_Object *place_obj; place_obj = (Scheme_Place_Object*) place->place_obj; - place_obj->die = 1; + + { + mzrt_mutex_lock(place_obj->lock); + + place_obj->die = 1; + + mzrt_mutex_unlock(place_obj->lock); + } + scheme_signal_received_at(place_obj->signal_handle); scheme_remove_managed(place->mref, (Scheme_Object *)place); return 0; } +static int place_break(Scheme_Place *place) { + Scheme_Place_Object *place_obj; + place_obj = (Scheme_Place_Object*) place->place_obj; + + { + mzrt_mutex_lock(place_obj->lock); + + place_obj->pbreak = 1; + + mzrt_mutex_unlock(place_obj->lock); + } + + scheme_signal_received_at(place_obj->signal_handle); + return 0; +} + static int cust_kill_place(Scheme_Object *pl, void *notused) { return place_kill((Scheme_Place *)pl); } @@ -289,6 +319,19 @@ static Scheme_Object *scheme_place_kill(int argc, Scheme_Object *args[]) { return scheme_make_integer(place_kill(place)); } +static Scheme_Object *scheme_place_break(int argc, Scheme_Object *args[]) { + Scheme_Place *place; + place = (Scheme_Place *) args[0]; + + if (argc != 1) { + scheme_wrong_count_m("place-break", 1, 1, argc, args, 0); + } + if (!SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) { + scheme_wrong_type("place-break", "place", 0, argc, args); + } + return scheme_make_integer(place_break(place)); +} + # if defined(MZ_PLACES_WAITPID) /*============= SIGCHLD SIGNAL HANDLING =============*/ @@ -1115,11 +1158,27 @@ static void *place_start_proc(void *data_arg) { return rc; } -void scheme_place_check_for_killed() { +void scheme_place_check_for_interruption() { Scheme_Place_Object *place_obj; + char local_die; + char local_break; + place_obj = (Scheme_Place_Object *) place_object; - if (place_obj && place_obj->die) { - scheme_longjmp(*place_obj->exit_buf, 1); + if (place_obj) { + { + mzrt_mutex_lock(place_obj->lock); + + local_die = place_obj->die; + local_break = place_obj->pbreak; + place_obj->pbreak = 0; + + mzrt_mutex_unlock(place_obj->lock); + } + + if (local_die) + scheme_longjmp(*place_obj->exit_buf, 1); + if (local_break) + scheme_break_thread(NULL); } } diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index c43b7b275b..3dc8dfe883 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -3672,7 +3672,7 @@ typedef struct Scheme_Place { Scheme_Env *scheme_place_instance_init(); void scheme_place_instance_destroy(); void scheme_kill_green_thread_timer(); -void scheme_place_check_for_killed(); +void scheme_place_check_for_interruption(); /*========================================================================*/ /* engine */ /*========================================================================*/ diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index 17a28b6370..3c3371d4dd 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -4164,7 +4164,7 @@ void scheme_thread_block(float sleep_time) #endif #if defined(MZ_USE_PLACES) if (!do_atomic) - scheme_place_check_for_killed(); + scheme_place_check_for_interruption(); #endif if (sleep_end > 0) {