places: no allocation while low-level locks are held

This commit is contained in:
Matthew Flatt 2015-01-20 17:39:57 -07:00
parent 48290d3360
commit 5b20690876
2 changed files with 135 additions and 103 deletions

View File

@ -3203,8 +3203,7 @@ static Scheme_Object *GC_master_make_vector(int size) {
static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) { static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) {
void *msg_memory = NULL; void *msg_memory = NULL;
Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL; Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL;
intptr_t sz; intptr_t sz, cnt;
int cnt;
o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object); o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object);
/* uo needs to stay live until `master_chain` is registered in `ch` */ /* uo needs to stay live until `master_chain` is registered in `ch` */
@ -3220,40 +3219,59 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
else bad_place_message(uo); else bad_place_message(uo);
} }
{
intptr_t msg_size;
msg_size = GC_message_allocator_size(msg_memory);
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
}
mzrt_mutex_lock(ch->lock); mzrt_mutex_lock(ch->lock);
{ {
cnt = ch->count; cnt = ch->count;
if (ch->count == ch->size) { /* GROW QUEUE */ if (ch->count == ch->size) { /* GROW QUEUE */
Scheme_Object **new_msgs, **new_chains; Scheme_Object **new_msgs = NULL, **new_chains = NULL;
void **new_msg_memory; void **new_msg_memory = NULL;
intptr_t sz = 0;
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2); /* Can't allocate while holding the lock, so release lock and loop: */
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2); while (ch->count == ch->size) {
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2); if ((sz == ch->size) && new_msgs) {
if (ch->out < ch->in) {
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out));
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out));
memcpy(new_chains, ch->msg_chains + ch->out, sizeof(void*) * (ch->in - ch->out));
}
else {
int s1 = (ch->size - ch->out);
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1);
memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in);
if (ch->out < ch->in) { memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * s1);
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out)); memcpy(new_msg_memory + s1, ch->msg_memory, sizeof(void*) * ch->in);
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out));
memcpy(new_chains, ch->msg_chains + ch->out, sizeof(void*) * (ch->in - ch->out)); memcpy(new_chains, ch->msg_chains + ch->out, sizeof(Scheme_Object *) * s1);
memcpy(new_chains + s1, ch->msg_chains, sizeof(Scheme_Object *) * ch->in);
}
ch->msgs = new_msgs;
ch->msg_memory = new_msg_memory;
ch->msg_chains = new_chains;
ch->in = ch->size;
ch->out = 0;
ch->size *= 2;
break;
} else {
sz = ch->size;
mzrt_mutex_unlock(ch->lock);
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * sz * 2);
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
mzrt_mutex_lock(ch->lock);
}
} }
else {
int s1 = (ch->size - ch->out);
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1);
memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in);
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * s1);
memcpy(new_msg_memory + s1, ch->msg_memory, sizeof(void*) * ch->in);
memcpy(new_chains, ch->msg_chains + ch->out, sizeof(Scheme_Object *) * s1);
memcpy(new_chains + s1, ch->msg_chains, sizeof(Scheme_Object *) * ch->in);
}
ch->msgs = new_msgs;
ch->msg_memory = new_msg_memory;
ch->msg_chains = new_chains;
ch->in = ch->size;
ch->out = 0;
ch->size *= 2;
} }
ch->msgs[ch->in] = o; ch->msgs[ch->in] = o;
@ -3271,12 +3289,6 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
/* make sure `uo` is treated as live until here: */ /* make sure `uo` is treated as live until here: */
if (!uo) scheme_signal_error("?"); if (!uo) scheme_signal_error("?");
{
intptr_t msg_size;
msg_size = GC_message_allocator_size(msg_memory);
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
}
if (!cnt && ch->wakeup_signal) { if (!cnt && ch->wakeup_signal) {
/*wake up possibly sleeping single receiver */ /*wake up possibly sleeping single receiver */
if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) { if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
@ -3381,71 +3393,93 @@ static void place_object_dec_refcount(Scheme_Object *o) {
} }
} }
static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o) { static void lock_and_register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o)
if (ch->wakeup_signal == o) { {
return; Scheme_Object *avail_vector;
}
else if (!ch->wakeup_signal) { mzrt_mutex_lock(ch->lock);
place_object_inc_refcount(o);
ch->wakeup_signal = o; /* loop in case we need to release the lock temporarily to allocate: */
} while (1) {
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal) if (ch->wakeup_signal == o) {
&& ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) { return;
place_object_dec_refcount(ch->wakeup_signal);
place_object_inc_refcount(o);
ch->wakeup_signal = o;
}
else if (SCHEME_VECTORP(ch->wakeup_signal)) {
int i = 0;
Scheme_Object *v = ch->wakeup_signal;
int size = SCHEME_VEC_SIZE(v);
/* already registered? */
for (i = 0; i < size; i++) {
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
if (vo == o) {
return;
}
} }
/* look for unused slot in wakeup vector */ else if (!ch->wakeup_signal) {
for (i = 0; i < size; i++) {
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
if (!vo) {
place_object_inc_refcount(o);
SCHEME_VEC_ELS(v)[i] = o;
return;
}
else if (SCHEME_PLACE_OBJECTP(vo) &&
((Scheme_Place_Object *)vo)->signal_handle == NULL) {
place_object_dec_refcount(vo);
place_object_inc_refcount(o);
SCHEME_VEC_ELS(v)[i] = o;
return;
}
}
/* fall through to here, need to grow wakeup vector */
{
Scheme_Object *nv;
nv = GC_master_make_vector(size*2);
for (i = 0; i < size; i++) {
SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i];
}
place_object_inc_refcount(o); place_object_inc_refcount(o);
SCHEME_VEC_ELS(nv)[size+1] = o; ch->wakeup_signal = o;
ch->wakeup_signal = nv; return;
}
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)
&& ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) {
place_object_dec_refcount(ch->wakeup_signal);
place_object_inc_refcount(o);
ch->wakeup_signal = o;
return;
}
else if (SCHEME_VECTORP(ch->wakeup_signal)) {
int i = 0;
Scheme_Object *v = ch->wakeup_signal;
int size = SCHEME_VEC_SIZE(v);
/* already registered? */
for (i = 0; i < size; i++) {
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
if (vo == o)
return;
}
/* look for unused slot in wakeup vector */
for (i = 0; i < size; i++) {
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
if (!vo) {
place_object_inc_refcount(o);
SCHEME_VEC_ELS(v)[i] = o;
return;
}
else if (SCHEME_PLACE_OBJECTP(vo) &&
((Scheme_Place_Object *)vo)->signal_handle == NULL) {
place_object_dec_refcount(vo);
place_object_inc_refcount(o);
SCHEME_VEC_ELS(v)[i] = o;
return;
}
}
/* fall through to here, need to grow wakeup vector;
must do so without the lock */
{
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == size*2)) {
Scheme_Object *nv;
nv = avail_vector;
for (i = 0; i < size; i++) {
SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i];
}
place_object_inc_refcount(o);
SCHEME_VEC_ELS(nv)[size+1] = o;
ch->wakeup_signal = nv;
} else {
mzrt_mutex_unlock(ch->lock);
avail_vector = GC_master_make_vector(size*2);
mzrt_mutex_lock(ch->lock);
}
}
}
/* grow from single wakeup to multiple wakeups */
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == 2)) {
Scheme_Object *v;
v = avail_vector;
SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal;
place_object_inc_refcount(o);
SCHEME_VEC_ELS(v)[1] = o;
ch->wakeup_signal = v;
} else {
mzrt_mutex_unlock(ch->lock);
avail_vector = GC_master_make_vector(2);
mzrt_mutex_lock(ch->lock);
}
}
else {
printf("Oops not a valid ch->wakeup_signal\n");
exit(1);
} }
}
/* grow from single wakeup to multiple wakeups */
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
Scheme_Object *v;
v = GC_master_make_vector(2);
SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal;
place_object_inc_refcount(o);
SCHEME_VEC_ELS(v)[1] = o;
ch->wakeup_signal = v;
}
else {
printf("Oops not a valid ch->wakeup_signal\n");
exit(1);
} }
} }
@ -3457,9 +3491,8 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan
void *msg_memory = NULL; void *msg_memory = NULL;
intptr_t sz; intptr_t sz;
mzrt_mutex_lock(ch->lock); lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
{ {
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
if (ch->count > 0) { /* GET MSG */ if (ch->count > 0) { /* GET MSG */
msg = ch->msgs[ch->out]; msg = ch->msgs[ch->out];
msg_memory = ch->msg_memory[ch->out]; msg_memory = ch->msg_memory[ch->out];
@ -3516,9 +3549,8 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel
static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) { static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
int ready = 0; int ready = 0;
mzrt_mutex_lock(ch->lock); lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
{ {
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
if (ch->count > 0) ready = 1; if (ch->count > 0) ready = 1;
if (!ch->wr_ref) ready = 1; if (!ch->wr_ref) ready = 1;
} }

View File

@ -4136,7 +4136,7 @@ typedef struct Scheme_Place_Async_Channel {
intptr_t delta; intptr_t delta;
intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */ intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */
#if defined(MZ_USE_PLACES) #if defined(MZ_USE_PLACES)
mzrt_mutex *lock; mzrt_mutex *lock; /* no allocation while this lock is held */
#endif #endif
Scheme_Object **msgs; Scheme_Object **msgs;
void **msg_memory; void **msg_memory;
@ -4177,7 +4177,7 @@ typedef struct Scheme_Place {
typedef struct Scheme_Place_Object { typedef struct Scheme_Place_Object {
Scheme_Object so; Scheme_Object so;
#if defined(MZ_USE_PLACES) #if defined(MZ_USE_PLACES)
mzrt_mutex *lock; mzrt_mutex *lock; /* no allocation or place-channel locks while this lock is held */
mzrt_sema *pause; mzrt_sema *pause;
#endif #endif
char die; char die;