diff --git a/racket/src/racket/src/place.c b/racket/src/racket/src/place.c index 134032c5bc..6baf58467c 100644 --- a/racket/src/racket/src/place.c +++ b/racket/src/racket/src/place.c @@ -3203,8 +3203,7 @@ static Scheme_Object *GC_master_make_vector(int size) { static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) { void *msg_memory = NULL; Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL; - intptr_t sz; - int cnt; + intptr_t sz, cnt; o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object); /* uo needs to stay live until `master_chain` is registered in `ch` */ @@ -3220,40 +3219,59 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) else bad_place_message(uo); } + { + intptr_t msg_size; + msg_size = GC_message_allocator_size(msg_memory); + log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size); + } + mzrt_mutex_lock(ch->lock); { cnt = ch->count; if (ch->count == ch->size) { /* GROW QUEUE */ - Scheme_Object **new_msgs, **new_chains; - void **new_msg_memory; + Scheme_Object **new_msgs = NULL, **new_chains = NULL; + void **new_msg_memory = NULL; + intptr_t sz = 0; - new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2); - new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2); - new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2); + /* Can't allocate while holding the lock, so release lock and loop: */ + while (ch->count == ch->size) { + if ((sz == ch->size) && new_msgs) { + if (ch->out < ch->in) { + memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out)); + memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out)); + memcpy(new_chains, ch->msg_chains + ch->out, sizeof(void*) * (ch->in - ch->out)); + } + else { + int s1 = (ch->size - ch->out); + memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1); + memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in); - if (ch->out < ch->in) { - memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out)); - memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out)); - memcpy(new_chains, ch->msg_chains + ch->out, sizeof(void*) * (ch->in - ch->out)); + memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * s1); + memcpy(new_msg_memory + s1, ch->msg_memory, sizeof(void*) * ch->in); + + memcpy(new_chains, ch->msg_chains + ch->out, sizeof(Scheme_Object *) * s1); + memcpy(new_chains + s1, ch->msg_chains, sizeof(Scheme_Object *) * ch->in); + } + + ch->msgs = new_msgs; + ch->msg_memory = new_msg_memory; + ch->msg_chains = new_chains; + ch->in = ch->size; + ch->out = 0; + ch->size *= 2; + + break; + } else { + sz = ch->size; + mzrt_mutex_unlock(ch->lock); + + new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * sz * 2); + new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2); + new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2); + + mzrt_mutex_lock(ch->lock); + } } - else { - int s1 = (ch->size - ch->out); - memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1); - memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in); - - memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * s1); - memcpy(new_msg_memory + s1, ch->msg_memory, sizeof(void*) * ch->in); - - memcpy(new_chains, ch->msg_chains + ch->out, sizeof(Scheme_Object *) * s1); - memcpy(new_chains + s1, ch->msg_chains, sizeof(Scheme_Object *) * ch->in); - } - - ch->msgs = new_msgs; - ch->msg_memory = new_msg_memory; - ch->msg_chains = new_chains; - ch->in = ch->size; - ch->out = 0; - ch->size *= 2; } ch->msgs[ch->in] = o; @@ -3271,12 +3289,6 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) /* make sure `uo` is treated as live until here: */ if (!uo) scheme_signal_error("?"); - { - intptr_t msg_size; - msg_size = GC_message_allocator_size(msg_memory); - log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size); - } - if (!cnt && ch->wakeup_signal) { /*wake up possibly sleeping single receiver */ if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) { @@ -3381,71 +3393,93 @@ static void place_object_dec_refcount(Scheme_Object *o) { } } -static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o) { - if (ch->wakeup_signal == o) { - return; - } - else if (!ch->wakeup_signal) { - place_object_inc_refcount(o); - ch->wakeup_signal = o; - } - else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal) - && ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) { - place_object_dec_refcount(ch->wakeup_signal); - place_object_inc_refcount(o); - ch->wakeup_signal = o; - } - else if (SCHEME_VECTORP(ch->wakeup_signal)) { - int i = 0; - Scheme_Object *v = ch->wakeup_signal; - int size = SCHEME_VEC_SIZE(v); - /* already registered? */ - for (i = 0; i < size; i++) { - Scheme_Object *vo = SCHEME_VEC_ELS(v)[i]; - if (vo == o) { - return; - } +static void lock_and_register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o) +{ + Scheme_Object *avail_vector; + + mzrt_mutex_lock(ch->lock); + + /* loop in case we need to release the lock temporarily to allocate: */ + while (1) { + if (ch->wakeup_signal == o) { + return; } - /* look for unused slot in wakeup vector */ - for (i = 0; i < size; i++) { - Scheme_Object *vo = SCHEME_VEC_ELS(v)[i]; - if (!vo) { - place_object_inc_refcount(o); - SCHEME_VEC_ELS(v)[i] = o; - return; - } - else if (SCHEME_PLACE_OBJECTP(vo) && - ((Scheme_Place_Object *)vo)->signal_handle == NULL) { - place_object_dec_refcount(vo); - place_object_inc_refcount(o); - SCHEME_VEC_ELS(v)[i] = o; - return; - } - } - /* fall through to here, need to grow wakeup vector */ - { - Scheme_Object *nv; - nv = GC_master_make_vector(size*2); - for (i = 0; i < size; i++) { - SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i]; - } + else if (!ch->wakeup_signal) { place_object_inc_refcount(o); - SCHEME_VEC_ELS(nv)[size+1] = o; - ch->wakeup_signal = nv; + ch->wakeup_signal = o; + return; + } + else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal) + && ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) { + place_object_dec_refcount(ch->wakeup_signal); + place_object_inc_refcount(o); + ch->wakeup_signal = o; + return; + } + else if (SCHEME_VECTORP(ch->wakeup_signal)) { + int i = 0; + Scheme_Object *v = ch->wakeup_signal; + int size = SCHEME_VEC_SIZE(v); + /* already registered? */ + for (i = 0; i < size; i++) { + Scheme_Object *vo = SCHEME_VEC_ELS(v)[i]; + if (vo == o) + return; + } + /* look for unused slot in wakeup vector */ + for (i = 0; i < size; i++) { + Scheme_Object *vo = SCHEME_VEC_ELS(v)[i]; + if (!vo) { + place_object_inc_refcount(o); + SCHEME_VEC_ELS(v)[i] = o; + return; + } + else if (SCHEME_PLACE_OBJECTP(vo) && + ((Scheme_Place_Object *)vo)->signal_handle == NULL) { + place_object_dec_refcount(vo); + place_object_inc_refcount(o); + SCHEME_VEC_ELS(v)[i] = o; + return; + } + } + /* fall through to here, need to grow wakeup vector; + must do so without the lock */ + { + if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == size*2)) { + Scheme_Object *nv; + nv = avail_vector; + for (i = 0; i < size; i++) { + SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i]; + } + place_object_inc_refcount(o); + SCHEME_VEC_ELS(nv)[size+1] = o; + ch->wakeup_signal = nv; + } else { + mzrt_mutex_unlock(ch->lock); + avail_vector = GC_master_make_vector(size*2); + mzrt_mutex_lock(ch->lock); + } + } + } + /* grow from single wakeup to multiple wakeups */ + else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) { + if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == 2)) { + Scheme_Object *v; + v = avail_vector; + SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal; + place_object_inc_refcount(o); + SCHEME_VEC_ELS(v)[1] = o; + ch->wakeup_signal = v; + } else { + mzrt_mutex_unlock(ch->lock); + avail_vector = GC_master_make_vector(2); + mzrt_mutex_lock(ch->lock); + } + } + else { + printf("Oops not a valid ch->wakeup_signal\n"); + exit(1); } - } - /* grow from single wakeup to multiple wakeups */ - else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) { - Scheme_Object *v; - v = GC_master_make_vector(2); - SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal; - place_object_inc_refcount(o); - SCHEME_VEC_ELS(v)[1] = o; - ch->wakeup_signal = v; - } - else { - printf("Oops not a valid ch->wakeup_signal\n"); - exit(1); } } @@ -3457,9 +3491,8 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan void *msg_memory = NULL; intptr_t sz; - mzrt_mutex_lock(ch->lock); + lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object); { - register_place_object_with_channel(ch, (Scheme_Object *) place_object); if (ch->count > 0) { /* GET MSG */ msg = ch->msgs[ch->out]; msg_memory = ch->msg_memory[ch->out]; @@ -3516,9 +3549,8 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) { int ready = 0; - mzrt_mutex_lock(ch->lock); + lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object); { - register_place_object_with_channel(ch, (Scheme_Object *) place_object); if (ch->count > 0) ready = 1; if (!ch->wr_ref) ready = 1; } diff --git a/racket/src/racket/src/schpriv.h b/racket/src/racket/src/schpriv.h index f21352b5ad..b7d0291fdb 100644 --- a/racket/src/racket/src/schpriv.h +++ b/racket/src/racket/src/schpriv.h @@ -4136,7 +4136,7 @@ typedef struct Scheme_Place_Async_Channel { intptr_t delta; intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */ #if defined(MZ_USE_PLACES) - mzrt_mutex *lock; + mzrt_mutex *lock; /* no allocation while this lock is held */ #endif Scheme_Object **msgs; void **msg_memory; @@ -4177,7 +4177,7 @@ typedef struct Scheme_Place { typedef struct Scheme_Place_Object { Scheme_Object so; #if defined(MZ_USE_PLACES) - mzrt_mutex *lock; + mzrt_mutex *lock; /* no allocation or place-channel locks while this lock is held */ mzrt_sema *pause; #endif char die;