fix some synchronization problems in futures
svn: r16916
This commit is contained in:
parent
d5f50056b1
commit
d2e5807811
|
@ -78,7 +78,6 @@ static pthread_cond_t g_future_pending_cv = PTHREAD_COND_INITIALIZER;
|
|||
|
||||
THREAD_LOCAL_DECL(static pthread_cond_t worker_can_continue_cv);
|
||||
|
||||
static pthread_mutex_t gc_ok_m = PTHREAD_MUTEX_INITIALIZER;
|
||||
static pthread_cond_t gc_ok_c = PTHREAD_COND_INITIALIZER;
|
||||
static pthread_cond_t gc_done_c = PTHREAD_COND_INITIALIZER;
|
||||
static int gc_not_ok, wait_for_gc;
|
||||
|
@ -99,8 +98,8 @@ static void register_traversers(void);
|
|||
#endif
|
||||
extern void scheme_on_demand_generate_lambda(Scheme_Native_Closure *nc, int argc, Scheme_Object **argv);
|
||||
|
||||
static void start_gc_not_ok(int with_lock);
|
||||
static void end_gc_not_ok(future_t *ft, int with_lock);
|
||||
static void start_gc_not_ok();
|
||||
static void end_gc_not_ok(future_t *ft);
|
||||
|
||||
static void future_do_runtimecall(void *func, int is_atomic);
|
||||
|
||||
|
@ -338,18 +337,14 @@ void futures_init(void)
|
|||
g_num_avail_threads = THREAD_POOL_SIZE;
|
||||
}
|
||||
|
||||
static void start_gc_not_ok(int with_lock)
|
||||
static void start_gc_not_ok()
|
||||
{
|
||||
if (with_lock)
|
||||
pthread_mutex_lock(&gc_ok_m);
|
||||
|
||||
while (wait_for_gc) {
|
||||
pthread_cond_wait(&gc_done_c, &gc_ok_m);
|
||||
pthread_cond_wait(&gc_done_c, &g_future_queue_mutex);
|
||||
}
|
||||
|
||||
gc_not_ok++;
|
||||
if (with_lock)
|
||||
pthread_mutex_unlock(&gc_ok_m);
|
||||
|
||||
#ifdef MZ_PRECISE_GC
|
||||
if (worker_gc_counter != *gc_counter_ptr) {
|
||||
GC_gen0_alloc_page_ptr = 0; /* forces future to ask for memory */
|
||||
|
@ -358,7 +353,7 @@ static void start_gc_not_ok(int with_lock)
|
|||
#endif
|
||||
}
|
||||
|
||||
static void end_gc_not_ok(future_t *ft, int with_lock)
|
||||
static void end_gc_not_ok(future_t *ft)
|
||||
{
|
||||
if (ft) {
|
||||
scheme_set_runstack_limits(ft->runstack_start,
|
||||
|
@ -369,21 +364,17 @@ static void end_gc_not_ok(future_t *ft, int with_lock)
|
|||
|
||||
/* FIXME: clear scheme_current_thread->ku.multiple.array ? */
|
||||
|
||||
if (with_lock)
|
||||
pthread_mutex_lock(&gc_ok_m);
|
||||
--gc_not_ok;
|
||||
pthread_cond_signal(&gc_ok_c);
|
||||
if (with_lock)
|
||||
pthread_mutex_unlock(&gc_ok_m);
|
||||
}
|
||||
|
||||
void scheme_future_block_until_gc()
|
||||
{
|
||||
int i;
|
||||
|
||||
pthread_mutex_lock(&gc_ok_m);
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
wait_for_gc = 1;
|
||||
pthread_mutex_unlock(&gc_ok_m);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
|
||||
for (i = 0; i < THREAD_POOL_SIZE; i++) {
|
||||
if (g_fuel_pointers[i] != NULL)
|
||||
|
@ -395,11 +386,11 @@ void scheme_future_block_until_gc()
|
|||
}
|
||||
asm("mfence");
|
||||
|
||||
pthread_mutex_lock(&gc_ok_m);
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
while (gc_not_ok) {
|
||||
pthread_cond_wait(&gc_ok_c, &gc_ok_m);
|
||||
pthread_cond_wait(&gc_ok_c, &g_future_queue_mutex);
|
||||
}
|
||||
pthread_mutex_unlock(&gc_ok_m);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
}
|
||||
|
||||
void scheme_future_continue_after_gc()
|
||||
|
@ -416,19 +407,21 @@ void scheme_future_continue_after_gc()
|
|||
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&gc_ok_m);
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
wait_for_gc = 0;
|
||||
pthread_cond_broadcast(&gc_done_c);
|
||||
pthread_mutex_unlock(&gc_ok_m);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
}
|
||||
|
||||
void scheme_future_gc_pause()
|
||||
/* Called in future thread */
|
||||
{
|
||||
pthread_mutex_lock(&gc_ok_m);
|
||||
end_gc_not_ok(current_ft, 0);
|
||||
start_gc_not_ok(0); /* waits until wait_for_gc is 0 */
|
||||
pthread_mutex_unlock(&gc_ok_m);
|
||||
future_t *future = current_ft;
|
||||
future->runstack = MZ_RUNSTACK;
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
end_gc_not_ok(future);
|
||||
start_gc_not_ok(); /* waits until wait_for_gc is 0 */
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
}
|
||||
|
||||
/**********************************************************************/
|
||||
|
@ -712,13 +705,13 @@ void *worker_thread_future_loop(void *arg)
|
|||
sema_signal(&ready_sema);
|
||||
|
||||
wait_for_work:
|
||||
start_gc_not_ok(1);
|
||||
pthread_mutex_lock(&g_future_queue_mutex);
|
||||
start_gc_not_ok();
|
||||
while (!(ft = get_pending_future()))
|
||||
{
|
||||
end_gc_not_ok(NULL, 1);
|
||||
end_gc_not_ok(NULL);
|
||||
pthread_cond_wait(&g_future_pending_cv, &g_future_queue_mutex);
|
||||
start_gc_not_ok(1);
|
||||
start_gc_not_ok();
|
||||
}
|
||||
|
||||
LOG("Got a signal that a future is pending...");
|
||||
|
@ -781,9 +774,9 @@ void *worker_thread_future_loop(void *arg)
|
|||
dequeue_future(ft);
|
||||
|
||||
scheme_signal_received_at(g_signal_handle);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
|
||||
end_gc_not_ok(NULL, 1);
|
||||
end_gc_not_ok(NULL);
|
||||
pthread_mutex_unlock(&g_future_queue_mutex);
|
||||
|
||||
goto wait_for_work;
|
||||
|
||||
|
@ -866,9 +859,9 @@ void future_do_runtimecall(void *func,
|
|||
|
||||
//Wait for the signal that the RT call is finished
|
||||
future->can_continue_cv = &worker_can_continue_cv;
|
||||
end_gc_not_ok(future, 1);
|
||||
end_gc_not_ok(future);
|
||||
pthread_cond_wait(&worker_can_continue_cv, &g_future_queue_mutex);
|
||||
start_gc_not_ok(1);
|
||||
start_gc_not_ok();
|
||||
|
||||
//Fetch the future instance again, in case the GC has moved the pointer
|
||||
future = current_ft;
|
||||
|
@ -990,7 +983,8 @@ static void do_invoke_rtcall(future_t *future)
|
|||
src = future->source_of_request;
|
||||
if (future->source_type == FSRC_RATOR) {
|
||||
int len;
|
||||
src = scheme_get_proc_name(future->arg_s0, &len, 1);
|
||||
if (SCHEME_PROCP(future->arg_s0))
|
||||
src = scheme_get_proc_name(future->arg_s0, &len, 1);
|
||||
} else if (future->source_type == FSRC_PRIM) {
|
||||
const char *src2;
|
||||
src2 = scheme_look_for_primitive(future->prim_func);
|
||||
|
|
|
@ -2246,6 +2246,8 @@ static int generate_pause_for_gc_and_retry(mz_jit_state *jitter,
|
|||
GC_CAN_IGNORE jit_insn *refslow = 0, *refpause;
|
||||
int i;
|
||||
|
||||
mz_rs_sync();
|
||||
|
||||
/* expose gc_reg to GC */
|
||||
mz_tl_sti_p(tl_jit_future_storage, gc_reg, JIT_R1);
|
||||
|
||||
|
@ -2265,6 +2267,7 @@ static int generate_pause_for_gc_and_retry(mz_jit_state *jitter,
|
|||
register back. */
|
||||
if (i == 1) {
|
||||
mz_patch_branch(refpause);
|
||||
JIT_UPDATE_THREAD_RSPTR_FOR_BRANCH_IF_NEEDED();
|
||||
jit_prepare(0);
|
||||
mz_finish(scheme_future_gc_pause);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user