futures and GC cooperation

svn: r16848
This commit is contained in:
Matthew Flatt 2009-11-17 21:02:44 +00:00
parent 89a388c6f7
commit e1e2267973
4 changed files with 137 additions and 25 deletions

View File

@ -18,9 +18,11 @@ int g_print_prims = 0;
extern void *on_demand_jit_code;
#define THREAD_POOL_SIZE 2
#define THREAD_POOL_SIZE 1
#define INITIAL_C_STACK_SIZE 500000
static pthread_t g_pool_threads[THREAD_POOL_SIZE];
static int *g_fuel_pointers[THREAD_POOL_SIZE];
static unsigned long *g_stack_boundary_pointers[THREAD_POOL_SIZE];
static int g_num_avail_threads = 0;
static unsigned long g_cur_cpu_mask = 1;
static void *g_signal_handle = NULL;
@ -32,6 +34,16 @@ pthread_t g_rt_threadid = 0;
static pthread_mutex_t g_future_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t g_future_pending_cv = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t gc_ok_m = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t gc_ok_c = PTHREAD_COND_INITIALIZER;
static int gc_not_ok;
extern THREAD_LOCAL unsigned long GC_gen0_alloc_page_ptr;
static void start_gc_not_ok();
static void end_gc_not_ok();
static THREAD_LOCAL future_t *current_ft;
//Stuff for scheme runstack
//Some of these may mimic defines in thread.c, but are redefined here
//to avoid making any changes to that file for now (moving anything out into common
@ -170,6 +182,38 @@ void dump_state(void)
}
#endif
/**********************************************************************/
/* Semaphore helpers */
/**********************************************************************/
typedef struct sema_t {
int ready;
pthread_mutex_t m;
pthread_cond_t c;
} sema_t;
#define SEMA_INITIALIZER { 0, PTHREAD_MUTEX_INITIALIZER, \
PTHREAD_COND_INITIALIZER }
static void sema_wait(sema_t *s)
{
pthread_mutex_lock(&s->m);
while (!s->ready) {
pthread_cond_wait(&s->c, &s->m);
}
--s->ready;
pthread_mutex_unlock(&s->m);
}
static void sema_signal(sema_t *s)
{
pthread_mutex_lock(&s->m);
s->ready++;
pthread_cond_signal(&s->c);
pthread_mutex_unlock(&s->m);
}
static sema_t ready_sema = SEMA_INITIALIZER;
/**********************************************************************/
/* Plumbing for MzScheme initialization */
@ -179,7 +223,6 @@ void dump_state(void)
//primitives known
void scheme_init_futures(Scheme_Env *env)
{
START_XFORM_SKIP;
Scheme_Object *v;
Scheme_Env *newenv;
@ -237,7 +280,8 @@ void scheme_init_futures(Scheme_Env *env)
scheme_finish_primitive_module(newenv);
scheme_protect_primitive_provide(newenv, NULL);
END_XFORM_SKIP;
REGISTER_SO(g_future_queue);
}
@ -255,15 +299,57 @@ void futures_init(void)
//'queue up' and wait for futures to become available
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, INITIAL_C_STACK_SIZE);
for (i = 0; i < THREAD_POOL_SIZE - 1; i++)
for (i = 0; i < THREAD_POOL_SIZE; i++)
{
pthread_create(&threadid, &attr, worker_thread_future_loop, NULL);
pthread_create(&threadid, &attr, worker_thread_future_loop, &i);
sema_wait(&ready_sema);
g_pool_threads[i] = threadid;
}
g_num_avail_threads = THREAD_POOL_SIZE;
}
static void start_gc_not_ok()
{
pthread_mutex_lock(&gc_ok_m);
gc_not_ok++;
pthread_mutex_unlock(&gc_ok_m);
}
static void end_gc_not_ok()
{
pthread_mutex_lock(&gc_ok_m);
--gc_not_ok;
pthread_cond_signal(&gc_ok_c);
pthread_mutex_unlock(&gc_ok_m);
}
void scheme_future_block_until_gc()
{
int i;
for (i = 0; i < THREAD_POOL_SIZE; i++) {
*(g_fuel_pointers[i]) = 0;
*(g_stack_boundary_pointers[i]) += INITIAL_C_STACK_SIZE;
}
asm("mfence");
pthread_mutex_lock(&gc_ok_m);
while (gc_not_ok) {
pthread_cond_wait(&gc_ok_c, &gc_ok_m);
}
pthread_mutex_unlock(&gc_ok_m);
}
void scheme_future_continue_after_gc()
{
int i;
for (i = 0; i < THREAD_POOL_SIZE; i++) {
*(g_fuel_pointers[i]) = 1;
*(g_stack_boundary_pointers[i]) -= INITIAL_C_STACK_SIZE;
}
}
/**********************************************************************/
/* Primitive implementations */
@ -312,8 +398,6 @@ void print_ms_and_us()
Scheme_Object *future(int argc, Scheme_Object *argv[])
{
START_XFORM_SKIP;
#ifdef DEBUG_FUTURES
LOG_THISCALL;
#endif
@ -368,7 +452,6 @@ Scheme_Object *future(int argc, Scheme_Object *argv[])
pthread_mutex_unlock(&g_future_queue_mutex);
return (Scheme_Object*)ft;
END_XFORM_SKIP;
}
@ -395,7 +478,6 @@ int future_ready(Scheme_Object *obj)
Scheme_Object *touch(int argc, Scheme_Object *argv[])
{
START_XFORM_SKIP;
Scheme_Object *retval = NULL;
void *rtcall_retval = NULL;
future_t *ft;
@ -472,7 +554,6 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
}
return retval;
END_XFORM_SKIP;
}
@ -484,6 +565,8 @@ void *worker_thread_future_loop(void *arg)
START_XFORM_SKIP;
Scheme_Object *v;
Scheme_Object* (*jitcode)(Scheme_Object*, int, Scheme_Object**);
future_t *ft;
int id = *(int *)arg;
//Set processor affinity
/*pthread_mutex_lock(&g_future_queue_mutex);
@ -498,22 +581,34 @@ void *worker_thread_future_loop(void *arg)
pthread_mutex_unlock(&g_future_queue_mutex);
*/
future_t *ft;
scheme_fuel_counter = 1;
scheme_jit_stack_boundary = ((unsigned long)&v) - INITIAL_C_STACK_SIZE;
g_fuel_pointers[id] = &scheme_scheme_fuel_counter;
g_stack_boundary_pointers[id] = &scheme_scheme_jit_stack_boundary;
GC_gen0_alloc_page_ptr = 1; /* weirdly, disables inline allocation */
REGISTER_SO(current_ft);
REGISTER_SO(scheme_current_runstack);
REGISTER_SO(scheme_current_runstack_start);
sema_signal(&ready_sema);
wait_for_work:
//LOG("Waiting for new future work...");
start_gc_not_ok();
pthread_mutex_lock(&g_future_queue_mutex);
while (!(ft = get_pending_future()))
{
end_gc_not_ok();
pthread_cond_wait(&g_future_pending_cv, &g_future_queue_mutex);
start_gc_not_ok();
}
LOG("Got a signal that a future is pending...");
//Work is available for this thread
scheme_fuel_counter = 1;
scheme_jit_stack_boundary = ((unsigned long)&v) - INITIAL_C_STACK_SIZE;
ft->pending = 0;
ft->status = RUNNING;
ft->threadid = pthread_self();
@ -532,6 +627,8 @@ void *worker_thread_future_loop(void *arg)
jitcode = (Scheme_Object* (*)(Scheme_Object*, int, Scheme_Object**))(ft->code);
pthread_mutex_unlock(&g_future_queue_mutex);
current_ft = ft;
//Run the code
//Passing no arguments for now.
//The lambda passed to a future will always be a parameterless
@ -543,6 +640,8 @@ void *worker_thread_future_loop(void *arg)
v = jitcode(ft->orig_lambda, 0, NULL);
LOG("Finished running JIT code at %p.\n", ft->code);
ft = current_ft;
//Set the return val in the descriptor
pthread_mutex_lock(&g_future_queue_mutex);
ft->work_completed = 1;
@ -553,6 +652,8 @@ void *worker_thread_future_loop(void *arg)
scheme_signal_received_at(g_signal_handle);
pthread_mutex_unlock(&g_future_queue_mutex);
end_gc_not_ok();
goto wait_for_work;
return NULL;
@ -570,7 +671,6 @@ int future_do_runtimecall(
void *args,
void *retval)
{
START_XFORM_SKIP;
future_t *future;
//If already running on the main thread
//or no future is involved, do nothing
@ -605,7 +705,9 @@ int future_do_runtimecall(
scheme_signal_received_at(g_signal_handle);
//Wait for the signal that the RT call is finished
start_gc_not_ok();
pthread_cond_wait(&future->can_continue_cv, &g_future_queue_mutex);
end_gc_not_ok();
//Clear rt call fields before releasing the lock on the descriptor
future->rt_prim = NULL;
@ -615,7 +717,6 @@ int future_do_runtimecall(
retval = future->rt_prim_retval;
pthread_mutex_unlock(&g_future_queue_mutex);
return 1;
END_XFORM_SKIP;
}
@ -869,11 +970,9 @@ void *invoke_rtcall(future_t *future)
future_t *enqueue_future(void)
{
START_XFORM_SKIP;
future_t *last = get_last_future();
future_t *ft = (future_t*)malloc(sizeof(future_t));
memset(ft, 0, sizeof(future_t));
ft->so.type = scheme_future_type;
future_t *ft = MALLOC_ONE_TAGGED(future_t);
ft->so.type = scheme_future_type;
if (NULL == last)
{
g_future_queue = ft;
@ -885,7 +984,6 @@ future_t *enqueue_future(void)
ft->next = NULL;
return ft;
END_XFORM_SKIP;
}
@ -906,7 +1004,9 @@ future_t *get_pending_future(void)
future_t *get_my_future(void)
{
START_XFORM_SKIP;
return get_future_by_threadid(pthread_self());
END_XFORM_SKIP;
}
@ -963,7 +1063,6 @@ future_t *get_future(int futureid)
future_t *get_last_future(void)
{
START_XFORM_SKIP;
future_t *ft = g_future_queue;
if (NULL == ft)
{
@ -976,7 +1075,6 @@ future_t *get_last_future(void)
}
return ft;
END_XFORM_SKIP;
}

View File

@ -524,6 +524,9 @@ extern int rtcall_obj_int_pobj_obj(
#define LOG_RTCALL_ENV_ENV_VOID(a,b)
#endif
void scheme_future_block_until_gc();
void scheme_future_continue_after_gc();
#ifdef UNIT_TEST
//These forwarding decls only need to be here to make

View File

@ -2225,9 +2225,9 @@ future {
future_t *f = (future_t *)p;
gcMARK(f->runstack);
gcMARK(f->runstack_start);
gcMARK(f->orig_thread);
gcMARK(f->orig_lambda);
gcMARK(f->rt_prim_args);
gcMARK(f->rt_prim_result);
gcMARK(f->rt_prim_retval);
gcMARK(f->retval);
gcMARK(f->prev);
gcMARK(f->next);

View File

@ -41,6 +41,9 @@
#include "schpriv.h"
#include "schmach.h"
#include "schgc.h"
#ifdef FUTURES_ENABLED
# include "future.h"
#endif
#ifndef PALMOS_STUFF
# include <time.h>
#endif
@ -7342,6 +7345,10 @@ static void get_ready_for_GC()
{
start_this_gc_time = scheme_get_process_milliseconds();
#ifdef FUTURES_ENABLED
scheme_future_block_until_gc();
#endif
scheme_zero_unneeded_rands(scheme_current_thread);
scheme_clear_modidx_cache();
@ -7409,6 +7416,10 @@ static void done_with_GC()
end_this_gc_time = scheme_get_process_milliseconds();
scheme_total_gc_time += (end_this_gc_time - start_this_gc_time);
#ifdef FUTURES_ENABLED
scheme_future_continue_after_gc();
#endif
}
#ifdef MZ_PRECISE_GC