places: no allocation while low-level locks are held
This commit is contained in:
parent
48290d3360
commit
5b20690876
|
@ -3203,8 +3203,7 @@ static Scheme_Object *GC_master_make_vector(int size) {
|
||||||
static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) {
|
static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) {
|
||||||
void *msg_memory = NULL;
|
void *msg_memory = NULL;
|
||||||
Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL;
|
Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL;
|
||||||
intptr_t sz;
|
intptr_t sz, cnt;
|
||||||
int cnt;
|
|
||||||
|
|
||||||
o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object);
|
o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object);
|
||||||
/* uo needs to stay live until `master_chain` is registered in `ch` */
|
/* uo needs to stay live until `master_chain` is registered in `ch` */
|
||||||
|
@ -3220,40 +3219,59 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
|
||||||
else bad_place_message(uo);
|
else bad_place_message(uo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
intptr_t msg_size;
|
||||||
|
msg_size = GC_message_allocator_size(msg_memory);
|
||||||
|
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
|
||||||
|
}
|
||||||
|
|
||||||
mzrt_mutex_lock(ch->lock);
|
mzrt_mutex_lock(ch->lock);
|
||||||
{
|
{
|
||||||
cnt = ch->count;
|
cnt = ch->count;
|
||||||
if (ch->count == ch->size) { /* GROW QUEUE */
|
if (ch->count == ch->size) { /* GROW QUEUE */
|
||||||
Scheme_Object **new_msgs, **new_chains;
|
Scheme_Object **new_msgs = NULL, **new_chains = NULL;
|
||||||
void **new_msg_memory;
|
void **new_msg_memory = NULL;
|
||||||
|
intptr_t sz = 0;
|
||||||
|
|
||||||
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
|
/* Can't allocate while holding the lock, so release lock and loop: */
|
||||||
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
|
while (ch->count == ch->size) {
|
||||||
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
|
if ((sz == ch->size) && new_msgs) {
|
||||||
|
if (ch->out < ch->in) {
|
||||||
|
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out));
|
||||||
|
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out));
|
||||||
|
memcpy(new_chains, ch->msg_chains + ch->out, sizeof(void*) * (ch->in - ch->out));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
int s1 = (ch->size - ch->out);
|
||||||
|
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1);
|
||||||
|
memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in);
|
||||||
|
|
||||||
if (ch->out < ch->in) {
|
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * s1);
|
||||||
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out));
|
memcpy(new_msg_memory + s1, ch->msg_memory, sizeof(void*) * ch->in);
|
||||||
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out));
|
|
||||||
memcpy(new_chains, ch->msg_chains + ch->out, sizeof(void*) * (ch->in - ch->out));
|
memcpy(new_chains, ch->msg_chains + ch->out, sizeof(Scheme_Object *) * s1);
|
||||||
|
memcpy(new_chains + s1, ch->msg_chains, sizeof(Scheme_Object *) * ch->in);
|
||||||
|
}
|
||||||
|
|
||||||
|
ch->msgs = new_msgs;
|
||||||
|
ch->msg_memory = new_msg_memory;
|
||||||
|
ch->msg_chains = new_chains;
|
||||||
|
ch->in = ch->size;
|
||||||
|
ch->out = 0;
|
||||||
|
ch->size *= 2;
|
||||||
|
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
sz = ch->size;
|
||||||
|
mzrt_mutex_unlock(ch->lock);
|
||||||
|
|
||||||
|
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * sz * 2);
|
||||||
|
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
|
||||||
|
new_chains = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
|
||||||
|
|
||||||
|
mzrt_mutex_lock(ch->lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
int s1 = (ch->size - ch->out);
|
|
||||||
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1);
|
|
||||||
memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in);
|
|
||||||
|
|
||||||
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * s1);
|
|
||||||
memcpy(new_msg_memory + s1, ch->msg_memory, sizeof(void*) * ch->in);
|
|
||||||
|
|
||||||
memcpy(new_chains, ch->msg_chains + ch->out, sizeof(Scheme_Object *) * s1);
|
|
||||||
memcpy(new_chains + s1, ch->msg_chains, sizeof(Scheme_Object *) * ch->in);
|
|
||||||
}
|
|
||||||
|
|
||||||
ch->msgs = new_msgs;
|
|
||||||
ch->msg_memory = new_msg_memory;
|
|
||||||
ch->msg_chains = new_chains;
|
|
||||||
ch->in = ch->size;
|
|
||||||
ch->out = 0;
|
|
||||||
ch->size *= 2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ch->msgs[ch->in] = o;
|
ch->msgs[ch->in] = o;
|
||||||
|
@ -3271,12 +3289,6 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
|
||||||
/* make sure `uo` is treated as live until here: */
|
/* make sure `uo` is treated as live until here: */
|
||||||
if (!uo) scheme_signal_error("?");
|
if (!uo) scheme_signal_error("?");
|
||||||
|
|
||||||
{
|
|
||||||
intptr_t msg_size;
|
|
||||||
msg_size = GC_message_allocator_size(msg_memory);
|
|
||||||
log_place_event("id %d: put message of %" PRIdPTR " bytes", "put", 1, msg_size);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!cnt && ch->wakeup_signal) {
|
if (!cnt && ch->wakeup_signal) {
|
||||||
/*wake up possibly sleeping single receiver */
|
/*wake up possibly sleeping single receiver */
|
||||||
if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
|
if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
|
||||||
|
@ -3381,71 +3393,93 @@ static void place_object_dec_refcount(Scheme_Object *o) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o) {
|
static void lock_and_register_place_object_with_channel(Scheme_Place_Async_Channel *ch, Scheme_Object *o)
|
||||||
if (ch->wakeup_signal == o) {
|
{
|
||||||
return;
|
Scheme_Object *avail_vector;
|
||||||
}
|
|
||||||
else if (!ch->wakeup_signal) {
|
mzrt_mutex_lock(ch->lock);
|
||||||
place_object_inc_refcount(o);
|
|
||||||
ch->wakeup_signal = o;
|
/* loop in case we need to release the lock temporarily to allocate: */
|
||||||
}
|
while (1) {
|
||||||
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)
|
if (ch->wakeup_signal == o) {
|
||||||
&& ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) {
|
return;
|
||||||
place_object_dec_refcount(ch->wakeup_signal);
|
|
||||||
place_object_inc_refcount(o);
|
|
||||||
ch->wakeup_signal = o;
|
|
||||||
}
|
|
||||||
else if (SCHEME_VECTORP(ch->wakeup_signal)) {
|
|
||||||
int i = 0;
|
|
||||||
Scheme_Object *v = ch->wakeup_signal;
|
|
||||||
int size = SCHEME_VEC_SIZE(v);
|
|
||||||
/* already registered? */
|
|
||||||
for (i = 0; i < size; i++) {
|
|
||||||
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
|
|
||||||
if (vo == o) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/* look for unused slot in wakeup vector */
|
else if (!ch->wakeup_signal) {
|
||||||
for (i = 0; i < size; i++) {
|
|
||||||
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
|
|
||||||
if (!vo) {
|
|
||||||
place_object_inc_refcount(o);
|
|
||||||
SCHEME_VEC_ELS(v)[i] = o;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else if (SCHEME_PLACE_OBJECTP(vo) &&
|
|
||||||
((Scheme_Place_Object *)vo)->signal_handle == NULL) {
|
|
||||||
place_object_dec_refcount(vo);
|
|
||||||
place_object_inc_refcount(o);
|
|
||||||
SCHEME_VEC_ELS(v)[i] = o;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* fall through to here, need to grow wakeup vector */
|
|
||||||
{
|
|
||||||
Scheme_Object *nv;
|
|
||||||
nv = GC_master_make_vector(size*2);
|
|
||||||
for (i = 0; i < size; i++) {
|
|
||||||
SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i];
|
|
||||||
}
|
|
||||||
place_object_inc_refcount(o);
|
place_object_inc_refcount(o);
|
||||||
SCHEME_VEC_ELS(nv)[size+1] = o;
|
ch->wakeup_signal = o;
|
||||||
ch->wakeup_signal = nv;
|
return;
|
||||||
|
}
|
||||||
|
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)
|
||||||
|
&& ( (Scheme_Place_Object *) ch->wakeup_signal)->signal_handle == NULL) {
|
||||||
|
place_object_dec_refcount(ch->wakeup_signal);
|
||||||
|
place_object_inc_refcount(o);
|
||||||
|
ch->wakeup_signal = o;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (SCHEME_VECTORP(ch->wakeup_signal)) {
|
||||||
|
int i = 0;
|
||||||
|
Scheme_Object *v = ch->wakeup_signal;
|
||||||
|
int size = SCHEME_VEC_SIZE(v);
|
||||||
|
/* already registered? */
|
||||||
|
for (i = 0; i < size; i++) {
|
||||||
|
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
|
||||||
|
if (vo == o)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
/* look for unused slot in wakeup vector */
|
||||||
|
for (i = 0; i < size; i++) {
|
||||||
|
Scheme_Object *vo = SCHEME_VEC_ELS(v)[i];
|
||||||
|
if (!vo) {
|
||||||
|
place_object_inc_refcount(o);
|
||||||
|
SCHEME_VEC_ELS(v)[i] = o;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (SCHEME_PLACE_OBJECTP(vo) &&
|
||||||
|
((Scheme_Place_Object *)vo)->signal_handle == NULL) {
|
||||||
|
place_object_dec_refcount(vo);
|
||||||
|
place_object_inc_refcount(o);
|
||||||
|
SCHEME_VEC_ELS(v)[i] = o;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* fall through to here, need to grow wakeup vector;
|
||||||
|
must do so without the lock */
|
||||||
|
{
|
||||||
|
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == size*2)) {
|
||||||
|
Scheme_Object *nv;
|
||||||
|
nv = avail_vector;
|
||||||
|
for (i = 0; i < size; i++) {
|
||||||
|
SCHEME_VEC_ELS(nv)[i] = SCHEME_VEC_ELS(v)[i];
|
||||||
|
}
|
||||||
|
place_object_inc_refcount(o);
|
||||||
|
SCHEME_VEC_ELS(nv)[size+1] = o;
|
||||||
|
ch->wakeup_signal = nv;
|
||||||
|
} else {
|
||||||
|
mzrt_mutex_unlock(ch->lock);
|
||||||
|
avail_vector = GC_master_make_vector(size*2);
|
||||||
|
mzrt_mutex_lock(ch->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* grow from single wakeup to multiple wakeups */
|
||||||
|
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
|
||||||
|
if (avail_vector && (SCHEME_VEC_SIZE(avail_vector) == 2)) {
|
||||||
|
Scheme_Object *v;
|
||||||
|
v = avail_vector;
|
||||||
|
SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal;
|
||||||
|
place_object_inc_refcount(o);
|
||||||
|
SCHEME_VEC_ELS(v)[1] = o;
|
||||||
|
ch->wakeup_signal = v;
|
||||||
|
} else {
|
||||||
|
mzrt_mutex_unlock(ch->lock);
|
||||||
|
avail_vector = GC_master_make_vector(2);
|
||||||
|
mzrt_mutex_lock(ch->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
printf("Oops not a valid ch->wakeup_signal\n");
|
||||||
|
exit(1);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
/* grow from single wakeup to multiple wakeups */
|
|
||||||
else if (SCHEME_PLACE_OBJECTP(ch->wakeup_signal)) {
|
|
||||||
Scheme_Object *v;
|
|
||||||
v = GC_master_make_vector(2);
|
|
||||||
SCHEME_VEC_ELS(v)[0] = ch->wakeup_signal;
|
|
||||||
place_object_inc_refcount(o);
|
|
||||||
SCHEME_VEC_ELS(v)[1] = o;
|
|
||||||
ch->wakeup_signal = v;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
printf("Oops not a valid ch->wakeup_signal\n");
|
|
||||||
exit(1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3457,9 +3491,8 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan
|
||||||
void *msg_memory = NULL;
|
void *msg_memory = NULL;
|
||||||
intptr_t sz;
|
intptr_t sz;
|
||||||
|
|
||||||
mzrt_mutex_lock(ch->lock);
|
lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
||||||
{
|
{
|
||||||
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
|
||||||
if (ch->count > 0) { /* GET MSG */
|
if (ch->count > 0) { /* GET MSG */
|
||||||
msg = ch->msgs[ch->out];
|
msg = ch->msgs[ch->out];
|
||||||
msg_memory = ch->msg_memory[ch->out];
|
msg_memory = ch->msg_memory[ch->out];
|
||||||
|
@ -3516,9 +3549,8 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel
|
||||||
|
|
||||||
static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
|
static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
|
||||||
int ready = 0;
|
int ready = 0;
|
||||||
mzrt_mutex_lock(ch->lock);
|
lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
||||||
{
|
{
|
||||||
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
|
||||||
if (ch->count > 0) ready = 1;
|
if (ch->count > 0) ready = 1;
|
||||||
if (!ch->wr_ref) ready = 1;
|
if (!ch->wr_ref) ready = 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4136,7 +4136,7 @@ typedef struct Scheme_Place_Async_Channel {
|
||||||
intptr_t delta;
|
intptr_t delta;
|
||||||
intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */
|
intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */
|
||||||
#if defined(MZ_USE_PLACES)
|
#if defined(MZ_USE_PLACES)
|
||||||
mzrt_mutex *lock;
|
mzrt_mutex *lock; /* no allocation while this lock is held */
|
||||||
#endif
|
#endif
|
||||||
Scheme_Object **msgs;
|
Scheme_Object **msgs;
|
||||||
void **msg_memory;
|
void **msg_memory;
|
||||||
|
@ -4177,7 +4177,7 @@ typedef struct Scheme_Place {
|
||||||
typedef struct Scheme_Place_Object {
|
typedef struct Scheme_Place_Object {
|
||||||
Scheme_Object so;
|
Scheme_Object so;
|
||||||
#if defined(MZ_USE_PLACES)
|
#if defined(MZ_USE_PLACES)
|
||||||
mzrt_mutex *lock;
|
mzrt_mutex *lock; /* no allocation or place-channel locks while this lock is held */
|
||||||
mzrt_sema *pause;
|
mzrt_sema *pause;
|
||||||
#endif
|
#endif
|
||||||
char die;
|
char die;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user