svn: r16841
This commit is contained in:
parent
e333bf9ffc
commit
8d6705dde4
|
@ -18,17 +18,18 @@ int g_print_prims = 0;
|
|||
|
||||
extern void *on_demand_jit_code;
|
||||
|
||||
#define THREAD_POOL_SIZE 2
|
||||
#define THREAD_POOL_SIZE 2
|
||||
#define INITIAL_C_STACK_SIZE 500000
|
||||
static pthread_t g_pool_threads[THREAD_POOL_SIZE];
|
||||
static int g_num_avail_threads = 0;
|
||||
static unsigned long g_cur_cpu_mask = 1;
|
||||
static void *g_signal_handle = NULL;
|
||||
|
||||
future_t *g_future_queue = NULL;
|
||||
int g_next_futureid = 0;
|
||||
pthread_t g_rt_threadid = 0;
|
||||
|
||||
static pthread_mutex_t g_future_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
static pthread_mutex_t g_future_pending_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
static pthread_cond_t g_future_pending_cv = PTHREAD_COND_INITIALIZER;
|
||||
|
||||
//Stuff for scheme runstack
|
||||
|
@ -246,13 +247,17 @@ void futures_init(void)
|
|||
{
|
||||
int i;
|
||||
pthread_t threadid;
|
||||
GC_CAN_IGNORE pthread_attr_t attr;
|
||||
g_rt_threadid = pthread_self();
|
||||
g_signal_handle = scheme_get_signal_handle();
|
||||
|
||||
//Create the worker thread pool. These threads will
|
||||
//'queue up' and wait for futures to become available
|
||||
//'queue up' and wait for futures to become available
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setstacksize(&attr, INITIAL_C_STACK_SIZE);
|
||||
for (i = 0; i < THREAD_POOL_SIZE - 1; i++)
|
||||
{
|
||||
pthread_create(&threadid, NULL, worker_thread_future_loop, NULL);
|
||||
pthread_create(&threadid, &attr, worker_thread_future_loop, NULL);
|
||||
g_pool_threads[i] = threadid;
|
||||
}
|
||||
|
||||
|
@ -311,7 +316,6 @@ Scheme_Object *future(int argc, Scheme_Object *argv[])
|
|||
|
||||
#ifdef DEBUG_FUTURES
|
||||
LOG_THISCALL;
|
||||
dump_state();
|
||||
#endif
|
||||
|
||||
int init_runstack_size, main_runstack_size;
|
||||
|
@ -353,23 +357,15 @@ Scheme_Object *future(int argc, Scheme_Object *argv[])
|
|||
//when generating code
|
||||
if (ncd->code == on_demand_jit_code)
|
||||
{
|
||||
printf("JIT compiling code.\n");
|
||||
scheme_on_demand_generate_lambda(nc, 0, NULL);
|
||||
printf("Code pointer is now %p, and on_demand_jit_code is %p.\n", ncd->code, on_demand_jit_code);
|
||||
}
|
||||
else
|
||||
{
|
||||
printf("Code pointer was %p, and on_demand_jit_code was %p. Code is already JIT compiled.\n", ncd->code, on_demand_jit_code);
|
||||
}
|
||||
|
||||
//pthread_mutex_lock(&g_future_queue_mutex);
|
||||
ft->code = (void*)ncd->code;
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
|
||||
//Signal that a future is pending
|
||||
pthread_mutex_lock(&g_future_pending_mutex);
|
||||
pthread_cond_signal(&g_future_pending_cv);
|
||||
pthread_mutex_unlock(&g_future_pending_mutex);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
|
||||
return scheme_make_integer(futureid);
|
||||
END_XFORM_SKIP;
|
||||
|
@ -382,6 +378,21 @@ Scheme_Object *num_processors(int argc, Scheme_Object *argv[])
|
|||
}
|
||||
|
||||
|
||||
int future_ready(Scheme_Object *obj)
|
||||
{
|
||||
int ret = 0;
|
||||
future_t *ft = (future_t*)obj;
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
if (ft->work_completed || ft->rt_prim != NULL)
|
||||
{
|
||||
ret = 1;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
Scheme_Object *touch(int argc, Scheme_Object *argv[])
|
||||
{
|
||||
START_XFORM_SKIP;
|
||||
|
@ -395,6 +406,16 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
|
|||
#ifdef DEBUG_FUTURES
|
||||
LOG("touch (future %d)", futureid);
|
||||
dump_state();
|
||||
|
||||
if (SCHEME_INTP(argv[0]))
|
||||
{
|
||||
printf("Future id passed to touch was %d, is a Scheme integer.\n", futureid);
|
||||
}
|
||||
else
|
||||
{
|
||||
printf("Arg passed to touch was a %d.\n", SCHEME_TYPE(argv[0]));
|
||||
}
|
||||
fflush(stdout);
|
||||
#endif
|
||||
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
|
@ -404,11 +425,15 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
|
|||
//Spin waiting for primitive calls or a return value from
|
||||
//the worker thread
|
||||
wait_for_rtcall_or_completion:
|
||||
scheme_block_until(future_ready, NULL, (Scheme_Object*)ft, 0);
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
if (ft->work_completed)
|
||||
{
|
||||
retval = ft->retval;
|
||||
|
||||
printf("Successfully touched future %d\n", ft->id);
|
||||
fflush(stdout);
|
||||
|
||||
//Destroy the future descriptor
|
||||
if (ft->prev == NULL)
|
||||
{
|
||||
|
@ -453,10 +478,6 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
|
|||
pthread_cond_signal(&ft->can_continue_cv);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
|
||||
#ifdef DEBUG_FUTURES
|
||||
dump_state();
|
||||
#endif
|
||||
|
||||
goto wait_for_rtcall_or_completion;
|
||||
}
|
||||
else
|
||||
|
@ -480,7 +501,7 @@ void *worker_thread_future_loop(void *arg)
|
|||
Scheme_Object* (*jitcode)(Scheme_Object*, int, Scheme_Object**);
|
||||
|
||||
//Set processor affinity
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
/*pthread_mutex_lock(&g_future_queue_mutex);
|
||||
if (pthread_setaffinity_np(pthread_self(), sizeof(g_cur_cpu_mask), &g_cur_cpu_mask))
|
||||
{
|
||||
printf(
|
||||
|
@ -490,17 +511,24 @@ void *worker_thread_future_loop(void *arg)
|
|||
}
|
||||
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
*/
|
||||
|
||||
future_t *ft;
|
||||
wait_for_work:
|
||||
//LOG("Waiting for new future work...");
|
||||
pthread_mutex_lock(&g_future_pending_mutex);
|
||||
pthread_cond_wait(&g_future_pending_cv, &g_future_pending_mutex);
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
while (!(ft = get_pending_future()))
|
||||
{
|
||||
pthread_cond_wait(&g_future_pending_cv, &g_future_queue_mutex);
|
||||
}
|
||||
|
||||
//LOG("Got a signal that a future is pending...");
|
||||
LOG("Got a signal that a future is pending...");
|
||||
|
||||
//Work is available for this thread
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
future_t *ft = get_pending_future();
|
||||
|
||||
scheme_fuel_counter = 1;
|
||||
scheme_jit_stack_boundary = ((unsigned long)&v) - INITIAL_C_STACK_SIZE;
|
||||
|
||||
ft->pending = 0;
|
||||
ft->status = RUNNING;
|
||||
ft->threadid = pthread_self();
|
||||
|
@ -518,7 +546,6 @@ void *worker_thread_future_loop(void *arg)
|
|||
|
||||
jitcode = (Scheme_Object* (*)(Scheme_Object*, int, Scheme_Object**))(ft->code);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
pthread_mutex_unlock(&g_future_pending_mutex);
|
||||
|
||||
//Run the code
|
||||
//Passing no arguments for now.
|
||||
|
@ -538,6 +565,7 @@ void *worker_thread_future_loop(void *arg)
|
|||
|
||||
//Update the status
|
||||
ft->status = FINISHED;
|
||||
scheme_signal_received_at(g_signal_handle);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
|
||||
goto wait_for_work;
|
||||
|
@ -557,8 +585,8 @@ int future_do_runtimecall(
|
|||
void *args,
|
||||
void *retval)
|
||||
{
|
||||
START_XFORM_SKIP;
|
||||
future_t *future;
|
||||
START_XFORM_SKIP;
|
||||
future_t *future;
|
||||
//If already running on the main thread
|
||||
//or no future is involved, do nothing
|
||||
//and return FALSE
|
||||
|
@ -589,6 +617,8 @@ int future_do_runtimecall(
|
|||
//Update the future's status to waiting
|
||||
future->status = WAITING_FOR_PRIM;
|
||||
|
||||
scheme_signal_received_at(g_signal_handle);
|
||||
|
||||
//Wait for the signal that the RT call is finished
|
||||
pthread_cond_wait(&future->can_continue_cv, &g_future_queue_mutex);
|
||||
|
||||
|
@ -662,7 +692,14 @@ int rtcall_obj_int_pobj_obj(
|
|||
|
||||
#ifdef DEBUG_FUTURES
|
||||
//debug_save_context();
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
printf("scheme_fuel_counter = %d\n", scheme_fuel_counter);
|
||||
printf("scheme_jit_stack_boundary = %p\n", scheme_jit_stack_boundary);
|
||||
printf("scheme_current_runstack = %p\n", scheme_current_runstack);
|
||||
printf("scheme_current_runstack_start = %p\n", scheme_current_runstack_start);
|
||||
printf("stack address = %p\n", &future);
|
||||
|
||||
data.prim = f;
|
||||
data.a = a;
|
||||
|
|
|
@ -478,7 +478,7 @@ extern int rtcall_obj_int_pobj_obj(
|
|||
|
||||
#endif
|
||||
|
||||
#ifdef LOG_ARGS
|
||||
#if 1
|
||||
#define LOG(a...) do { fprintf(stderr, "%x:%s:%s:%d ", (unsigned) pthread_self(), __FILE__, __FUNCTION__, __LINE__); fprintf(stderr, a); fprintf(stderr, "\n"); fflush(stdout); } while(0)
|
||||
#define LOG_THISCALL LOG(__FUNCTION__)
|
||||
|
||||
|
|
|
@ -2201,6 +2201,21 @@ static Scheme_Object *ts_scheme_apply_from_native(Scheme_Object *rator, int argc
|
|||
return _scheme_apply_from_native(rator, argc, argv);
|
||||
}
|
||||
|
||||
static Scheme_Object *ts_scheme_tail_apply_from_native(Scheme_Object *rator, int argc, Scheme_Object **argv)
|
||||
{
|
||||
/* RTCALL_OBJ_INT_POBJ_OBJ(_scheme_tail_apply_from_native, rator, argc, argv); */
|
||||
Scheme_Object *retptr;
|
||||
if (rtcall_obj_int_pobj_obj(_scheme_tail_apply_from_native,
|
||||
rator,
|
||||
argc,
|
||||
argv,
|
||||
&retptr)) {
|
||||
return retptr;
|
||||
}
|
||||
|
||||
return _scheme_tail_apply_from_native(rator, argc, argv);
|
||||
}
|
||||
|
||||
static void ts_on_demand(void)
|
||||
{
|
||||
/* RTCALL_VOID_VOID(on_demand); */
|
||||
|
@ -2400,7 +2415,7 @@ static int generate_finish_tail_call(mz_jit_state *jitter, int direct_native)
|
|||
if (direct_native > 1) { /* => some_args_already_in_place */
|
||||
(void)mz_finish(_scheme_tail_apply_from_native_fixup_args);
|
||||
} else {
|
||||
(void)mz_finish(_scheme_tail_apply_from_native);
|
||||
(void)mz_finish(ts_scheme_tail_apply_from_native);
|
||||
}
|
||||
CHECK_LIMIT();
|
||||
/* Return: */
|
||||
|
|
|
@ -68,7 +68,7 @@ MZ_EXTERN Scheme_Object *scheme_current_break_cell();
|
|||
/*========================================================================*/
|
||||
|
||||
#ifndef LINK_EXTENSIONS_BY_TABLE
|
||||
# ifndef MZ_USE_PLACES
|
||||
# if !defined(MZ_USE_PLACES) || !defined(FUTURES_ENABLED)
|
||||
MZ_EXTERN THREAD_LOCAL Scheme_Thread *scheme_current_thread;
|
||||
# endif
|
||||
MZ_EXTERN THREAD_LOCAL volatile int scheme_fuel_counter;
|
||||
|
|
Loading…
Reference in New Issue
Block a user