From 3883594af3c724b02fcb7fa5c6dbf750eb755f02 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Sat, 26 Nov 2011 10:32:24 -0700 Subject: [PATCH] fix future bugs mostly related to `touch' within a future --- src/racket/src/future.c | 80 ++++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 29 deletions(-) diff --git a/src/racket/src/future.c b/src/racket/src/future.c index 8fda0eb14f..dd7887d008 100644 --- a/src/racket/src/future.c +++ b/src/racket/src/future.c @@ -289,7 +289,7 @@ static void future_do_runtimecall(struct Scheme_Future_Thread_State *fts, void *func, int is_atomic, int can_suspend); -static int capture_future_continuation(future_t *ft, void **storage); +static int capture_future_continuation(struct Scheme_Future_State *fs, future_t *ft, void **storage, int need_lock); #define INITIAL_C_STACK_SIZE 500000 #define FUTURE_RUNSTACK_SIZE 2000 @@ -1305,6 +1305,8 @@ Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv) } static void requeue_future_within_lock(future_t *future, Scheme_Future_State *fs) + XFORM_SKIP_PROC +/* called in any thread with lock held */ { if (scheme_custodian_is_available(future->cust)) { future->status = PENDING; @@ -1457,16 +1459,19 @@ Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv) /* Try to capture it locally (on this thread) */ if (GC_gen0_alloc_page_ptr - && capture_future_continuation(future, storage)) { - /* This will set fts->thread->current_ft to NULL */ + && capture_future_continuation(fs, future, storage, 0)) { + /* capture sets fts->thread->current_ft to NULL */ mzrt_mutex_lock(fs->future_mutex); future->status = WAITING_FOR_FSEMA; } else { /* Can't capture the continuation locally, so ask the runtime thread to do it */ mzrt_mutex_lock(fs->future_mutex); - future->next_waiting_lwc = fs->future_waiting_lwc; - fs->future_waiting_lwc = future; + if (!future->in_queue_waiting_for_lwc) { + future->next_waiting_lwc = fs->future_waiting_lwc; + fs->future_waiting_lwc = future; + future->in_queue_waiting_for_lwc = 1; + } future->want_lw = 1; } @@ -1595,6 +1600,8 @@ static void dequeue_future(Scheme_Future_State *fs, future_t *ft) } static void complete_rtcall(Scheme_Future_State *fs, future_t *future) + XFORM_SKIP_PROC +/* called in any thread with lock held */ { if (future->suspended_lw) { /* Re-enqueue the future so that some future thread can continue */ @@ -1611,6 +1618,8 @@ static void complete_rtcall(Scheme_Future_State *fs, future_t *future) } static void direct_future_to_future_touch(Scheme_Future_State *fs, future_t *ft, future_t *t_ft) + XFORM_SKIP_PROC +/* called in any thread with lock held */ { Scheme_Object *retval = ft->retval; @@ -1643,6 +1652,7 @@ static void trigger_added_touches(Scheme_Future_State *fs, future_t *ft) { if (ft->touching) { Scheme_Object *touching = ft->touching; + ft->touching = NULL; while (!SCHEME_NULLP(touching)) { Scheme_Object *wb = SCHEME_CAR(touching); future_t *t_ft = (future_t *)SCHEME_WEAK_BOX_VAL(wb); @@ -2178,15 +2188,12 @@ static Scheme_Object *apply_future_lw(future_t *ft) return (Scheme_Object *)scheme_top_level_do(apply_future_lw_k, 0); } -static int capture_future_continuation(future_t *ft, void **storage) +static int capture_future_continuation(Scheme_Future_State *fs, future_t *ft, void **storage, int need_lock) XFORM_SKIP_PROC /* This function explicitly cooperates with the GC by storing the pointers it needs to save across a collection in `storage', so it can be used in a future thread. If future-thread-local - allocation fails, the result is 0. - - It also grabs the future-modification lock as needed to modify the - future. */ + allocation fails, the result is 0. */ { Scheme_Lightweight_Continuation *lw; Scheme_Object **arg_S; @@ -2198,14 +2205,26 @@ static int capture_future_continuation(future_t *ft, void **storage) ft = (future_t *)storage[2]; - ft->suspended_lw = lw; - ft->maybe_suspended_lw = 1; + if (need_lock) { + mzrt_mutex_lock(fs->future_mutex); + if (!ft->want_lw) { + /* Future can already continue. This can only happen + if ft was blocked on another future, and the other + future decided that it could continue while we were + trying to grab the continuation. Drop the captured + continuation. */ + return 1; + } + } + ft->want_lw = 0; ft->fts->thread->current_ft = NULL; /* tells worker thread that it no longer needs to handle the future */ - + ft->suspended_lw = lw; + ft->maybe_suspended_lw = 1; + if (ft->arg_S0) { arg_S = scheme_adjust_runstack_argument(lw, ft->arg_S0); ft->arg_S0 = arg_S; @@ -2230,6 +2249,7 @@ void scheme_check_future_work() for allocation). */ future_t *ft, *other_ft; Scheme_Future_State *fs = scheme_future_state; + mzrt_sema *can_continue_sema; int more; if (!fs) return; @@ -2248,7 +2268,6 @@ void scheme_check_future_work() if (ft) { fs->future_waiting_atomic = ft->next_waiting_atomic; ft->next_waiting_atomic = NULL; - ft->in_queue_waiting_for_lwc = 0; if ((ft->status == WAITING_FOR_PRIM) && ft->rt_prim_is_atomic) { ft->status = HANDLING_PRIM; ft->want_lw = 0; /* we expect to handle it quickly, @@ -2283,30 +2302,22 @@ void scheme_check_future_work() if (other_ft) { /* Chain to `ft' from `other_ft': */ Scheme_Object *wb, *pr; - int was_done; wb = scheme_make_weak_box((Scheme_Object *)ft); pr = scheme_make_pair(wb, scheme_null); mzrt_mutex_lock(fs->future_mutex); if (other_ft->status == FINISHED) { - /* Completed while we tried to allocated a chain link. */ + /* Completed while we tried to allocate a chain link. */ ft->status = HANDLING_PRIM; - ft->want_lw = 0; - was_done = 1; + direct_future_to_future_touch(fs, other_ft, ft); } else { /* enqueue */ if (other_ft->touching) SCHEME_CDR(pr) = other_ft->touching; other_ft->touching = pr; - was_done = 0; } mzrt_mutex_unlock(fs->future_mutex); - - if (was_done) { - /* other_ft is done: */ - direct_future_to_future_touch(fs, other_ft, ft); - } } } @@ -2317,18 +2328,29 @@ void scheme_check_future_work() if (ft) { fs->future_waiting_lwc = ft->next_waiting_lwc; ft->next_waiting_lwc = NULL; + ft->in_queue_waiting_for_lwc = 0; if (!ft->want_lw) ft = NULL; + else { + /* If ft is touching another future, then the other + future may resume ft while we grab the continuation. + Withold ft->can_continue_sema for now, so that we can + capture the continuation, and then double-check + afterward whether the continuation wants a lwc: */ + can_continue_sema = ft->can_continue_sema; + ft->can_continue_sema = NULL; + } } mzrt_mutex_unlock(fs->future_mutex); if (ft) { void *storage[3]; - if (capture_future_continuation(ft, storage)) { + if (capture_future_continuation(fs, ft, storage, 1)) { + /* capture performs mzrt_mutex_lock(fs->future_mutex) on success. */ /* Signal the waiting worker thread that it can continue doing other things: */ - mzrt_mutex_lock(fs->future_mutex); + ft->can_continue_sema = can_continue_sema; if (ft->can_continue_sema) { mzrt_sema_post(ft->can_continue_sema); ft->can_continue_sema = NULL; @@ -2405,7 +2427,7 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts, if (prefer_to_suspend && GC_gen0_alloc_page_ptr - && capture_future_continuation(future, storage)) { + && capture_future_continuation(fs, future, storage, 0)) { /* this future thread will suspend handling the future continuation until the result of the blocking call is ready; fts->thread->current_ft was set to NULL */ @@ -2439,7 +2461,6 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts, fs->future_waiting_lwc = future; future->in_queue_waiting_for_lwc = 1; } - future->want_lw = 1; } } @@ -3034,7 +3055,8 @@ static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile /**********************************************************************/ future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft) -/* Called in runtime thread */ + XFORM_SKIP_PROC +/* called in any thread with lock held */ { if (fs->future_queue_end) { fs->future_queue_end->next = ft;