diff --git a/src/mzscheme/src/future.c b/src/mzscheme/src/future.c index f611fa1ec9..4e2ff3bae0 100644 --- a/src/mzscheme/src/future.c +++ b/src/mzscheme/src/future.c @@ -18,17 +18,18 @@ int g_print_prims = 0; extern void *on_demand_jit_code; -#define THREAD_POOL_SIZE 2 +#define THREAD_POOL_SIZE 2 +#define INITIAL_C_STACK_SIZE 500000 static pthread_t g_pool_threads[THREAD_POOL_SIZE]; static int g_num_avail_threads = 0; static unsigned long g_cur_cpu_mask = 1; +static void *g_signal_handle = NULL; future_t *g_future_queue = NULL; int g_next_futureid = 0; pthread_t g_rt_threadid = 0; static pthread_mutex_t g_future_queue_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t g_future_pending_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t g_future_pending_cv = PTHREAD_COND_INITIALIZER; //Stuff for scheme runstack @@ -246,13 +247,17 @@ void futures_init(void) { int i; pthread_t threadid; + GC_CAN_IGNORE pthread_attr_t attr; g_rt_threadid = pthread_self(); + g_signal_handle = scheme_get_signal_handle(); //Create the worker thread pool. These threads will - //'queue up' and wait for futures to become available + //'queue up' and wait for futures to become available + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, INITIAL_C_STACK_SIZE); for (i = 0; i < THREAD_POOL_SIZE - 1; i++) { - pthread_create(&threadid, NULL, worker_thread_future_loop, NULL); + pthread_create(&threadid, &attr, worker_thread_future_loop, NULL); g_pool_threads[i] = threadid; } @@ -311,7 +316,6 @@ Scheme_Object *future(int argc, Scheme_Object *argv[]) #ifdef DEBUG_FUTURES LOG_THISCALL; - dump_state(); #endif int init_runstack_size, main_runstack_size; @@ -353,23 +357,15 @@ Scheme_Object *future(int argc, Scheme_Object *argv[]) //when generating code if (ncd->code == on_demand_jit_code) { - printf("JIT compiling code.\n"); scheme_on_demand_generate_lambda(nc, 0, NULL); - printf("Code pointer is now %p, and on_demand_jit_code is %p.\n", ncd->code, on_demand_jit_code); - } - else - { - printf("Code pointer was %p, and on_demand_jit_code was %p. Code is already JIT compiled.\n", ncd->code, on_demand_jit_code); } //pthread_mutex_lock(&g_future_queue_mutex); ft->code = (void*)ncd->code; - pthread_mutex_unlock(&g_future_queue_mutex); //Signal that a future is pending - pthread_mutex_lock(&g_future_pending_mutex); pthread_cond_signal(&g_future_pending_cv); - pthread_mutex_unlock(&g_future_pending_mutex); + pthread_mutex_unlock(&g_future_queue_mutex); return scheme_make_integer(futureid); END_XFORM_SKIP; @@ -382,6 +378,21 @@ Scheme_Object *num_processors(int argc, Scheme_Object *argv[]) } +int future_ready(Scheme_Object *obj) +{ + int ret = 0; + future_t *ft = (future_t*)obj; + pthread_mutex_lock(&g_future_queue_mutex); + if (ft->work_completed || ft->rt_prim != NULL) + { + ret = 1; + } + + pthread_mutex_unlock(&g_future_queue_mutex); + return ret; +} + + Scheme_Object *touch(int argc, Scheme_Object *argv[]) { START_XFORM_SKIP; @@ -395,6 +406,16 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[]) #ifdef DEBUG_FUTURES LOG("touch (future %d)", futureid); dump_state(); + + if (SCHEME_INTP(argv[0])) + { + printf("Future id passed to touch was %d, is a Scheme integer.\n", futureid); + } + else + { + printf("Arg passed to touch was a %d.\n", SCHEME_TYPE(argv[0])); + } + fflush(stdout); #endif pthread_mutex_lock(&g_future_queue_mutex); @@ -404,11 +425,15 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[]) //Spin waiting for primitive calls or a return value from //the worker thread wait_for_rtcall_or_completion: + scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0); pthread_mutex_lock(&g_future_queue_mutex); if (ft->work_completed) { retval = ft->retval; + printf("Successfully touched future %d\n", ft->id); + fflush(stdout); + //Destroy the future descriptor if (ft->prev == NULL) { @@ -453,10 +478,6 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[]) pthread_cond_signal(&ft->can_continue_cv); pthread_mutex_unlock(&g_future_queue_mutex); - #ifdef DEBUG_FUTURES - dump_state(); - #endif - goto wait_for_rtcall_or_completion; } else @@ -480,7 +501,7 @@ void *worker_thread_future_loop(void *arg) Scheme_Object* (*jitcode)(Scheme_Object*, int, Scheme_Object**); //Set processor affinity - pthread_mutex_lock(&g_future_queue_mutex); + /*pthread_mutex_lock(&g_future_queue_mutex); if (pthread_setaffinity_np(pthread_self(), sizeof(g_cur_cpu_mask), &g_cur_cpu_mask)) { printf( @@ -490,17 +511,24 @@ void *worker_thread_future_loop(void *arg) } pthread_mutex_unlock(&g_future_queue_mutex); + */ + future_t *ft; wait_for_work: //LOG("Waiting for new future work..."); - pthread_mutex_lock(&g_future_pending_mutex); - pthread_cond_wait(&g_future_pending_cv, &g_future_pending_mutex); + pthread_mutex_lock(&g_future_queue_mutex); + while (!(ft = get_pending_future())) + { + pthread_cond_wait(&g_future_pending_cv, &g_future_queue_mutex); + } - //LOG("Got a signal that a future is pending..."); + LOG("Got a signal that a future is pending..."); //Work is available for this thread - pthread_mutex_lock(&g_future_queue_mutex); - future_t *ft = get_pending_future(); + + scheme_fuel_counter = 1; + scheme_jit_stack_boundary = ((unsigned long)&v) - INITIAL_C_STACK_SIZE; + ft->pending = 0; ft->status = RUNNING; ft->threadid = pthread_self(); @@ -518,7 +546,6 @@ void *worker_thread_future_loop(void *arg) jitcode = (Scheme_Object* (*)(Scheme_Object*, int, Scheme_Object**))(ft->code); pthread_mutex_unlock(&g_future_queue_mutex); - pthread_mutex_unlock(&g_future_pending_mutex); //Run the code //Passing no arguments for now. @@ -538,6 +565,7 @@ void *worker_thread_future_loop(void *arg) //Update the status ft->status = FINISHED; + scheme_signal_received_at(g_signal_handle); pthread_mutex_unlock(&g_future_queue_mutex); goto wait_for_work; @@ -557,8 +585,8 @@ int future_do_runtimecall( void *args, void *retval) { - START_XFORM_SKIP; - future_t *future; + START_XFORM_SKIP; + future_t *future; //If already running on the main thread //or no future is involved, do nothing //and return FALSE @@ -589,6 +617,8 @@ int future_do_runtimecall( //Update the future's status to waiting future->status = WAITING_FOR_PRIM; + scheme_signal_received_at(g_signal_handle); + //Wait for the signal that the RT call is finished pthread_cond_wait(&future->can_continue_cv, &g_future_queue_mutex); @@ -662,7 +692,14 @@ int rtcall_obj_int_pobj_obj( #ifdef DEBUG_FUTURES //debug_save_context(); - #endif + + #endif + + printf("scheme_fuel_counter = %d\n", scheme_fuel_counter); + printf("scheme_jit_stack_boundary = %p\n", scheme_jit_stack_boundary); + printf("scheme_current_runstack = %p\n", scheme_current_runstack); + printf("scheme_current_runstack_start = %p\n", scheme_current_runstack_start); + printf("stack address = %p\n", &future); data.prim = f; data.a = a; diff --git a/src/mzscheme/src/future.h b/src/mzscheme/src/future.h index 4b074004ca..01f066187c 100644 --- a/src/mzscheme/src/future.h +++ b/src/mzscheme/src/future.h @@ -478,7 +478,7 @@ extern int rtcall_obj_int_pobj_obj( #endif -#ifdef LOG_ARGS +#if 1 #define LOG(a...) do { fprintf(stderr, "%x:%s:%s:%d ", (unsigned) pthread_self(), __FILE__, __FUNCTION__, __LINE__); fprintf(stderr, a); fprintf(stderr, "\n"); fflush(stdout); } while(0) #define LOG_THISCALL LOG(__FUNCTION__) diff --git a/src/mzscheme/src/jit.c b/src/mzscheme/src/jit.c index 690f54ec4d..0f73cf4cf9 100644 --- a/src/mzscheme/src/jit.c +++ b/src/mzscheme/src/jit.c @@ -2201,6 +2201,21 @@ static Scheme_Object *ts_scheme_apply_from_native(Scheme_Object *rator, int argc return _scheme_apply_from_native(rator, argc, argv); } +static Scheme_Object *ts_scheme_tail_apply_from_native(Scheme_Object *rator, int argc, Scheme_Object **argv) +{ + /* RTCALL_OBJ_INT_POBJ_OBJ(_scheme_tail_apply_from_native, rator, argc, argv); */ + Scheme_Object *retptr; + if (rtcall_obj_int_pobj_obj(_scheme_tail_apply_from_native, + rator, + argc, + argv, + &retptr)) { + return retptr; + } + + return _scheme_tail_apply_from_native(rator, argc, argv); +} + static void ts_on_demand(void) { /* RTCALL_VOID_VOID(on_demand); */ @@ -2400,7 +2415,7 @@ static int generate_finish_tail_call(mz_jit_state *jitter, int direct_native) if (direct_native > 1) { /* => some_args_already_in_place */ (void)mz_finish(_scheme_tail_apply_from_native_fixup_args); } else { - (void)mz_finish(_scheme_tail_apply_from_native); + (void)mz_finish(ts_scheme_tail_apply_from_native); } CHECK_LIMIT(); /* Return: */ diff --git a/src/mzscheme/src/schemef.h b/src/mzscheme/src/schemef.h index 825c6177b6..6ee569f92c 100644 --- a/src/mzscheme/src/schemef.h +++ b/src/mzscheme/src/schemef.h @@ -68,7 +68,7 @@ MZ_EXTERN Scheme_Object *scheme_current_break_cell(); /*========================================================================*/ #ifndef LINK_EXTENSIONS_BY_TABLE -# ifndef MZ_USE_PLACES +# if !defined(MZ_USE_PLACES) || !defined(FUTURES_ENABLED) MZ_EXTERN THREAD_LOCAL Scheme_Thread *scheme_current_thread; # endif MZ_EXTERN THREAD_LOCAL volatile int scheme_fuel_counter;