diff --git a/pkgs/racket-test/tests/racket/place-chmsg-gc-acct.rkt b/pkgs/racket-test/tests/racket/place-chmsg-gc-acct.rkt new file mode 100644 index 0000000000..1bcb74dce5 --- /dev/null +++ b/pkgs/racket-test/tests/racket/place-chmsg-gc-acct.rkt @@ -0,0 +1,27 @@ +#lang racket/base +(require racket/place) + +(define (go) + (place + pch + + ;; Trigger memory accounting in every place: + #; + (custodian-limit-memory + (current-custodian) + (* 1024 1024 1024)) + + (dynamic-require 'tests/racket/place-chmsg-gc #f) + + 0)) + +(module+ main + (define r + (map place-wait + (for/list ([i 8]) + (go)))) + (unless (andmap zero? r) + (error "some place failed"))) + + + diff --git a/racket/src/racket/include/scheme.h b/racket/src/racket/include/scheme.h index 71f7620a31..e2f3222f93 100644 --- a/racket/src/racket/include/scheme.h +++ b/racket/src/racket/include/scheme.h @@ -1261,6 +1261,7 @@ typedef struct Scheme_Thread { #ifdef MZ_PRECISE_GC struct GC_Thread_Info *gc_info; /* managed by the GC */ void *place_channel_msg_in_flight; + void *place_channel_msg_chain_in_flight; #endif } Scheme_Thread; diff --git a/racket/src/racket/src/mzmark_type.inc b/racket/src/racket/src/mzmark_type.inc index 1e35671a42..605e38e480 100644 --- a/racket/src/racket/src/mzmark_type.inc +++ b/racket/src/racket/src/mzmark_type.inc @@ -1890,6 +1890,16 @@ static int thread_val_MARK(void *p, struct NewGC *gc) { gcMARK2(pr->mbox_first, gc); gcMARK2(pr->mbox_last, gc); gcMARK2(pr->mbox_sema, gc); + + /* Follow msg_chain for an in-flight message like in place_async_channel_val */ + { + Scheme_Object *cpr = pr->place_channel_msg_chain_in_flight; + while (cpr) { + gcMARK2(SCHEME_CAR(cpr), gc); + cpr = SCHEME_CDR(cpr); + } + } + return gcBYTES_TO_WORDS(sizeof(Scheme_Thread)); } @@ -2008,6 +2018,16 @@ static int thread_val_FIXUP(void *p, struct NewGC *gc) { gcFIXUP2(pr->mbox_first, gc); gcFIXUP2(pr->mbox_last, gc); gcFIXUP2(pr->mbox_sema, gc); + + /* Follow msg_chain for an in-flight message like in place_async_channel_val */ + { + Scheme_Object *cpr = pr->place_channel_msg_chain_in_flight; + while (cpr) { + gcFIXUP2(SCHEME_CAR(cpr), gc); + cpr = SCHEME_CDR(cpr); + } + } + return gcBYTES_TO_WORDS(sizeof(Scheme_Thread)); } diff --git a/racket/src/racket/src/mzmarksrc.c b/racket/src/racket/src/mzmarksrc.c index 3a05bbc505..7e56ae0bbe 100644 --- a/racket/src/racket/src/mzmarksrc.c +++ b/racket/src/racket/src/mzmarksrc.c @@ -797,6 +797,16 @@ thread_val { gcMARK2(pr->mbox_first, gc); gcMARK2(pr->mbox_last, gc); gcMARK2(pr->mbox_sema, gc); + + /* Follow msg_chain for an in-flight message like in place_async_channel_val */ + { + Scheme_Object *cpr = pr->place_channel_msg_chain_in_flight; + while (cpr) { + gcMARK2(SCHEME_CAR(cpr), gc); + cpr = SCHEME_CDR(cpr); + } + } + size: gcBYTES_TO_WORDS(sizeof(Scheme_Thread)); } diff --git a/racket/src/racket/src/place.c b/racket/src/racket/src/place.c index cb7c64eac3..c17c84f42c 100644 --- a/racket/src/racket/src/place.c +++ b/racket/src/racket/src/place.c @@ -1288,10 +1288,10 @@ static Scheme_Object *trivial_copy(Scheme_Object *so, Scheme_Object **master_cha if (SHARED_ALLOCATEDP(so)) { scheme_hash_key(so); if (master_chain) { - /* Keep track of all the master-allocated objects that are - in a message, so that the corresponding objects can be - marked during a master GC, in case on happens before the - message is received. */ + /* Keep track of all the objects that are in a message that + refer to master-allocated objects, so that the + corresponding objects can be marked during a master GC, + in case one happens before the message is received. */ Scheme_Object *mc; mc = scheme_make_raw_pair(so, *master_chain); *master_chain = mc; @@ -3504,12 +3504,14 @@ static void lock_and_register_place_object_with_channel(Scheme_Place_Async_Chann } } -static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr, +static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, + void **msg_memory_ptr, + void **msg_chain_ptr, int *_no_writers) /* The result must not be retained past extraction from `*msg_memory_ptr'! */ { Scheme_Object *msg = NULL; - void *msg_memory = NULL; + void *msg_memory = NULL, *msg_chain = NULL; intptr_t sz; lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object); @@ -3517,11 +3519,14 @@ static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch if (ch->count > 0) { /* GET MSG */ msg = ch->msgs[ch->out]; msg_memory = ch->msg_memory[ch->out]; + msg_chain = ch->msg_chains[ch->out]; ch->msgs[ch->out] = NULL; ch->msg_memory[ch->out] = NULL; ch->msg_chains[ch->out] = NULL; + /* No GCs from here until msg_chain is registered */ + --ch->count; ch->out = ((ch->out + 1) % ch->size); @@ -3535,13 +3540,9 @@ static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch *_no_writers = 1; mzrt_mutex_unlock(ch->lock); - if (msg) { - intptr_t msg_size; - msg_size = GC_message_allocator_size(msg_memory); - log_place_event("id %d: get message of %" PRIdPTR " bytes", "get", 1, msg_size); - } - *msg_memory_ptr = msg_memory; + *msg_chain_ptr = msg_chain; + return msg; } @@ -3550,19 +3551,33 @@ static void cleanup_msg_memmory(void *thread) { if (p->place_channel_msg_in_flight) { GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight); p->place_channel_msg_in_flight = NULL; + p->place_channel_msg_chain_in_flight = NULL; + } +} + +static void log_received_msg(Scheme_Object *msg, void *msg_memory) +{ + if (msg) { + intptr_t msg_size; + msg_size = GC_message_allocator_size(msg_memory); + log_place_event("id %d: get message of %" PRIdPTR " bytes", "get", 1, msg_size); } } static Scheme_Object *place_async_try_receive(Scheme_Place_Async_Channel *ch, int *_no_writers) { Scheme_Object *msg = NULL; Scheme_Thread *p = scheme_current_thread; - GC_CAN_IGNORE void *msg_memory; + GC_CAN_IGNORE void *msg_memory, *msg_chain; BEGIN_ESCAPEABLE(cleanup_msg_memmory, p); - msg = place_async_try_receive_raw(ch, &msg_memory, _no_writers); + msg = place_async_try_receive_raw(ch, &msg_memory, &msg_chain, _no_writers); + /* no GCs until msg_chain is registered */ if (msg) { p->place_channel_msg_in_flight = msg_memory; + p->place_channel_msg_chain_in_flight = msg_chain; + log_received_msg(msg, msg_memory); msg = scheme_places_deserialize(msg, msg_memory); p->place_channel_msg_in_flight = NULL; + p->place_channel_msg_chain_in_flight = NULL; } END_ESCAPEABLE(); return msg; @@ -3589,6 +3604,7 @@ static Scheme_Object *place_channel_finish_ready(void *d, int argc, struct Schem BEGIN_ESCAPEABLE(cleanup_msg_memmory, p); msg = scheme_places_deserialize(msg, p->place_channel_msg_in_flight); p->place_channel_msg_in_flight = NULL; + p->place_channel_msg_chain_in_flight = NULL; END_ESCAPEABLE(); return msg; @@ -3598,7 +3614,7 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { Scheme_Place_Bi_Channel *ch; Scheme_Object *msg = NULL; Scheme_Object *wrapper; - void *msg_memory = NULL; + GC_CAN_IGNORE void *msg_memory = NULL, *msg_chain = NULL; int no_writers = 0; if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) { @@ -3609,18 +3625,23 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) { } msg = place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->link->recvch, - &msg_memory, &no_writers); + &msg_memory, &msg_chain, &no_writers); + /* no GCs until msg_chain is registered */ if (msg != NULL) { Scheme_Object **msg_holder; Scheme_Thread *p = ((Syncing *)(sinfo->current_syncing))->thread; + p->place_channel_msg_in_flight = msg_memory; + p->place_channel_msg_chain_in_flight = msg_chain; + + log_received_msg(msg, msg_memory); + /* Hold `msg' in atomic memory, because we're not allowed to hold onto it beyond release of msg_memory, and `wrapper' and the result flow into the evt system in general. */ msg_holder = (Scheme_Object **)scheme_malloc_atomic(sizeof(Scheme_Object*)); *msg_holder = msg; - p->place_channel_msg_in_flight = msg_memory; wrapper = scheme_make_closed_prim(place_channel_finish_ready, msg_holder); scheme_set_sync_target(sinfo, scheme_void, wrapper, NULL, 0, 0, NULL);