Places: 1-copy messages

This commit is contained in:
Kevin Tew 2010-08-30 11:33:47 -06:00
parent d43cbab863
commit 869373cf0d
2 changed files with 156 additions and 21 deletions

View File

@ -1,4 +1,3 @@
#include "schpriv.h"
/* READ ONLY SHARABLE GLOBALS */
@ -18,7 +17,6 @@ static Scheme_Object *scheme_place(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_wait(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so);
static Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]);
@ -31,8 +29,11 @@ static Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place
static int scheme_place_channel_ready(Scheme_Object *so);
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o);
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch);
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o, void *msg_memory);
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch, void **msg_memory);
static Scheme_Object *scheme_places_deep_copy_to_master(Scheme_Object *so);
/* Scheme_Object *scheme_places_deep_copy(Scheme_Object *so); */
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
static Scheme_Object *scheme_places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table *ht);
@ -166,9 +167,9 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
if (argc == 2) {
Scheme_Object *so;
so = scheme_places_deep_copy_in_master(args[0]);
so = scheme_places_deep_copy_to_master(args[0]);
place_data->module = so;
so = scheme_places_deep_copy_in_master(args[1]);
so = scheme_places_deep_copy_to_master(args[1]);
place_data->function = so;
place_data->ready = ready;
@ -186,7 +187,7 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
}
collection_paths = scheme_current_library_collection_paths(0, NULL);
collection_paths = scheme_places_deep_copy_in_master(collection_paths);
collection_paths = scheme_places_deep_copy_to_master(collection_paths);
place_data->current_library_collection_paths = collection_paths;
/* create new place */
@ -478,7 +479,9 @@ void scheme_done_with_process_id(int pid, int is_group)
static void got_sigchld() XFORM_SKIP_PROC
{
write(2, "SIGCHLD handler called (some thread has SIGCHLD unblocked)\n", 59);
if(-1 == write(2, "SIGCHLD handler called (some thread has SIGCHLD unblocked)\n", 59)) {
}
}
void scheme_places_block_child_signal() XFORM_SKIP_PROC
@ -879,6 +882,9 @@ Scheme_Object *scheme_places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab
}
#endif
#if 0
/* unused code, may be useful when/if we revive shared symbol and prefab key tables */
Scheme_Struct_Type *scheme_make_prefab_struct_type_in_master(Scheme_Object *base,
Scheme_Object *parent,
int num_fields,
@ -915,6 +921,7 @@ Scheme_Struct_Type *scheme_make_prefab_struct_type_in_master(Scheme_Object *base
return stype;
}
#endif
static void *place_start_proc(void *data_arg) {
void *stack_base;
@ -1002,7 +1009,7 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
Scheme_Hash_Table *force_hash(Scheme_Object *so);
# endif
Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so) {
Scheme_Object *scheme_places_deep_copy_to_master(Scheme_Object *so) {
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
Scheme_Object *o;
void *original_gc;
@ -1010,21 +1017,127 @@ Scheme_Object *scheme_places_deep_copy_in_master(Scheme_Object *so) {
ht = force_hash(so);
# ifdef MZ_PRECISE_GC
original_gc = GC_switch_to_master_gc();
scheme_start_atomic();
# endif
o = scheme_places_deep_copy_worker(so, ht);
# ifdef MZ_PRECISE_GC
scheme_end_atomic_no_swap();
GC_switch_back_from_master(original_gc);
# endif
return o;
#else
return so;
#endif
}
Scheme_Object *scheme_places_deserialize_worker(Scheme_Object *so)
{
Scheme_Object *new_so = so;
if (SCHEME_INTP(so)) {
return so;
}
switch (so->type) {
case scheme_true_type:
case scheme_false_type:
case scheme_null_type:
/* place_bi_channels are allocated in the master and can be passed along as is */
case scheme_place_bi_channel_type:
case scheme_char_type:
case scheme_rational_type:
case scheme_float_type:
case scheme_double_type:
case scheme_complex_type:
case scheme_char_string_type:
case scheme_byte_string_type:
case scheme_unix_path_type:
case scheme_flvector_type:
new_so = so;
break;
case scheme_symbol_type:
scheme_log_abort("scheme_symbol_type: shouldn't be seen during deserialization step");
break;
case scheme_serialized_symbol_type:
new_so = scheme_intern_exact_symbol(SCHEME_BYTE_STR_VAL(so), SCHEME_BYTE_STRLEN_VAL(so));
break;
case scheme_pair_type:
{
Scheme_Object *tmp;
tmp = scheme_places_deserialize_worker(SCHEME_CAR(so));
SCHEME_CAR(so) = tmp;
tmp = scheme_places_deserialize_worker(SCHEME_CDR(so));
SCHEME_CDR(so) = tmp;
new_so = so;
}
break;
case scheme_vector_type:
{
long i;
long size = SCHEME_VEC_SIZE(so);
for (i = 0; i <size ; i++) {
Scheme_Object *tmp;
tmp = scheme_places_deserialize_worker(SCHEME_VEC_ELS(so)[i]);
SCHEME_VEC_ELS(so)[i] = tmp;
}
new_so = so;
}
break;
case scheme_structure_type:
scheme_log_abort("scheme_structure_type: shouldn't be seen during deserialization step");
break;
case scheme_serialized_structure_type:
{
Scheme_Serialized_Structure *st = (Scheme_Serialized_Structure*)so;
Scheme_Struct_Type *stype;
Scheme_Structure *nst;
long size;
int i = 0;
size = st->num_slots;
stype = scheme_lookup_prefab_type(SCHEME_CDR(st->prefab_key), size);
nst = (Scheme_Structure*) scheme_make_blank_prefab_struct_instance(stype);
for (i = 0; i <size ; i++) {
Scheme_Object *tmp;
tmp = scheme_places_deserialize_worker((Scheme_Object*) st->slots[i]);
nst->slots[i] = tmp;
}
new_so = (Scheme_Object*)nst;
}
break;
case scheme_resolved_module_path_type:
default:
scheme_log_abort("cannot deserialize object");
abort();
break;
}
return new_so;
}
Scheme_Object *scheme_places_serialize(Scheme_Object *so, void **msg_memory) {
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
Scheme_Object *new_so;
Scheme_Object *tmp;
GC_create_message_allocator();
new_so = scheme_places_deep_copy(so);
tmp = GC_finish_message_allocator();
(*msg_memory) = tmp;
return new_so;
#else
return so;
#endif
}
Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) {
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
Scheme_Object *new_so;
new_so = scheme_places_deserialize_worker(so);
GC_adopt_message_allocator(msg_memory);
return new_so;
#else
return so;
#endif
}
Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
if (argc == 2) {
Scheme_Object *mso;
@ -1035,8 +1148,11 @@ Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
else {
ch = (Scheme_Place_Bi_Channel *)args[0];
}
mso = scheme_places_deep_copy_in_master(args[1]);
scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, mso);
{
void *msg_memory;
mso = scheme_places_serialize(args[1], &msg_memory);
scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, mso, msg_memory);
}
}
else {
scheme_wrong_count_m("place-channel-send", 1, 2, argc, args, 0);
@ -1054,8 +1170,11 @@ Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) {
else {
ch = (Scheme_Place_Bi_Channel *) args[0];
}
mso = scheme_place_async_recv((Scheme_Place_Async_Channel *) ch->recvch);
return scheme_places_deep_copy(mso);
{
void *msg_memory;
mso = scheme_place_async_recv((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory);
return scheme_places_deserialize(mso, msg_memory);
}
}
else {
scheme_wrong_count_m("place-channel-recv", 1, 2, argc, args, 0);
@ -1127,7 +1246,7 @@ void force_hash_worker(Scheme_Object *so, Scheme_Hash_Table *ht)
long i;
long size = SCHEME_VEC_SIZE(so);
for (i = 0; i <size ; i++) {
scheme_places_deep_copy_worker(SCHEME_VEC_ELS(so)[i], ht);
force_hash_worker(SCHEME_VEC_ELS(so)[i], ht);
}
}
break;
@ -1201,9 +1320,11 @@ static void* GC_master_malloc_tagged(size_t size) {
Scheme_Place_Async_Channel *scheme_place_async_channel_create() {
Scheme_Object **msgs;
Scheme_Place_Async_Channel *ch;
void **msg_memory;
ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Async_Channel));
msgs = GC_master_malloc(sizeof(Scheme_Object*) * 8);
msg_memory = GC_master_malloc(sizeof(void*) * 8);
ch->so.type = scheme_place_async_channel_type;
ch->in = 0;
@ -1212,6 +1333,7 @@ Scheme_Place_Async_Channel *scheme_place_async_channel_create() {
ch->size = 8;
mzrt_mutex_create(&ch->lock);
ch->msgs = msgs;
ch->msg_memory = msg_memory;
ch->wakeup_signal = NULL;
return ch;
}
@ -1261,32 +1383,40 @@ static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[])
return SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type) ? scheme_true : scheme_false;
}
void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o) {
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o, void *msg_memory) {
int cnt;
mzrt_mutex_lock(ch->lock);
{
cnt = ch->count;
if (ch->count == ch->size) { /* GROW QUEUE */
Scheme_Object **new_msgs;
void **new_msg_memory;
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) *2);
new_msgs = GC_master_malloc(sizeof(Scheme_Object*) * ch->size * 2);
new_msg_memory = GC_master_malloc(sizeof(void*) * ch->size * 2);
if (ch->out < ch->in) {
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * (ch->in - ch->out));
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * (ch->in - ch->out));
}
else {
int s1 = (ch->size - ch->out);
memcpy(new_msgs, ch->msgs + ch->out, sizeof(Scheme_Object *) * s1);
memcpy(new_msgs + s1, ch->msgs, sizeof(Scheme_Object *) * ch->in);
memcpy(new_msg_memory, ch->msg_memory + ch->out, sizeof(void*) * s1);
memcpy(new_msg_memory + s1, ch->msg_memory, sizeof(void*) * ch->in);
}
ch->msgs = new_msgs;
ch->msg_memory = new_msg_memory;
ch->in = ch->size;
ch->out = 0;
ch->size *= 2;
}
ch->msgs[ch->in] = o;
ch->msg_memory[ch->in] = msg_memory;
++ch->count;
ch->in = (++ch->in % ch->size);
}
@ -1320,14 +1450,18 @@ static int scheme_place_channel_ready(Scheme_Object *so) {
return scheme_place_async_ch_ready((Scheme_Place_Async_Channel *) ch->recvch);
}
Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) {
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch, void **msg_memory) {
Scheme_Object *msg = NULL;
while(1) {
mzrt_mutex_lock(ch->lock);
{
if (ch->count > 0) { /* GET MSG */
msg = ch->msgs[ch->out];
*msg_memory = ch->msg_memory[ch->out];
ch->msgs[ch->out] = NULL;
ch->msg_memory[ch->out] = NULL;
--ch->count;
ch->out = (++ch->out % ch->size);
}

View File

@ -3540,6 +3540,7 @@ typedef struct Scheme_Place_Async_Channel {
mzrt_mutex *lock;
#endif
Scheme_Object **msgs;
void **msg_memory;
void *wakeup_signal;
} Scheme_Place_Async_Channel;