From b7639e5a15eb59f97c8ab41103651d14c4342006 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Fri, 22 Apr 2011 15:58:36 -0600 Subject: [PATCH] clean up futures at place termination --- src/racket/src/env.c | 1 + src/racket/src/future.c | 67 +++++++++++++++++++++++++++++++++++++++- src/racket/src/schpriv.h | 1 + 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/racket/src/env.c b/src/racket/src/env.c index 6aee773ad8..0403d0c36b 100644 --- a/src/racket/src/env.c +++ b/src/racket/src/env.c @@ -586,6 +586,7 @@ Scheme_Env *scheme_place_instance_init(void *stack_base) { } void scheme_place_instance_destroy() { + scheme_end_futures_per_place(); #if defined(MZ_USE_PLACES) scheme_kill_green_thread_timer(); #endif diff --git a/src/racket/src/future.c b/src/racket/src/future.c index e919a7da98..92cb86ea34 100644 --- a/src/racket/src/future.c +++ b/src/racket/src/future.c @@ -249,6 +249,10 @@ void scheme_init_futures_per_place() { } +void scheme_end_futures_per_place() +{ +} + #else #include "future.h" @@ -383,6 +387,7 @@ typedef struct Scheme_Future_State { mzrt_sema *gc_done_c; int gc_not_ok, wait_for_gc, need_gc_ok_post, need_gc_done_post; + int abort_all_futures; int *gc_counter_ptr; @@ -394,6 +399,7 @@ typedef struct Scheme_Future_State { } Scheme_Future_State; typedef struct Scheme_Future_Thread_State { + mz_proc_thread *t; int id; int worker_gc_counter; mzrt_sema *worker_can_continue_sema; @@ -438,6 +444,7 @@ static Scheme_Object *_apply_future_lw(future_t *ft); static Scheme_Object *apply_future_lw(future_t *ft); static int fsemaphore_ready(Scheme_Object *obj); static void init_fevent(Fevent_Buffer *b); +static void free_fevent(Fevent_Buffer *b); READ_ONLY static int cpucount; static void init_cpucount(void); @@ -637,6 +644,7 @@ static void init_future_thread(Scheme_Future_State *fs, int i) GC_CAN_IGNORE future_thread_params_t params; Scheme_Thread *skeleton; Scheme_Object **runstack_start; + mz_proc_thread *t; /* Create the worker thread pool. These threads will 'queue up' and wait for futures to become available. */ @@ -676,9 +684,11 @@ static void init_future_thread(Scheme_Future_State *fs, int i) params.runstack_start = runstack_start; mzrt_sema_create(¶ms.ready_sema, 0); - mz_proc_thread_create_w_stacksize(worker_thread_future_loop, ¶ms, INITIAL_C_STACK_SIZE); + t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, ¶ms, INITIAL_C_STACK_SIZE); mzrt_sema_wait(params.ready_sema); mzrt_sema_destroy(params.ready_sema); + + fts->t = t; scheme_register_static(params.scheme_current_runstack_ptr, sizeof(void*)); scheme_register_static(params.scheme_current_runstack_start_ptr, sizeof(void*)); @@ -688,8 +698,53 @@ static void init_future_thread(Scheme_Future_State *fs, int i) fs->pool_threads[i] = fts; } +void scheme_end_futures_per_place() +{ + Scheme_Future_State *fs = scheme_future_state; + + if (fs) { + int i; + + mzrt_mutex_lock(fs->future_mutex); + fs->abort_all_futures = 1; + fs->wait_for_gc = 1; + mzrt_mutex_unlock(fs->future_mutex); + + /* post enough semas to ensure that every future + wakes up and tries to disable GC: */ + for (i = 0; i < fs->thread_pool_size; i++) { + if (fs->pool_threads[i]) { + mzrt_sema_post(fs->future_pending_sema); + mzrt_sema_post(fs->pool_threads[i]->worker_can_continue_sema); + } + } + + scheme_future_block_until_gc(); + + /* wait for all future threads to end: */ + for (i = 0; i < fs->thread_pool_size; i++) { + if (fs->pool_threads[i]) { + (void)mz_proc_thread_wait(fs->pool_threads[i]->t); + + free_fevent(&fs->pool_threads[i]->fevents1); + free_fevent(&fs->pool_threads[i]->fevents2); + + free(fs->pool_threads[i]); + } + } + + free_fevent(&fs->runtime_fevents); + + free(fs->pool_threads); + free(fs); + } +} + static void check_future_thread_creation(Scheme_Future_State *fs) { + if (!fs->future_threads_created && !fs->future_queue_count) + return; + if (fs->future_threads_created < fs->thread_pool_size) { int count; @@ -708,8 +763,10 @@ static void start_gc_not_ok(Scheme_Future_State *fs) /* must have mutex_lock */ { while (fs->wait_for_gc) { + int quit = fs->abort_all_futures; fs->need_gc_done_post++; mzrt_mutex_unlock(fs->future_mutex); + if (quit) mz_proc_thread_exit(NULL); mzrt_sema_wait(fs->gc_done_c); mzrt_mutex_lock(fs->future_mutex); } @@ -896,6 +953,14 @@ static void init_fevent(Fevent_Buffer *b) XFORM_SKIP_PROC memset(b->a, 0, FEVENT_BUFFER_SIZE * sizeof(Fevent)); } +static void free_fevent(Fevent_Buffer *b) +{ + if (b->a) { + free(b->a); + b->a = NULL; + } +} + static void record_fevent(int what, int fid) XFORM_SKIP_PROC /* call with the lock or in the runtime thread */ { diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 0e28d934d3..a50b1fee21 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -250,6 +250,7 @@ void scheme_init_places_once(); void scheme_init_futures(Scheme_Env *env); void scheme_init_futures_once(); void scheme_init_futures_per_place(); +void scheme_end_futures_per_place(); void scheme_init_print_buffers_places(void); void scheme_init_string_places(void);