diff --git a/collects/tests/racket/place-channel.rktl b/collects/tests/racket/place-channel.rktl index ef03be323c..4db723f771 100644 --- a/collects/tests/racket/place-channel.rktl +++ b/collects/tests/racket/place-channel.rktl @@ -101,7 +101,7 @@ END (define-values (pc5 pc6) (place-channel)) (place-channel-send pl pc5) - (test "Ready5" sync (handle-evt pc6 (lambda (p) (place-channel-recv p)))) + (test "Ready5" sync pc6) (place-wait pl) ) diff --git a/src/racket/src/places.c b/src/racket/src/places.c index 07deb8f6f0..2091ccc2dc 100644 --- a/src/racket/src/places.c +++ b/src/racket/src/places.c @@ -26,11 +26,12 @@ static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]); static Scheme_Place_Async_Channel *scheme_place_async_channel_create(); static Scheme_Place_Bi_Channel *scheme_place_bi_channel_create(); static Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig); -static int scheme_place_channel_ready(Scheme_Object *so); +static int scheme_place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo); -static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o, void *msg_memory); -static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch, void **msg_memory); + +static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o); +static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch); static Scheme_Object *scheme_places_deep_copy_to_master(Scheme_Object *so); /* Scheme_Object *scheme_places_deep_copy(Scheme_Object *so); */ @@ -1186,7 +1187,6 @@ Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) { Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) { if (argc == 2) { - Scheme_Object *mso; Scheme_Place_Bi_Channel *ch; if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) { ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel; @@ -1198,11 +1198,7 @@ Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) { ch = NULL; scheme_wrong_type("place-channel-send", "place-channel", 0, argc, args); } - { - void *msg_memory = NULL; - mso = scheme_places_serialize(args[1], &msg_memory); - scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, mso, msg_memory); - } + scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, args[1]); } else { scheme_wrong_count_m("place-channel-send", 2, 2, argc, args, 0); @@ -1212,7 +1208,6 @@ Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) { Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) { if (argc == 1) { - Scheme_Object *mso; Scheme_Place_Bi_Channel *ch; if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) { ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel; @@ -1224,11 +1219,7 @@ Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) { ch = NULL; scheme_wrong_type("place-channel-recv", "place-channel", 0, argc, args); } - { - void *msg_memory = NULL; - mso = scheme_place_async_recv((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory); - return scheme_places_deserialize(mso, msg_memory); - } + return scheme_place_async_recv((Scheme_Place_Async_Channel *) ch->recvch); } else { scheme_wrong_count_m("place-channel-recv", 1, 1, argc, args, 0); @@ -1437,8 +1428,13 @@ static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]) return SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type) ? scheme_true : scheme_false; } -static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o, void *msg_memory) { +static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) { + void *msg_memory = NULL; + Scheme_Object *o; int cnt; + + o = scheme_places_serialize(uo, &msg_memory); + mzrt_mutex_lock(ch->lock); { cnt = ch->count; @@ -1482,6 +1478,34 @@ static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Objec } } +static Scheme_Object *scheme_place_async_try_recv(Scheme_Place_Async_Channel *ch) { + Scheme_Object *msg = NULL; + void *msg_memory = NULL; + + mzrt_mutex_lock(ch->lock); + { + void *signaldescr; + signaldescr = scheme_get_signal_handle(); + ch->wakeup_signal = signaldescr; + if (ch->count > 0) { /* GET MSG */ + msg = ch->msgs[ch->out]; + msg_memory = ch->msg_memory[ch->out]; + + ch->msgs[ch->out] = NULL; + ch->msg_memory[ch->out] = NULL; + + --ch->count; + ch->out = (++ch->out % ch->size); + } + } + mzrt_mutex_unlock(ch->lock); + + if (msg) { + return scheme_places_deserialize(msg, msg_memory); + } + return msg; +} + static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) { int ready = 0; mzrt_mutex_lock(ch->lock); @@ -1495,35 +1519,28 @@ static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) { return ready; } -static int scheme_place_channel_ready(Scheme_Object *so) { +static int scheme_place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { Scheme_Place_Bi_Channel *ch; + Scheme_Object *msg = NULL; if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) { ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) so)->channel; } else { ch = (Scheme_Place_Bi_Channel *)so; } - - return scheme_place_async_ch_ready((Scheme_Place_Async_Channel *) ch->recvch); + + msg = scheme_place_async_try_recv((Scheme_Place_Async_Channel *) ch->recvch); + if (msg != NULL) { + scheme_set_sync_target(sinfo, msg, NULL, NULL, 0, 0, NULL); + return 1; + } + return 0; } -static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch, void **msg_memory) { +static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) { Scheme_Object *msg = NULL; while(1) { - mzrt_mutex_lock(ch->lock); - { - if (ch->count > 0) { /* GET MSG */ - msg = ch->msgs[ch->out]; - *msg_memory = ch->msg_memory[ch->out]; - - ch->msgs[ch->out] = NULL; - ch->msg_memory[ch->out] = NULL; - - --ch->count; - ch->out = (++ch->out % ch->size); - } - } - mzrt_mutex_unlock(ch->lock); + msg = scheme_place_async_try_recv(ch); if(msg) break; else { void *signaldescr;