diff --git a/src/mzscheme/src/future.c b/src/mzscheme/src/future.c index 08b8975a22..e2dab8196d 100644 --- a/src/mzscheme/src/future.c +++ b/src/mzscheme/src/future.c @@ -18,9 +18,11 @@ int g_print_prims = 0; extern void *on_demand_jit_code; -#define THREAD_POOL_SIZE 2 +#define THREAD_POOL_SIZE 1 #define INITIAL_C_STACK_SIZE 500000 static pthread_t g_pool_threads[THREAD_POOL_SIZE]; +static int *g_fuel_pointers[THREAD_POOL_SIZE]; +static unsigned long *g_stack_boundary_pointers[THREAD_POOL_SIZE]; static int g_num_avail_threads = 0; static unsigned long g_cur_cpu_mask = 1; static void *g_signal_handle = NULL; @@ -32,6 +34,16 @@ pthread_t g_rt_threadid = 0; static pthread_mutex_t g_future_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t g_future_pending_cv = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t gc_ok_m = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t gc_ok_c = PTHREAD_COND_INITIALIZER; +static int gc_not_ok; +extern THREAD_LOCAL unsigned long GC_gen0_alloc_page_ptr; + +static void start_gc_not_ok(); +static void end_gc_not_ok(); + +static THREAD_LOCAL future_t *current_ft; + //Stuff for scheme runstack //Some of these may mimic defines in thread.c, but are redefined here //to avoid making any changes to that file for now (moving anything out into common @@ -170,6 +182,38 @@ void dump_state(void) } #endif +/**********************************************************************/ +/* Semaphore helpers */ +/**********************************************************************/ + +typedef struct sema_t { + int ready; + pthread_mutex_t m; + pthread_cond_t c; +} sema_t; + +#define SEMA_INITIALIZER { 0, PTHREAD_MUTEX_INITIALIZER, \ + PTHREAD_COND_INITIALIZER } + +static void sema_wait(sema_t *s) +{ + pthread_mutex_lock(&s->m); + while (!s->ready) { + pthread_cond_wait(&s->c, &s->m); + } + --s->ready; + pthread_mutex_unlock(&s->m); +} + +static void sema_signal(sema_t *s) +{ + pthread_mutex_lock(&s->m); + s->ready++; + pthread_cond_signal(&s->c); + pthread_mutex_unlock(&s->m); +} + +static sema_t ready_sema = SEMA_INITIALIZER; /**********************************************************************/ /* Plumbing for MzScheme initialization */ @@ -179,7 +223,6 @@ void dump_state(void) //primitives known void scheme_init_futures(Scheme_Env *env) { - START_XFORM_SKIP; Scheme_Object *v; Scheme_Env *newenv; @@ -237,7 +280,8 @@ void scheme_init_futures(Scheme_Env *env) scheme_finish_primitive_module(newenv); scheme_protect_primitive_provide(newenv, NULL); - END_XFORM_SKIP; + + REGISTER_SO(g_future_queue); } @@ -255,15 +299,57 @@ void futures_init(void) //'queue up' and wait for futures to become available pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, INITIAL_C_STACK_SIZE); - for (i = 0; i < THREAD_POOL_SIZE - 1; i++) + for (i = 0; i < THREAD_POOL_SIZE; i++) { - pthread_create(&threadid, &attr, worker_thread_future_loop, NULL); + pthread_create(&threadid, &attr, worker_thread_future_loop, &i); + sema_wait(&ready_sema); g_pool_threads[i] = threadid; } g_num_avail_threads = THREAD_POOL_SIZE; } +static void start_gc_not_ok() +{ + pthread_mutex_lock(&gc_ok_m); + gc_not_ok++; + pthread_mutex_unlock(&gc_ok_m); +} + +static void end_gc_not_ok() +{ + pthread_mutex_lock(&gc_ok_m); + --gc_not_ok; + pthread_cond_signal(&gc_ok_c); + pthread_mutex_unlock(&gc_ok_m); +} + +void scheme_future_block_until_gc() +{ + int i; + + for (i = 0; i < THREAD_POOL_SIZE; i++) { + *(g_fuel_pointers[i]) = 0; + *(g_stack_boundary_pointers[i]) += INITIAL_C_STACK_SIZE; + } + asm("mfence"); + + pthread_mutex_lock(&gc_ok_m); + while (gc_not_ok) { + pthread_cond_wait(&gc_ok_c, &gc_ok_m); + } + pthread_mutex_unlock(&gc_ok_m); +} + +void scheme_future_continue_after_gc() +{ + int i; + + for (i = 0; i < THREAD_POOL_SIZE; i++) { + *(g_fuel_pointers[i]) = 1; + *(g_stack_boundary_pointers[i]) -= INITIAL_C_STACK_SIZE; + } +} /**********************************************************************/ /* Primitive implementations */ @@ -312,8 +398,6 @@ void print_ms_and_us() Scheme_Object *future(int argc, Scheme_Object *argv[]) { - START_XFORM_SKIP; - #ifdef DEBUG_FUTURES LOG_THISCALL; #endif @@ -368,7 +452,6 @@ Scheme_Object *future(int argc, Scheme_Object *argv[]) pthread_mutex_unlock(&g_future_queue_mutex); return (Scheme_Object*)ft; - END_XFORM_SKIP; } @@ -395,7 +478,6 @@ int future_ready(Scheme_Object *obj) Scheme_Object *touch(int argc, Scheme_Object *argv[]) { - START_XFORM_SKIP; Scheme_Object *retval = NULL; void *rtcall_retval = NULL; future_t *ft; @@ -472,7 +554,6 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[]) } return retval; - END_XFORM_SKIP; } @@ -484,6 +565,8 @@ void *worker_thread_future_loop(void *arg) START_XFORM_SKIP; Scheme_Object *v; Scheme_Object* (*jitcode)(Scheme_Object*, int, Scheme_Object**); + future_t *ft; + int id = *(int *)arg; //Set processor affinity /*pthread_mutex_lock(&g_future_queue_mutex); @@ -498,22 +581,34 @@ void *worker_thread_future_loop(void *arg) pthread_mutex_unlock(&g_future_queue_mutex); */ - future_t *ft; + scheme_fuel_counter = 1; + scheme_jit_stack_boundary = ((unsigned long)&v) - INITIAL_C_STACK_SIZE; + + g_fuel_pointers[id] = &scheme_scheme_fuel_counter; + g_stack_boundary_pointers[id] = &scheme_scheme_jit_stack_boundary; + + GC_gen0_alloc_page_ptr = 1; /* weirdly, disables inline allocation */ + + REGISTER_SO(current_ft); + REGISTER_SO(scheme_current_runstack); + REGISTER_SO(scheme_current_runstack_start); + sema_signal(&ready_sema); + wait_for_work: //LOG("Waiting for new future work..."); + start_gc_not_ok(); pthread_mutex_lock(&g_future_queue_mutex); while (!(ft = get_pending_future())) { + end_gc_not_ok(); pthread_cond_wait(&g_future_pending_cv, &g_future_queue_mutex); + start_gc_not_ok(); } LOG("Got a signal that a future is pending..."); //Work is available for this thread - scheme_fuel_counter = 1; - scheme_jit_stack_boundary = ((unsigned long)&v) - INITIAL_C_STACK_SIZE; - ft->pending = 0; ft->status = RUNNING; ft->threadid = pthread_self(); @@ -532,6 +627,8 @@ void *worker_thread_future_loop(void *arg) jitcode = (Scheme_Object* (*)(Scheme_Object*, int, Scheme_Object**))(ft->code); pthread_mutex_unlock(&g_future_queue_mutex); + current_ft = ft; + //Run the code //Passing no arguments for now. //The lambda passed to a future will always be a parameterless @@ -543,6 +640,8 @@ void *worker_thread_future_loop(void *arg) v = jitcode(ft->orig_lambda, 0, NULL); LOG("Finished running JIT code at %p.\n", ft->code); + ft = current_ft; + //Set the return val in the descriptor pthread_mutex_lock(&g_future_queue_mutex); ft->work_completed = 1; @@ -553,6 +652,8 @@ void *worker_thread_future_loop(void *arg) scheme_signal_received_at(g_signal_handle); pthread_mutex_unlock(&g_future_queue_mutex); + end_gc_not_ok(); + goto wait_for_work; return NULL; @@ -570,7 +671,6 @@ int future_do_runtimecall( void *args, void *retval) { - START_XFORM_SKIP; future_t *future; //If already running on the main thread //or no future is involved, do nothing @@ -605,7 +705,9 @@ int future_do_runtimecall( scheme_signal_received_at(g_signal_handle); //Wait for the signal that the RT call is finished + start_gc_not_ok(); pthread_cond_wait(&future->can_continue_cv, &g_future_queue_mutex); + end_gc_not_ok(); //Clear rt call fields before releasing the lock on the descriptor future->rt_prim = NULL; @@ -615,7 +717,6 @@ int future_do_runtimecall( retval = future->rt_prim_retval; pthread_mutex_unlock(&g_future_queue_mutex); return 1; - END_XFORM_SKIP; } @@ -869,11 +970,9 @@ void *invoke_rtcall(future_t *future) future_t *enqueue_future(void) { - START_XFORM_SKIP; future_t *last = get_last_future(); - future_t *ft = (future_t*)malloc(sizeof(future_t)); - memset(ft, 0, sizeof(future_t)); - ft->so.type = scheme_future_type; + future_t *ft = MALLOC_ONE_TAGGED(future_t); + ft->so.type = scheme_future_type; if (NULL == last) { g_future_queue = ft; @@ -885,7 +984,6 @@ future_t *enqueue_future(void) ft->next = NULL; return ft; - END_XFORM_SKIP; } @@ -906,7 +1004,9 @@ future_t *get_pending_future(void) future_t *get_my_future(void) { + START_XFORM_SKIP; return get_future_by_threadid(pthread_self()); + END_XFORM_SKIP; } @@ -963,7 +1063,6 @@ future_t *get_future(int futureid) future_t *get_last_future(void) { - START_XFORM_SKIP; future_t *ft = g_future_queue; if (NULL == ft) { @@ -976,7 +1075,6 @@ future_t *get_last_future(void) } return ft; - END_XFORM_SKIP; } diff --git a/src/mzscheme/src/future.h b/src/mzscheme/src/future.h index 01f066187c..d3e49e2d33 100644 --- a/src/mzscheme/src/future.h +++ b/src/mzscheme/src/future.h @@ -524,6 +524,9 @@ extern int rtcall_obj_int_pobj_obj( #define LOG_RTCALL_ENV_ENV_VOID(a,b) #endif +void scheme_future_block_until_gc(); +void scheme_future_continue_after_gc(); + #ifdef UNIT_TEST //These forwarding decls only need to be here to make diff --git a/src/mzscheme/src/mzmarksrc.c b/src/mzscheme/src/mzmarksrc.c index 2747210bd8..61c6e0bb96 100644 --- a/src/mzscheme/src/mzmarksrc.c +++ b/src/mzscheme/src/mzmarksrc.c @@ -2225,9 +2225,9 @@ future { future_t *f = (future_t *)p; gcMARK(f->runstack); gcMARK(f->runstack_start); - gcMARK(f->orig_thread); + gcMARK(f->orig_lambda); gcMARK(f->rt_prim_args); - gcMARK(f->rt_prim_result); + gcMARK(f->rt_prim_retval); gcMARK(f->retval); gcMARK(f->prev); gcMARK(f->next); diff --git a/src/mzscheme/src/thread.c b/src/mzscheme/src/thread.c index 2654994753..d70b0d88bf 100644 --- a/src/mzscheme/src/thread.c +++ b/src/mzscheme/src/thread.c @@ -41,6 +41,9 @@ #include "schpriv.h" #include "schmach.h" #include "schgc.h" +#ifdef FUTURES_ENABLED +# include "future.h" +#endif #ifndef PALMOS_STUFF # include #endif @@ -7342,6 +7345,10 @@ static void get_ready_for_GC() { start_this_gc_time = scheme_get_process_milliseconds(); +#ifdef FUTURES_ENABLED + scheme_future_block_until_gc(); +#endif + scheme_zero_unneeded_rands(scheme_current_thread); scheme_clear_modidx_cache(); @@ -7409,6 +7416,10 @@ static void done_with_GC() end_this_gc_time = scheme_get_process_milliseconds(); scheme_total_gc_time += (end_this_gc_time - start_this_gc_time); + +#ifdef FUTURES_ENABLED + scheme_future_continue_after_gc(); +#endif } #ifdef MZ_PRECISE_GC