place channels & threads: improve GC

Allow a thread to be GCed when it is blocked on a place
channel for reading and the place channel's write end
is inaccessible.

GC is limited to threads that do not participate in cycles
of such threads, where the otherwise unerachable threads
are blocked on place channels that are reachable among the
set of threads. In other words, the GC finds the greatest
fix point (as measured by the threads to retain) instead of
least fix point --- which isn't what you want, but finding
the least fix point seems to require significant extra GC
machinery across places.

This improvement was intended to solve the same problem as
commit 7b0608c, but that case seems to run into the limitation
on cycles.
This commit is contained in:
Matthew Flatt 2013-03-24 18:53:24 -06:00
parent b34fac32c0
commit 4d76aa2040
12 changed files with 253 additions and 99 deletions

View File

@ -95,6 +95,16 @@ racket
(place-channel-get pch))))
]
Place channels are subject to @tech{garbage collection}, like other
Racket values, and a @tech{thread} that is blocked reading from a
@tech{place channel} can be garbage collected if @tech{place
channel}'s writing end becomes unreachable. @elemtag['(caveat
"place-channel-gc")]{However}, unlike normal @tech{channel} blocking,
if otherwise unreachable threads are mutually blocked on place
channels that are reachable only from the same threads, the threads
and place channels are all considered reachable, instead of
unreachable.
@defproc[(place-enabled?) boolean?]{

View File

@ -13,10 +13,12 @@ through @racket[thread-resume].
A thread that has not terminated can be garbage collected (see
@secref["gc-model"]) if it is unreachable and suspended or if it is
unreachable and blocked on only unreachable events through
@racket[semaphore-wait], @racket[semaphore-wait/enable-break],
unreachable and blocked on only unreachable events through functions
such as @racket[semaphore-wait], @racket[semaphore-wait/enable-break],
@racket[channel-put], @racket[channel-get], @racket[sync],
@racket[sync/enable-break], or @racket[thread-wait].
@racket[sync/enable-break], or @racket[thread-wait]. Beware, however,
of a limitation on @tech{place-channel} blocking; see the
@elemref['(caveat "place-channel-gc")]{caveat} in @secref["places"].
@margin-note{In GRacket, a handler thread for an eventspace is blocked on
an internal semaphore when its event queue is empty. Thus, the handler

View File

@ -414,6 +414,33 @@
(test-long (lambda (x) (intern-num-sym (modulo x 1000))) "Listof symbols")
(test-long (lambda (x) #s(clown "Binky" "pie")) "Listof prefabs")
(test-long (lambda (x) (read (open-input-string "#0=(#0# . #0#)"))) "Listof cycles")
;; check that a thread blocked on a place channel
;; can be GCed if the other end of the channel is
;; unreachable --- where a place's channels should
;; all count as "unreachable" when the place ends
(displayln "checking place-channel and thread GC interaction")
(let ([N 40])
(define weaks (make-weak-hash))
(for ([i (in-range N)])
(define s (make-semaphore))
(hash-set!
weaks
(thread (lambda ()
(define-values (i o) (place-channel))
(define p (place ch (place-channel-get ch)))
(place-channel-put p o)
(place-wait p)
(semaphore-post s)
(sync i)))
#t)
(sync s))
(for ([i 3])
(sync (system-idle-evt))
(collect-garbage))
(unless ((hash-count weaks) . < . (/ N 2))
(error "thread-gc test failed")))
)
;(report-errs)

View File

@ -370,8 +370,8 @@ int BTC_bi_chan_mark(void *p, struct NewGC *gc)
/* Race conditions here on `mem_size', and likely double counting
when the same async channels are accessible from paired bi
channels --- but those approximations are ok for accounting. */
account_memory(gc, gc->current_mark_owner, bc->sendch->mem_size, 0);
account_memory(gc, gc->current_mark_owner, bc->recvch->mem_size, 0);
account_memory(gc, gc->current_mark_owner, bc->link->sendch->mem_size, 0);
account_memory(gc, gc->current_mark_owner, bc->link->recvch->mem_size, 0);
}
return gc->mark_table[btc_redirect_bi_chan](p, gc);
}

View File

@ -341,6 +341,7 @@ typedef struct Thread_Local_Variables {
struct Evt **place_evts_;
struct Scheme_Place_Object *place_object_;
struct Scheme_Place *all_child_places_;
struct Scheme_Place_Bi_Channel_Link *place_channel_links_;
struct Scheme_Object **reusable_ifs_stack_;
struct Scheme_Object *empty_self_shift_cache_;
struct Scheme_Bucket_Table *scheme_module_code_cache_;
@ -720,6 +721,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define place_evts XOA (scheme_get_thread_local_variables()->place_evts_)
#define place_object XOA (scheme_get_thread_local_variables()->place_object_)
#define all_child_places XOA (scheme_get_thread_local_variables()->all_child_places_)
#define place_channel_links XOA (scheme_get_thread_local_variables()->place_channel_links_)
#define reusable_ifs_stack XOA (scheme_get_thread_local_variables()->reusable_ifs_stack_)
#define empty_self_shift_cache XOA (scheme_get_thread_local_variables()->empty_self_shift_cache_)
#define scheme_module_code_cache XOA (scheme_get_thread_local_variables()->scheme_module_code_cache_)

View File

@ -702,8 +702,8 @@ int is_equal (Scheme_Object *obj1, Scheme_Object *obj2, Equal_Info *eql)
Scheme_Place_Bi_Channel *bc1, *bc2;
bc1 = (Scheme_Place_Bi_Channel *)obj1;
bc2 = (Scheme_Place_Bi_Channel *)obj2;
return (SAME_OBJ(bc1->recvch, bc2->recvch)
&& SAME_OBJ(bc1->sendch, bc2->sendch));
return (SAME_OBJ(bc1->link->recvch, bc2->link->recvch)
&& SAME_OBJ(bc1->link->sendch, bc2->link->sendch));
} else if (!eql->for_chaperone && ((t1 == scheme_chaperone_type)
|| (t1 == scheme_proc_chaperone_type))) {
/* both chaperones */

View File

@ -558,6 +558,10 @@ static Scheme_Env *place_instance_init(void *stack_base, int initial_main_os_thr
printf("done @ %" PRIdPTR "\n#endif\n", scheme_get_process_milliseconds());
#endif
#if defined(MZ_USE_PLACES)
REGISTER_SO(place_channel_links);
#endif
return env;
}
@ -607,6 +611,7 @@ void scheme_place_instance_destroy(int force)
scheme_end_futures_per_place();
#if defined(MZ_USE_PLACES)
scheme_kill_green_thread_timer();
scheme_free_place_bi_channels();
#endif
#if defined(MZ_PRECISE_GC) && defined(MZ_USE_PLACES)
GC_destruct_child_gc();

View File

@ -1468,7 +1468,7 @@ static uintptr_t equal_hash_key(Scheme_Object *o, uintptr_t k, Hash_Info *hi)
k += 7;
/* a bi channel has sendch and recvch, but
sends are the same iff recvs are the same: */
o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch;
o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->link->sendch;
}
break;
default:
@ -1890,7 +1890,7 @@ static uintptr_t equal_hash_key2(Scheme_Object *o, Hash_Info *hi)
case scheme_place_bi_channel_type:
/* a bi channel has sendch and recvch, but
sends are the same iff recvs are the same: */
o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch;
o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->link->sendch;
goto top;
default:
{

View File

@ -7,8 +7,7 @@ static int place_bi_channel_val_SIZE(void *p, struct NewGC *gc) {
static int place_bi_channel_val_MARK(void *p, struct NewGC *gc) {
Scheme_Place_Bi_Channel *pbc = (Scheme_Place_Bi_Channel *)p;
gcMARK2(pbc->sendch, gc);
gcMARK2(pbc->recvch, gc);
gcMARK2(pbc->link, gc);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place_Bi_Channel));
@ -16,8 +15,7 @@ static int place_bi_channel_val_MARK(void *p, struct NewGC *gc) {
static int place_bi_channel_val_FIXUP(void *p, struct NewGC *gc) {
Scheme_Place_Bi_Channel *pbc = (Scheme_Place_Bi_Channel *)p;
gcFIXUP2(pbc->sendch, gc);
gcFIXUP2(pbc->recvch, gc);
gcFIXUP2(pbc->link, gc);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place_Bi_Channel));

View File

@ -1515,8 +1515,7 @@ START place;
place_bi_channel_val {
mark:
Scheme_Place_Bi_Channel *pbc = (Scheme_Place_Bi_Channel *)p;
gcMARK2(pbc->sendch, gc);
gcMARK2(pbc->recvch, gc);
gcMARK2(pbc->link, gc);
size:
gcBYTES_TO_WORDS(sizeof(Scheme_Place_Bi_Channel));

View File

@ -64,6 +64,7 @@ static void cust_kill_place(Scheme_Object *pl, void *notused);
static void resume_one_place_with_lock(Scheme_Place_Object *place_obj);
static Scheme_Place_Async_Channel *place_async_channel_create();
static Scheme_Place_Bi_Channel *place_bi_channel_malloc();
static Scheme_Place_Bi_Channel *place_bi_channel_create();
static Scheme_Place_Bi_Channel *place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig);
static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo);
@ -75,6 +76,9 @@ static int place_dead_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo);
static void* GC_master_malloc_tagged(size_t size);
static void destroy_place_object_locks(Scheme_Place_Object *place_obj);
static void bi_channel_refcount(Scheme_Place_Bi_Channel *ch, int delta);
static void bi_channel_set_finalizer(Scheme_Place_Bi_Channel *ch);
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht,
int mode, int gcable, int can_raise_exn,
@ -83,10 +87,13 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab
# define mzPDC_CHECK 0
# define mzPDC_COPY 1
# define mzPDC_UNCOPY 2
# define mzPDC_DESER 3
# define mzPDC_CLEAN 4
# define mzPDC_DIRECT_UNCOPY 3
# define mzPDC_DESER 4
# define mzPDC_CLEAN 5
#endif
static void places_prepare_direct(Scheme_Object *so);
# ifdef MZ_PRECISE_GC
static void register_traversers(void);
# endif
@ -342,10 +349,8 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
NULL);
}
so = places_deep_copy_to_master(args[0]);
place_data->module = so;
so = places_deep_copy_to_master(args[1]);
place_data->function = so;
place_data->module = args[0];
place_data->function = args[1];
place_data->ready = ready;
/* create channel */
@ -359,11 +364,9 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
}
collection_paths = scheme_current_library_collection_paths(0, NULL);
collection_paths = places_deep_copy_to_master(collection_paths);
place_data->current_library_collection_paths = collection_paths;
collection_paths = scheme_compiled_file_roots(0, NULL);
collection_paths = places_deep_copy_to_master(collection_paths);
place_data->compiled_roots = collection_paths;
cust = scheme_get_current_custodian();
@ -449,6 +452,12 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
}
}
places_prepare_direct(place_data->current_library_collection_paths);
places_prepare_direct(place_data->compiled_roots);
places_prepare_direct(place_data->channel);
places_prepare_direct(place_data->module);
places_prepare_direct(place_data->function);
/* create new place */
proc_thread = mz_proc_thread_create(place_start_proc, place_data);
@ -1179,8 +1188,12 @@ static Scheme_Object *do_places_deep_copy(Scheme_Object *so, int mode, int gcabl
#endif
}
Scheme_Object *places_deep_uncopy(Scheme_Object *so) {
return do_places_deep_copy(so, mzPDC_UNCOPY, 1, NULL, NULL);
static void places_prepare_direct(Scheme_Object *so) {
(void)do_places_deep_copy(so, mzPDC_CHECK, 1, NULL, NULL);
}
static Scheme_Object *places_deep_direct_uncopy(Scheme_Object *so) {
return do_places_deep_copy(so, mzPDC_DIRECT_UNCOPY, 1, NULL, NULL);
}
static void bad_place_message(Scheme_Object *so) {
@ -1269,7 +1282,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
Scheme_Object **master_chain,
Scheme_Object **invalid_object) {
Scheme_Object *new_so;
int copy_mode = ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY));
int copy_mode = ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY));
new_so = trivial_copy(so, master_chain);
if (new_so) return new_so;
@ -1283,17 +1296,26 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
case scheme_place_bi_channel_type: /* ^^^ fall through ^^* */
if (copy_mode) {
Scheme_Place_Bi_Channel *ch;
ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel);
ch->so.type = scheme_place_bi_channel_type;
ch->sendch = ((Scheme_Place_Bi_Channel *)so)->sendch;
ch->recvch = ((Scheme_Place_Bi_Channel *)so)->recvch;
ch = place_bi_channel_malloc();
ch->link->sendch = ((Scheme_Place_Bi_Channel *)so)->link->sendch;
ch->link->recvch = ((Scheme_Place_Bi_Channel *)so)->link->recvch;
if ((mode == mzPDC_COPY) || (mode == mzPDC_DIRECT_UNCOPY))
bi_channel_refcount(ch, 1);
if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY))
bi_channel_set_finalizer(ch);
if (master_chain) {
/* See setting of master_chain in trivial_copy(): */
new_so = scheme_make_raw_pair((Scheme_Object *)ch->sendch, *master_chain);
new_so = scheme_make_raw_pair((Scheme_Object *)ch->recvch, new_so);
new_so = scheme_make_raw_pair((Scheme_Object *)ch->link->sendch, *master_chain);
new_so = scheme_make_raw_pair((Scheme_Object *)ch->link->recvch, new_so);
*master_chain = new_so;
}
new_so = (Scheme_Object *)ch;
} else if (mode == mzPDC_CLEAN) {
bi_channel_refcount((Scheme_Place_Bi_Channel *)so, -1);
} else if (mode == mzPDC_DESER) {
bi_channel_set_finalizer((Scheme_Place_Bi_Channel *)so);
}
break;
case scheme_char_type:
@ -1368,13 +1390,28 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
MZ_OPT_HASH_KEY(&((Scheme_Symbol*)new_so)->iso) = 0x2;
}
new_so->type = scheme_serialized_symbol_type;
} else if (mode == mzPDC_DIRECT_UNCOPY) {
char *str, buf[64];
intptr_t len;
len = SCHEME_SYM_LEN(so);
if (len < 64)
str = buf;
else
str = (char *)scheme_malloc_atomic(len);
memcpy(str, SCHEME_SYM_VAL(so), len);
if (SCHEME_SYM_UNINTERNEDP(so))
new_so = scheme_make_exact_symbol(str, len);
else if (SCHEME_SYM_PARALLELP(so))
new_so = scheme_intern_exact_parallel_symbol(str, len);
else
new_so = scheme_intern_exact_symbol(str, len);
} else if (mode != mzPDC_CHECK) {
scheme_log_abort("encountered symbol in bad mode");
abort();
}
break;
case scheme_serialized_symbol_type:
if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER))
if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)) {
if (SCHEME_SYM_UNINTERNEDP(so)) {
new_so = scheme_make_exact_symbol(SCHEME_BYTE_STR_VAL(so), SCHEME_BYTE_STRLEN_VAL(so));
}
@ -1384,7 +1421,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
else {
new_so = scheme_intern_exact_symbol(SCHEME_BYTE_STR_VAL(so), SCHEME_BYTE_STRLEN_VAL(so));
}
else if (mode != mzPDC_CLEAN) {
} else if (mode != mzPDC_CLEAN) {
scheme_log_abort("encountered serialized symbol in bad mode");
abort();
}
@ -1546,7 +1583,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
break;
case scheme_serialized_tcp_fd_type:
{
if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)) {
if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY) || (mode == mzPDC_DESER)) {
Scheme_Object *in;
Scheme_Object *out;
Scheme_Object *name;
@ -1576,7 +1613,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
break;
case scheme_serialized_file_fd_type:
{
if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)) {
if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY) || (mode == mzPDC_DESER)) {
Scheme_Serialized_File_FD *ffd;
Scheme_Object *name;
int fd;
@ -1780,7 +1817,9 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab
Scheme_Object *fd_accumulators = NULL;
intptr_t delayed_errno = 0;
int set_mode = ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER));
int set_mode = ((mode == mzPDC_COPY)
|| (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY)
|| (mode == mzPDC_DESER));
/* lifted variables for xform*/
Scheme_Object *pair;
@ -1872,7 +1911,7 @@ DEEP_DO:
/* --------- pair ----------- */
case scheme_pair_type:
/* handle cycles: */
if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY)) {
if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY)) {
pair = scheme_make_pair(scheme_false, scheme_false);
SCHEME_PAIR_COPY_FLAGS(pair, so);
} else
@ -1907,7 +1946,7 @@ DEEP_DO_FIN_PAIR_L:
case scheme_vector_type:
size = SCHEME_VEC_SIZE(so);
if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY))
if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY))
vec = scheme_make_vector(size, 0);
else
vec = so;
@ -2104,7 +2143,6 @@ DEEP_SST2_L:
break;
case scheme_hash_table_type:
case scheme_hash_tree_type:
/* if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY)) { */
if (set_mode) {
if (scheme_true == scheme_hash_eq_p(1, &so)) {
nht = scheme_make_immutable_hasheq(0, NULL);
@ -2568,16 +2606,16 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
/* scheme_make_thread behaves differently if the above global vars are not null */
scheme_place_instance_init(stack_base, place_data->parent_gc, mem_limit);
a[0] = places_deep_uncopy(place_data->current_library_collection_paths);
a[0] = places_deep_direct_uncopy(place_data->current_library_collection_paths);
scheme_current_library_collection_paths(1, a);
a[0] = places_deep_uncopy(place_data->compiled_roots);
a[0] = places_deep_direct_uncopy(place_data->compiled_roots);
scheme_compiled_file_roots(1, a);
scheme_seal_parameters();
a[0] = places_deep_uncopy(place_data->module);
a[1] = places_deep_uncopy(place_data->function);
a[0] = places_deep_direct_uncopy(place_data->module);
a[1] = places_deep_direct_uncopy(place_data->function);
a[1] = scheme_intern_exact_symbol(SCHEME_SYM_VAL(a[1]), SCHEME_SYM_LEN(a[1]));
channel = places_deep_uncopy(place_data->channel);
channel = places_deep_direct_uncopy(place_data->channel);
place_obj = place_data->place_obj;
REGISTER_SO(place_object);
place_object = place_obj;
@ -2623,7 +2661,6 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
GC_allow_master_gc_check();
# endif
/* at point, don't refer to place_data or its content
anymore, because it's allocated in the other place */
@ -2667,29 +2704,6 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
return NULL;
}
Scheme_Object *places_deep_copy_to_master(Scheme_Object *so) {
Scheme_Hash_Table *ht = NULL;
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
Scheme_Object *o;
void *original_gc;
/* forces hash codes: */
(void)places_deep_copy_worker(so, &ht, mzPDC_CHECK, 1, 1, NULL, NULL);
ht = NULL;
original_gc = GC_switch_to_master_gc();
scheme_start_atomic();
o = places_deep_copy_worker(so, &ht, mzPDC_COPY, 1, 0, NULL, NULL);
scheme_end_atomic_no_swap();
GC_switch_back_from_master(original_gc);
return o;
#else
return places_deep_copy_worker(so, &ht, mzPDC_COPY, 1, 1, NULL, NULL);
#endif
}
static Scheme_Object *places_serialize(Scheme_Object *so, void **msg_memory, Scheme_Object **master_chain,
Scheme_Object **invalid_object) {
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
@ -2754,7 +2768,7 @@ Scheme_Object *place_send(int argc, Scheme_Object *args[])
ch = NULL;
scheme_wrong_contract("place-channel-put", "place-channel?", 0, argc, args);
}
place_async_send((Scheme_Place_Async_Channel *) ch->sendch, args[1]);
place_async_send((Scheme_Place_Async_Channel *) ch->link->sendch, args[1]);
return scheme_void;
}
@ -2770,7 +2784,7 @@ Scheme_Object *place_receive(int argc, Scheme_Object *args[]) {
ch = NULL;
scheme_wrong_contract("place-channel-get", "place-channel?", 0, argc, args);
}
return place_async_receive((Scheme_Place_Async_Channel *) ch->recvch);
return place_async_receive((Scheme_Place_Async_Channel *)ch->link->recvch);
}
static Scheme_Object* place_allowed_p(int argc, Scheme_Object *args[])
@ -2941,28 +2955,102 @@ Scheme_Place_Async_Channel *place_async_channel_create() {
return ch;
}
static void async_channel_refcount(Scheme_Place_Async_Channel *ch, int for_send, int delta)
{
mzrt_mutex_lock(ch->lock);
if (for_send)
ch->wr_ref += delta;
else
ch->rd_ref += delta;
if ((ch->wr_ref < 0) || (ch->rd_ref < 0)) {
scheme_log_abort("internal error: bad reference count on async channel");
abort();
}
mzrt_mutex_unlock(ch->lock);
}
static void bi_channel_refcount(Scheme_Place_Bi_Channel *ch, int delta)
{
async_channel_refcount(ch->link->sendch, 1, delta);
async_channel_refcount(ch->link->recvch, 0, delta);
}
static void bi_channel_refcount_down(void *_ch, void *data)
{
Scheme_Place_Bi_Channel *ch = (Scheme_Place_Bi_Channel *)_ch;
if (ch->link->prev)
ch->link->prev->next = ch->link->next;
else
place_channel_links = ch->link->next;
if (ch->link->next)
ch->link->next->prev = ch->link->prev;
bi_channel_refcount(ch, -1);
}
void scheme_free_place_bi_channels()
{
Scheme_Place_Bi_Channel_Link *link;
for (link = place_channel_links; link; link = link->next) {
async_channel_refcount(link->sendch, 1, -1);
async_channel_refcount(link->recvch, 0, -1);
}
}
static void bi_channel_set_finalizer(Scheme_Place_Bi_Channel *ch)
{
ch->link->next = place_channel_links;
if (place_channel_links)
place_channel_links->prev = ch->link;
place_channel_links = ch->link;
scheme_add_finalizer(ch, bi_channel_refcount_down, NULL);
}
Scheme_Place_Bi_Channel *place_bi_channel_malloc() {
Scheme_Place_Bi_Channel *ch;
Scheme_Place_Bi_Channel_Link *link;
ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel);
ch->so.type = scheme_place_bi_channel_type;
link = (Scheme_Place_Bi_Channel_Link*)scheme_malloc(sizeof(Scheme_Place_Bi_Channel_Link));
ch->link = link;
return ch;
}
Scheme_Place_Bi_Channel *place_bi_channel_create() {
Scheme_Place_Async_Channel *tmp;
Scheme_Place_Bi_Channel *ch;
ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel);
ch->so.type = scheme_place_bi_channel_type;
ch = place_bi_channel_malloc();
tmp = place_async_channel_create();
ch->sendch = tmp;
ch->link->sendch = tmp;
tmp = place_async_channel_create();
ch->recvch = tmp;
ch->link->recvch = tmp;
bi_channel_refcount(ch, 1);
bi_channel_set_finalizer(ch);
return ch;
}
Scheme_Place_Bi_Channel *place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig) {
Scheme_Place_Bi_Channel *ch;
ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel);
ch->so.type = scheme_place_bi_channel_type;
ch = place_bi_channel_malloc();
ch->link->sendch = orig->link->recvch;
ch->link->recvch = orig->link->sendch;
bi_channel_refcount(ch, 1);
bi_channel_set_finalizer(ch);
ch->sendch = orig->recvch;
ch->recvch = orig->sendch;
return ch;
}
@ -3232,7 +3320,8 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S
}
}
static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr)
static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr,
int *_no_writers)
/* The result must not be retained past extraction from `*msg_memory_ptr'! */
{
Scheme_Object *msg = NULL;
@ -3259,6 +3348,8 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan
maybe_report_message_size(ch);
}
}
if (!msg && !ch->wr_ref && _no_writers)
*_no_writers = 1;
mzrt_mutex_unlock(ch->lock);
*msg_memory_ptr = msg_memory;
@ -3273,12 +3364,12 @@ static void cleanup_msg_memmory(void *thread) {
}
}
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) {
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch, int *_no_writers) {
Scheme_Object *msg = NULL;
Scheme_Thread *p = scheme_current_thread;
GC_CAN_IGNORE void *msg_memory;
BEGIN_ESCAPEABLE(cleanup_msg_memmory, p);
msg = scheme_place_async_try_receive_raw(ch, &msg_memory);
msg = scheme_place_async_try_receive_raw(ch, &msg_memory, _no_writers);
if (msg) {
p->place_channel_msg_in_flight = msg_memory;
msg = scheme_places_deserialize(msg, msg_memory);
@ -3294,6 +3385,7 @@ static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) {
{
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
if (ch->count > 0) ready = 1;
if (!ch->wr_ref) ready = 1;
}
mzrt_mutex_unlock(ch->lock);
return ready;
@ -3319,6 +3411,8 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
Scheme_Object *msg = NULL;
Scheme_Object *wrapper;
void *msg_memory = NULL;
int no_writers = 0;
if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) so)->channel;
}
@ -3326,7 +3420,8 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
ch = (Scheme_Place_Bi_Channel *)so;
}
msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory);
msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->link->recvch,
&msg_memory, &no_writers);
if (msg != NULL) {
Scheme_Object **msg_holder;
Scheme_Thread *p = ((Syncing *)(sinfo->current_syncing))->thread;
@ -3343,25 +3438,32 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
return 1;
}
if (no_writers)
return 1; /* wake up to discover that we can give up */
return 0;
}
static Scheme_Object *place_async_receive(Scheme_Place_Async_Channel *ch) {
Scheme_Object *msg = NULL;
while(1) {
msg = scheme_place_async_try_receive(ch);
if(msg) break;
int no_writers = 0;
while (1) {
msg = scheme_place_async_try_receive(ch, &no_writers);
if (msg)
break;
else {
/*
mzrt_mutex_lock(ch->lock);
register_place_object_with_channel(ch, (Scheme_Object *) place_object);
mzrt_mutex_unlock(ch->lock);
*/
if (no_writers) {
/* No writers are left for this channel, so suspend the thread */
scheme_wait_sema(scheme_make_sema(0), 0);
}
scheme_thread_block(0);
scheme_block_until((Scheme_Ready_Fun) scheme_place_async_ch_ready, NULL, (Scheme_Object *) ch, 0);
}
}
return msg;
}

View File

@ -4027,11 +4027,12 @@ void scheme_ended_child();
typedef struct Scheme_Place_Async_Channel {
Scheme_Object so;
int in;
int out;
int count;
int size;
int delta;
intptr_t in;
intptr_t out;
intptr_t count;
intptr_t size;
intptr_t delta;
intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */
#if defined(MZ_USE_PLACES)
mzrt_mutex *lock;
#endif
@ -4043,12 +4044,20 @@ typedef struct Scheme_Place_Async_Channel {
void *wakeup_signal;
} Scheme_Place_Async_Channel;
typedef struct Scheme_Place_Bi_Channel {
Scheme_Object so;
typedef struct Scheme_Place_Bi_Channel_Link {
/* all pointers; allocated as an array */
Scheme_Place_Async_Channel *sendch;
Scheme_Place_Async_Channel *recvch;
struct Scheme_Place_Bi_Channel_Link *prev, *next;
} Scheme_Place_Bi_Channel_Link;
typedef struct Scheme_Place_Bi_Channel {
Scheme_Object so;
Scheme_Place_Bi_Channel_Link *link;
} Scheme_Place_Bi_Channel;
void scheme_free_place_bi_channels();
typedef struct Scheme_Place {
Scheme_Object so;
struct Scheme_Place_Object *place_obj;