places: no allocation while low-level locks are held
This commit is contained in:
parent
48290d3360
commit
5b20690876
|
@ -3203,8 +3203,7 @@ static Scheme_Object *GC_master_make_vector(int size) {
|
|||
static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) {
|
||||
void *msg_memory = NULL;
|
||||
Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL;
|
||||
intptr_t sz;
|
||||
int cnt;
|
||||
intptr_t sz, cnt;
|
||||
|
||||
o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object);
|
||||
/* uo needs to stay live until `master_chain` is registered in `ch` */
|
||||
|
@ -3220,17 +3219,23 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
|
|||
else bad_place_message(uo);
|
||||
}
|
||||
|
||||
{
|
||||
intptr_t msg_size;
|
||||
msg_size = GC_message_allocator_size(msg_memory);
|
||||
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
|
||||
}
|
||||
|
||||
mzrt_mutex_lock(ch->lock);
|
||||
{
|
||||
cnt = ch->count;
|
||||
if (ch->count == ch->size) { /* GROW QUEUE */
|
||||
Scheme_Object **new_msgs, **new_chains;
|
||||
void **new_msg_memory;
|
||||
|
||||
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
|
||||
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
|
||||
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
|
||||
Scheme_Object **new_msgs = NULL, **new_chains = NULL;
|
||||
void **new_msg_memory = NULL;
|
||||
intptr_t sz = 0;
|
||||
|
||||
/* Can't allocate while holding the lock, so release lock and loop: */
|
||||
while (ch->count == ch->size) {
|
||||
if ((sz == ch->size) && new_msgs) {
|
||||
if (ch->out < ch->in) {
|
||||
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out));
|
||||
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out));
|
||||
|
@ -3254,6 +3259,19 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
|
|||
ch->in = ch->size;
|
||||
ch->out = 0;
|
||||
ch->size *= 2;
|
||||
|
||||
break;
|
||||
} else {
|
||||
sz = ch->size;
|
||||
mzrt_mutex_unlock(ch->lock);
|
||||
|
||||
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * sz * 2);
|
||||
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
|
||||
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
|
||||
|
||||
mzrt_mutex_lock(ch->lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ch->msgs[ch->in] = o;
|
||||
|
@ -3271,12 +3289,6 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
|
|||
/* make sure `uo` is treated as live until here: */
|
||||
if (!uo) scheme_signal_error("?");
|
||||
|
||||
{
|
||||
intptr_t msg_size;
|
||||
msg_size = GC_message_allocator_size(msg_memory);
|
||||
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
|
||||
}
|
||||
|
||||
if (!cnt && ch->wakeup_signal) {
|
||||
/*wake up possibly sleeping single receiver */
|
||||
if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
|
||||
|
@ -3381,19 +3393,28 @@ static void place_object_dec_refcount(Scheme_Object *o) {
|
|||
}
|
||||
}
|
||||
|
||||
static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o) {
|
||||
static void lock_and_register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o)
|
||||
{
|
||||
Scheme_Object *avail_vector;
|
||||
|
||||
mzrt_mutex_lock(ch->lock);
|
||||
|
||||
/* loop in case we need to release the lock temporarily to allocate: */
|
||||
while (1) {
|
||||
if (ch->wakeup_signal == o) {
|
||||
return;
|
||||
}
|
||||
else if (!ch->wakeup_signal) {
|
||||
place_object_inc_refcount(o);
|
||||
ch->wakeup_signal = o;
|
||||
return;
|
||||
}
|
||||
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)
|
||||
&& ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) {
|
||||
place_object_dec_refcount(ch->wakeup_signal);
|
||||
place_object_inc_refcount(o);
|
||||
ch->wakeup_signal = o;
|
||||
return;
|
||||
}
|
||||
else if (SCHEME_VECTORP(ch->wakeup_signal)) {
|
||||
int i = 0;
|
||||
|
@ -3402,10 +3423,9 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S
|
|||
/* already registered? */
|
||||
for (i = 0; i < size; i++) {
|
||||
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
|
||||
if (vo == o) {
|
||||
if (vo == o)
|
||||
return;
|
||||
}
|
||||
}
|
||||
/* look for unused slot in wakeup vector */
|
||||
for (i = 0; i < size; i++) {
|
||||
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
|
||||
|
@ -3422,31 +3442,45 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S
|
|||
return;
|
||||
}
|
||||
}
|
||||
/* fall through to here, need to grow wakeup vector */
|
||||
/* fall through to here, need to grow wakeup vector;
|
||||
must do so without the lock */
|
||||
{
|
||||
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == size*2)) {
|
||||
Scheme_Object *nv;
|
||||
nv = GC_master_make_vector(size*2);
|
||||
nv = avail_vector;
|
||||
for (i = 0; i < size; i++) {
|
||||
SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i];
|
||||
}
|
||||
place_object_inc_refcount(o);
|
||||
SCHEME_VEC_ELS(nv)[size+1] = o;
|
||||
ch->wakeup_signal = nv;
|
||||
} else {
|
||||
mzrt_mutex_unlock(ch->lock);
|
||||
avail_vector = GC_master_make_vector(size*2);
|
||||
mzrt_mutex_lock(ch->lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* grow from single wakeup to multiple wakeups */
|
||||
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
|
||||
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == 2)) {
|
||||
Scheme_Object *v;
|
||||
v = GC_master_make_vector(2);
|
||||
v = avail_vector;
|
||||
SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal;
|
||||
place_object_inc_refcount(o);
|
||||
SCHEME_VEC_ELS(v)[1] = o;
|
||||
ch->wakeup_signal = v;
|
||||
} else {
|
||||
mzrt_mutex_unlock(ch->lock);
|
||||
avail_vector = GC_master_make_vector(2);
|
||||
mzrt_mutex_lock(ch->lock);
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("Oops not a valid ch->wakeup_signal\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr,
|
||||
|
@ -3457,9 +3491,8 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan
|
|||
void *msg_memory = NULL;
|
||||
intptr_t sz;
|
||||
|
||||
mzrt_mutex_lock(ch->lock);
|
||||
lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
||||
{
|
||||
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
||||
if (ch->count > 0) { /* GET MSG */
|
||||
msg = ch->msgs[ch->out];
|
||||
msg_memory = ch->msg_memory[ch->out];
|
||||
|
@ -3516,9 +3549,8 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel
|
|||
|
||||
static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
|
||||
int ready = 0;
|
||||
mzrt_mutex_lock(ch->lock);
|
||||
lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
||||
{
|
||||
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
||||
if (ch->count > 0) ready = 1;
|
||||
if (!ch->wr_ref) ready = 1;
|
||||
}
|
||||
|
|
|
@ -4136,7 +4136,7 @@ typedef struct Scheme_Place_Async_Channel {
|
|||
intptr_t delta;
|
||||
intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */
|
||||
#if defined(MZ_USE_PLACES)
|
||||
mzrt_mutex *lock;
|
||||
mzrt_mutex *lock; /* no allocation while this lock is held */
|
||||
#endif
|
||||
Scheme_Object **msgs;
|
||||
void **msg_memory;
|
||||
|
@ -4177,7 +4177,7 @@ typedef struct Scheme_Place {
|
|||
typedef struct Scheme_Place_Object {
|
||||
Scheme_Object so;
|
||||
#if defined(MZ_USE_PLACES)
|
||||
mzrt_mutex *lock;
|
||||
mzrt_mutex *lock; /* no allocation or place-channel locks while this lock is held */
|
||||
mzrt_sema *pause;
|
||||
#endif
|
||||
char die;
|
||||
|
|
Loading…
Reference in New Issue
Block a user