fix future bugs mostly related to `touch' within a future

This commit is contained in:
Matthew Flatt 2011-11-26 10:32:24 -07:00
parent 93bcc58a0c
commit 3883594af3

View File

@ -289,7 +289,7 @@ static void future_do_runtimecall(struct Scheme_Future_Thread_State *fts,
void *func,
int is_atomic,
int can_suspend);
static int capture_future_continuation(future_t *ft, void **storage);
static int capture_future_continuation(struct Scheme_Future_State *fs, future_t *ft, void **storage, int need_lock);
#define INITIAL_C_STACK_SIZE 500000
#define FUTURE_RUNSTACK_SIZE 2000
@ -1305,6 +1305,8 @@ Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv)
}
static void requeue_future_within_lock(future_t *future, Scheme_Future_State *fs)
XFORM_SKIP_PROC
/* called in any thread with lock held */
{
if (scheme_custodian_is_available(future->cust)) {
future->status = PENDING;
@ -1457,16 +1459,19 @@ Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
/* Try to capture it locally (on this thread) */
if (GC_gen0_alloc_page_ptr
&& capture_future_continuation(future, storage)) {
/* This will set fts->thread->current_ft to NULL */
&& capture_future_continuation(fs, future, storage, 0)) {
/* capture sets fts->thread->current_ft to NULL */
mzrt_mutex_lock(fs->future_mutex);
future->status = WAITING_FOR_FSEMA;
} else {
/* Can't capture the continuation locally, so ask the runtime
thread to do it */
mzrt_mutex_lock(fs->future_mutex);
future->next_waiting_lwc = fs->future_waiting_lwc;
fs->future_waiting_lwc = future;
if (!future->in_queue_waiting_for_lwc) {
future->next_waiting_lwc = fs->future_waiting_lwc;
fs->future_waiting_lwc = future;
future->in_queue_waiting_for_lwc = 1;
}
future->want_lw = 1;
}
@ -1595,6 +1600,8 @@ static void dequeue_future(Scheme_Future_State *fs, future_t *ft)
}
static void complete_rtcall(Scheme_Future_State *fs, future_t *future)
XFORM_SKIP_PROC
/* called in any thread with lock held */
{
if (future->suspended_lw) {
/* Re-enqueue the future so that some future thread can continue */
@ -1611,6 +1618,8 @@ static void complete_rtcall(Scheme_Future_State *fs, future_t *future)
}
static void direct_future_to_future_touch(Scheme_Future_State *fs, future_t *ft, future_t *t_ft)
XFORM_SKIP_PROC
/* called in any thread with lock held */
{
Scheme_Object *retval = ft->retval;
@ -1643,6 +1652,7 @@ static void trigger_added_touches(Scheme_Future_State *fs, future_t *ft)
{
if (ft->touching) {
Scheme_Object *touching = ft->touching;
ft->touching = NULL;
while (!SCHEME_NULLP(touching)) {
Scheme_Object *wb = SCHEME_CAR(touching);
future_t *t_ft = (future_t *)SCHEME_WEAK_BOX_VAL(wb);
@ -2178,15 +2188,12 @@ static Scheme_Object *apply_future_lw(future_t *ft)
return (Scheme_Object *)scheme_top_level_do(apply_future_lw_k, 0);
}
static int capture_future_continuation(future_t *ft, void **storage)
static int capture_future_continuation(Scheme_Future_State *fs, future_t *ft, void **storage, int need_lock)
XFORM_SKIP_PROC
/* This function explicitly cooperates with the GC by storing the
pointers it needs to save across a collection in `storage', so
it can be used in a future thread. If future-thread-local
allocation fails, the result is 0.
It also grabs the future-modification lock as needed to modify the
future. */
allocation fails, the result is 0. */
{
Scheme_Lightweight_Continuation *lw;
Scheme_Object **arg_S;
@ -2198,14 +2205,26 @@ static int capture_future_continuation(future_t *ft, void **storage)
ft = (future_t *)storage[2];
ft->suspended_lw = lw;
ft->maybe_suspended_lw = 1;
if (need_lock) {
mzrt_mutex_lock(fs->future_mutex);
if (!ft->want_lw) {
/* Future can already continue. This can only happen
if ft was blocked on another future, and the other
future decided that it could continue while we were
trying to grab the continuation. Drop the captured
continuation. */
return 1;
}
}
ft->want_lw = 0;
ft->fts->thread->current_ft = NULL; /* tells worker thread that it no longer
needs to handle the future */
ft->suspended_lw = lw;
ft->maybe_suspended_lw = 1;
if (ft->arg_S0) {
arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S0);
ft->arg_S0 = arg_S;
@ -2230,6 +2249,7 @@ void scheme_check_future_work()
for allocation). */
future_t *ft, *other_ft;
Scheme_Future_State *fs = scheme_future_state;
mzrt_sema *can_continue_sema;
int more;
if (!fs) return;
@ -2248,7 +2268,6 @@ void scheme_check_future_work()
if (ft) {
fs->future_waiting_atomic = ft->next_waiting_atomic;
ft->next_waiting_atomic = NULL;
ft->in_queue_waiting_for_lwc = 0;
if ((ft->status == WAITING_FOR_PRIM) && ft->rt_prim_is_atomic) {
ft->status = HANDLING_PRIM;
ft->want_lw = 0; /* we expect to handle it quickly,
@ -2283,30 +2302,22 @@ void scheme_check_future_work()
if (other_ft) {
/* Chain to `ft' from `other_ft': */
Scheme_Object *wb, *pr;
int was_done;
wb = scheme_make_weak_box((Scheme_Object *)ft);
pr = scheme_make_pair(wb, scheme_null);
mzrt_mutex_lock(fs->future_mutex);
if (other_ft->status == FINISHED) {
/* Completed while we tried to allocated a chain link. */
/* Completed while we tried to allocate a chain link. */
ft->status = HANDLING_PRIM;
ft->want_lw = 0;
was_done = 1;
direct_future_to_future_touch(fs, other_ft, ft);
} else {
/* enqueue */
if (other_ft->touching)
SCHEME_CDR(pr) = other_ft->touching;
other_ft->touching = pr;
was_done = 0;
}
mzrt_mutex_unlock(fs->future_mutex);
if (was_done) {
/* other_ft is done: */
direct_future_to_future_touch(fs, other_ft, ft);
}
}
}
@ -2317,18 +2328,29 @@ void scheme_check_future_work()
if (ft) {
fs->future_waiting_lwc = ft->next_waiting_lwc;
ft->next_waiting_lwc = NULL;
ft->in_queue_waiting_for_lwc = 0;
if (!ft->want_lw)
ft = NULL;
else {
/* If ft is touching another future, then the other
future may resume ft while we grab the continuation.
Withold ft->can_continue_sema for now, so that we can
capture the continuation, and then double-check
afterward whether the continuation wants a lwc: */
can_continue_sema = ft->can_continue_sema;
ft->can_continue_sema = NULL;
}
}
mzrt_mutex_unlock(fs->future_mutex);
if (ft) {
void *storage[3];
if (capture_future_continuation(ft, storage)) {
if (capture_future_continuation(fs, ft, storage, 1)) {
/* capture performs mzrt_mutex_lock(fs->future_mutex) on success. */
/* Signal the waiting worker thread that it
can continue doing other things: */
mzrt_mutex_lock(fs->future_mutex);
ft->can_continue_sema = can_continue_sema;
if (ft->can_continue_sema) {
mzrt_sema_post(ft->can_continue_sema);
ft->can_continue_sema = NULL;
@ -2405,7 +2427,7 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
if (prefer_to_suspend
&& GC_gen0_alloc_page_ptr
&& capture_future_continuation(future, storage)) {
&& capture_future_continuation(fs, future, storage, 0)) {
/* this future thread will suspend handling the future
continuation until the result of the blocking call is ready;
fts->thread->current_ft was set to NULL */
@ -2439,7 +2461,6 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
fs->future_waiting_lwc = future;
future->in_queue_waiting_for_lwc = 1;
}
future->want_lw = 1;
}
}
@ -3034,7 +3055,8 @@ static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile
/**********************************************************************/
future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft)
/* Called in runtime thread */
XFORM_SKIP_PROC
/* called in any thread with lock held */
{
if (fs->future_queue_end) {
fs->future_queue_end->next = ft;