Places Async Channels

svn: r17637
This commit is contained in:
Kevin Tew 2010-01-13 19:16:54 +00:00
parent c8b8775805
commit 08fa44801c
6 changed files with 291 additions and 81 deletions

View File

@ -3347,12 +3347,16 @@ static int place_val_SIZE(void *p) {
static int place_val_MARK(void *p) { static int place_val_MARK(void *p) {
Scheme_Place *pr = (Scheme_Place *)p; Scheme_Place *pr = (Scheme_Place *)p;
gcMARK(pr->channel);
return return
gcBYTES_TO_WORDS(sizeof(Scheme_Place)); gcBYTES_TO_WORDS(sizeof(Scheme_Place));
} }
static int place_val_FIXUP(void *p) { static int place_val_FIXUP(void *p) {
Scheme_Place *pr = (Scheme_Place *)p; Scheme_Place *pr = (Scheme_Place *)p;
gcFIXUP(pr->channel);
return return
gcBYTES_TO_WORDS(sizeof(Scheme_Place)); gcBYTES_TO_WORDS(sizeof(Scheme_Place));
} }
@ -3361,6 +3365,35 @@ static int place_val_FIXUP(void *p) {
#define place_val_IS_CONST_SIZE 1 #define place_val_IS_CONST_SIZE 1
static int place_async_channel_val_SIZE(void *p) {
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel));
}
static int place_async_channel_val_MARK(void *p) {
Scheme_Place_Async_Channel *pac = (Scheme_Place_Async_Channel *)p;
int i;
for (i = pac->size; i--; )
gcMARK(pac->msgs[i]);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel));
}
static int place_async_channel_val_FIXUP(void *p) {
Scheme_Place_Async_Channel *pac = (Scheme_Place_Async_Channel *)p;
int i;
for (i = pac->size; i--; )
gcFIXUP(pac->msgs[i]);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel));
}
#define place_async_channel_val_IS_ATOMIC 0
#define place_async_channel_val_IS_CONST_SIZE 1
#endif /* PLACES */ #endif /* PLACES */
/**********************************************************************/ /**********************************************************************/

View File

@ -1357,10 +1357,23 @@ START places;
place_val { place_val {
mark: mark:
Scheme_Place *pr = (Scheme_Place *)p; Scheme_Place *pr = (Scheme_Place *)p;
gcMARK(pr->channel);
size: size:
gcBYTES_TO_WORDS(sizeof(Scheme_Place)); gcBYTES_TO_WORDS(sizeof(Scheme_Place));
} }
place_async_channel_val {
mark:
Scheme_Place_Async_Channel *pac = (Scheme_Place_Async_Channel *)p;
int i;
for (i = pac->size; i--; )
gcMARK(pac->msgs[i]);
size:
gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel));
}
END places; END places;
/**********************************************************************/ /**********************************************************************/

View File

@ -10,12 +10,17 @@
SHARED_OK mz_proc_thread *scheme_master_proc_thread; SHARED_OK mz_proc_thread *scheme_master_proc_thread;
THREAD_LOCAL_DECL(mz_proc_thread *proc_thread_self); THREAD_LOCAL_DECL(mz_proc_thread *proc_thread_self);
Scheme_Object *scheme_place(int argc, Scheme_Object *args[]); Scheme_Object *scheme_place(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_wait(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_wait(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so); static Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so);
static Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]);
Scheme_Object *scheme_place_async_channel_create();
void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o);
Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch);
# ifdef MZ_PRECISE_GC # ifdef MZ_PRECISE_GC
static void register_traversers(void); static void register_traversers(void);
@ -55,10 +60,12 @@ void scheme_init_place(Scheme_Env *env)
plenv = scheme_primitive_module(scheme_intern_symbol("#%place"), env); plenv = scheme_primitive_module(scheme_intern_symbol("#%place"), env);
PLACE_PRIM_W_ARITY("place", scheme_place, 3, 3, plenv); PLACE_PRIM_W_ARITY("place", scheme_place, 1, 3, plenv);
PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv); PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-ch-send", scheme_place_send, 1, 2, plenv);
PLACE_PRIM_W_ARITY("place-ch-recv", scheme_place_recv, 1, 1, plenv);
scheme_finish_primitive_module(plenv); scheme_finish_primitive_module(plenv);
} }
@ -110,12 +117,26 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
/* pass critical info to new place */ /* pass critical info to new place */
place_data = MALLOC_ONE(Place_Start_Data); place_data = MALLOC_ONE(Place_Start_Data);
place_data->module = args[0];
place_data->function = args[1];
place_data->channel = args[2];
place_data->ready = ready; place_data->ready = ready;
if (argc == 2 || argc == 3 ) {
Scheme_Object *channel;
place_data->module = args[0];
place_data->function = args[1];
place_data->ready = ready;
if (argc == 2) {
channel = scheme_place_async_channel_create();
}
else {
channel = args[2];
}
place_data->channel = channel;
place->channel = channel;
}
else {
scheme_wrong_count_m("place", 1, 2, argc, args, 0);
}
collection_paths = scheme_current_library_collection_paths(0, NULL); collection_paths = scheme_current_library_collection_paths(0, NULL);
collection_paths = scheme_places_deep_copy_in_master(collection_paths); collection_paths = scheme_places_deep_copy_in_master(collection_paths);
place_data->current_library_collection_paths = collection_paths; place_data->current_library_collection_paths = collection_paths;
@ -460,6 +481,28 @@ Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so) {
return so; return so;
} }
Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
if (argc == 2) {
Scheme_Object *mso;
mso = scheme_places_deep_copy_in_master(args[1]);
scheme_place_async_send((Scheme_Place_Async_Channel *) args[0], mso);
}
else {
scheme_wrong_count_m("place-ch-send", 1, 2, argc, args, 0);
}
return scheme_true;
}
Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) {
if (argc == 1) {
return scheme_place_async_recv((Scheme_Place_Async_Channel *) args[0]);
}
else {
scheme_wrong_count_m("place-ch-recv", 1, 2, argc, args, 0);
}
return scheme_true;
}
#ifdef MZ_PRECISE_GC #ifdef MZ_PRECISE_GC
static void* scheme_master_place_handlemsg(int msg_type, void *msg_payload); static void* scheme_master_place_handlemsg(int msg_type, void *msg_payload);
@ -499,9 +542,13 @@ void* scheme_master_fast_path(int msg_type, void *msg_payload) {
Scheme_Object *o; Scheme_Object *o;
void *original_gc; void *original_gc;
#ifdef MZ_PRECISE_GC
original_gc = GC_switch_to_master_gc(); original_gc = GC_switch_to_master_gc();
#endif
o = scheme_master_place_handlemsg(msg_type, msg_payload); o = scheme_master_place_handlemsg(msg_type, msg_payload);
#ifdef MZ_PRECISE_GC
GC_switch_back_from_master(original_gc); GC_switch_back_from_master(original_gc);
#endif
return o; return o;
} }
@ -515,8 +562,102 @@ void scheme_spawn_master_place() {
scheme_master_proc_thread = (void*) ~0; scheme_master_proc_thread = (void*) ~0;
} }
#endif #endif
/*========================================================================*/
/* places async channels */
/*========================================================================*/
static void* GC_master_malloc(size_t size) {
void *ptr;
#ifdef MZ_PRECISE_GC
void *original_gc;
original_gc = GC_switch_to_master_gc();
#endif
ptr = GC_malloc(size);
#ifdef MZ_PRECISE_GC
GC_switch_back_from_master(original_gc);
#endif
return ptr;
}
Scheme_Object *scheme_place_async_channel_create() {
Scheme_Object **msgs;
Scheme_Place_Async_Channel *ch;
ch = GC_master_malloc(sizeof(Scheme_Place_Async_Channel));
msgs = GC_master_malloc(sizeof(Scheme_Object*) * 8);
ch->so.type = scheme_place_async_channel_type;
ch->in = 0;
ch->out = 0;
ch->count = 0;
ch->size = 8;
mzrt_mutex_create(&ch->lock);
ch->msgs = msgs;
ch->wakeup_signal = NULL;
return (Scheme_Object *)ch;
}
void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o) {
int cnt;
mzrt_mutex_lock(ch->lock);
{
cnt = ch->count;
if (ch->count == ch->size) { /* GROW QUEUE */
Scheme_Object **new_msgs;
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) *2);
if (ch->out < ch->in) {
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out));
}
else {
int s1 = (ch->size - ch->out);
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1);
memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in);
}
ch->msgs = new_msgs;
ch->in = ch->size;
ch->out = 0;
ch->size *= 2;
}
ch->msgs[ch->in] = o;
++ch->count;
ch->in = (++ch->in % ch->size);
}
mzrt_mutex_unlock(ch->lock);
if (!cnt && ch->wakeup_signal) {
/*wake up possibly sleeping receiver */
scheme_signal_received_at(ch->wakeup_signal);
}
}
Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) {
Scheme_Object *msg = NULL;
while(1) {
mzrt_mutex_lock(ch->lock);
{
if (ch->count > 0) { /* GET MSG */
msg = ch->msgs[ch->out];
ch->msgs[ch->out] = NULL;
--ch->count;
ch->out = (++ch->out % ch->size);
}
}
mzrt_mutex_unlock(ch->lock);
if(msg) break;
scheme_thread_block(0);
scheme_block_until(NULL, NULL, NULL, 0);
}
return msg;
}
/*========================================================================*/ /*========================================================================*/
/* precise GC traversers */ /* precise GC traversers */
/*========================================================================*/ /*========================================================================*/
@ -531,6 +672,7 @@ START_XFORM_SKIP;
static void register_traversers(void) static void register_traversers(void)
{ {
GC_REG_TRAV(scheme_place_type, place_val); GC_REG_TRAV(scheme_place_type, place_val);
GC_REG_TRAV(scheme_place_async_channel_type, place_async_channel_val);
} }
END_XFORM_SKIP; END_XFORM_SKIP;

View File

@ -3366,8 +3366,27 @@ Scheme_Object *scheme_places_deep_copy(Scheme_Object *so);
typedef struct Scheme_Place { typedef struct Scheme_Place {
Scheme_Object so; Scheme_Object so;
void *proc_thread; void *proc_thread;
Scheme_Object *channel;
} Scheme_Place; } Scheme_Place;
typedef struct Scheme_Place_Async_Channel {
Scheme_Object so;
int in;
int out;
int count;
int size;
#if defined(MZ_USE_PLACES)
mzrt_mutex *lock;
#endif
Scheme_Object **msgs;
void *wakeup_signal;
} Scheme_Place_Async_Channel;
Scheme_Object *scheme_place_async_channel_create();
void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o);
Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch);
Scheme_Env *scheme_place_instance_init(); Scheme_Env *scheme_place_instance_init();
void scheme_place_instance_destroy(); void scheme_place_instance_destroy();
void scheme_kill_green_thread_timer(); void scheme_kill_green_thread_timer();

View File

@ -173,83 +173,85 @@ enum {
scheme_future_type, /* 155 */ scheme_future_type, /* 155 */
scheme_flvector_type, /* 156 */ scheme_flvector_type, /* 156 */
scheme_place_type, /* 157 */ scheme_place_type, /* 157 */
scheme_engine_type, /* 158 */ scheme_place_async_channel_type, /* 158 */
scheme_once_used_type, /* 159 */ scheme_engine_type, /* 159 */
scheme_once_used_type, /* 160 */
#ifdef MZTAG_REQUIRED #ifdef MZTAG_REQUIRED
_scheme_last_normal_type_, /* 160 */ _scheme_last_normal_type_, /* 161 */
scheme_rt_weak_array, /* 161 */ scheme_rt_weak_array, /* 162 */
scheme_rt_comp_env, /* 162 */ scheme_rt_comp_env, /* 163 */
scheme_rt_constant_binding, /* 163 */ scheme_rt_constant_binding, /* 164 */
scheme_rt_resolve_info, /* 164 */ scheme_rt_resolve_info, /* 165 */
scheme_rt_optimize_info, /* 165 */ scheme_rt_optimize_info, /* 166 */
scheme_rt_compile_info, /* 166 */ scheme_rt_compile_info, /* 167 */
scheme_rt_cont_mark, /* 167 */ scheme_rt_cont_mark, /* 168 */
scheme_rt_saved_stack, /* 168 */ scheme_rt_saved_stack, /* 169 */
scheme_rt_reply_item, /* 169 */ scheme_rt_reply_item, /* 170 */
scheme_rt_closure_info, /* 170 */ scheme_rt_closure_info, /* 171 */
scheme_rt_overflow, /* 171 */ scheme_rt_overflow, /* 172 */
scheme_rt_overflow_jmp, /* 172 */ scheme_rt_overflow_jmp, /* 173 */
scheme_rt_meta_cont, /* 173 */ scheme_rt_meta_cont, /* 174 */
scheme_rt_dyn_wind_cell, /* 174 */ scheme_rt_dyn_wind_cell, /* 175 */
scheme_rt_dyn_wind_info, /* 175 */ scheme_rt_dyn_wind_info, /* 176 */
scheme_rt_dyn_wind, /* 176 */ scheme_rt_dyn_wind, /* 177 */
scheme_rt_dup_check, /* 177 */ scheme_rt_dup_check, /* 178 */
scheme_rt_thread_memory, /* 178 */ scheme_rt_thread_memory, /* 179 */
scheme_rt_input_file, /* 179 */ scheme_rt_input_file, /* 180 */
scheme_rt_input_fd, /* 180 */ scheme_rt_input_fd, /* 181 */
scheme_rt_oskit_console_input, /* 181 */ scheme_rt_oskit_console_input, /* 182 */
scheme_rt_tested_input_file, /* 182 */ scheme_rt_tested_input_file, /* 183 */
scheme_rt_tested_output_file, /* 183 */ scheme_rt_tested_output_file, /* 184 */
scheme_rt_indexed_string, /* 184 */ scheme_rt_indexed_string, /* 185 */
scheme_rt_output_file, /* 185 */ scheme_rt_output_file, /* 186 */
scheme_rt_load_handler_data, /* 186 */ scheme_rt_load_handler_data, /* 187 */
scheme_rt_pipe, /* 187 */ scheme_rt_pipe, /* 188 */
scheme_rt_beos_process, /* 188 */ scheme_rt_beos_process, /* 189 */
scheme_rt_system_child, /* 189 */ scheme_rt_system_child, /* 190 */
scheme_rt_tcp, /* 190 */ scheme_rt_tcp, /* 191 */
scheme_rt_write_data, /* 191 */ scheme_rt_write_data, /* 192 */
scheme_rt_tcp_select_info, /* 192 */ scheme_rt_tcp_select_info, /* 193 */
scheme_rt_namespace_option, /* 193 */ scheme_rt_namespace_option, /* 194 */
scheme_rt_param_data, /* 194 */ scheme_rt_param_data, /* 195 */
scheme_rt_will, /* 195 */ scheme_rt_will, /* 196 */
scheme_rt_struct_proc_info, /* 196 */ scheme_rt_struct_proc_info, /* 197 */
scheme_rt_linker_name, /* 197 */ scheme_rt_linker_name, /* 198 */
scheme_rt_param_map, /* 198 */ scheme_rt_param_map, /* 199 */
scheme_rt_finalization, /* 199 */ scheme_rt_finalization, /* 200 */
scheme_rt_finalizations, /* 200 */ scheme_rt_finalizations, /* 201 */
scheme_rt_cpp_object, /* 201 */ scheme_rt_cpp_object, /* 202 */
scheme_rt_cpp_array_object, /* 202 */ scheme_rt_cpp_array_object, /* 203 */
scheme_rt_stack_object, /* 203 */ scheme_rt_stack_object, /* 204 */
scheme_rt_preallocated_object, /* 204 */ scheme_rt_preallocated_object, /* 205 */
scheme_thread_hop_type, /* 205 */ scheme_thread_hop_type, /* 206 */
scheme_rt_srcloc, /* 206 */ scheme_rt_srcloc, /* 207 */
scheme_rt_evt, /* 207 */ scheme_rt_evt, /* 208 */
scheme_rt_syncing, /* 208 */ scheme_rt_syncing, /* 209 */
scheme_rt_comp_prefix, /* 209 */ scheme_rt_comp_prefix, /* 210 */
scheme_rt_user_input, /* 210 */ scheme_rt_user_input, /* 211 */
scheme_rt_user_output, /* 211 */ scheme_rt_user_output, /* 212 */
scheme_rt_compact_port, /* 212 */ scheme_rt_compact_port, /* 213 */
scheme_rt_read_special_dw, /* 213 */ scheme_rt_read_special_dw, /* 214 */
scheme_rt_regwork, /* 214 */ scheme_rt_regwork, /* 215 */
scheme_rt_buf_holder, /* 215 */ scheme_rt_buf_holder, /* 216 */
scheme_rt_parameterization, /* 216 */ scheme_rt_parameterization, /* 217 */
scheme_rt_print_params, /* 217 */ scheme_rt_print_params, /* 218 */
scheme_rt_read_params, /* 218 */ scheme_rt_read_params, /* 219 */
scheme_rt_native_code, /* 219 */ scheme_rt_native_code, /* 220 */
scheme_rt_native_code_plus_case, /* 220 */ scheme_rt_native_code_plus_case, /* 221 */
scheme_rt_jitter_data, /* 221 */ scheme_rt_jitter_data, /* 222 */
scheme_rt_module_exports, /* 222 */ scheme_rt_module_exports, /* 223 */
scheme_rt_delay_load_info, /* 223 */ scheme_rt_delay_load_info, /* 224 */
scheme_rt_marshal_info, /* 224 */ scheme_rt_marshal_info, /* 225 */
scheme_rt_unmarshal_info, /* 225 */ scheme_rt_unmarshal_info, /* 226 */
scheme_rt_runstack, /* 226 */ scheme_rt_runstack, /* 227 */
scheme_rt_sfs_info, /* 227 */ scheme_rt_sfs_info, /* 228 */
scheme_rt_validate_clearing, /* 228 */ scheme_rt_validate_clearing, /* 229 */
scheme_rt_rb_node, /* 229 */ scheme_rt_rb_node, /* 230 */
#endif #endif
_scheme_last_type_ _scheme_last_type_
}; };

View File

@ -280,6 +280,7 @@ scheme_init_type ()
set_name(scheme_rt_meta_cont, "<meta-continuation>"); set_name(scheme_rt_meta_cont, "<meta-continuation>");
#endif #endif
set_name(scheme_place_type, "<place>"); set_name(scheme_place_type, "<place>");
set_name(scheme_place_async_channel_type, "<place_async_channel>");
set_name(scheme_engine_type, "<engine>"); set_name(scheme_engine_type, "<engine>");
} }