[Places] added place-break primitive

This commit is contained in:
Kevin Tew 2011-04-26 15:03:15 -06:00
parent 8679afed60
commit a589ea42b6
6 changed files with 83 additions and 12 deletions

View File

@ -59,6 +59,7 @@
(define (th-place-sleep n) (sleep n))
(define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0)
(define (th-place-kill pl) (custodian-shutdown-all (TH-place-cust pl)))
(define (th-place-break pl) (break-thread (TH-place-th pl)))
(define (th-place-channel)
(define-values (as ar) (make-th-async-channel))
(define-values (bs br) (make-th-async-channel))
@ -117,6 +118,7 @@
(define-pl place-sleep pl-place-sleep th-place-sleep)
(define-pl place-wait pl-place-wait th-place-wait)
(define-pl place-kill pl-place-kill th-place-kill)
(define-pl place-break pl-place-break th-place-break)
(define-pl place-channel pl-place-channel th-place-channel)
(define-pl place-channel-send pl-place-channel-send th-place-channel-send)
(define-pl place-channel-receive pl-place-channel-receive th-place-channel-receive)

View File

@ -119,6 +119,13 @@ racket
blocking until the place completes if it has not already completed.
}
@defproc[(place-break [p place?]) void?]{
Sends place @racket[p] a break signal; see @secref["breakhandler"].
}
@defproc[(place-kill [p place?]) void?]{
Terminates the place indicated by @racket[p],
}
@defproc[(place-channel) (values place-channel? place-channel?)]{

View File

@ -82,14 +82,14 @@
(define/public (send/msg msg)
(with-handlers ([exn:fail?
(lambda (x)
(eprintf "CONTROLLER SEND MESSAGE ERROR TO WORKER ~a ~a\n" id (exn-message x))
(eprintf "While sending message to parallel-do worker: ~a ~a\n" id (exn-message x))
(exit 1))])
(DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg))
(write msg in) (flush-output in)))
(define/public (recv/msg)
(with-handlers ([exn:fail?
(lambda (x)
(eprintf "CONTROLLER RECEIVE MESSAGE ERROR FROM WORKER ~a ~a\n" id (exn-message x))
(eprintf "While receiving message from parallel-do worker ~a ~a\n" id (exn-message x))
(exit 1))])
(define r (read out))
(DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r))
@ -177,6 +177,7 @@
(define (jobs?) (queue/has jobqueue))
(define (empty?) (not (queue/has jobqueue)))
(define workers #f)
(define no-breaks #f)
(dynamic-wind
(lambda ()
(parameterize-break #f
@ -206,7 +207,7 @@
[error-count error-count])
(error-threshold error-count)
(with-handlers* ([exn:fail? (lambda (e)
(printf "MASTER WRITE ERROR - writing to worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
(printf "Error writing to worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
(wrkr/kill wrkr)
(retry-loop (spawn (wrkr/id wrkr)) (add1 error-count)))])
(wrkr/send wrkr cmd-list))
@ -224,7 +225,7 @@
[(list node wrkr)
(handle-evt (wrkr/out wrkr) (λ (e)
(with-handlers* ([exn:fail? (lambda (e)
(printf "MASTER READ ERROR - reading from worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
(printf "Error reading from worker: ~v ~a\n" (wrkr/id wrkr) (exn-message e))
(kill/remove-dead-worker node-worker wrkr))])
(let ([msg (if use-places e (wrkr/recv wrkr))])
(if (pair? msg)
@ -238,7 +239,9 @@
(eprintf "parallel-do-event-loop match node-worker failed.\n")
(eprintf "trying to match:\n~a\n" node-worker)]))
(DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n"))
(apply sync (map gen-node-handler inflight))])))
(begin0
(apply sync (map gen-node-handler inflight))
(set! no-breaks #t))])))
(lambda ()
;(printf "Asking all workers to die\n")
(for ([p workers]) (with-handlers ([exn:fail? void]) (wrkr/send p (list 'DIE))))

View File

@ -24,6 +24,7 @@ THREAD_LOCAL_DECL(void *place_object);
static Scheme_Object *scheme_place(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_wait(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_kill(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_break(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]);
@ -92,6 +93,7 @@ void scheme_init_place(Scheme_Env *env)
PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-kill", scheme_place_kill, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-break", scheme_place_break, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv);
PLACE_PRIM_W_ARITY("place-channel-send", scheme_place_send, 1, 2, plenv);
@ -176,7 +178,9 @@ Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]) {
* it is shared acrosss place boundaries and
* must be allocated with malloc and free*/
typedef struct Scheme_Place_Object {
int die;
mzrt_mutex *lock;
char die;
char pbreak;
mz_jmp_buf *exit_buf;
void *signal_handle;
/*Thread_Local_Variables *tlvs; */
@ -194,8 +198,10 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
place = MALLOC_ONE_TAGGED(Scheme_Place);
place->so.type = scheme_place_type;
place_obj = malloc(sizeof(Scheme_Place_Object));
mzrt_mutex_create(&place_obj->lock);
place->place_obj = place_obj;
place_obj->die = 0;
place_obj->pbreak = 0;
mzrt_sema_create(&ready, 0);
@ -266,12 +272,36 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
static int place_kill(Scheme_Place *place) {
Scheme_Place_Object *place_obj;
place_obj = (Scheme_Place_Object*) place->place_obj;
place_obj->die = 1;
{
mzrt_mutex_lock(place_obj->lock);
place_obj->die = 1;
mzrt_mutex_unlock(place_obj->lock);
}
scheme_signal_received_at(place_obj->signal_handle);
scheme_remove_managed(place->mref, (Scheme_Object *)place);
return 0;
}
static int place_break(Scheme_Place *place) {
Scheme_Place_Object *place_obj;
place_obj = (Scheme_Place_Object*) place->place_obj;
{
mzrt_mutex_lock(place_obj->lock);
place_obj->pbreak = 1;
mzrt_mutex_unlock(place_obj->lock);
}
scheme_signal_received_at(place_obj->signal_handle);
return 0;
}
static int cust_kill_place(Scheme_Object *pl, void *notused) {
return place_kill((Scheme_Place *)pl);
}
@ -289,6 +319,19 @@ static Scheme_Object *scheme_place_kill(int argc, Scheme_Object *args[]) {
return scheme_make_integer(place_kill(place));
}
static Scheme_Object *scheme_place_break(int argc, Scheme_Object *args[]) {
Scheme_Place *place;
place = (Scheme_Place *) args[0];
if (argc != 1) {
scheme_wrong_count_m("place-break", 1, 1, argc, args, 0);
}
if (!SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
scheme_wrong_type("place-break", "place", 0, argc, args);
}
return scheme_make_integer(place_break(place));
}
# if defined(MZ_PLACES_WAITPID)
/*============= SIGCHLD SIGNAL HANDLING =============*/
@ -1115,11 +1158,27 @@ static void *place_start_proc(void *data_arg) {
return rc;
}
void scheme_place_check_for_killed() {
void scheme_place_check_for_interruption() {
Scheme_Place_Object *place_obj;
char local_die;
char local_break;
place_obj = (Scheme_Place_Object *) place_object;
if (place_obj && place_obj->die) {
scheme_longjmp(*place_obj->exit_buf, 1);
if (place_obj) {
{
mzrt_mutex_lock(place_obj->lock);
local_die = place_obj->die;
local_break = place_obj->pbreak;
place_obj->pbreak = 0;
mzrt_mutex_unlock(place_obj->lock);
}
if (local_die)
scheme_longjmp(*place_obj->exit_buf, 1);
if (local_break)
scheme_break_thread(NULL);
}
}

View File

@ -3672,7 +3672,7 @@ typedef struct Scheme_Place {
Scheme_Env *scheme_place_instance_init();
void scheme_place_instance_destroy();
void scheme_kill_green_thread_timer();
void scheme_place_check_for_killed();
void scheme_place_check_for_interruption();
/*========================================================================*/
/* engine */
/*========================================================================*/

View File

@ -4164,7 +4164,7 @@ void scheme_thread_block(float sleep_time)
#endif
#if defined(MZ_USE_PLACES)
if (!do_atomic)
scheme_place_check_for_killed();
scheme_place_check_for_interruption();
#endif
if (sleep_end > 0) {