From 666c0fdb57485e9fb0af6b3b870012819602919b Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Tue, 13 Mar 2012 11:46:15 -0600 Subject: [PATCH] Fix sync/timeout hang due to place channels --- collects/tests/racket/place-channel.rkt | 10 +++++ src/racket/include/scheme.h | 2 + src/racket/src/place.c | 50 ++++++++++++++++++++++--- src/racket/src/thread.c | 17 ++++++++- 4 files changed, 73 insertions(+), 6 deletions(-) diff --git a/collects/tests/racket/place-channel.rkt b/collects/tests/racket/place-channel.rkt index 953966a992..fc540d9aa9 100644 --- a/collects/tests/racket/place-channel.rkt +++ b/collects/tests/racket/place-channel.rkt @@ -183,6 +183,16 @@ (define (main) + (let () + (define pl (place ch + (for ([i 100000]) + (place-channel-get ch) + (place-channel-put ch (list "foo" 1 "bar"))))) + + (for ([i 100000]) + (place-channel-put pl (list "hello" 4 "hello")) + (sync/timeout 1 pl))) + (let () (define flx (make-shared-fxvector 10 0)) (define flv (make-shared-flvector 10 0.0)) diff --git a/src/racket/include/scheme.h b/src/racket/include/scheme.h index 25a3b91df5..1a5a58ba37 100644 --- a/src/racket/include/scheme.h +++ b/src/racket/include/scheme.h @@ -1148,7 +1148,9 @@ typedef struct Scheme_Thread { #ifdef MZ_PRECISE_GC struct GC_Thread_Info *gc_info; /* managed by the GC */ + void *place_channel_msg_in_flight; #endif + } Scheme_Thread; #include "schthread.h" diff --git a/src/racket/src/place.c b/src/racket/src/place.c index 20a00f92a3..22d5a60c73 100644 --- a/src/racket/src/place.c +++ b/src/racket/src/place.c @@ -2589,9 +2589,11 @@ Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) { if (GC_message_objects_size(msg_memory) < 1024) { new_so = do_places_deep_copy(so, mzPDC_UNCOPY, 1, NULL, NULL); GC_dispose_short_message_allocator(msg_memory); + msg_memory = NULL; } else { GC_adopt_message_allocator(msg_memory); + msg_memory = NULL; #if !defined(SHARED_TABLES) new_so = do_places_deep_copy(so, mzPDC_DESER, 1, NULL, NULL); #endif @@ -3086,7 +3088,7 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S } } -static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) { +static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr) { Scheme_Object *msg = NULL; void *msg_memory = NULL; intptr_t sz; @@ -3113,9 +3115,24 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel } mzrt_mutex_unlock(ch->lock); + *msg_memory_ptr = msg_memory; + return msg; +} + +static void cleanup_msg_memmory(void *msg_memory) { + if (msg_memory) + GC_destroy_orphan_msg_memory(msg_memory); +} + +static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) { + Scheme_Object *msg = NULL; + void *msg_memory = NULL; + BEGIN_ESCAPEABLE(); + msg = scheme_place_async_try_receive_raw(ch, &msg_memory); if (msg) { - return scheme_places_deserialize(msg, msg_memory); + msg = scheme_places_deserialize(msg, msg_memory); } + END_ESCAPEABLE(); return msg; } @@ -3129,20 +3146,43 @@ static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) { mzrt_mutex_unlock(ch->lock); return ready; } +static Scheme_Object *place_channel_finish_ready(void *d, int argc, struct Scheme_Object *argv[]) { + Scheme_Object *o; + Scheme_Object *msg; + void *msg_memory; + + o = argv[0]; + msg = SCHEME_CAR(o); + msg_memory = SCHEME_CDR(o); + + if (msg) { + Scheme_Thread *p = scheme_current_thread; + p->place_channel_msg_in_flight = NULL; + return scheme_places_deserialize(msg, msg_memory); + } + return msg; +} static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { Scheme_Place_Bi_Channel *ch; Scheme_Object *msg = NULL; + Scheme_Object *o; + Scheme_Object *wrapper; + void *msg_memory = NULL; if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) { ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) so)->channel; } else { ch = (Scheme_Place_Bi_Channel *)so; } - - msg = scheme_place_async_try_receive((Scheme_Place_Async_Channel *) ch->recvch); + + msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory); if (msg != NULL) { - scheme_set_sync_target(sinfo, msg, NULL, NULL, 0, 0, NULL); + Scheme_Thread *p = scheme_current_thread; + p->place_channel_msg_in_flight = msg_memory; + o = scheme_make_pair(msg, msg_memory); + wrapper = scheme_make_closed_prim(place_channel_finish_ready, NULL); + scheme_set_sync_target(sinfo, o, wrapper, NULL, 0, 0, NULL); return 1; } return 0; diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index 5b41a38ce2..7bfe3c81bb 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -6381,6 +6381,21 @@ void scheme_post_syncing_nacks(Syncing *syncing) } } +static void escape_during_sync(Syncing *syncing) { +#ifdef MZ_PRECISE_GC + Scheme_Thread *p = scheme_current_thread; +#endif + +scheme_post_syncing_nacks(syncing); + +#ifdef MZ_PRECISE_GC + if (p->place_channel_msg_in_flight) { + GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight); + p->place_channel_msg_in_flight = NULL; + } +#endif +} + static Scheme_Object *do_sync(const char *name, int argc, Scheme_Object *argv[], int with_break, int with_timeout, int _tailok) { @@ -6478,7 +6493,7 @@ static Scheme_Object *do_sync(const char *name, int argc, Scheme_Object *argv[], syncing->disable_break = scheme_current_thread; } - BEGIN_ESCAPEABLE(scheme_post_syncing_nacks, syncing); + BEGIN_ESCAPEABLE(escape_during_sync, syncing); scheme_block_until((Scheme_Ready_Fun)syncing_ready, syncing_needs_wakeup, (Scheme_Object *)syncing, timeout); END_ESCAPEABLE();