From d4d5ca70fb051af165233d96a291dc71a98fe2f8 Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Wed, 21 Mar 2012 11:18:43 -0600 Subject: [PATCH] Hold reference to original thread during BEGIN_ESCAPABLE in place-channel sync Fixes several bugs related to killed threads that were in the middle of syncing on place-channels. --- collects/tests/racket/place-channel.rkt | 38 +++++++++++++++++++------ src/racket/src/mzmark_thread.inc | 2 ++ src/racket/src/mzmarksrc.c | 1 + src/racket/src/place.c | 29 +++++++++++-------- src/racket/src/schpriv.h | 1 + src/racket/src/thread.c | 6 ++-- 6 files changed, 55 insertions(+), 22 deletions(-) diff --git a/collects/tests/racket/place-channel.rkt b/collects/tests/racket/place-channel.rkt index fc540d9aa9..ac9c7b182d 100644 --- a/collects/tests/racket/place-channel.rkt +++ b/collects/tests/racket/place-channel.rkt @@ -183,15 +183,37 @@ (define (main) + + ;test breaks in BEGIN_ESCAPABLE during scheme_place_async_try_receive (let () - (define pl (place ch - (for ([i 100000]) - (place-channel-get ch) - (place-channel-put ch (list "foo" 1 "bar"))))) - - (for ([i 100000]) - (place-channel-put pl (list "hello" 4 "hello")) - (sync/timeout 1 pl))) + (for ([i 25]) + (let () + (define-values (ch1 ch2) (place-channel)) + (define mt + (thread + (lambda () + (let loop () (loop))))) + (define t + (thread + (lambda () + (for ([i 100000]) + ;(place-channel-put ch1 (list "foo" 1 "bar")) + (place-channel-put ch1 (make-list 7000 'foo)) + )))) + (define t2 + (thread + (lambda () + (time (for ([i 100000]) + (sync ch2)))))) + + (define ti (/ (+ (random 100) 1) 100)) + (sleep ti) + (kill-thread mt) + (kill-thread t2) + (kill-thread t) + ;(displayln (exact->inexact ti)) + (thread-wait t) + (thread-wait t2)))) (let () (define flx (make-shared-fxvector 10 0)) diff --git a/src/racket/src/mzmark_thread.inc b/src/racket/src/mzmark_thread.inc index b0e7708b0d..588d77df50 100644 --- a/src/racket/src/mzmark_thread.inc +++ b/src/racket/src/mzmark_thread.inc @@ -306,6 +306,7 @@ static int mark_syncing_MARK(void *p, struct NewGC *gc) { gcMARK2(w->reposts, gc); gcMARK2(w->accepts, gc); gcMARK2(w->disable_break, gc); + gcMARK2(w->thread, gc); return gcBYTES_TO_WORDS(sizeof(Syncing)); @@ -320,6 +321,7 @@ static int mark_syncing_FIXUP(void *p, struct NewGC *gc) { gcFIXUP2(w->reposts, gc); gcFIXUP2(w->accepts, gc); gcFIXUP2(w->disable_break, gc); + gcFIXUP2(w->thread, gc); return gcBYTES_TO_WORDS(sizeof(Syncing)); diff --git a/src/racket/src/mzmarksrc.c b/src/racket/src/mzmarksrc.c index cc9a96c68a..fd5d147f5b 100644 --- a/src/racket/src/mzmarksrc.c +++ b/src/racket/src/mzmarksrc.c @@ -1915,6 +1915,7 @@ mark_syncing { gcMARK2(w->reposts, gc); gcMARK2(w->accepts, gc); gcMARK2(w->disable_break, gc); + gcMARK2(w->thread, gc); size: gcBYTES_TO_WORDS(sizeof(Syncing)); diff --git a/src/racket/src/place.c b/src/racket/src/place.c index d023703489..1f7d41fe33 100644 --- a/src/racket/src/place.c +++ b/src/racket/src/place.c @@ -19,7 +19,6 @@ */ #include "schpriv.h" - static Scheme_Object* scheme_place_enabled(int argc, Scheme_Object *args[]); static Scheme_Object* scheme_place_shared(int argc, Scheme_Object *args[]); @@ -3119,18 +3118,22 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan return msg; } -static void cleanup_msg_memmory(void *msg_memory) { - if (msg_memory) - GC_destroy_orphan_msg_memory(msg_memory); +static void cleanup_msg_memmory(void *thread) { + Scheme_Thread *p = thread; + if (p->place_channel_msg_in_flight) { + GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight); + p->place_channel_msg_in_flight = NULL; + } } static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) { Scheme_Object *msg = NULL; - void *msg_memory = NULL; - BEGIN_ESCAPEABLE(cleanup_msg_memmory, msg_memory); - msg = scheme_place_async_try_receive_raw(ch, &msg_memory); + Scheme_Thread *p = scheme_current_thread; + BEGIN_ESCAPEABLE(cleanup_msg_memmory, p); + msg = scheme_place_async_try_receive_raw(ch, &p->place_channel_msg_in_flight); if (msg) { - msg = scheme_places_deserialize(msg, msg_memory); + msg = scheme_places_deserialize(msg, p->place_channel_msg_in_flight); + p->place_channel_msg_in_flight = NULL; } END_ESCAPEABLE(); return msg; @@ -3157,8 +3160,10 @@ static Scheme_Object *place_channel_finish_ready(void *d, int argc, struct Schem if (msg) { Scheme_Thread *p = scheme_current_thread; - p->place_channel_msg_in_flight = NULL; - return scheme_places_deserialize(msg, msg_memory); + BEGIN_ESCAPEABLE(cleanup_msg_memmory, p); + msg = scheme_places_deserialize(msg, p->place_channel_msg_in_flight); + p->place_channel_msg_in_flight = NULL; + END_ESCAPEABLE(); } return msg; } @@ -3178,9 +3183,9 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory); if (msg != NULL) { - Scheme_Thread *p = scheme_current_thread; + Scheme_Thread *p = ((Syncing *)(sinfo->current_syncing))->thread; p->place_channel_msg_in_flight = msg_memory; - o = scheme_make_pair(msg, msg_memory); + o = scheme_make_raw_pair(msg, msg_memory); wrapper = scheme_make_closed_prim(place_channel_finish_ready, NULL); scheme_set_sync_target(sinfo, o, wrapper, NULL, 0, 0, NULL); return 1; diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index a8d592b5d7..91398f10d8 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -1720,6 +1720,7 @@ typedef struct Syncing { Scheme_Accept_Sync *accepts; Scheme_Thread *disable_break; /* when result is set */ + Scheme_Thread *thread; /* set when syncing to allow in flight place message cleanup */ } Syncing; int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *syncing); diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index 7bfe3c81bb..e7efb98c6e 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -5888,6 +5888,7 @@ static Syncing *make_syncing(Evt_Set *evt_set, float timeout, double start_time) pos = scheme_rand((Scheme_Random_State *)rand_state); syncing->start_pos = (pos % evt_set->argc); } + syncing->thread = scheme_current_thread; return syncing; } @@ -6383,16 +6384,17 @@ void scheme_post_syncing_nacks(Syncing *syncing) static void escape_during_sync(Syncing *syncing) { #ifdef MZ_PRECISE_GC - Scheme_Thread *p = scheme_current_thread; + Scheme_Thread *p = syncing->thread; #endif scheme_post_syncing_nacks(syncing); #ifdef MZ_PRECISE_GC - if (p->place_channel_msg_in_flight) { + if (p && p->place_channel_msg_in_flight) { GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight); p->place_channel_msg_in_flight = NULL; } + syncing->thread = NULL; #endif }