From 885bf6555f50bea0dbd3000a2490db12df996464 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Sat, 3 Dec 2011 20:50:17 -0700 Subject: [PATCH] fix future resume in runtime thread with extra stack space Also, fix the possibility that a future can shift from RUNNING to PENDING while the runtime thread is checking PENDING then RUNNING. --- src/racket/src/fun.c | 39 +++++++++++++++- src/racket/src/future.c | 98 ++++++++++++++++++++++------------------ src/racket/src/future.h | 47 ------------------- src/racket/src/schpriv.h | 3 +- 4 files changed, 94 insertions(+), 93 deletions(-) diff --git a/src/racket/src/fun.c b/src/racket/src/fun.c index 8db326b661..c639e44a90 100644 --- a/src/racket/src/fun.c +++ b/src/racket/src/fun.c @@ -7487,7 +7487,9 @@ static void *apply_lwc_k() return scheme_apply_lightweight_continuation(lw, result, p->ku.k.i1, p->ku.k.i2); } -int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw) +static Scheme_Object *can_apply_lwc_k(void); + +static int can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw, int did_overflow) { #ifdef DO_STACK_CHECK /* enough room on C stack? */ @@ -7498,7 +7500,15 @@ int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *l # define SCHEME_PLUS_STACK_DELTA(x) ((x) - size) # include "mzstkchk.h" { - return 0; + if (did_overflow) + return 0; + else { + scheme_current_thread->ku.k.p1 = lw; + if (SCHEME_TRUEP(scheme_handle_stack_overflow(can_apply_lwc_k))) + return 2; + else + return 0; + } } } @@ -7508,6 +7518,31 @@ int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *l #endif } +int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw, int check_overflow) +/* result value 2 => need to handle stack overflow to have enough room */ +{ + if (check_overflow) + return can_apply_lightweight_continuation(lw, 0); + else + /* assume that we can apply the continuation, though + overflow handling may be needed (i.e., assume that the runtime + thread's stack size is > than a future thread's stack) */ + return 1; +} + +static Scheme_Object *can_apply_lwc_k(void) +{ + Scheme_Thread *p = scheme_current_thread; + Scheme_Lightweight_Continuation *lwc = (Scheme_Lightweight_Continuation *)p->ku.k.p1; + + p->ku.k.p1 = NULL; + + if (can_apply_lightweight_continuation(lwc, 1)) + return scheme_true; + else + return scheme_false; +} + Scheme_Object *scheme_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw, Scheme_Object *result, int result_is_rs_argv, diff --git a/src/racket/src/future.c b/src/racket/src/future.c index 1a7c17cb6a..2212ba47ca 100644 --- a/src/racket/src/future.c +++ b/src/racket/src/future.c @@ -291,7 +291,7 @@ static void future_do_runtimecall(struct Scheme_Future_Thread_State *fts, int can_suspend); static int capture_future_continuation(struct Scheme_Future_State *fs, future_t *ft, void **storage, int need_lock); -#define INITIAL_C_STACK_SIZE 500000 +#define FUTURE_C_STACK_SIZE 500000 #define FUTURE_RUNSTACK_SIZE 2000 #define FEVENT_BUFFER_SIZE 512 @@ -667,7 +667,7 @@ static void init_future_thread(Scheme_Future_State *fs, int i) params.runstack_start = runstack_start; mzrt_sema_create(¶ms.ready_sema, 0); - t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, ¶ms, INITIAL_C_STACK_SIZE); + t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, ¶ms, FUTURE_C_STACK_SIZE); mzrt_sema_wait(params.ready_sema); mzrt_sema_destroy(params.ready_sema); @@ -823,7 +823,7 @@ void scheme_future_block_until_gc() *(fs->pool_threads[i]->need_gc_pointer) = 1; if (*(fs->pool_threads[i]->fuel_pointer)) { *(fs->pool_threads[i]->fuel_pointer) = 0; - *(fs->pool_threads[i]->stack_boundary_pointer) += INITIAL_C_STACK_SIZE; + *(fs->pool_threads[i]->stack_boundary_pointer) += FUTURE_C_STACK_SIZE; } } } @@ -888,7 +888,7 @@ void scheme_future_continue_after_gc() if (!fs->pool_threads[i]->thread->current_ft || scheme_custodian_is_available(fs->pool_threads[i]->thread->current_ft->cust)) { *(fs->pool_threads[i]->fuel_pointer) = 1; - *(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE; + *(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE; } else { /* leave fuel exhausted, which will force the thread into a slow path when it resumes to suspend the computation */ @@ -1665,6 +1665,16 @@ static void trigger_added_touches(Scheme_Future_State *fs, future_t *ft) } } +static Scheme_Object *shallower_apply_future_lw_k(void) +{ + Scheme_Thread *p = scheme_current_thread; + future_t *ft = (future_t *)p->ku.k.p1; + + p->ku.k.p1 = NULL; + + return apply_future_lw(ft); +} + static void future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, int what) { mz_jmp_buf newbuf, * volatile savebuf; @@ -1685,7 +1695,11 @@ static void future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, i retval = NULL; } else { if (ft->suspended_lw) { - retval = apply_future_lw(ft); + if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 1) > 1) { + p->ku.k.p1 = ft; + retval = scheme_handle_stack_overflow(shallower_apply_future_lw_k); + } else + retval = apply_future_lw(ft); } else { retval = scheme_apply_multi(ft->orig_lambda, 0, NULL); } @@ -1742,45 +1756,37 @@ Scheme_Object *general_touch(int argc, Scheme_Object *argv[]) dump_state(); #endif - mzrt_mutex_lock(fs->future_mutex); - if ((((ft->status == PENDING) - && prefer_to_apply_future_in_runtime()) - || (ft->status == PENDING_OVERSIZE) - || (ft->status == SUSPENDED)) - && (!ft->suspended_lw - || scheme_can_apply_lightweight_continuation(ft->suspended_lw))) { - int what = FEVENT_START_WORK; - if (ft->status == PENDING_OVERSIZE) { - what = FEVENT_START_RTONLY_WORK; - } else if (ft->status != SUSPENDED) { - dequeue_future(fs, ft); - } - ft->status = RUNNING; - mzrt_mutex_unlock(fs->future_mutex); - - future_in_runtime(fs, ft, what); - - retval = ft->retval; - - receive_special_result(ft, retval, 0); - - flush_future_logs(fs); - - return retval; - } - mzrt_mutex_unlock(fs->future_mutex); - /* Spin waiting for primitive calls or a return value from the worker thread */ while (1) { - if (!future_ready((Scheme_Object *)ft)) { - record_fevent(FEVENT_TOUCH_PAUSE, ft->id); - scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0); - record_fevent(FEVENT_TOUCH_RESUME, ft->id); - } - mzrt_mutex_lock(fs->future_mutex); - if ((ft->status == RUNNING) + if ((((ft->status == PENDING) + && prefer_to_apply_future_in_runtime()) + || (ft->status == PENDING_OVERSIZE) + || (ft->status == SUSPENDED)) + && (!ft->suspended_lw + || scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0))) + { + int what = FEVENT_START_WORK; + if (ft->status == PENDING_OVERSIZE) { + what = FEVENT_START_RTONLY_WORK; + } else if (ft->status != SUSPENDED) { + dequeue_future(fs, ft); + } + ft->status = RUNNING; + mzrt_mutex_unlock(fs->future_mutex); + + future_in_runtime(fs, ft, what); + + retval = ft->retval; + + receive_special_result(ft, retval, 0); + + flush_future_logs(fs); + + return retval; + } + else if ((ft->status == RUNNING) || (ft->status == WAITING_FOR_FSEMA) || (ft->status == HANDLING_PRIM)) { @@ -1811,7 +1817,7 @@ Scheme_Object *general_touch(int argc, Scheme_Object *argv[]) { ft->maybe_suspended_lw = 0; if (ft->suspended_lw) { - if (scheme_can_apply_lightweight_continuation(ft->suspended_lw) + if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0) && prefer_to_apply_future_in_runtime()) { if (ft->status != SUSPENDED) dequeue_future(fs, ft); @@ -1833,6 +1839,12 @@ Scheme_Object *general_touch(int argc, Scheme_Object *argv[]) { mzrt_mutex_unlock(fs->future_mutex); } + + scheme_thread_block(0.0); /* to ensure check for futures work */ + + record_fevent(FEVENT_TOUCH_PAUSE, ft->id); + scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0); + record_fevent(FEVENT_TOUCH_RESUME, ft->id); } if (!retval) { @@ -2008,7 +2020,7 @@ void *worker_thread_future_loop(void *arg) scheme_current_thread = fts->thread; scheme_fuel_counter = 1; - scheme_jit_stack_boundary = ((uintptr_t)&v) - INITIAL_C_STACK_SIZE; + scheme_jit_stack_boundary = ((uintptr_t)&v) - FUTURE_C_STACK_SIZE; fts->need_gc_pointer = &scheme_future_need_gc_pause; fts->fuel_pointer = &scheme_fuel_counter; @@ -2369,7 +2381,7 @@ void scheme_check_future_work() if (!*(fs->pool_threads[i]->fuel_pointer) && !fs->pool_threads[i]->thread->current_ft) { *(fs->pool_threads[i]->fuel_pointer) = 1; - *(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE; + *(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE; } } } diff --git a/src/racket/src/future.h b/src/racket/src/future.h index d7f7aa2e56..d185f75cd3 100644 --- a/src/racket/src/future.h +++ b/src/racket/src/future.h @@ -3,27 +3,6 @@ #ifdef MZ_USE_FUTURES -#ifndef UNIT_TEST -#include "schpriv.h" -typedef Scheme_Object*(*prim_t)(int, Scheme_Object**); -#else -#define Scheme_Object void -#define Scheme_Bucket void -#define Scheme_Env void -#define Scheme_Type int -#define scheme_void NULL -#define scheme_false 0x0 -#define START_XFORM_SKIP -#define END_XFORM_SKIP -#define MZ_MARK_STACK_TYPE intptr_t -#define Scheme_Native_Closure_Data void -typedef Scheme_Object*(*prim_t)(int, Scheme_Object**); -void scheme_add_global(char *name, int arity, Scheme_Env *env); -int scheme_make_prim_w_arity(prim_t func, char *name, int arg1, int arg2); -#endif - -#include - typedef Scheme_Object **(*prim_on_demand_t)(Scheme_Object **, Scheme_Object **); typedef Scheme_Object* (*prim_obj_int_pobj_obj_t)(Scheme_Object*, int, Scheme_Object**); typedef Scheme_Object* (*prim_int_pobj_obj_t)(int, Scheme_Object**); @@ -197,7 +176,6 @@ typedef struct fsemaphore_t { future_t *queue_end; } fsemaphore_t; - /* Primitive instrumentation stuff */ /* Exceptions */ @@ -222,16 +200,6 @@ void scheme_wrong_type_from_ft(const char *who, const char *expected_type, int w extern Scheme_Object *scheme_ts_scheme_force_value_same_mark(Scheme_Object *v); -/* Helper macros for argument marshaling */ -#ifdef MZ_USE_FUTURES - -#define IS_WORKER_THREAD (g_rt_threadid != 0 && pthread_self() != g_rt_threadid) -#define ASSERT_CORRECT_THREAD if (g_rt_threadid != 0 && pthread_self() != g_rt_threadid) \ - { \ - printf("%s invoked on wrong thread!\n", __FUNCTION__); \ - /*GDB_BREAK;*/ \ - } - extern Scheme_Object **scheme_rtcall_on_demand(const char *who, int src_type, prim_on_demand_t f, Scheme_Object **argv); extern uintptr_t scheme_rtcall_alloc(const char *who, int src_type); extern void scheme_rtcall_new_mark_segment(Scheme_Thread *p); @@ -239,12 +207,6 @@ extern void scheme_rtcall_allocate_values(const char *who, int src_type, int cou prim_allocate_values_t f); extern Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, Scheme_Object *ready); extern Scheme_Object *scheme_rtcall_make_future(const char *who, int src_type, Scheme_Object *proc); -#else - -#define IS_WORKER_THREAD 0 -#define ASSERT_CORRECT_THREAD - -#endif void scheme_future_block_until_gc(); void scheme_future_continue_after_gc(); @@ -252,15 +214,6 @@ void scheme_check_future_work(); void scheme_future_gc_pause(); void scheme_future_check_custodians(); -#ifdef UNIT_TEST -/* These forwarding decls only need to be here to make - primitives visible to test cases written in C */ -extern int future_begin_invoke(void *code); -extern Scheme_Object *touch(int argc, Scheme_Object **argv); -extern Scheme_Object *future_touch(int futureid); -#endif - -#else #endif /* MZ_USE_FUTURES */ /* always defined: */ diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 6ad3dd53bc..b208819527 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -2397,7 +2397,8 @@ Scheme_Object *scheme_apply_lightweight_continuation(Scheme_Lightweight_Continua Scheme_Object **scheme_adjust_runstack_argument(Scheme_Lightweight_Continuation *captured, Scheme_Object **arg); -int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *captured); +int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *captured, + int check_overflow); int scheme_push_marks_from_thread(Scheme_Thread *p2, Scheme_Cont_Frame_Data *d); int scheme_push_marks_from_lightweight_continuation(Scheme_Lightweight_Continuation *captured,