places: no allocation while low-level locks are held

This commit is contained in:
Matthew Flatt 2015-01-20 17:39:57 -07:00
parent 48290d3360
commit 5b20690876
2 changed files with 135 additions and 103 deletions

View File

@ -3203,8 +3203,7 @@ static Scheme_Object *GC_master_make_vector(int size) {
static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) {
void *msg_memory = NULL;
Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL;
intptr_t sz;
int cnt;
intptr_t sz, cnt;
o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object);
/* uo needs to stay live until `master_chain` is registered in `ch` */
@ -3220,17 +3219,23 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
else bad_place_message(uo);
}
{
intptr_t msg_size;
msg_size = GC_message_allocator_size(msg_memory);
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
}
mzrt_mutex_lock(ch->lock);
{
cnt = ch->count;
if (ch->count == ch->size) { /* GROW QUEUE */
Scheme_Object **new_msgs, **new_chains;
void **new_msg_memory;
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
Scheme_Object **new_msgs = NULL, **new_chains = NULL;
void **new_msg_memory = NULL;
intptr_t sz = 0;
/* Can't allocate while holding the lock, so release lock and loop: */
while (ch->count == ch->size) {
if ((sz == ch->size) && new_msgs) {
if (ch->out < ch->in) {
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out));
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out));
@ -3254,6 +3259,19 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
ch->in = ch->size;
ch->out = 0;
ch->size *= 2;
break;
} else {
sz = ch->size;
mzrt_mutex_unlock(ch->lock);
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * sz * 2);
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
mzrt_mutex_lock(ch->lock);
}
}
}
ch->msgs[ch->in] = o;
@ -3271,12 +3289,6 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
/* make sure `uo` is treated as live until here: */
if (!uo) scheme_signal_error("?");
{
intptr_t msg_size;
msg_size = GC_message_allocator_size(msg_memory);
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
}
if (!cnt && ch->wakeup_signal) {
/*wake up possibly sleeping single receiver */
if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
@ -3381,19 +3393,28 @@ static void place_object_dec_refcount(Scheme_Object *o) {
}
}
static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o) {
static void lock_and_register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o)
{
Scheme_Object *avail_vector;
mzrt_mutex_lock(ch->lock);
/* loop in case we need to release the lock temporarily to allocate: */
while (1) {
if (ch->wakeup_signal == o) {
return;
}
else if (!ch->wakeup_signal) {
place_object_inc_refcount(o);
ch->wakeup_signal = o;
return;
}
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)
&& ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) {
place_object_dec_refcount(ch->wakeup_signal);
place_object_inc_refcount(o);
ch->wakeup_signal = o;
return;
}
else if (SCHEME_VECTORP(ch->wakeup_signal)) {
int i = 0;
@ -3402,10 +3423,9 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S
/* already registered? */
for (i = 0; i < size; i++) {
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
if (vo == o) {
if (vo == o)
return;
}
}
/* look for unused slot in wakeup vector */
for (i = 0; i < size; i++) {
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
@ -3422,32 +3442,46 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S
return;
}
}
/* fall through to here, need to grow wakeup vector */
/* fall through to here, need to grow wakeup vector;
must do so without the lock */
{
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == size*2)) {
Scheme_Object *nv;
nv = GC_master_make_vector(size*2);
nv = avail_vector;
for (i = 0; i < size; i++) {
SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i];
}
place_object_inc_refcount(o);
SCHEME_VEC_ELS(nv)[size+1] = o;
ch->wakeup_signal = nv;
} else {
mzrt_mutex_unlock(ch->lock);
avail_vector = GC_master_make_vector(size*2);
mzrt_mutex_lock(ch->lock);
}
}
}
/* grow from single wakeup to multiple wakeups */
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == 2)) {
Scheme_Object *v;
v = GC_master_make_vector(2);
v = avail_vector;
SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal;
place_object_inc_refcount(o);
SCHEME_VEC_ELS(v)[1] = o;
ch->wakeup_signal = v;
} else {
mzrt_mutex_unlock(ch->lock);
avail_vector = GC_master_make_vector(2);
mzrt_mutex_lock(ch->lock);
}
}
else {
printf("Oops not a valid ch->wakeup_signal\n");
exit(1);
}
}
}
static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr,
int *_no_writers)
@ -3457,9 +3491,8 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan
void *msg_memory = NULL;
intptr_t sz;
mzrt_mutex_lock(ch->lock);
lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
{
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
if (ch->count > 0) { /* GET MSG */
msg = ch->msgs[ch->out];
msg_memory = ch->msg_memory[ch->out];
@ -3516,9 +3549,8 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel
static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
int ready = 0;
mzrt_mutex_lock(ch->lock);
lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
{
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
if (ch->count > 0) ready = 1;
if (!ch->wr_ref) ready = 1;
}

View File

@ -4136,7 +4136,7 @@ typedef struct Scheme_Place_Async_Channel {
intptr_t delta;
intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */
#if defined(MZ_USE_PLACES)
mzrt_mutex *lock;
mzrt_mutex *lock; /* no allocation while this lock is held */
#endif
Scheme_Object **msgs;
void **msg_memory;
@ -4177,7 +4177,7 @@ typedef struct Scheme_Place {
typedef struct Scheme_Place_Object {
Scheme_Object so;
#if defined(MZ_USE_PLACES)
mzrt_mutex *lock;
mzrt_mutex *lock; /* no allocation or place-channel locks while this lock is held */
mzrt_sema *pause;
#endif
char die;