Hold reference to original thread during BEGIN_ESCAPABLE in place-channel sync
Fixes several bugs related to killed threads that were in the middle of syncing on place-channels.
This commit is contained in:
parent
c1dbaff6b8
commit
d4d5ca70fb
|
@ -183,15 +183,37 @@
|
||||||
|
|
||||||
|
|
||||||
(define (main)
|
(define (main)
|
||||||
|
|
||||||
|
;test breaks in BEGIN_ESCAPABLE during scheme_place_async_try_receive
|
||||||
(let ()
|
(let ()
|
||||||
(define pl (place ch
|
(for ([i 25])
|
||||||
(for ([i 100000])
|
(let ()
|
||||||
(place-channel-get ch)
|
(define-values (ch1 ch2) (place-channel))
|
||||||
(place-channel-put ch (list "foo" 1 "bar")))))
|
(define mt
|
||||||
|
(thread
|
||||||
(for ([i 100000])
|
(lambda ()
|
||||||
(place-channel-put pl (list "hello" 4 "hello"))
|
(let loop () (loop)))))
|
||||||
(sync/timeout 1 pl)))
|
(define t
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(for ([i 100000])
|
||||||
|
;(place-channel-put ch1 (list "foo" 1 "bar"))
|
||||||
|
(place-channel-put ch1 (make-list 7000 'foo))
|
||||||
|
))))
|
||||||
|
(define t2
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(time (for ([i 100000])
|
||||||
|
(sync ch2))))))
|
||||||
|
|
||||||
|
(define ti (/ (+ (random 100) 1) 100))
|
||||||
|
(sleep ti)
|
||||||
|
(kill-thread mt)
|
||||||
|
(kill-thread t2)
|
||||||
|
(kill-thread t)
|
||||||
|
;(displayln (exact->inexact ti))
|
||||||
|
(thread-wait t)
|
||||||
|
(thread-wait t2))))
|
||||||
|
|
||||||
(let ()
|
(let ()
|
||||||
(define flx (make-shared-fxvector 10 0))
|
(define flx (make-shared-fxvector 10 0))
|
||||||
|
|
|
@ -306,6 +306,7 @@ static int mark_syncing_MARK(void *p, struct NewGC *gc) {
|
||||||
gcMARK2(w->reposts, gc);
|
gcMARK2(w->reposts, gc);
|
||||||
gcMARK2(w->accepts, gc);
|
gcMARK2(w->accepts, gc);
|
||||||
gcMARK2(w->disable_break, gc);
|
gcMARK2(w->disable_break, gc);
|
||||||
|
gcMARK2(w->thread, gc);
|
||||||
|
|
||||||
return
|
return
|
||||||
gcBYTES_TO_WORDS(sizeof(Syncing));
|
gcBYTES_TO_WORDS(sizeof(Syncing));
|
||||||
|
@ -320,6 +321,7 @@ static int mark_syncing_FIXUP(void *p, struct NewGC *gc) {
|
||||||
gcFIXUP2(w->reposts, gc);
|
gcFIXUP2(w->reposts, gc);
|
||||||
gcFIXUP2(w->accepts, gc);
|
gcFIXUP2(w->accepts, gc);
|
||||||
gcFIXUP2(w->disable_break, gc);
|
gcFIXUP2(w->disable_break, gc);
|
||||||
|
gcFIXUP2(w->thread, gc);
|
||||||
|
|
||||||
return
|
return
|
||||||
gcBYTES_TO_WORDS(sizeof(Syncing));
|
gcBYTES_TO_WORDS(sizeof(Syncing));
|
||||||
|
|
|
@ -1915,6 +1915,7 @@ mark_syncing {
|
||||||
gcMARK2(w->reposts, gc);
|
gcMARK2(w->reposts, gc);
|
||||||
gcMARK2(w->accepts, gc);
|
gcMARK2(w->accepts, gc);
|
||||||
gcMARK2(w->disable_break, gc);
|
gcMARK2(w->disable_break, gc);
|
||||||
|
gcMARK2(w->thread, gc);
|
||||||
|
|
||||||
size:
|
size:
|
||||||
gcBYTES_TO_WORDS(sizeof(Syncing));
|
gcBYTES_TO_WORDS(sizeof(Syncing));
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "schpriv.h"
|
#include "schpriv.h"
|
||||||
|
|
||||||
static Scheme_Object* scheme_place_enabled(int argc, Scheme_Object *args[]);
|
static Scheme_Object* scheme_place_enabled(int argc, Scheme_Object *args[]);
|
||||||
static Scheme_Object* scheme_place_shared(int argc, Scheme_Object *args[]);
|
static Scheme_Object* scheme_place_shared(int argc, Scheme_Object *args[]);
|
||||||
|
|
||||||
|
@ -3119,18 +3118,22 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cleanup_msg_memmory(void *msg_memory) {
|
static void cleanup_msg_memmory(void *thread) {
|
||||||
if (msg_memory)
|
Scheme_Thread *p = thread;
|
||||||
GC_destroy_orphan_msg_memory(msg_memory);
|
if (p->place_channel_msg_in_flight) {
|
||||||
|
GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight);
|
||||||
|
p->place_channel_msg_in_flight = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) {
|
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) {
|
||||||
Scheme_Object *msg = NULL;
|
Scheme_Object *msg = NULL;
|
||||||
void *msg_memory = NULL;
|
Scheme_Thread *p = scheme_current_thread;
|
||||||
BEGIN_ESCAPEABLE(cleanup_msg_memmory, msg_memory);
|
BEGIN_ESCAPEABLE(cleanup_msg_memmory, p);
|
||||||
msg = scheme_place_async_try_receive_raw(ch, &msg_memory);
|
msg = scheme_place_async_try_receive_raw(ch, &p->place_channel_msg_in_flight);
|
||||||
if (msg) {
|
if (msg) {
|
||||||
msg = scheme_places_deserialize(msg, msg_memory);
|
msg = scheme_places_deserialize(msg, p->place_channel_msg_in_flight);
|
||||||
|
p->place_channel_msg_in_flight = NULL;
|
||||||
}
|
}
|
||||||
END_ESCAPEABLE();
|
END_ESCAPEABLE();
|
||||||
return msg;
|
return msg;
|
||||||
|
@ -3157,8 +3160,10 @@ static Scheme_Object *place_channel_finish_ready(void *d, int argc, struct Schem
|
||||||
|
|
||||||
if (msg) {
|
if (msg) {
|
||||||
Scheme_Thread *p = scheme_current_thread;
|
Scheme_Thread *p = scheme_current_thread;
|
||||||
p->place_channel_msg_in_flight = NULL;
|
BEGIN_ESCAPEABLE(cleanup_msg_memmory, p);
|
||||||
return scheme_places_deserialize(msg, msg_memory);
|
msg = scheme_places_deserialize(msg, p->place_channel_msg_in_flight);
|
||||||
|
p->place_channel_msg_in_flight = NULL;
|
||||||
|
END_ESCAPEABLE();
|
||||||
}
|
}
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
@ -3178,9 +3183,9 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
|
||||||
|
|
||||||
msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory);
|
msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory);
|
||||||
if (msg != NULL) {
|
if (msg != NULL) {
|
||||||
Scheme_Thread *p = scheme_current_thread;
|
Scheme_Thread *p = ((Syncing *)(sinfo->current_syncing))->thread;
|
||||||
p->place_channel_msg_in_flight = msg_memory;
|
p->place_channel_msg_in_flight = msg_memory;
|
||||||
o = scheme_make_pair(msg, msg_memory);
|
o = scheme_make_raw_pair(msg, msg_memory);
|
||||||
wrapper = scheme_make_closed_prim(place_channel_finish_ready, NULL);
|
wrapper = scheme_make_closed_prim(place_channel_finish_ready, NULL);
|
||||||
scheme_set_sync_target(sinfo, o, wrapper, NULL, 0, 0, NULL);
|
scheme_set_sync_target(sinfo, o, wrapper, NULL, 0, 0, NULL);
|
||||||
return 1;
|
return 1;
|
||||||
|
|
|
@ -1720,6 +1720,7 @@ typedef struct Syncing {
|
||||||
Scheme_Accept_Sync *accepts;
|
Scheme_Accept_Sync *accepts;
|
||||||
|
|
||||||
Scheme_Thread *disable_break; /* when result is set */
|
Scheme_Thread *disable_break; /* when result is set */
|
||||||
|
Scheme_Thread *thread; /* set when syncing to allow in flight place message cleanup */
|
||||||
} Syncing;
|
} Syncing;
|
||||||
|
|
||||||
int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *syncing);
|
int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *syncing);
|
||||||
|
|
|
@ -5888,6 +5888,7 @@ static Syncing *make_syncing(Evt_Set *evt_set, float timeout, double start_time)
|
||||||
pos = scheme_rand((Scheme_Random_State *)rand_state);
|
pos = scheme_rand((Scheme_Random_State *)rand_state);
|
||||||
syncing->start_pos = (pos % evt_set->argc);
|
syncing->start_pos = (pos % evt_set->argc);
|
||||||
}
|
}
|
||||||
|
syncing->thread = scheme_current_thread;
|
||||||
|
|
||||||
return syncing;
|
return syncing;
|
||||||
}
|
}
|
||||||
|
@ -6383,16 +6384,17 @@ void scheme_post_syncing_nacks(Syncing *syncing)
|
||||||
|
|
||||||
static void escape_during_sync(Syncing *syncing) {
|
static void escape_during_sync(Syncing *syncing) {
|
||||||
#ifdef MZ_PRECISE_GC
|
#ifdef MZ_PRECISE_GC
|
||||||
Scheme_Thread *p = scheme_current_thread;
|
Scheme_Thread *p = syncing->thread;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
scheme_post_syncing_nacks(syncing);
|
scheme_post_syncing_nacks(syncing);
|
||||||
|
|
||||||
#ifdef MZ_PRECISE_GC
|
#ifdef MZ_PRECISE_GC
|
||||||
if (p->place_channel_msg_in_flight) {
|
if (p && p->place_channel_msg_in_flight) {
|
||||||
GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight);
|
GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight);
|
||||||
p->place_channel_msg_in_flight = NULL;
|
p->place_channel_msg_in_flight = NULL;
|
||||||
}
|
}
|
||||||
|
syncing->thread = NULL;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user