diff --git a/src/mzscheme/src/mzmark.c b/src/mzscheme/src/mzmark.c index af47b7fc7a..afe07e3be9 100644 --- a/src/mzscheme/src/mzmark.c +++ b/src/mzscheme/src/mzmark.c @@ -3347,12 +3347,16 @@ static int place_val_SIZE(void *p) { static int place_val_MARK(void *p) { Scheme_Place *pr = (Scheme_Place *)p; + gcMARK(pr->channel); + return gcBYTES_TO_WORDS(sizeof(Scheme_Place)); } static int place_val_FIXUP(void *p) { Scheme_Place *pr = (Scheme_Place *)p; + gcFIXUP(pr->channel); + return gcBYTES_TO_WORDS(sizeof(Scheme_Place)); } @@ -3361,6 +3365,35 @@ static int place_val_FIXUP(void *p) { #define place_val_IS_CONST_SIZE 1 +static int place_async_channel_val_SIZE(void *p) { + return + gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel)); +} + +static int place_async_channel_val_MARK(void *p) { + Scheme_Place_Async_Channel *pac = (Scheme_Place_Async_Channel *)p; + int i; + for (i = pac->size; i--; ) + gcMARK(pac->msgs[i]); + + return + gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel)); +} + +static int place_async_channel_val_FIXUP(void *p) { + Scheme_Place_Async_Channel *pac = (Scheme_Place_Async_Channel *)p; + int i; + for (i = pac->size; i--; ) + gcFIXUP(pac->msgs[i]); + + return + gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel)); +} + +#define place_async_channel_val_IS_ATOMIC 0 +#define place_async_channel_val_IS_CONST_SIZE 1 + + #endif /* PLACES */ /**********************************************************************/ diff --git a/src/mzscheme/src/mzmarksrc.c b/src/mzscheme/src/mzmarksrc.c index 18311f222f..8a0051a7ea 100644 --- a/src/mzscheme/src/mzmarksrc.c +++ b/src/mzscheme/src/mzmarksrc.c @@ -1357,10 +1357,23 @@ START places; place_val { mark: Scheme_Place *pr = (Scheme_Place *)p; + gcMARK(pr->channel); + size: gcBYTES_TO_WORDS(sizeof(Scheme_Place)); } +place_async_channel_val { + mark: + Scheme_Place_Async_Channel *pac = (Scheme_Place_Async_Channel *)p; + int i; + for (i = pac->size; i--; ) + gcMARK(pac->msgs[i]); + + size: + gcBYTES_TO_WORDS(sizeof(Scheme_Place_Async_Channel)); +} + END places; /**********************************************************************/ diff --git a/src/mzscheme/src/places.c b/src/mzscheme/src/places.c index 84380c04e3..93192b60a9 100644 --- a/src/mzscheme/src/places.c +++ b/src/mzscheme/src/places.c @@ -10,12 +10,17 @@ SHARED_OK mz_proc_thread *scheme_master_proc_thread; THREAD_LOCAL_DECL(mz_proc_thread *proc_thread_self); - Scheme_Object *scheme_place(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_wait(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so); +static Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]); +static Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]); + +Scheme_Object *scheme_place_async_channel_create(); +void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o); +Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch); # ifdef MZ_PRECISE_GC static void register_traversers(void); @@ -55,10 +60,12 @@ void scheme_init_place(Scheme_Env *env) plenv = scheme_primitive_module(scheme_intern_symbol("#%place"), env); - PLACE_PRIM_W_ARITY("place", scheme_place, 3, 3, plenv); - PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv); - PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv); - PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv); + PLACE_PRIM_W_ARITY("place", scheme_place, 1, 3, plenv); + PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv); + PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv); + PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv); + PLACE_PRIM_W_ARITY("place-ch-send", scheme_place_send, 1, 2, plenv); + PLACE_PRIM_W_ARITY("place-ch-recv", scheme_place_recv, 1, 1, plenv); scheme_finish_primitive_module(plenv); } @@ -110,12 +117,26 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { /* pass critical info to new place */ place_data = MALLOC_ONE(Place_Start_Data); - - place_data->module = args[0]; - place_data->function = args[1]; - place_data->channel = args[2]; place_data->ready = ready; + if (argc == 2 || argc == 3 ) { + Scheme_Object *channel; + place_data->module = args[0]; + place_data->function = args[1]; + place_data->ready = ready; + if (argc == 2) { + channel = scheme_place_async_channel_create(); + } + else { + channel = args[2]; + } + place_data->channel = channel; + place->channel = channel; + } + else { + scheme_wrong_count_m("place", 1, 2, argc, args, 0); + } + collection_paths = scheme_current_library_collection_paths(0, NULL); collection_paths = scheme_places_deep_copy_in_master(collection_paths); place_data->current_library_collection_paths = collection_paths; @@ -460,6 +481,28 @@ Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so) { return so; } +Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) { + if (argc == 2) { + Scheme_Object *mso; + mso = scheme_places_deep_copy_in_master(args[1]); + scheme_place_async_send((Scheme_Place_Async_Channel *) args[0], mso); + } + else { + scheme_wrong_count_m("place-ch-send", 1, 2, argc, args, 0); + } + return scheme_true; +} + +Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) { + if (argc == 1) { + return scheme_place_async_recv((Scheme_Place_Async_Channel *) args[0]); + } + else { + scheme_wrong_count_m("place-ch-recv", 1, 2, argc, args, 0); + } + return scheme_true; +} + #ifdef MZ_PRECISE_GC static void* scheme_master_place_handlemsg(int msg_type, void *msg_payload); @@ -499,9 +542,13 @@ void* scheme_master_fast_path(int msg_type, void *msg_payload) { Scheme_Object *o; void *original_gc; +#ifdef MZ_PRECISE_GC original_gc = GC_switch_to_master_gc(); +#endif o = scheme_master_place_handlemsg(msg_type, msg_payload); +#ifdef MZ_PRECISE_GC GC_switch_back_from_master(original_gc); +#endif return o; } @@ -515,8 +562,102 @@ void scheme_spawn_master_place() { scheme_master_proc_thread = (void*) ~0; } + #endif +/*========================================================================*/ +/* places async channels */ +/*========================================================================*/ + +static void* GC_master_malloc(size_t size) { + void *ptr; +#ifdef MZ_PRECISE_GC + void *original_gc; + original_gc = GC_switch_to_master_gc(); +#endif + ptr = GC_malloc(size); +#ifdef MZ_PRECISE_GC + GC_switch_back_from_master(original_gc); +#endif + return ptr; +} + +Scheme_Object *scheme_place_async_channel_create() { + Scheme_Object **msgs; + Scheme_Place_Async_Channel *ch; + + ch = GC_master_malloc(sizeof(Scheme_Place_Async_Channel)); + msgs = GC_master_malloc(sizeof(Scheme_Object*) * 8); + + ch->so.type = scheme_place_async_channel_type; + ch->in = 0; + ch->out = 0; + ch->count = 0; + ch->size = 8; + mzrt_mutex_create(&ch->lock); + ch->msgs = msgs; + ch->wakeup_signal = NULL; + return (Scheme_Object *)ch; +} + +void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o) { + int cnt; + mzrt_mutex_lock(ch->lock); + { + cnt = ch->count; + if (ch->count == ch->size) { /* GROW QUEUE */ + Scheme_Object **new_msgs; + + new_msgs = GC_master_malloc(sizeof(Scheme_Object*) *2); + + if (ch->out < ch->in) { + memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out)); + } + else { + int s1 = (ch->size - ch->out); + memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1); + memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in); + } + + ch->msgs = new_msgs; + ch->in = ch->size; + ch->out = 0; + ch->size *= 2; + } + + ch->msgs[ch->in] = o; + ++ch->count; + ch->in = (++ch->in % ch->size); + } + mzrt_mutex_unlock(ch->lock); + + if (!cnt && ch->wakeup_signal) { + /*wake up possibly sleeping receiver */ + scheme_signal_received_at(ch->wakeup_signal); + } +} + +Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) { + Scheme_Object *msg = NULL; + while(1) { + mzrt_mutex_lock(ch->lock); + { + if (ch->count > 0) { /* GET MSG */ + msg = ch->msgs[ch->out]; + ch->msgs[ch->out] = NULL; + --ch->count; + ch->out = (++ch->out % ch->size); + } + } + mzrt_mutex_unlock(ch->lock); + if(msg) break; + scheme_thread_block(0); + scheme_block_until(NULL, NULL, NULL, 0); + } + return msg; +} + + /*========================================================================*/ /* precise GC traversers */ /*========================================================================*/ @@ -531,6 +672,7 @@ START_XFORM_SKIP; static void register_traversers(void) { GC_REG_TRAV(scheme_place_type, place_val); + GC_REG_TRAV(scheme_place_async_channel_type, place_async_channel_val); } END_XFORM_SKIP; diff --git a/src/mzscheme/src/schpriv.h b/src/mzscheme/src/schpriv.h index bbcc046f43..30e6e51925 100644 --- a/src/mzscheme/src/schpriv.h +++ b/src/mzscheme/src/schpriv.h @@ -3366,8 +3366,27 @@ Scheme_Object *scheme_places_deep_copy(Scheme_Object *so); typedef struct Scheme_Place { Scheme_Object so; void *proc_thread; + Scheme_Object *channel; } Scheme_Place; +typedef struct Scheme_Place_Async_Channel { + Scheme_Object so; + int in; + int out; + int count; + int size; +#if defined(MZ_USE_PLACES) + mzrt_mutex *lock; +#endif + Scheme_Object **msgs; + void *wakeup_signal; +} Scheme_Place_Async_Channel; + +Scheme_Object *scheme_place_async_channel_create(); +void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o); +Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch); + + Scheme_Env *scheme_place_instance_init(); void scheme_place_instance_destroy(); void scheme_kill_green_thread_timer(); diff --git a/src/mzscheme/src/stypes.h b/src/mzscheme/src/stypes.h index dc0fe28e5a..a387e89449 100644 --- a/src/mzscheme/src/stypes.h +++ b/src/mzscheme/src/stypes.h @@ -173,83 +173,85 @@ enum { scheme_future_type, /* 155 */ scheme_flvector_type, /* 156 */ scheme_place_type, /* 157 */ - scheme_engine_type, /* 158 */ - scheme_once_used_type, /* 159 */ + scheme_place_async_channel_type, /* 158 */ + scheme_engine_type, /* 159 */ + scheme_once_used_type, /* 160 */ #ifdef MZTAG_REQUIRED - _scheme_last_normal_type_, /* 160 */ + _scheme_last_normal_type_, /* 161 */ - scheme_rt_weak_array, /* 161 */ + scheme_rt_weak_array, /* 162 */ - scheme_rt_comp_env, /* 162 */ - scheme_rt_constant_binding, /* 163 */ - scheme_rt_resolve_info, /* 164 */ - scheme_rt_optimize_info, /* 165 */ - scheme_rt_compile_info, /* 166 */ - scheme_rt_cont_mark, /* 167 */ - scheme_rt_saved_stack, /* 168 */ - scheme_rt_reply_item, /* 169 */ - scheme_rt_closure_info, /* 170 */ - scheme_rt_overflow, /* 171 */ - scheme_rt_overflow_jmp, /* 172 */ - scheme_rt_meta_cont, /* 173 */ - scheme_rt_dyn_wind_cell, /* 174 */ - scheme_rt_dyn_wind_info, /* 175 */ - scheme_rt_dyn_wind, /* 176 */ - scheme_rt_dup_check, /* 177 */ - scheme_rt_thread_memory, /* 178 */ - scheme_rt_input_file, /* 179 */ - scheme_rt_input_fd, /* 180 */ - scheme_rt_oskit_console_input, /* 181 */ - scheme_rt_tested_input_file, /* 182 */ - scheme_rt_tested_output_file, /* 183 */ - scheme_rt_indexed_string, /* 184 */ - scheme_rt_output_file, /* 185 */ - scheme_rt_load_handler_data, /* 186 */ - scheme_rt_pipe, /* 187 */ - scheme_rt_beos_process, /* 188 */ - scheme_rt_system_child, /* 189 */ - scheme_rt_tcp, /* 190 */ - scheme_rt_write_data, /* 191 */ - scheme_rt_tcp_select_info, /* 192 */ - scheme_rt_namespace_option, /* 193 */ - scheme_rt_param_data, /* 194 */ - scheme_rt_will, /* 195 */ - scheme_rt_struct_proc_info, /* 196 */ - scheme_rt_linker_name, /* 197 */ - scheme_rt_param_map, /* 198 */ - scheme_rt_finalization, /* 199 */ - scheme_rt_finalizations, /* 200 */ - scheme_rt_cpp_object, /* 201 */ - scheme_rt_cpp_array_object, /* 202 */ - scheme_rt_stack_object, /* 203 */ - scheme_rt_preallocated_object, /* 204 */ - scheme_thread_hop_type, /* 205 */ - scheme_rt_srcloc, /* 206 */ - scheme_rt_evt, /* 207 */ - scheme_rt_syncing, /* 208 */ - scheme_rt_comp_prefix, /* 209 */ - scheme_rt_user_input, /* 210 */ - scheme_rt_user_output, /* 211 */ - scheme_rt_compact_port, /* 212 */ - scheme_rt_read_special_dw, /* 213 */ - scheme_rt_regwork, /* 214 */ - scheme_rt_buf_holder, /* 215 */ - scheme_rt_parameterization, /* 216 */ - scheme_rt_print_params, /* 217 */ - scheme_rt_read_params, /* 218 */ - scheme_rt_native_code, /* 219 */ - scheme_rt_native_code_plus_case, /* 220 */ - scheme_rt_jitter_data, /* 221 */ - scheme_rt_module_exports, /* 222 */ - scheme_rt_delay_load_info, /* 223 */ - scheme_rt_marshal_info, /* 224 */ - scheme_rt_unmarshal_info, /* 225 */ - scheme_rt_runstack, /* 226 */ - scheme_rt_sfs_info, /* 227 */ - scheme_rt_validate_clearing, /* 228 */ - scheme_rt_rb_node, /* 229 */ + scheme_rt_comp_env, /* 163 */ + scheme_rt_constant_binding, /* 164 */ + scheme_rt_resolve_info, /* 165 */ + scheme_rt_optimize_info, /* 166 */ + scheme_rt_compile_info, /* 167 */ + scheme_rt_cont_mark, /* 168 */ + scheme_rt_saved_stack, /* 169 */ + scheme_rt_reply_item, /* 170 */ + scheme_rt_closure_info, /* 171 */ + scheme_rt_overflow, /* 172 */ + scheme_rt_overflow_jmp, /* 173 */ + scheme_rt_meta_cont, /* 174 */ + scheme_rt_dyn_wind_cell, /* 175 */ + scheme_rt_dyn_wind_info, /* 176 */ + scheme_rt_dyn_wind, /* 177 */ + scheme_rt_dup_check, /* 178 */ + scheme_rt_thread_memory, /* 179 */ + scheme_rt_input_file, /* 180 */ + scheme_rt_input_fd, /* 181 */ + scheme_rt_oskit_console_input, /* 182 */ + scheme_rt_tested_input_file, /* 183 */ + scheme_rt_tested_output_file, /* 184 */ + scheme_rt_indexed_string, /* 185 */ + scheme_rt_output_file, /* 186 */ + scheme_rt_load_handler_data, /* 187 */ + scheme_rt_pipe, /* 188 */ + scheme_rt_beos_process, /* 189 */ + scheme_rt_system_child, /* 190 */ + scheme_rt_tcp, /* 191 */ + scheme_rt_write_data, /* 192 */ + scheme_rt_tcp_select_info, /* 193 */ + scheme_rt_namespace_option, /* 194 */ + scheme_rt_param_data, /* 195 */ + scheme_rt_will, /* 196 */ + scheme_rt_struct_proc_info, /* 197 */ + scheme_rt_linker_name, /* 198 */ + scheme_rt_param_map, /* 199 */ + scheme_rt_finalization, /* 200 */ + scheme_rt_finalizations, /* 201 */ + scheme_rt_cpp_object, /* 202 */ + scheme_rt_cpp_array_object, /* 203 */ + scheme_rt_stack_object, /* 204 */ + scheme_rt_preallocated_object, /* 205 */ + scheme_thread_hop_type, /* 206 */ + scheme_rt_srcloc, /* 207 */ + scheme_rt_evt, /* 208 */ + scheme_rt_syncing, /* 209 */ + scheme_rt_comp_prefix, /* 210 */ + scheme_rt_user_input, /* 211 */ + scheme_rt_user_output, /* 212 */ + scheme_rt_compact_port, /* 213 */ + scheme_rt_read_special_dw, /* 214 */ + scheme_rt_regwork, /* 215 */ + scheme_rt_buf_holder, /* 216 */ + scheme_rt_parameterization, /* 217 */ + scheme_rt_print_params, /* 218 */ + scheme_rt_read_params, /* 219 */ + scheme_rt_native_code, /* 220 */ + scheme_rt_native_code_plus_case, /* 221 */ + scheme_rt_jitter_data, /* 222 */ + scheme_rt_module_exports, /* 223 */ + scheme_rt_delay_load_info, /* 224 */ + scheme_rt_marshal_info, /* 225 */ + scheme_rt_unmarshal_info, /* 226 */ + scheme_rt_runstack, /* 227 */ + scheme_rt_sfs_info, /* 228 */ + scheme_rt_validate_clearing, /* 229 */ + scheme_rt_rb_node, /* 230 */ #endif + _scheme_last_type_ }; diff --git a/src/mzscheme/src/type.c b/src/mzscheme/src/type.c index 555cb45b87..47715b2dd6 100644 --- a/src/mzscheme/src/type.c +++ b/src/mzscheme/src/type.c @@ -280,6 +280,7 @@ scheme_init_type () set_name(scheme_rt_meta_cont, ""); #endif set_name(scheme_place_type, ""); + set_name(scheme_place_async_channel_type, ""); set_name(scheme_engine_type, ""); }