fix future resume in runtime thread with extra stack space

Also, fix the possibility that a future can shift from RUNNING
to PENDING while the runtime thread is checking PENDING then
RUNNING.
This commit is contained in:
Matthew Flatt 2011-12-03 20:50:17 -07:00
parent 333ffd7ede
commit 885bf6555f
4 changed files with 94 additions and 93 deletions

View File

@ -7487,7 +7487,9 @@ static void *apply_lwc_k()
return scheme_apply_lightweight_continuation(lw, result, p->ku.k.i1, p->ku.k.i2);
}
int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw)
static Scheme_Object *can_apply_lwc_k(void);
static int can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw, int did_overflow)
{
#ifdef DO_STACK_CHECK
/* enough room on C stack? */
@ -7498,7 +7500,15 @@ int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *l
# define SCHEME_PLUS_STACK_DELTA(x) ((x) - size)
# include "mzstkchk.h"
{
return 0;
if (did_overflow)
return 0;
else {
scheme_current_thread->ku.k.p1 = lw;
if (SCHEME_TRUEP(scheme_handle_stack_overflow(can_apply_lwc_k)))
return 2;
else
return 0;
}
}
}
@ -7508,6 +7518,31 @@ int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *l
#endif
}
int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw, int check_overflow)
/* result value 2 => need to handle stack overflow to have enough room */
{
if (check_overflow)
return can_apply_lightweight_continuation(lw, 0);
else
/* assume that we can apply the continuation, though
overflow handling may be needed (i.e., assume that the runtime
thread's stack size is > than a future thread's stack) */
return 1;
}
static Scheme_Object *can_apply_lwc_k(void)
{
Scheme_Thread *p = scheme_current_thread;
Scheme_Lightweight_Continuation *lwc = (Scheme_Lightweight_Continuation *)p->ku.k.p1;
p->ku.k.p1 = NULL;
if (can_apply_lightweight_continuation(lwc, 1))
return scheme_true;
else
return scheme_false;
}
Scheme_Object *scheme_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw,
Scheme_Object *result,
int result_is_rs_argv,

View File

@ -291,7 +291,7 @@ static void future_do_runtimecall(struct Scheme_Future_Thread_State *fts,
int can_suspend);
static int capture_future_continuation(struct Scheme_Future_State *fs, future_t *ft, void **storage, int need_lock);
#define INITIAL_C_STACK_SIZE 500000
#define FUTURE_C_STACK_SIZE 500000
#define FUTURE_RUNSTACK_SIZE 2000
#define FEVENT_BUFFER_SIZE 512
@ -667,7 +667,7 @@ static void init_future_thread(Scheme_Future_State *fs, int i)
params.runstack_start = runstack_start;
mzrt_sema_create(&params.ready_sema, 0);
t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, &params, INITIAL_C_STACK_SIZE);
t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, &params, FUTURE_C_STACK_SIZE);
mzrt_sema_wait(params.ready_sema);
mzrt_sema_destroy(params.ready_sema);
@ -823,7 +823,7 @@ void scheme_future_block_until_gc()
*(fs->pool_threads[i]->need_gc_pointer) = 1;
if (*(fs->pool_threads[i]->fuel_pointer)) {
*(fs->pool_threads[i]->fuel_pointer) = 0;
*(fs->pool_threads[i]->stack_boundary_pointer) += INITIAL_C_STACK_SIZE;
*(fs->pool_threads[i]->stack_boundary_pointer) += FUTURE_C_STACK_SIZE;
}
}
}
@ -888,7 +888,7 @@ void scheme_future_continue_after_gc()
if (!fs->pool_threads[i]->thread->current_ft
|| scheme_custodian_is_available(fs->pool_threads[i]->thread->current_ft->cust)) {
*(fs->pool_threads[i]->fuel_pointer) = 1;
*(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE;
*(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE;
} else {
/* leave fuel exhausted, which will force the thread into a slow
path when it resumes to suspend the computation */
@ -1665,6 +1665,16 @@ static void trigger_added_touches(Scheme_Future_State *fs, future_t *ft)
}
}
static Scheme_Object *shallower_apply_future_lw_k(void)
{
Scheme_Thread *p = scheme_current_thread;
future_t *ft = (future_t *)p->ku.k.p1;
p->ku.k.p1 = NULL;
return apply_future_lw(ft);
}
static void future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, int what)
{
mz_jmp_buf newbuf, * volatile savebuf;
@ -1685,7 +1695,11 @@ static void future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, i
retval = NULL;
} else {
if (ft->suspended_lw) {
retval = apply_future_lw(ft);
if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 1) > 1) {
p->ku.k.p1 = ft;
retval = scheme_handle_stack_overflow(shallower_apply_future_lw_k);
} else
retval = apply_future_lw(ft);
} else {
retval = scheme_apply_multi(ft->orig_lambda, 0, NULL);
}
@ -1742,45 +1756,37 @@ Scheme_Object *general_touch(int argc, Scheme_Object *argv[])
dump_state();
#endif
mzrt_mutex_lock(fs->future_mutex);
if ((((ft->status == PENDING)
&& prefer_to_apply_future_in_runtime())
|| (ft->status == PENDING_OVERSIZE)
|| (ft->status == SUSPENDED))
&& (!ft->suspended_lw
|| scheme_can_apply_lightweight_continuation(ft->suspended_lw))) {
int what = FEVENT_START_WORK;
if (ft->status == PENDING_OVERSIZE) {
what = FEVENT_START_RTONLY_WORK;
} else if (ft->status != SUSPENDED) {
dequeue_future(fs, ft);
}
ft->status = RUNNING;
mzrt_mutex_unlock(fs->future_mutex);
future_in_runtime(fs, ft, what);
retval = ft->retval;
receive_special_result(ft, retval, 0);
flush_future_logs(fs);
return retval;
}
mzrt_mutex_unlock(fs->future_mutex);
/* Spin waiting for primitive calls or a return value from
the worker thread */
while (1) {
if (!future_ready((Scheme_Object *)ft)) {
record_fevent(FEVENT_TOUCH_PAUSE, ft->id);
scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0);
record_fevent(FEVENT_TOUCH_RESUME, ft->id);
}
mzrt_mutex_lock(fs->future_mutex);
if ((ft->status == RUNNING)
if ((((ft->status == PENDING)
&& prefer_to_apply_future_in_runtime())
|| (ft->status == PENDING_OVERSIZE)
|| (ft->status == SUSPENDED))
&& (!ft->suspended_lw
|| scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0)))
{
int what = FEVENT_START_WORK;
if (ft->status == PENDING_OVERSIZE) {
what = FEVENT_START_RTONLY_WORK;
} else if (ft->status != SUSPENDED) {
dequeue_future(fs, ft);
}
ft->status = RUNNING;
mzrt_mutex_unlock(fs->future_mutex);
future_in_runtime(fs, ft, what);
retval = ft->retval;
receive_special_result(ft, retval, 0);
flush_future_logs(fs);
return retval;
}
else if ((ft->status == RUNNING)
|| (ft->status == WAITING_FOR_FSEMA)
|| (ft->status == HANDLING_PRIM))
{
@ -1811,7 +1817,7 @@ Scheme_Object *general_touch(int argc, Scheme_Object *argv[])
{
ft->maybe_suspended_lw = 0;
if (ft->suspended_lw) {
if (scheme_can_apply_lightweight_continuation(ft->suspended_lw)
if (scheme_can_apply_lightweight_continuation(ft->suspended_lw, 0)
&& prefer_to_apply_future_in_runtime()) {
if (ft->status != SUSPENDED)
dequeue_future(fs, ft);
@ -1833,6 +1839,12 @@ Scheme_Object *general_touch(int argc, Scheme_Object *argv[])
{
mzrt_mutex_unlock(fs->future_mutex);
}
scheme_thread_block(0.0); /* to ensure check for futures work */
record_fevent(FEVENT_TOUCH_PAUSE, ft->id);
scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0);
record_fevent(FEVENT_TOUCH_RESUME, ft->id);
}
if (!retval) {
@ -2008,7 +2020,7 @@ void *worker_thread_future_loop(void *arg)
scheme_current_thread = fts->thread;
scheme_fuel_counter = 1;
scheme_jit_stack_boundary = ((uintptr_t)&v) - INITIAL_C_STACK_SIZE;
scheme_jit_stack_boundary = ((uintptr_t)&v) - FUTURE_C_STACK_SIZE;
fts->need_gc_pointer = &scheme_future_need_gc_pause;
fts->fuel_pointer = &scheme_fuel_counter;
@ -2369,7 +2381,7 @@ void scheme_check_future_work()
if (!*(fs->pool_threads[i]->fuel_pointer)
&& !fs->pool_threads[i]->thread->current_ft) {
*(fs->pool_threads[i]->fuel_pointer) = 1;
*(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE;
*(fs->pool_threads[i]->stack_boundary_pointer) -= FUTURE_C_STACK_SIZE;
}
}
}

View File

@ -3,27 +3,6 @@
#ifdef MZ_USE_FUTURES
#ifndef UNIT_TEST
#include "schpriv.h"
typedef Scheme_Object*(*prim_t)(int, Scheme_Object**);
#else
#define Scheme_Object void
#define Scheme_Bucket void
#define Scheme_Env void
#define Scheme_Type int
#define scheme_void NULL
#define scheme_false 0x0
#define START_XFORM_SKIP
#define END_XFORM_SKIP
#define MZ_MARK_STACK_TYPE intptr_t
#define Scheme_Native_Closure_Data void
typedef Scheme_Object*(*prim_t)(int, Scheme_Object**);
void scheme_add_global(char *name, int arity, Scheme_Env *env);
int scheme_make_prim_w_arity(prim_t func, char *name, int arg1, int arg2);
#endif
#include <stdio.h>
typedef Scheme_Object **(*prim_on_demand_t)(Scheme_Object **, Scheme_Object **);
typedef Scheme_Object* (*prim_obj_int_pobj_obj_t)(Scheme_Object*, int, Scheme_Object**);
typedef Scheme_Object* (*prim_int_pobj_obj_t)(int, Scheme_Object**);
@ -197,7 +176,6 @@ typedef struct fsemaphore_t {
future_t *queue_end;
} fsemaphore_t;
/* Primitive instrumentation stuff */
/* Exceptions */
@ -222,16 +200,6 @@ void scheme_wrong_type_from_ft(const char *who, const char *expected_type, int w
extern Scheme_Object *scheme_ts_scheme_force_value_same_mark(Scheme_Object *v);
/* Helper macros for argument marshaling */
#ifdef MZ_USE_FUTURES
#define IS_WORKER_THREAD (g_rt_threadid != 0 && pthread_self() != g_rt_threadid)
#define ASSERT_CORRECT_THREAD if (g_rt_threadid != 0 && pthread_self() != g_rt_threadid) \
{ \
printf("%s invoked on wrong thread!\n", __FUNCTION__); \
/*GDB_BREAK;*/ \
}
extern Scheme_Object **scheme_rtcall_on_demand(const char *who, int src_type, prim_on_demand_t f, Scheme_Object **argv);
extern uintptr_t scheme_rtcall_alloc(const char *who, int src_type);
extern void scheme_rtcall_new_mark_segment(Scheme_Thread *p);
@ -239,12 +207,6 @@ extern void scheme_rtcall_allocate_values(const char *who, int src_type, int cou
prim_allocate_values_t f);
extern Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, Scheme_Object *ready);
extern Scheme_Object *scheme_rtcall_make_future(const char *who, int src_type, Scheme_Object *proc);
#else
#define IS_WORKER_THREAD 0
#define ASSERT_CORRECT_THREAD
#endif
void scheme_future_block_until_gc();
void scheme_future_continue_after_gc();
@ -252,15 +214,6 @@ void scheme_check_future_work();
void scheme_future_gc_pause();
void scheme_future_check_custodians();
#ifdef UNIT_TEST
/* These forwarding decls only need to be here to make
primitives visible to test cases written in C */
extern int future_begin_invoke(void *code);
extern Scheme_Object *touch(int argc, Scheme_Object **argv);
extern Scheme_Object *future_touch(int futureid);
#endif
#else
#endif /* MZ_USE_FUTURES */
/* always defined: */

View File

@ -2397,7 +2397,8 @@ Scheme_Object *scheme_apply_lightweight_continuation(Scheme_Lightweight_Continua
Scheme_Object **scheme_adjust_runstack_argument(Scheme_Lightweight_Continuation *captured,
Scheme_Object **arg);
int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *captured);
int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *captured,
int check_overflow);
int scheme_push_marks_from_thread(Scheme_Thread *p2, Scheme_Cont_Frame_Data *d);
int scheme_push_marks_from_lightweight_continuation(Scheme_Lightweight_Continuation *captured,