Fix sync/timeout hang due to place channels
This commit is contained in:
parent
f3a21c6aba
commit
666c0fdb57
|
@ -183,6 +183,16 @@
|
|||
|
||||
|
||||
(define (main)
|
||||
(let ()
|
||||
(define pl (place ch
|
||||
(for ([i 100000])
|
||||
(place-channel-get ch)
|
||||
(place-channel-put ch (list "foo" 1 "bar")))))
|
||||
|
||||
(for ([i 100000])
|
||||
(place-channel-put pl (list "hello" 4 "hello"))
|
||||
(sync/timeout 1 pl)))
|
||||
|
||||
(let ()
|
||||
(define flx (make-shared-fxvector 10 0))
|
||||
(define flv (make-shared-flvector 10 0.0))
|
||||
|
|
|
@ -1148,7 +1148,9 @@ typedef struct Scheme_Thread {
|
|||
|
||||
#ifdef MZ_PRECISE_GC
|
||||
struct GC_Thread_Info *gc_info; /* managed by the GC */
|
||||
void *place_channel_msg_in_flight;
|
||||
#endif
|
||||
|
||||
} Scheme_Thread;
|
||||
|
||||
#include "schthread.h"
|
||||
|
|
|
@ -2589,9 +2589,11 @@ Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) {
|
|||
if (GC_message_objects_size(msg_memory) < 1024) {
|
||||
new_so = do_places_deep_copy(so, mzPDC_UNCOPY, 1, NULL, NULL);
|
||||
GC_dispose_short_message_allocator(msg_memory);
|
||||
msg_memory = NULL;
|
||||
}
|
||||
else {
|
||||
GC_adopt_message_allocator(msg_memory);
|
||||
msg_memory = NULL;
|
||||
#if !defined(SHARED_TABLES)
|
||||
new_so = do_places_deep_copy(so, mzPDC_DESER, 1, NULL, NULL);
|
||||
#endif
|
||||
|
@ -3086,7 +3088,7 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S
|
|||
}
|
||||
}
|
||||
|
||||
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) {
|
||||
static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr) {
|
||||
Scheme_Object *msg = NULL;
|
||||
void *msg_memory = NULL;
|
||||
intptr_t sz;
|
||||
|
@ -3113,9 +3115,24 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel
|
|||
}
|
||||
mzrt_mutex_unlock(ch->lock);
|
||||
|
||||
*msg_memory_ptr = msg_memory;
|
||||
return msg;
|
||||
}
|
||||
|
||||
static void cleanup_msg_memmory(void *msg_memory) {
|
||||
if (msg_memory)
|
||||
GC_destroy_orphan_msg_memory(msg_memory);
|
||||
}
|
||||
|
||||
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) {
|
||||
Scheme_Object *msg = NULL;
|
||||
void *msg_memory = NULL;
|
||||
BEGIN_ESCAPEABLE();
|
||||
msg = scheme_place_async_try_receive_raw(ch, &msg_memory);
|
||||
if (msg) {
|
||||
return scheme_places_deserialize(msg, msg_memory);
|
||||
msg = scheme_places_deserialize(msg, msg_memory);
|
||||
}
|
||||
END_ESCAPEABLE();
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
@ -3129,20 +3146,43 @@ static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
|
|||
mzrt_mutex_unlock(ch->lock);
|
||||
return ready;
|
||||
}
|
||||
static Scheme_Object *place_channel_finish_ready(void *d, int argc, struct Scheme_Object *argv[]) {
|
||||
Scheme_Object *o;
|
||||
Scheme_Object *msg;
|
||||
void *msg_memory;
|
||||
|
||||
o = argv[0];
|
||||
msg = SCHEME_CAR(o);
|
||||
msg_memory = SCHEME_CDR(o);
|
||||
|
||||
if (msg) {
|
||||
Scheme_Thread *p = scheme_current_thread;
|
||||
p->place_channel_msg_in_flight = NULL;
|
||||
return scheme_places_deserialize(msg, msg_memory);
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
|
||||
Scheme_Place_Bi_Channel *ch;
|
||||
Scheme_Object *msg = NULL;
|
||||
Scheme_Object *o;
|
||||
Scheme_Object *wrapper;
|
||||
void *msg_memory = NULL;
|
||||
if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) {
|
||||
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) so)->channel;
|
||||
}
|
||||
else {
|
||||
ch = (Scheme_Place_Bi_Channel *)so;
|
||||
}
|
||||
|
||||
msg = scheme_place_async_try_receive((Scheme_Place_Async_Channel *) ch->recvch);
|
||||
|
||||
msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory);
|
||||
if (msg != NULL) {
|
||||
scheme_set_sync_target(sinfo, msg, NULL, NULL, 0, 0, NULL);
|
||||
Scheme_Thread *p = scheme_current_thread;
|
||||
p->place_channel_msg_in_flight = msg_memory;
|
||||
o = scheme_make_pair(msg, msg_memory);
|
||||
wrapper = scheme_make_closed_prim(place_channel_finish_ready, NULL);
|
||||
scheme_set_sync_target(sinfo, o, wrapper, NULL, 0, 0, NULL);
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -6381,6 +6381,21 @@ void scheme_post_syncing_nacks(Syncing *syncing)
|
|||
}
|
||||
}
|
||||
|
||||
static void escape_during_sync(Syncing *syncing) {
|
||||
#ifdef MZ_PRECISE_GC
|
||||
Scheme_Thread *p = scheme_current_thread;
|
||||
#endif
|
||||
|
||||
scheme_post_syncing_nacks(syncing);
|
||||
|
||||
#ifdef MZ_PRECISE_GC
|
||||
if (p->place_channel_msg_in_flight) {
|
||||
GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight);
|
||||
p->place_channel_msg_in_flight = NULL;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static Scheme_Object *do_sync(const char *name, int argc, Scheme_Object *argv[],
|
||||
int with_break, int with_timeout, int _tailok)
|
||||
{
|
||||
|
@ -6478,7 +6493,7 @@ static Scheme_Object *do_sync(const char *name, int argc, Scheme_Object *argv[],
|
|||
syncing->disable_break = scheme_current_thread;
|
||||
}
|
||||
|
||||
BEGIN_ESCAPEABLE(scheme_post_syncing_nacks, syncing);
|
||||
BEGIN_ESCAPEABLE(escape_during_sync, syncing);
|
||||
scheme_block_until((Scheme_Ready_Fun)syncing_ready, syncing_needs_wakeup,
|
||||
(Scheme_Object *)syncing, timeout);
|
||||
END_ESCAPEABLE();
|
||||
|
|
Loading…
Reference in New Issue
Block a user