fix GC problem with in-flight place messages
While a place message is received by a thread but not yet deserialized, if the message contains references to objects in the shared space, and if a "master" GC happens (which crosses all places), make sure that the references in the still-serialized message are traversed.
This commit is contained in:
parent
2773737c9c
commit
a9078196b7
27
pkgs/racket-test/tests/racket/place-chmsg-gc-acct.rkt
Normal file
27
pkgs/racket-test/tests/racket/place-chmsg-gc-acct.rkt
Normal file
|
@ -0,0 +1,27 @@
|
|||
#lang racket/base
|
||||
(require racket/place)
|
||||
|
||||
(define (go)
|
||||
(place
|
||||
pch
|
||||
|
||||
;; Trigger memory accounting in every place:
|
||||
#;
|
||||
(custodian-limit-memory
|
||||
(current-custodian)
|
||||
(* 1024 1024 1024))
|
||||
|
||||
(dynamic-require 'tests/racket/place-chmsg-gc #f)
|
||||
|
||||
0))
|
||||
|
||||
(module+ main
|
||||
(define r
|
||||
(map place-wait
|
||||
(for/list ([i 8])
|
||||
(go))))
|
||||
(unless (andmap zero? r)
|
||||
(error "some place failed")))
|
||||
|
||||
|
||||
|
|
@ -1261,6 +1261,7 @@ typedef struct Scheme_Thread {
|
|||
#ifdef MZ_PRECISE_GC
|
||||
struct GC_Thread_Info *gc_info; /* managed by the GC */
|
||||
void *place_channel_msg_in_flight;
|
||||
void *place_channel_msg_chain_in_flight;
|
||||
#endif
|
||||
|
||||
} Scheme_Thread;
|
||||
|
|
|
@ -1890,6 +1890,16 @@ static int thread_val_MARK(void *p, struct NewGC *gc) {
|
|||
gcMARK2(pr->mbox_first, gc);
|
||||
gcMARK2(pr->mbox_last, gc);
|
||||
gcMARK2(pr->mbox_sema, gc);
|
||||
|
||||
/* Follow msg_chain for an in-flight message like in place_async_channel_val */
|
||||
{
|
||||
Scheme_Object *cpr = pr->place_channel_msg_chain_in_flight;
|
||||
while (cpr) {
|
||||
gcMARK2(SCHEME_CAR(cpr), gc);
|
||||
cpr = SCHEME_CDR(cpr);
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
|
||||
}
|
||||
|
@ -2008,6 +2018,16 @@ static int thread_val_FIXUP(void *p, struct NewGC *gc) {
|
|||
gcFIXUP2(pr->mbox_first, gc);
|
||||
gcFIXUP2(pr->mbox_last, gc);
|
||||
gcFIXUP2(pr->mbox_sema, gc);
|
||||
|
||||
/* Follow msg_chain for an in-flight message like in place_async_channel_val */
|
||||
{
|
||||
Scheme_Object *cpr = pr->place_channel_msg_chain_in_flight;
|
||||
while (cpr) {
|
||||
gcFIXUP2(SCHEME_CAR(cpr), gc);
|
||||
cpr = SCHEME_CDR(cpr);
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
|
||||
}
|
||||
|
|
|
@ -797,6 +797,16 @@ thread_val {
|
|||
gcMARK2(pr->mbox_first, gc);
|
||||
gcMARK2(pr->mbox_last, gc);
|
||||
gcMARK2(pr->mbox_sema, gc);
|
||||
|
||||
/* Follow msg_chain for an in-flight message like in place_async_channel_val */
|
||||
{
|
||||
Scheme_Object *cpr = pr->place_channel_msg_chain_in_flight;
|
||||
while (cpr) {
|
||||
gcMARK2(SCHEME_CAR(cpr), gc);
|
||||
cpr = SCHEME_CDR(cpr);
|
||||
}
|
||||
}
|
||||
|
||||
size:
|
||||
gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
|
||||
}
|
||||
|
|
|
@ -1288,10 +1288,10 @@ static Scheme_Object *trivial_copy(Scheme_Object *so, Scheme_Object **master_cha
|
|||
if (SHARED_ALLOCATEDP(so)) {
|
||||
scheme_hash_key(so);
|
||||
if (master_chain) {
|
||||
/* Keep track of all the master-allocated objects that are
|
||||
in a message, so that the corresponding objects can be
|
||||
marked during a master GC, in case on happens before the
|
||||
message is received. */
|
||||
/* Keep track of all the objects that are in a message that
|
||||
refer to master-allocated objects, so that the
|
||||
corresponding objects can be marked during a master GC,
|
||||
in case one happens before the message is received. */
|
||||
Scheme_Object *mc;
|
||||
mc = scheme_make_raw_pair(so, *master_chain);
|
||||
*master_chain = mc;
|
||||
|
@ -3504,12 +3504,14 @@ static void lock_and_register_place_object_with_channel(Scheme_Place_Async_Chann
|
|||
}
|
||||
}
|
||||
|
||||
static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch, void **msg_memory_ptr,
|
||||
static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch,
|
||||
void **msg_memory_ptr,
|
||||
void **msg_chain_ptr,
|
||||
int *_no_writers)
|
||||
/* The result must not be retained past extraction from `*msg_memory_ptr'! */
|
||||
{
|
||||
Scheme_Object *msg = NULL;
|
||||
void *msg_memory = NULL;
|
||||
void *msg_memory = NULL, *msg_chain = NULL;
|
||||
intptr_t sz;
|
||||
|
||||
lock_and_register_place_object_with_channel(ch, (Scheme_Object *) place_object);
|
||||
|
@ -3517,11 +3519,14 @@ static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch
|
|||
if (ch->count > 0) { /* GET MSG */
|
||||
msg = ch->msgs[ch->out];
|
||||
msg_memory = ch->msg_memory[ch->out];
|
||||
msg_chain = ch->msg_chains[ch->out];
|
||||
|
||||
ch->msgs[ch->out] = NULL;
|
||||
ch->msg_memory[ch->out] = NULL;
|
||||
ch->msg_chains[ch->out] = NULL;
|
||||
|
||||
/* No GCs from here until msg_chain is registered */
|
||||
|
||||
--ch->count;
|
||||
ch->out = ((ch->out + 1) % ch->size);
|
||||
|
||||
|
@ -3535,13 +3540,9 @@ static Scheme_Object *place_async_try_receive_raw(Scheme_Place_Async_Channel *ch
|
|||
*_no_writers = 1;
|
||||
mzrt_mutex_unlock(ch->lock);
|
||||
|
||||
if (msg) {
|
||||
intptr_t msg_size;
|
||||
msg_size = GC_message_allocator_size(msg_memory);
|
||||
log_place_event("id %d: get message of %" PRIdPTR " bytes", "get", 1, msg_size);
|
||||
}
|
||||
|
||||
*msg_memory_ptr = msg_memory;
|
||||
*msg_chain_ptr = msg_chain;
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
@ -3550,19 +3551,33 @@ static void cleanup_msg_memmory(void *thread) {
|
|||
if (p->place_channel_msg_in_flight) {
|
||||
GC_destroy_orphan_msg_memory(p->place_channel_msg_in_flight);
|
||||
p->place_channel_msg_in_flight = NULL;
|
||||
p->place_channel_msg_chain_in_flight = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void log_received_msg(Scheme_Object *msg, void *msg_memory)
|
||||
{
|
||||
if (msg) {
|
||||
intptr_t msg_size;
|
||||
msg_size = GC_message_allocator_size(msg_memory);
|
||||
log_place_event("id %d: get message of %" PRIdPTR " bytes", "get", 1, msg_size);
|
||||
}
|
||||
}
|
||||
|
||||
static Scheme_Object *place_async_try_receive(Scheme_Place_Async_Channel *ch, int *_no_writers) {
|
||||
Scheme_Object *msg = NULL;
|
||||
Scheme_Thread *p = scheme_current_thread;
|
||||
GC_CAN_IGNORE void *msg_memory;
|
||||
GC_CAN_IGNORE void *msg_memory, *msg_chain;
|
||||
BEGIN_ESCAPEABLE(cleanup_msg_memmory, p);
|
||||
msg = place_async_try_receive_raw(ch, &msg_memory, _no_writers);
|
||||
msg = place_async_try_receive_raw(ch, &msg_memory, &msg_chain, _no_writers);
|
||||
/* no GCs until msg_chain is registered */
|
||||
if (msg) {
|
||||
p->place_channel_msg_in_flight = msg_memory;
|
||||
p->place_channel_msg_chain_in_flight = msg_chain;
|
||||
log_received_msg(msg, msg_memory);
|
||||
msg = scheme_places_deserialize(msg, msg_memory);
|
||||
p->place_channel_msg_in_flight = NULL;
|
||||
p->place_channel_msg_chain_in_flight = NULL;
|
||||
}
|
||||
END_ESCAPEABLE();
|
||||
return msg;
|
||||
|
@ -3589,6 +3604,7 @@ static Scheme_Object *place_channel_finish_ready(void *d, int argc, struct Schem
|
|||
BEGIN_ESCAPEABLE(cleanup_msg_memmory, p);
|
||||
msg = scheme_places_deserialize(msg, p->place_channel_msg_in_flight);
|
||||
p->place_channel_msg_in_flight = NULL;
|
||||
p->place_channel_msg_chain_in_flight = NULL;
|
||||
END_ESCAPEABLE();
|
||||
|
||||
return msg;
|
||||
|
@ -3598,7 +3614,7 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
|
|||
Scheme_Place_Bi_Channel *ch;
|
||||
Scheme_Object *msg = NULL;
|
||||
Scheme_Object *wrapper;
|
||||
void *msg_memory = NULL;
|
||||
GC_CAN_IGNORE void *msg_memory = NULL, *msg_chain = NULL;
|
||||
int no_writers = 0;
|
||||
|
||||
if (SAME_TYPE(SCHEME_TYPE(so), scheme_place_type)) {
|
||||
|
@ -3609,18 +3625,23 @@ static int place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo) {
|
|||
}
|
||||
|
||||
msg = place_async_try_receive_raw((Scheme_Place_Async_Channel *) ch->link->recvch,
|
||||
&msg_memory, &no_writers);
|
||||
&msg_memory, &msg_chain, &no_writers);
|
||||
/* no GCs until msg_chain is registered */
|
||||
if (msg != NULL) {
|
||||
Scheme_Object **msg_holder;
|
||||
Scheme_Thread *p = ((Syncing *)(sinfo->current_syncing))->thread;
|
||||
|
||||
p->place_channel_msg_in_flight = msg_memory;
|
||||
p->place_channel_msg_chain_in_flight = msg_chain;
|
||||
|
||||
log_received_msg(msg, msg_memory);
|
||||
|
||||
/* Hold `msg' in atomic memory, because we're not allowed to hold onto
|
||||
it beyond release of msg_memory, and `wrapper' and the result
|
||||
flow into the evt system in general. */
|
||||
msg_holder = (Scheme_Object **)scheme_malloc_atomic(sizeof(Scheme_Object*));
|
||||
*msg_holder = msg;
|
||||
|
||||
p->place_channel_msg_in_flight = msg_memory;
|
||||
wrapper = scheme_make_closed_prim(place_channel_finish_ready, msg_holder);
|
||||
scheme_set_sync_target(sinfo, scheme_void, wrapper, NULL, 0, 0, NULL);
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user