calling sync on a place-channel now returns the channel message just like standard racket channels

This commit is contained in:
Kevin Tew 2011-02-15 12:57:48 -07:00
parent 21c6a9f5c8
commit 156153f692
2 changed files with 52 additions and 35 deletions

View File

@ -101,7 +101,7 @@ END
(define-values (pc5 pc6) (place-channel))
(place-channel-send pl pc5)
(test "Ready5" sync (handle-evt pc6 (lambda (p) (place-channel-recv p))))
(test "Ready5" sync pc6)
(place-wait pl)
)

View File

@ -26,11 +26,12 @@ static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]);
static Scheme_Place_Async_Channel *scheme_place_async_channel_create();
static Scheme_Place_Bi_Channel *scheme_place_bi_channel_create();
static Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig);
static int scheme_place_channel_ready(Scheme_Object *so);
static int scheme_place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo);
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o, void *msg_memory);
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch, void **msg_memory);
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o);
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch);
static Scheme_Object *scheme_places_deep_copy_to_master(Scheme_Object *so);
/* Scheme_Object *scheme_places_deep_copy(Scheme_Object *so); */
@ -1186,7 +1187,6 @@ Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) {
Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
if (argc == 2) {
Scheme_Object *mso;
Scheme_Place_Bi_Channel *ch;
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel;
@ -1198,11 +1198,7 @@ Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
ch = NULL;
scheme_wrong_type("place-channel-send", "place-channel", 0, argc, args);
}
{
void *msg_memory = NULL;
mso = scheme_places_serialize(args[1], &msg_memory);
scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, mso, msg_memory);
}
scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, args[1]);
}
else {
scheme_wrong_count_m("place-channel-send", 2, 2, argc, args, 0);
@ -1212,7 +1208,6 @@ Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) {
if (argc == 1) {
Scheme_Object *mso;
Scheme_Place_Bi_Channel *ch;
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel;
@ -1224,11 +1219,7 @@ Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) {
ch = NULL;
scheme_wrong_type("place-channel-recv", "place-channel", 0, argc, args);
}
{
void *msg_memory = NULL;
mso = scheme_place_async_recv((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory);
return scheme_places_deserialize(mso, msg_memory);
}
return scheme_place_async_recv((Scheme_Place_Async_Channel *) ch->recvch);
}
else {
scheme_wrong_count_m("place-channel-recv", 1, 1, argc, args, 0);
@ -1437,8 +1428,13 @@ static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[])
return SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type) ? scheme_true : scheme_false;
}
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o, void *msg_memory) {
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) {
void *msg_memory = NULL;
Scheme_Object *o;
int cnt;
o = scheme_places_serialize(uo, &msg_memory);
mzrt_mutex_lock(ch->lock);
{
cnt = ch->count;
@ -1482,6 +1478,34 @@ static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Objec
}
}
static Scheme_Object *scheme_place_async_try_recv(Scheme_Place_Async_Channel *ch) {
Scheme_Object *msg = NULL;
void *msg_memory = NULL;
mzrt_mutex_lock(ch->lock);
{
void *signaldescr;
signaldescr = scheme_get_signal_handle();
ch->wakeup_signal = signaldescr;
if (ch->count > 0) { /* GET MSG */
msg = ch->msgs[ch->out];
msg_memory = ch->msg_memory[ch->out];
ch->msgs[ch->out] = NULL;
ch->msg_memory[ch->out] = NULL;
--ch->count;
ch->out = (++ch->out % ch->size);
}
}
mzrt_mutex_unlock(ch->lock);
if (msg) {
return scheme_places_deserialize(msg, msg_memory);
}
return msg;
}
static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
int ready = 0;
mzrt_mutex_lock(ch->lock);
@ -1495,35 +1519,28 @@ static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
return ready;
}
static int scheme_place_channel_ready(Scheme_Object *so) {
static int scheme_place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
Scheme_Place_Bi_Channel *ch;
Scheme_Object *msg = NULL;
if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) so)->channel;
}
else {
ch = (Scheme_Place_Bi_Channel *)so;
}
return scheme_place_async_ch_ready((Scheme_Place_Async_Channel *) ch->recvch);
msg = scheme_place_async_try_recv((Scheme_Place_Async_Channel *) ch->recvch);
if (msg != NULL) {
scheme_set_sync_target(sinfo, msg, NULL, NULL, 0, 0, NULL);
return 1;
}
return 0;
}
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch, void **msg_memory) {
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) {
Scheme_Object *msg = NULL;
while(1) {
mzrt_mutex_lock(ch->lock);
{
if (ch->count > 0) { /* GET MSG */
msg = ch->msgs[ch->out];
*msg_memory = ch->msg_memory[ch->out];
ch->msgs[ch->out] = NULL;
ch->msg_memory[ch->out] = NULL;
--ch->count;
ch->out = (++ch->out % ch->size);
}
}
mzrt_mutex_unlock(ch->lock);
msg = scheme_place_async_try_recv(ch);
if(msg) break;
else {
void *signaldescr;