diff --git a/collects/scribblings/reference/places.scrbl b/collects/scribblings/reference/places.scrbl index c983d26b2e..648321d460 100644 --- a/collects/scribblings/reference/places.scrbl +++ b/collects/scribblings/reference/places.scrbl @@ -95,6 +95,16 @@ racket (place-channel-get pch)))) ] +Place channels are subject to @tech{garbage collection}, like other +Racket values, and a @tech{thread} that is blocked reading from a +@tech{place channel} can be garbage collected if @tech{place +channel}'s writing end becomes unreachable. @elemtag['(caveat +"place-channel-gc")]{However}, unlike normal @tech{channel} blocking, +if otherwise unreachable threads are mutually blocked on place +channels that are reachable only from the same threads, the threads +and place channels are all considered reachable, instead of +unreachable. + @defproc[(place-enabled?) boolean?]{ diff --git a/collects/scribblings/reference/threads.scrbl b/collects/scribblings/reference/threads.scrbl index ce9f1bb5c0..946509f419 100644 --- a/collects/scribblings/reference/threads.scrbl +++ b/collects/scribblings/reference/threads.scrbl @@ -13,10 +13,12 @@ through @racket[thread-resume]. A thread that has not terminated can be garbage collected (see @secref["gc-model"]) if it is unreachable and suspended or if it is -unreachable and blocked on only unreachable events through -@racket[semaphore-wait], @racket[semaphore-wait/enable-break], +unreachable and blocked on only unreachable events through functions +such as @racket[semaphore-wait], @racket[semaphore-wait/enable-break], @racket[channel-put], @racket[channel-get], @racket[sync], -@racket[sync/enable-break], or @racket[thread-wait]. +@racket[sync/enable-break], or @racket[thread-wait]. Beware, however, +of a limitation on @tech{place-channel} blocking; see the +@elemref['(caveat "place-channel-gc")]{caveat} in @secref["places"]. @margin-note{In GRacket, a handler thread for an eventspace is blocked on an internal semaphore when its event queue is empty. Thus, the handler diff --git a/collects/tests/racket/place-channel.rkt b/collects/tests/racket/place-channel.rkt index 24995b2fdb..1db9dfaea0 100644 --- a/collects/tests/racket/place-channel.rkt +++ b/collects/tests/racket/place-channel.rkt @@ -414,6 +414,33 @@ (test-long (lambda (x) (intern-num-sym (modulo x 1000))) "Listof symbols") (test-long (lambda (x) #s(clown "Binky" "pie")) "Listof prefabs") (test-long (lambda (x) (read (open-input-string "#0=(#0# . #0#)"))) "Listof cycles") + + ;; check that a thread blocked on a place channel + ;; can be GCed if the other end of the channel is + ;; unreachable --- where a place's channels should + ;; all count as "unreachable" when the place ends + (displayln "checking place-channel and thread GC interaction") + (let ([N 40]) + (define weaks (make-weak-hash)) + (for ([i (in-range N)]) + (define s (make-semaphore)) + (hash-set! + weaks + (thread (lambda () + (define-values (i o) (place-channel)) + (define p (place ch (place-channel-get ch))) + (place-channel-put p o) + (place-wait p) + (semaphore-post s) + (sync i))) + #t) + (sync s)) + (for ([i 3]) + (sync (system-idle-evt)) + (collect-garbage)) + (unless ((hash-count weaks) . < . (/ N 2)) + (error "thread-gc test failed"))) + ) ;(report-errs) diff --git a/src/racket/gc2/mem_account.c b/src/racket/gc2/mem_account.c index 7d1aab7d65..df7b34c6f1 100644 --- a/src/racket/gc2/mem_account.c +++ b/src/racket/gc2/mem_account.c @@ -370,8 +370,8 @@ int BTC_bi_chan_mark(void *p, struct NewGC *gc) /* Race conditions here on `mem_size', and likely double counting when the same async channels are accessible from paired bi channels --- but those approximations are ok for accounting. */ - account_memory(gc, gc->current_mark_owner, bc->sendch->mem_size, 0); - account_memory(gc, gc->current_mark_owner, bc->recvch->mem_size, 0); + account_memory(gc, gc->current_mark_owner, bc->link->sendch->mem_size, 0); + account_memory(gc, gc->current_mark_owner, bc->link->recvch->mem_size, 0); } return gc->mark_table[btc_redirect_bi_chan](p, gc); } diff --git a/src/racket/include/schthread.h b/src/racket/include/schthread.h index 22a43bab1d..fa14ec7fd6 100644 --- a/src/racket/include/schthread.h +++ b/src/racket/include/schthread.h @@ -341,6 +341,7 @@ typedef struct Thread_Local_Variables { struct Evt **place_evts_; struct Scheme_Place_Object *place_object_; struct Scheme_Place *all_child_places_; + struct Scheme_Place_Bi_Channel_Link *place_channel_links_; struct Scheme_Object **reusable_ifs_stack_; struct Scheme_Object *empty_self_shift_cache_; struct Scheme_Bucket_Table *scheme_module_code_cache_; @@ -720,6 +721,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL; #define place_evts XOA (scheme_get_thread_local_variables()->place_evts_) #define place_object XOA (scheme_get_thread_local_variables()->place_object_) #define all_child_places XOA (scheme_get_thread_local_variables()->all_child_places_) +#define place_channel_links XOA (scheme_get_thread_local_variables()->place_channel_links_) #define reusable_ifs_stack XOA (scheme_get_thread_local_variables()->reusable_ifs_stack_) #define empty_self_shift_cache XOA (scheme_get_thread_local_variables()->empty_self_shift_cache_) #define scheme_module_code_cache XOA (scheme_get_thread_local_variables()->scheme_module_code_cache_) diff --git a/src/racket/src/bool.c b/src/racket/src/bool.c index 2820636f73..02163713da 100644 --- a/src/racket/src/bool.c +++ b/src/racket/src/bool.c @@ -702,8 +702,8 @@ int is_equal (Scheme_Object *obj1, Scheme_Object *obj2, Equal_Info *eql) Scheme_Place_Bi_Channel *bc1, *bc2; bc1 = (Scheme_Place_Bi_Channel *)obj1; bc2 = (Scheme_Place_Bi_Channel *)obj2; - return (SAME_OBJ(bc1->recvch, bc2->recvch) - && SAME_OBJ(bc1->sendch, bc2->sendch)); + return (SAME_OBJ(bc1->link->recvch, bc2->link->recvch) + && SAME_OBJ(bc1->link->sendch, bc2->link->sendch)); } else if (!eql->for_chaperone && ((t1 == scheme_chaperone_type) || (t1 == scheme_proc_chaperone_type))) { /* both chaperones */ diff --git a/src/racket/src/env.c b/src/racket/src/env.c index 50cb84de58..2bd620ed66 100644 --- a/src/racket/src/env.c +++ b/src/racket/src/env.c @@ -558,6 +558,10 @@ static Scheme_Env *place_instance_init(void *stack_base, int initial_main_os_thr printf("done @ %" PRIdPTR "\n#endif\n", scheme_get_process_milliseconds()); #endif +#if defined(MZ_USE_PLACES) + REGISTER_SO(place_channel_links); +#endif + return env; } @@ -607,6 +611,7 @@ void scheme_place_instance_destroy(int force) scheme_end_futures_per_place(); #if defined(MZ_USE_PLACES) scheme_kill_green_thread_timer(); + scheme_free_place_bi_channels(); #endif #if defined(MZ_PRECISE_GC) && defined(MZ_USE_PLACES) GC_destruct_child_gc(); diff --git a/src/racket/src/hash.c b/src/racket/src/hash.c index d861671dbc..280fb86e78 100644 --- a/src/racket/src/hash.c +++ b/src/racket/src/hash.c @@ -1468,7 +1468,7 @@ static uintptr_t equal_hash_key(Scheme_Object *o, uintptr_t k, Hash_Info *hi) k += 7; /* a bi channel has sendch and recvch, but sends are the same iff recvs are the same: */ - o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch; + o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->link->sendch; } break; default: @@ -1890,7 +1890,7 @@ static uintptr_t equal_hash_key2(Scheme_Object *o, Hash_Info *hi) case scheme_place_bi_channel_type: /* a bi channel has sendch and recvch, but sends are the same iff recvs are the same: */ - o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch; + o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->link->sendch; goto top; default: { diff --git a/src/racket/src/mzmark_place.inc b/src/racket/src/mzmark_place.inc index 5c0f8015ab..93c0e2a70d 100644 --- a/src/racket/src/mzmark_place.inc +++ b/src/racket/src/mzmark_place.inc @@ -7,8 +7,7 @@ static int place_bi_channel_val_SIZE(void *p, struct NewGC *gc) { static int place_bi_channel_val_MARK(void *p, struct NewGC *gc) { Scheme_Place_Bi_Channel *pbc = (Scheme_Place_Bi_Channel *)p; - gcMARK2(pbc->sendch, gc); - gcMARK2(pbc->recvch, gc); + gcMARK2(pbc->link, gc); return gcBYTES_TO_WORDS(sizeof(Scheme_Place_Bi_Channel)); @@ -16,8 +15,7 @@ static int place_bi_channel_val_MARK(void *p, struct NewGC *gc) { static int place_bi_channel_val_FIXUP(void *p, struct NewGC *gc) { Scheme_Place_Bi_Channel *pbc = (Scheme_Place_Bi_Channel *)p; - gcFIXUP2(pbc->sendch, gc); - gcFIXUP2(pbc->recvch, gc); + gcFIXUP2(pbc->link, gc); return gcBYTES_TO_WORDS(sizeof(Scheme_Place_Bi_Channel)); diff --git a/src/racket/src/mzmarksrc.c b/src/racket/src/mzmarksrc.c index 47ea086c68..20652a0d96 100644 --- a/src/racket/src/mzmarksrc.c +++ b/src/racket/src/mzmarksrc.c @@ -1515,8 +1515,7 @@ START place; place_bi_channel_val { mark: Scheme_Place_Bi_Channel *pbc = (Scheme_Place_Bi_Channel *)p; - gcMARK2(pbc->sendch, gc); - gcMARK2(pbc->recvch, gc); + gcMARK2(pbc->link, gc); size: gcBYTES_TO_WORDS(sizeof(Scheme_Place_Bi_Channel)); diff --git a/src/racket/src/place.c b/src/racket/src/place.c index db6dacb354..76bd845f90 100644 --- a/src/racket/src/place.c +++ b/src/racket/src/place.c @@ -64,6 +64,7 @@ static void cust_kill_place(Scheme_Object *pl, void *notused); static void resume_one_place_with_lock(Scheme_Place_Object *place_obj); static Scheme_Place_Async_Channel *place_async_channel_create(); +static Scheme_Place_Bi_Channel *place_bi_channel_malloc(); static Scheme_Place_Bi_Channel *place_bi_channel_create(); static Scheme_Place_Bi_Channel *place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig); static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo); @@ -75,6 +76,9 @@ static int place_dead_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo); static void* GC_master_malloc_tagged(size_t size); static void destroy_place_object_locks(Scheme_Place_Object *place_obj); +static void bi_channel_refcount(Scheme_Place_Bi_Channel *ch, int delta); +static void bi_channel_set_finalizer(Scheme_Place_Bi_Channel *ch); + #if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC) static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht, int mode, int gcable, int can_raise_exn, @@ -83,10 +87,13 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab # define mzPDC_CHECK 0 # define mzPDC_COPY 1 # define mzPDC_UNCOPY 2 -# define mzPDC_DESER 3 -# define mzPDC_CLEAN 4 +# define mzPDC_DIRECT_UNCOPY 3 +# define mzPDC_DESER 4 +# define mzPDC_CLEAN 5 #endif +static void places_prepare_direct(Scheme_Object *so); + # ifdef MZ_PRECISE_GC static void register_traversers(void); # endif @@ -342,10 +349,8 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { NULL); } - so = places_deep_copy_to_master(args[0]); - place_data->module = so; - so = places_deep_copy_to_master(args[1]); - place_data->function = so; + place_data->module = args[0]; + place_data->function = args[1]; place_data->ready = ready; /* create channel */ @@ -359,11 +364,9 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { } collection_paths = scheme_current_library_collection_paths(0, NULL); - collection_paths = places_deep_copy_to_master(collection_paths); place_data->current_library_collection_paths = collection_paths; collection_paths = scheme_compiled_file_roots(0, NULL); - collection_paths = places_deep_copy_to_master(collection_paths); place_data->compiled_roots = collection_paths; cust = scheme_get_current_custodian(); @@ -449,6 +452,12 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { } } + places_prepare_direct(place_data->current_library_collection_paths); + places_prepare_direct(place_data->compiled_roots); + places_prepare_direct(place_data->channel); + places_prepare_direct(place_data->module); + places_prepare_direct(place_data->function); + /* create new place */ proc_thread = mz_proc_thread_create(place_start_proc, place_data); @@ -1179,8 +1188,12 @@ static Scheme_Object *do_places_deep_copy(Scheme_Object *so, int mode, int gcabl #endif } -Scheme_Object *places_deep_uncopy(Scheme_Object *so) { - return do_places_deep_copy(so, mzPDC_UNCOPY, 1, NULL, NULL); +static void places_prepare_direct(Scheme_Object *so) { + (void)do_places_deep_copy(so, mzPDC_CHECK, 1, NULL, NULL); +} + +static Scheme_Object *places_deep_direct_uncopy(Scheme_Object *so) { + return do_places_deep_copy(so, mzPDC_DIRECT_UNCOPY, 1, NULL, NULL); } static void bad_place_message(Scheme_Object *so) { @@ -1269,7 +1282,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h Scheme_Object **master_chain, Scheme_Object **invalid_object) { Scheme_Object *new_so; - int copy_mode = ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY)); + int copy_mode = ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY)); new_so = trivial_copy(so, master_chain); if (new_so) return new_so; @@ -1283,17 +1296,26 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h case scheme_place_bi_channel_type: /* ^^^ fall through ^^* */ if (copy_mode) { Scheme_Place_Bi_Channel *ch; - ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel); - ch->so.type = scheme_place_bi_channel_type; - ch->sendch = ((Scheme_Place_Bi_Channel *)so)->sendch; - ch->recvch = ((Scheme_Place_Bi_Channel *)so)->recvch; + ch = place_bi_channel_malloc(); + ch->link->sendch = ((Scheme_Place_Bi_Channel *)so)->link->sendch; + ch->link->recvch = ((Scheme_Place_Bi_Channel *)so)->link->recvch; + + if ((mode == mzPDC_COPY) || (mode == mzPDC_DIRECT_UNCOPY)) + bi_channel_refcount(ch, 1); + if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY)) + bi_channel_set_finalizer(ch); + if (master_chain) { /* See setting of master_chain in trivial_copy(): */ - new_so = scheme_make_raw_pair((Scheme_Object *)ch->sendch, *master_chain); - new_so = scheme_make_raw_pair((Scheme_Object *)ch->recvch, new_so); + new_so = scheme_make_raw_pair((Scheme_Object *)ch->link->sendch, *master_chain); + new_so = scheme_make_raw_pair((Scheme_Object *)ch->link->recvch, new_so); *master_chain = new_so; } new_so = (Scheme_Object *)ch; + } else if (mode == mzPDC_CLEAN) { + bi_channel_refcount((Scheme_Place_Bi_Channel *)so, -1); + } else if (mode == mzPDC_DESER) { + bi_channel_set_finalizer((Scheme_Place_Bi_Channel *)so); } break; case scheme_char_type: @@ -1368,13 +1390,28 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h MZ_OPT_HASH_KEY(&((Scheme_Symbol*)new_so)->iso) = 0x2; } new_so->type = scheme_serialized_symbol_type; + } else if (mode == mzPDC_DIRECT_UNCOPY) { + char *str, buf[64]; + intptr_t len; + len = SCHEME_SYM_LEN(so); + if (len < 64) + str = buf; + else + str = (char *)scheme_malloc_atomic(len); + memcpy(str, SCHEME_SYM_VAL(so), len); + if (SCHEME_SYM_UNINTERNEDP(so)) + new_so = scheme_make_exact_symbol(str, len); + else if (SCHEME_SYM_PARALLELP(so)) + new_so = scheme_intern_exact_parallel_symbol(str, len); + else + new_so = scheme_intern_exact_symbol(str, len); } else if (mode != mzPDC_CHECK) { scheme_log_abort("encountered symbol in bad mode"); abort(); } break; case scheme_serialized_symbol_type: - if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)) + if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)) { if (SCHEME_SYM_UNINTERNEDP(so)) { new_so = scheme_make_exact_symbol(SCHEME_BYTE_STR_VAL(so), SCHEME_BYTE_STRLEN_VAL(so)); } @@ -1384,7 +1421,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h else { new_so = scheme_intern_exact_symbol(SCHEME_BYTE_STR_VAL(so), SCHEME_BYTE_STRLEN_VAL(so)); } - else if (mode != mzPDC_CLEAN) { + } else if (mode != mzPDC_CLEAN) { scheme_log_abort("encountered serialized symbol in bad mode"); abort(); } @@ -1546,7 +1583,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h break; case scheme_serialized_tcp_fd_type: { - if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)) { + if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY) || (mode == mzPDC_DESER)) { Scheme_Object *in; Scheme_Object *out; Scheme_Object *name; @@ -1576,7 +1613,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h break; case scheme_serialized_file_fd_type: { - if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)) { + if ((mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY) || (mode == mzPDC_DESER)) { Scheme_Serialized_File_FD *ffd; Scheme_Object *name; int fd; @@ -1780,7 +1817,9 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab Scheme_Object *fd_accumulators = NULL; intptr_t delayed_errno = 0; - int set_mode = ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DESER)); + int set_mode = ((mode == mzPDC_COPY) + || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY) + || (mode == mzPDC_DESER)); /* lifted variables for xform*/ Scheme_Object *pair; @@ -1872,7 +1911,7 @@ DEEP_DO: /* --------- pair ----------- */ case scheme_pair_type: /* handle cycles: */ - if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY)) { + if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY)) { pair = scheme_make_pair(scheme_false, scheme_false); SCHEME_PAIR_COPY_FLAGS(pair, so); } else @@ -1907,7 +1946,7 @@ DEEP_DO_FIN_PAIR_L: case scheme_vector_type: size = SCHEME_VEC_SIZE(so); - if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY)) + if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY)) vec = scheme_make_vector(size, 0); else vec = so; @@ -2104,7 +2143,6 @@ DEEP_SST2_L: break; case scheme_hash_table_type: case scheme_hash_tree_type: - /* if ((mode == mzPDC_COPY) || (mode == mzPDC_UNCOPY)) { */ if (set_mode) { if (scheme_true == scheme_hash_eq_p(1, &so)) { nht = scheme_make_immutable_hasheq(0, NULL); @@ -2568,16 +2606,16 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { /* scheme_make_thread behaves differently if the above global vars are not null */ scheme_place_instance_init(stack_base, place_data->parent_gc, mem_limit); - a[0] = places_deep_uncopy(place_data->current_library_collection_paths); + a[0] = places_deep_direct_uncopy(place_data->current_library_collection_paths); scheme_current_library_collection_paths(1, a); - a[0] = places_deep_uncopy(place_data->compiled_roots); + a[0] = places_deep_direct_uncopy(place_data->compiled_roots); scheme_compiled_file_roots(1, a); scheme_seal_parameters(); - a[0] = places_deep_uncopy(place_data->module); - a[1] = places_deep_uncopy(place_data->function); + a[0] = places_deep_direct_uncopy(place_data->module); + a[1] = places_deep_direct_uncopy(place_data->function); a[1] = scheme_intern_exact_symbol(SCHEME_SYM_VAL(a[1]), SCHEME_SYM_LEN(a[1])); - channel = places_deep_uncopy(place_data->channel); + channel = places_deep_direct_uncopy(place_data->channel); place_obj = place_data->place_obj; REGISTER_SO(place_object); place_object = place_obj; @@ -2623,7 +2661,6 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { GC_allow_master_gc_check(); # endif - /* at point, don't refer to place_data or its content anymore, because it's allocated in the other place */ @@ -2667,29 +2704,6 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { return NULL; } -Scheme_Object *places_deep_copy_to_master(Scheme_Object *so) { - Scheme_Hash_Table *ht = NULL; -#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC) - Scheme_Object *o; - void *original_gc; - - /* forces hash codes: */ - (void)places_deep_copy_worker(so, &ht, mzPDC_CHECK, 1, 1, NULL, NULL); - ht = NULL; - - original_gc = GC_switch_to_master_gc(); - scheme_start_atomic(); - - o = places_deep_copy_worker(so, &ht, mzPDC_COPY, 1, 0, NULL, NULL); - - scheme_end_atomic_no_swap(); - GC_switch_back_from_master(original_gc); - return o; -#else - return places_deep_copy_worker(so, &ht, mzPDC_COPY, 1, 1, NULL, NULL); -#endif -} - static Scheme_Object *places_serialize(Scheme_Object *so, void **msg_memory, Scheme_Object **master_chain, Scheme_Object **invalid_object) { #if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC) @@ -2754,7 +2768,7 @@ Scheme_Object *place_send(int argc, Scheme_Object *args[]) ch = NULL; scheme_wrong_contract("place-channel-put", "place-channel?", 0, argc, args); } - place_async_send((Scheme_Place_Async_Channel *) ch->sendch, args[1]); + place_async_send((Scheme_Place_Async_Channel *) ch->link->sendch, args[1]); return scheme_void; } @@ -2770,7 +2784,7 @@ Scheme_Object *place_receive(int argc, Scheme_Object *args[]) { ch = NULL; scheme_wrong_contract("place-channel-get", "place-channel?", 0, argc, args); } - return place_async_receive((Scheme_Place_Async_Channel *) ch->recvch); + return place_async_receive((Scheme_Place_Async_Channel *)ch->link->recvch); } static Scheme_Object* place_allowed_p(int argc, Scheme_Object *args[]) @@ -2941,28 +2955,102 @@ Scheme_Place_Async_Channel *place_async_channel_create() { return ch; } +static void async_channel_refcount(Scheme_Place_Async_Channel *ch, int for_send, int delta) +{ + mzrt_mutex_lock(ch->lock); + if (for_send) + ch->wr_ref += delta; + else + ch->rd_ref += delta; + if ((ch->wr_ref < 0) || (ch->rd_ref < 0)) { + scheme_log_abort("internal error: bad reference count on async channel"); + abort(); + } + mzrt_mutex_unlock(ch->lock); +} + +static void bi_channel_refcount(Scheme_Place_Bi_Channel *ch, int delta) +{ + async_channel_refcount(ch->link->sendch, 1, delta); + async_channel_refcount(ch->link->recvch, 0, delta); +} + +static void bi_channel_refcount_down(void *_ch, void *data) +{ + Scheme_Place_Bi_Channel *ch = (Scheme_Place_Bi_Channel *)_ch; + + if (ch->link->prev) + ch->link->prev->next = ch->link->next; + else + place_channel_links = ch->link->next; + if (ch->link->next) + ch->link->next->prev = ch->link->prev; + + bi_channel_refcount(ch, -1); +} + +void scheme_free_place_bi_channels() +{ + Scheme_Place_Bi_Channel_Link *link; + + for (link = place_channel_links; link; link = link->next) { + async_channel_refcount(link->sendch, 1, -1); + async_channel_refcount(link->recvch, 0, -1); + } +} + +static void bi_channel_set_finalizer(Scheme_Place_Bi_Channel *ch) +{ + ch->link->next = place_channel_links; + if (place_channel_links) + place_channel_links->prev = ch->link; + place_channel_links = ch->link; + + scheme_add_finalizer(ch, bi_channel_refcount_down, NULL); +} + + +Scheme_Place_Bi_Channel *place_bi_channel_malloc() { + Scheme_Place_Bi_Channel *ch; + Scheme_Place_Bi_Channel_Link *link; + + ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel); + ch->so.type = scheme_place_bi_channel_type; + + link = (Scheme_Place_Bi_Channel_Link*)scheme_malloc(sizeof(Scheme_Place_Bi_Channel_Link)); + ch->link = link; + + return ch; +} + Scheme_Place_Bi_Channel *place_bi_channel_create() { Scheme_Place_Async_Channel *tmp; Scheme_Place_Bi_Channel *ch; - ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel); - ch->so.type = scheme_place_bi_channel_type; + ch = place_bi_channel_malloc(); tmp = place_async_channel_create(); - ch->sendch = tmp; + ch->link->sendch = tmp; tmp = place_async_channel_create(); - ch->recvch = tmp; + ch->link->recvch = tmp; + + bi_channel_refcount(ch, 1); + bi_channel_set_finalizer(ch); + return ch; } Scheme_Place_Bi_Channel *place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig) { Scheme_Place_Bi_Channel *ch; - ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel); - ch->so.type = scheme_place_bi_channel_type; + ch = place_bi_channel_malloc(); + + ch->link->sendch = orig->link->recvch; + ch->link->recvch = orig->link->sendch; + + bi_channel_refcount(ch, 1); + bi_channel_set_finalizer(ch); - ch->sendch = orig->recvch; - ch->recvch = orig->sendch; return ch; } @@ -3232,7 +3320,8 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S } } -static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr) +static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr, + int *_no_writers) /* The result must not be retained past extraction from `*msg_memory_ptr'! */ { Scheme_Object *msg = NULL; @@ -3259,6 +3348,8 @@ static Scheme_Object *scheme_place_async_try_receive_raw(Scheme_Place_Async_Chan maybe_report_message_size(ch); } } + if (!msg && !ch->wr_ref && _no_writers) + *_no_writers = 1; mzrt_mutex_unlock(ch->lock); *msg_memory_ptr = msg_memory; @@ -3273,12 +3364,12 @@ static void cleanup_msg_memmory(void *thread) { } } -static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) { +static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch, int *_no_writers) { Scheme_Object *msg = NULL; Scheme_Thread *p = scheme_current_thread; GC_CAN_IGNORE void *msg_memory; BEGIN_ESCAPEABLE(cleanup_msg_memmory, p); - msg = scheme_place_async_try_receive_raw(ch, &msg_memory); + msg = scheme_place_async_try_receive_raw(ch, &msg_memory, _no_writers); if (msg) { p->place_channel_msg_in_flight = msg_memory; msg = scheme_places_deserialize(msg, msg_memory); @@ -3294,6 +3385,7 @@ static int scheme_place_async_ch_ready(Scheme_Place_Async_Channel *ch) { { register_place_object_with_channel(ch, (Scheme_Object *) place_object); if (ch->count > 0) ready = 1; + if (!ch->wr_ref) ready = 1; } mzrt_mutex_unlock(ch->lock); return ready; @@ -3319,6 +3411,8 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { Scheme_Object *msg = NULL; Scheme_Object *wrapper; void *msg_memory = NULL; + int no_writers = 0; + if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) { ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) so)->channel; } @@ -3326,7 +3420,8 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { ch = (Scheme_Place_Bi_Channel *)so; } - msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->recvch, &msg_memory); + msg = scheme_place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->link->recvch, + &msg_memory, &no_writers); if (msg != NULL) { Scheme_Object **msg_holder; Scheme_Thread *p = ((Syncing *)(sinfo->current_syncing))->thread; @@ -3343,25 +3438,32 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { return 1; } + + if (no_writers) + return 1; /* wake up to discover that we can give up */ + return 0; } static Scheme_Object *place_async_receive(Scheme_Place_Async_Channel *ch) { Scheme_Object *msg = NULL; - while(1) { - msg = scheme_place_async_try_receive(ch); - if(msg) break; + int no_writers = 0; + + while (1) { + msg = scheme_place_async_try_receive(ch, &no_writers); + if (msg) + break; else { - /* - mzrt_mutex_lock(ch->lock); - register_place_object_with_channel(ch, (Scheme_Object *) place_object); - mzrt_mutex_unlock(ch->lock); - */ - + if (no_writers) { + /* No writers are left for this channel, so suspend the thread */ + scheme_wait_sema(scheme_make_sema(0), 0); + } + scheme_thread_block(0); scheme_block_until((Scheme_Ready_Fun) scheme_place_async_ch_ready, NULL, (Scheme_Object *) ch, 0); } } + return msg; } diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 28dae06770..0c9527af1a 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -4027,11 +4027,12 @@ void scheme_ended_child(); typedef struct Scheme_Place_Async_Channel { Scheme_Object so; - int in; - int out; - int count; - int size; - int delta; + intptr_t in; + intptr_t out; + intptr_t count; + intptr_t size; + intptr_t delta; + intptr_t wr_ref, rd_ref; /* ref counts on readers and writers */ #if defined(MZ_USE_PLACES) mzrt_mutex *lock; #endif @@ -4043,12 +4044,20 @@ typedef struct Scheme_Place_Async_Channel { void *wakeup_signal; } Scheme_Place_Async_Channel; -typedef struct Scheme_Place_Bi_Channel { - Scheme_Object so; +typedef struct Scheme_Place_Bi_Channel_Link { + /* all pointers; allocated as an array */ Scheme_Place_Async_Channel *sendch; Scheme_Place_Async_Channel *recvch; + struct Scheme_Place_Bi_Channel_Link *prev, *next; +} Scheme_Place_Bi_Channel_Link; + +typedef struct Scheme_Place_Bi_Channel { + Scheme_Object so; + Scheme_Place_Bi_Channel_Link *link; } Scheme_Place_Bi_Channel; +void scheme_free_place_bi_channels(); + typedef struct Scheme_Place { Scheme_Object so; struct Scheme_Place_Object *place_obj;