clean up futures at place termination
This commit is contained in:
parent
0754ad0114
commit
b7639e5a15
|
@ -586,6 +586,7 @@ Scheme_Env *scheme_place_instance_init(void *stack_base) {
|
|||
}
|
||||
|
||||
void scheme_place_instance_destroy() {
|
||||
scheme_end_futures_per_place();
|
||||
#if defined(MZ_USE_PLACES)
|
||||
scheme_kill_green_thread_timer();
|
||||
#endif
|
||||
|
|
|
@ -249,6 +249,10 @@ void scheme_init_futures_per_place()
|
|||
{
|
||||
}
|
||||
|
||||
void scheme_end_futures_per_place()
|
||||
{
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
#include "future.h"
|
||||
|
@ -383,6 +387,7 @@ typedef struct Scheme_Future_State {
|
|||
mzrt_sema *gc_done_c;
|
||||
|
||||
int gc_not_ok, wait_for_gc, need_gc_ok_post, need_gc_done_post;
|
||||
int abort_all_futures;
|
||||
|
||||
int *gc_counter_ptr;
|
||||
|
||||
|
@ -394,6 +399,7 @@ typedef struct Scheme_Future_State {
|
|||
} Scheme_Future_State;
|
||||
|
||||
typedef struct Scheme_Future_Thread_State {
|
||||
mz_proc_thread *t;
|
||||
int id;
|
||||
int worker_gc_counter;
|
||||
mzrt_sema *worker_can_continue_sema;
|
||||
|
@ -438,6 +444,7 @@ static Scheme_Object *_apply_future_lw(future_t *ft);
|
|||
static Scheme_Object *apply_future_lw(future_t *ft);
|
||||
static int fsemaphore_ready(Scheme_Object *obj);
|
||||
static void init_fevent(Fevent_Buffer *b);
|
||||
static void free_fevent(Fevent_Buffer *b);
|
||||
|
||||
READ_ONLY static int cpucount;
|
||||
static void init_cpucount(void);
|
||||
|
@ -637,6 +644,7 @@ static void init_future_thread(Scheme_Future_State *fs, int i)
|
|||
GC_CAN_IGNORE future_thread_params_t params;
|
||||
Scheme_Thread *skeleton;
|
||||
Scheme_Object **runstack_start;
|
||||
mz_proc_thread *t;
|
||||
|
||||
/* Create the worker thread pool. These threads will
|
||||
'queue up' and wait for futures to become available. */
|
||||
|
@ -676,9 +684,11 @@ static void init_future_thread(Scheme_Future_State *fs, int i)
|
|||
params.runstack_start = runstack_start;
|
||||
|
||||
mzrt_sema_create(¶ms.ready_sema, 0);
|
||||
mz_proc_thread_create_w_stacksize(worker_thread_future_loop, ¶ms, INITIAL_C_STACK_SIZE);
|
||||
t = mz_proc_thread_create_w_stacksize(worker_thread_future_loop, ¶ms, INITIAL_C_STACK_SIZE);
|
||||
mzrt_sema_wait(params.ready_sema);
|
||||
mzrt_sema_destroy(params.ready_sema);
|
||||
|
||||
fts->t = t;
|
||||
|
||||
scheme_register_static(params.scheme_current_runstack_ptr, sizeof(void*));
|
||||
scheme_register_static(params.scheme_current_runstack_start_ptr, sizeof(void*));
|
||||
|
@ -688,8 +698,53 @@ static void init_future_thread(Scheme_Future_State *fs, int i)
|
|||
fs->pool_threads[i] = fts;
|
||||
}
|
||||
|
||||
void scheme_end_futures_per_place()
|
||||
{
|
||||
Scheme_Future_State *fs = scheme_future_state;
|
||||
|
||||
if (fs) {
|
||||
int i;
|
||||
|
||||
mzrt_mutex_lock(fs->future_mutex);
|
||||
fs->abort_all_futures = 1;
|
||||
fs->wait_for_gc = 1;
|
||||
mzrt_mutex_unlock(fs->future_mutex);
|
||||
|
||||
/* post enough semas to ensure that every future
|
||||
wakes up and tries to disable GC: */
|
||||
for (i = 0; i < fs->thread_pool_size; i++) {
|
||||
if (fs->pool_threads[i]) {
|
||||
mzrt_sema_post(fs->future_pending_sema);
|
||||
mzrt_sema_post(fs->pool_threads[i]->worker_can_continue_sema);
|
||||
}
|
||||
}
|
||||
|
||||
scheme_future_block_until_gc();
|
||||
|
||||
/* wait for all future threads to end: */
|
||||
for (i = 0; i < fs->thread_pool_size; i++) {
|
||||
if (fs->pool_threads[i]) {
|
||||
(void)mz_proc_thread_wait(fs->pool_threads[i]->t);
|
||||
|
||||
free_fevent(&fs->pool_threads[i]->fevents1);
|
||||
free_fevent(&fs->pool_threads[i]->fevents2);
|
||||
|
||||
free(fs->pool_threads[i]);
|
||||
}
|
||||
}
|
||||
|
||||
free_fevent(&fs->runtime_fevents);
|
||||
|
||||
free(fs->pool_threads);
|
||||
free(fs);
|
||||
}
|
||||
}
|
||||
|
||||
static void check_future_thread_creation(Scheme_Future_State *fs)
|
||||
{
|
||||
if (!fs->future_threads_created && !fs->future_queue_count)
|
||||
return;
|
||||
|
||||
if (fs->future_threads_created < fs->thread_pool_size) {
|
||||
int count;
|
||||
|
||||
|
@ -708,8 +763,10 @@ static void start_gc_not_ok(Scheme_Future_State *fs)
|
|||
/* must have mutex_lock */
|
||||
{
|
||||
while (fs->wait_for_gc) {
|
||||
int quit = fs->abort_all_futures;
|
||||
fs->need_gc_done_post++;
|
||||
mzrt_mutex_unlock(fs->future_mutex);
|
||||
if (quit) mz_proc_thread_exit(NULL);
|
||||
mzrt_sema_wait(fs->gc_done_c);
|
||||
mzrt_mutex_lock(fs->future_mutex);
|
||||
}
|
||||
|
@ -896,6 +953,14 @@ static void init_fevent(Fevent_Buffer *b) XFORM_SKIP_PROC
|
|||
memset(b->a, 0, FEVENT_BUFFER_SIZE * sizeof(Fevent));
|
||||
}
|
||||
|
||||
static void free_fevent(Fevent_Buffer *b)
|
||||
{
|
||||
if (b->a) {
|
||||
free(b->a);
|
||||
b->a = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void record_fevent(int what, int fid) XFORM_SKIP_PROC
|
||||
/* call with the lock or in the runtime thread */
|
||||
{
|
||||
|
|
|
@ -250,6 +250,7 @@ void scheme_init_places_once();
|
|||
void scheme_init_futures(Scheme_Env *env);
|
||||
void scheme_init_futures_once();
|
||||
void scheme_init_futures_per_place();
|
||||
void scheme_end_futures_per_place();
|
||||
|
||||
void scheme_init_print_buffers_places(void);
|
||||
void scheme_init_string_places(void);
|
||||
|
|
Loading…
Reference in New Issue
Block a user