fix some O(N) problems for N threads

Repairs include GC setup, thread finalization, and place
synchronization.
This commit is contained in:
Matthew Flatt 2012-08-10 11:49:47 -06:00
parent b5f0c97762
commit c89fa0cda7
7 changed files with 123 additions and 104 deletions

View File

@ -1046,6 +1046,8 @@ typedef struct Scheme_Thread {
Scheme_Object *dead_box; /* contains non-zero when the thread is dead */
Scheme_Object *running_box; /* contains pointer to thread when it's running */
struct Scheme_Thread *gc_prep_chain;
struct Scheme_Thread *nester, *nestee;
struct future_t *current_ft;

View File

@ -230,6 +230,7 @@ typedef struct Thread_Local_Variables {
struct Scheme_Thread *scheme_current_thread_;
struct Scheme_Thread *scheme_main_thread_;
struct Scheme_Thread *scheme_first_thread_;
struct Scheme_Thread *gc_prep_thread_chain_;
struct Scheme_Thread_Set *scheme_thread_set_top_;
struct Scheme_Current_LWC *scheme_current_lwc_;
int num_running_threads_;
@ -333,6 +334,7 @@ typedef struct Thread_Local_Variables {
int place_evts_array_size_;
struct Evt **place_evts_;
struct Scheme_Place_Object *place_object_;
struct Scheme_Place *all_child_places_;
struct Scheme_Object **reusable_ifs_stack_;
struct Scheme_Object *empty_self_shift_cache_;
struct Scheme_Bucket_Table *scheme_module_code_cache_;
@ -575,6 +577,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define scheme_current_thread XOA (scheme_get_thread_local_variables()->scheme_current_thread_)
#define scheme_main_thread XOA (scheme_get_thread_local_variables()->scheme_main_thread_)
#define scheme_first_thread XOA (scheme_get_thread_local_variables()->scheme_first_thread_)
#define gc_prep_thread_chain XOA (scheme_get_thread_local_variables()->gc_prep_thread_chain_)
#define scheme_thread_set_top XOA (scheme_get_thread_local_variables()->scheme_thread_set_top_)
#define scheme_current_lwc XOA (scheme_get_thread_local_variables()->scheme_current_lwc_)
#define num_running_threads XOA (scheme_get_thread_local_variables()->num_running_threads_)
@ -678,6 +681,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define place_evts_array_size XOA (scheme_get_thread_local_variables()->place_evts_array_size_)
#define place_evts XOA (scheme_get_thread_local_variables()->place_evts_)
#define place_object XOA (scheme_get_thread_local_variables()->place_object_)
#define all_child_places XOA (scheme_get_thread_local_variables()->all_child_places_)
#define reusable_ifs_stack XOA (scheme_get_thread_local_variables()->reusable_ifs_stack_)
#define empty_self_shift_cache XOA (scheme_get_thread_local_variables()->empty_self_shift_cache_)
#define scheme_module_code_cache XOA (scheme_get_thread_local_variables()->scheme_module_code_cache_)

View File

@ -57,6 +57,8 @@ static int place_val_MARK(void *p, struct NewGC *gc) {
gcMARK2(pr->mref, gc);
gcMARK2(pr->pumper_threads, gc);
gcMARK2(pr->place_obj, gc);
gcMARK2(pr->prev, gc);
gcMARK2(pr->next, gc);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place));
@ -68,6 +70,8 @@ static int place_val_FIXUP(void *p, struct NewGC *gc) {
gcFIXUP2(pr->mref, gc);
gcFIXUP2(pr->pumper_threads, gc);
gcFIXUP2(pr->place_obj, gc);
gcFIXUP2(pr->prev, gc);
gcFIXUP2(pr->next, gc);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place));

View File

@ -1497,6 +1497,8 @@ place_val {
gcMARK2(pr->mref, gc);
gcMARK2(pr->pumper_threads, gc);
gcMARK2(pr->place_obj, gc);
gcMARK2(pr->prev, gc);
gcMARK2(pr->next, gc);
size:
gcBYTES_TO_WORDS(sizeof(Scheme_Place));

View File

@ -45,6 +45,7 @@ static mzrt_mutex *id_counter_mutex;
SHARED_OK mz_proc_thread *scheme_master_proc_thread;
THREAD_LOCAL_DECL(static struct Scheme_Place_Object *place_object);
THREAD_LOCAL_DECL(static Scheme_Place *all_child_places);
THREAD_LOCAL_DECL(static uintptr_t force_gc_for_place_accounting);
static Scheme_Object *scheme_place(int argc, Scheme_Object *args[]);
static Scheme_Object *place_pumper_threads(int argc, Scheme_Object *args[]);
@ -144,6 +145,8 @@ void scheme_init_place(Scheme_Env *env)
PLACE_PRIM_W_ARITY("place-dead-evt", make_place_dead, 1, 1, plenv);
scheme_finish_primitive_module(plenv);
REGISTER_SO(all_child_places);
}
static Scheme_Object* scheme_place_enabled(int argc, Scheme_Object *args[]) {
@ -457,6 +460,11 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
place_data->ready = NULL;
place_data->place_obj = NULL;
place->next = all_child_places;
if (place->next)
place->next->prev = place;
all_child_places = place;
{
Scheme_Custodian_Reference *mref;
@ -528,6 +536,14 @@ static void do_place_kill(Scheme_Place *place)
}
scheme_remove_managed(place->mref, (Scheme_Object *)place);
if (place->next)
place->next->prev = place->prev;
if (place->prev)
place->prev->next = place->next;
else
all_child_places = place->next;
if (!refcount) {
destroy_place_object_locks(place_obj);
}
@ -2206,7 +2222,7 @@ static void *place_start_proc(void *data_arg) {
return rc;
}
void scheme_pause_one_place(Scheme_Place *p)
static void pause_one_place(Scheme_Place *p)
{
Scheme_Place_Object *place_obj = p->place_obj;
@ -2234,7 +2250,7 @@ static void resume_one_place_with_lock(Scheme_Place_Object *place_obj)
}
}
void scheme_resume_one_place(Scheme_Place *p)
static void resume_one_place(Scheme_Place *p)
{
Scheme_Place_Object *place_obj = p->place_obj;
@ -2245,6 +2261,24 @@ void scheme_resume_one_place(Scheme_Place *p)
}
}
static void pause_all_child_places()
{
Scheme_Place *p = all_child_places;
while (p) {
pause_one_place(p);
p = p->next;
}
}
static void resume_all_child_places()
{
Scheme_Place *p = all_child_places;
while (p) {
resume_one_place(p);
p = p->next;
}
}
void destroy_place_object_locks(Scheme_Place_Object *place_obj) {
mzrt_mutex_destroy(place_obj->lock);
if (place_obj->pause)
@ -2279,10 +2313,10 @@ void scheme_place_check_for_interruption()
mzrt_mutex_unlock(place_obj->lock);
if (local_pause) {
scheme_pause_all_places();
pause_all_child_places();
mzrt_sema_wait(local_pause);
mzrt_sema_destroy(local_pause);
scheme_resume_all_places();
resume_all_child_places();
} else
break;
}
@ -2311,7 +2345,7 @@ void scheme_place_set_memory_use(intptr_t mem_use)
custodian limits that will kill this place; pause this
place and its children to give the original place time
to kill this one */
scheme_pause_all_places();
pause_all_child_places();
mzrt_ensure_max_cas(place_obj->parent_need_gc, 1);
scheme_signal_received_at(place_obj->parent_signal_handle);
} else if (mem_use > (1 + place_obj->use_factor) * place_obj->prev_notify_memory_use) {
@ -2334,7 +2368,7 @@ void scheme_place_check_memory_use()
if (force_gc_for_place_accounting) {
force_gc_for_place_accounting = 0;
scheme_collect_garbage();
scheme_resume_all_places();
resume_all_child_places();
}
}
@ -2462,7 +2496,7 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
REGISTER_SO(place_object);
place_object = place_obj;
place_obj->refcount++;
{
void *signal_handle;
signal_handle = scheme_get_signal_handle();

View File

@ -3815,6 +3815,8 @@ typedef struct Scheme_Place {
struct GC_Thread_Info *gc_info; /* managed by the GC */
#endif
Scheme_Object *pumper_threads; /* Scheme_Vector of scheme threads */
struct Scheme_Place *prev, *next; /* keeping a list of child places */
} Scheme_Place;
typedef struct Scheme_Place_Object {
@ -3882,9 +3884,4 @@ void scheme_place_set_memory_use(intptr_t amt);
void scheme_place_check_memory_use();
void scheme_clear_place_ifs_stack();
void scheme_pause_all_places();
void scheme_pause_one_place(Scheme_Place *p);
void scheme_resume_all_places();
void scheme_resume_one_place(Scheme_Place *p);
#endif /* __mzscheme_private__ */

View File

@ -156,6 +156,7 @@ THREAD_LOCAL_DECL(static int buffer_init_size);
THREAD_LOCAL_DECL(Scheme_Thread *scheme_current_thread = NULL);
THREAD_LOCAL_DECL(Scheme_Thread *scheme_main_thread = NULL);
THREAD_LOCAL_DECL(Scheme_Thread *scheme_first_thread = NULL);
THREAD_LOCAL_DECL(static Scheme_Thread *gc_prep_thread_chain = NULL);
XFORM_NONGCING Scheme_Thread *scheme_get_current_thread() { return scheme_current_thread; }
XFORM_NONGCING intptr_t scheme_get_multiple_count() { return scheme_current_thread->ku.multiple.count; }
@ -292,10 +293,16 @@ typedef struct {
# define MALLOC_MREF() (Scheme_Custodian_Reference *)scheme_make_late_weak_box(NULL)
# define CUSTODIAN_FAM(x) ((Scheme_Custodian_Weak_Box *)x)->val
# define xCUSTODIAN_FAM(x) SCHEME_BOX_VAL(x)
# define SET_MREF_POSITION(mref, i) (((Scheme_Custodian_Weak_Box *)mref)->hash_key = (i & 0xFFFF))
# define EXTRACT_MREF_START_POSITION(mref, c) (((Scheme_Custodian_Weak_Box *)mref)->hash_key | ((c) & ~0xFFFF))
# define EXTRACT_MREF_POSITION_DELTA(mref, c) 0x10000
#else
# define MALLOC_MREF() MALLOC_ONE_WEAK(Scheme_Custodian_Reference)
# define CUSTODIAN_FAM(x) (*(x))
# define xCUSTODIAN_FAM(x) (*(x))
# define SET_MREF_POSITION(mref, i) /* empty */
# define EXTRACT_MREF_START_POSITION(mref, c) ((c)-1)
# define EXTRACT_MREF_POSITION_DELTA(mref, c) 1
#endif
typedef struct Proc_Global_Rec {
@ -891,7 +898,7 @@ static void add_managed_box(Scheme_Custodian *m,
Scheme_Object **box, Scheme_Custodian_Reference *mref,
Scheme_Close_Custodian_Client *f, void *data)
{
int i;
int i, saw = 0;
for (i = m->count; i--; ) {
if (!m->boxes[i]) {
@ -899,11 +906,16 @@ static void add_managed_box(Scheme_Custodian *m,
m->closers[i] = f;
m->data[i] = data;
m->mrefs[i] = mref;
SET_MREF_POSITION(mref, i);
m->elems++;
adjust_limit_table(m);
return;
} else {
saw++;
if (i + saw == m->elems)
break; /* no empty spaces left */
}
}
@ -913,6 +925,7 @@ static void add_managed_box(Scheme_Custodian *m,
m->closers[m->count] = f;
m->data[m->count] = data;
m->mrefs[m->count] = mref;
SET_MREF_POSITION(mref, m->count);
m->elems++;
adjust_limit_table(m);
@ -924,7 +937,7 @@ static void remove_managed(Scheme_Custodian_Reference *mr, Scheme_Object *o,
Scheme_Close_Custodian_Client **old_f, void **old_data)
{
Scheme_Custodian *m;
int i;
int i, delta;
if (!mr)
return;
@ -932,21 +945,27 @@ static void remove_managed(Scheme_Custodian_Reference *mr, Scheme_Object *o,
if (!m)
return;
for (i = m->count; i--; ) {
if (m->boxes[i] && SAME_OBJ((xCUSTODIAN_FAM(m->boxes[i])), o)) {
xCUSTODIAN_FAM(m->boxes[i]) = 0;
m->boxes[i] = NULL;
CUSTODIAN_FAM(m->mrefs[i]) = 0;
m->mrefs[i] = NULL;
if (old_f)
*old_f = m->closers[i];
if (old_data)
*old_data = m->data[i];
m->data[i] = NULL;
--m->elems;
adjust_limit_table(m);
break;
i = EXTRACT_MREF_START_POSITION(mr, m->count);
delta = EXTRACT_MREF_POSITION_DELTA(mr, m->count);
while (i >= 0) {
if (i < m->count) {
if (m->boxes[i] && SAME_OBJ((xCUSTODIAN_FAM(m->boxes[i])), o)) {
xCUSTODIAN_FAM(m->boxes[i]) = 0;
m->boxes[i] = NULL;
CUSTODIAN_FAM(m->mrefs[i]) = 0;
m->mrefs[i] = NULL;
if (old_f)
*old_f = m->closers[i];
if (old_data)
*old_data = m->data[i];
m->data[i] = NULL;
--m->elems;
adjust_limit_table(m);
break;
}
}
i -= delta;
}
while (m->count && !m->boxes[m->count - 1]) {
@ -1265,11 +1284,12 @@ Scheme_Thread *scheme_do_close_managed(Scheme_Custodian *m, Scheme_Exit_Closer_F
CUSTODIAN_FAM(m->mrefs[i]) = NULL;
/* Set m->count to i in case a GC happens while
the closer is running. If there's a GC, then
for_each_managed will be called. */
the closer is running. */
m->count = i;
if (is_thread && !the_thread) {
if (!o) {
/* weak link disappeared */
} else if (is_thread && !the_thread) {
/* Thread is already collected, so skip */
} else if (cf) {
cf(o, f, data);
@ -1357,53 +1377,6 @@ Scheme_Thread *scheme_do_close_managed(Scheme_Custodian *m, Scheme_Exit_Closer_F
return kill_self;
}
typedef void (*Scheme_For_Each_Func)(Scheme_Object *);
static void for_each_managed(Scheme_Type type, Scheme_For_Each_Func cf)
XFORM_SKIP_PROC
/* This function must not allocate. */
{
Scheme_Custodian *m;
int i;
if (SAME_TYPE(type, scheme_thread_type))
type = scheme_thread_hop_type;
/* back to front so children are first: */
m = last_custodian;
while (m) {
for (i = m->count; i--; ) {
if (m->boxes[i]) {
Scheme_Object *o;
o = xCUSTODIAN_FAM(m->boxes[i]);
if (SAME_TYPE(SCHEME_TYPE(o), type)) {
if (SAME_TYPE(type, scheme_thread_hop_type)) {
/* We've added an indirection and made it weak. See mr_hop note above. */
Scheme_Thread *t;
t = (Scheme_Thread *)WEAKIFIED(((Scheme_Thread_Custodian_Hop *)o)->p);
if (!t) {
/* The thread is already collected */
continue;
} else if (SAME_OBJ(t->mref, m->mrefs[i]))
o = (Scheme_Object *)t;
else {
/* The main custodian for this thread is someone else */
continue;
}
}
cf(o);
}
}
}
m = CUSTODIAN_FAM(m->global_prev);
}
}
static void do_close_managed(Scheme_Custodian *m)
/* The trick is that we may need to kill the thread
that is running us. If so, delay it to the very
@ -1483,30 +1456,6 @@ static Scheme_Object *extract_thread(Scheme_Object *o)
return (Scheme_Object *)WEAKIFIED(((Scheme_Thread_Custodian_Hop *)o)->p);
}
static void pause_place(Scheme_Object *o)
{
#ifdef MZ_USE_PLACES
scheme_pause_one_place((Scheme_Place *)o);
#endif
}
void scheme_pause_all_places()
{
for_each_managed(scheme_place_type, pause_place);
}
static void resume_place(Scheme_Object *o)
{
#ifdef MZ_USE_PLACES
scheme_resume_one_place((Scheme_Place *)o);
#endif
}
void scheme_resume_all_places()
{
for_each_managed(scheme_place_type, resume_place);
}
void scheme_init_custodian_extractors()
{
if (!extractors) {
@ -2053,6 +2002,9 @@ static Scheme_Thread *make_thread(Scheme_Config *config,
process->prev = NULL;
process->next = NULL;
gc_prep_thread_chain = process;
scheme_current_thread->gc_prep_chain = process;
process->suspend_break = 1; /* until start-up finished */
process->error_buf = NULL;
@ -2444,7 +2396,6 @@ static void do_swap_thread()
}
#endif
if (!swap_no_setjmp && SETJMP(scheme_current_thread)) {
/* We're back! */
/* See also initial swap in in start_child() */
@ -2537,6 +2488,10 @@ static void do_swap_thread()
#endif
scheme_current_thread = new_thread;
if (!new_thread->gc_prep_chain) {
new_thread->gc_prep_chain = gc_prep_thread_chain;
gc_prep_thread_chain = new_thread;
}
/* Fixup current pointers in thread sets */
if (!scheme_current_thread->return_marks_to) {
@ -3240,6 +3195,9 @@ Scheme_Object *scheme_call_as_nested_thread(int argc, Scheme_Object *argv[], voi
scheme_first_thread->prev = np;
scheme_first_thread = np;
np->gc_prep_chain = gc_prep_thread_chain;
gc_prep_thread_chain = np;
np->t_set_parent = p->t_set_parent;
schedule_in_set((Scheme_Object *)np, np->t_set_parent);
@ -8095,6 +8053,8 @@ static void prepare_thread_for_GC(Scheme_Object *t)
{
Scheme_Thread *p = (Scheme_Thread *)t;
if (!p->running) return;
/* zero ununsed part of env stack in each thread */
if (!p->nestee) {
@ -8267,7 +8227,20 @@ static void get_ready_for_GC()
}
#endif
for_each_managed(scheme_thread_type, prepare_thread_for_GC);
/* Prepare each thread that has run: */
if (gc_prep_thread_chain) {
Scheme_Thread *p, *next;
p = gc_prep_thread_chain;
while (p != p->gc_prep_chain) {
prepare_thread_for_GC((Scheme_Object *)p);
next = p->gc_prep_chain;
p->gc_prep_chain = NULL;
p = next;
}
prepare_thread_for_GC((Scheme_Object *)p);
p->gc_prep_chain = NULL;
gc_prep_thread_chain = NULL;
}
#ifdef MZ_PRECISE_GC
scheme_flush_stack_copy_cache();
@ -8318,6 +8291,9 @@ static void done_with_GC()
end_this_gc_real_time = scheme_get_inexact_milliseconds();
scheme_total_gc_time += (end_this_gc_time - start_this_gc_time);
gc_prep_thread_chain = scheme_current_thread;
scheme_current_thread->gc_prep_chain = scheme_current_thread;
run_gc_callbacks(0);
#ifdef MZ_USE_FUTURES