more memory accounting fixes for places

Fix memory accounting to detect when messages pile up in a
place channel and when shared values (such as the result of
`make-shared-bytes') pile up. Also fix problems where a GC
or free-page purge needs to be triggered.

The implementation causes a minor API change, which is that
a place channel sent multiple times as a message generates
values that are `equal?' but no longer eq?'.

Closes PR 12273

[Do not merge to 5.2]
This commit is contained in:
Matthew Flatt 2011-10-11 11:20:21 -06:00
parent 4d00b13ce0
commit 5c0956d7b1
10 changed files with 242 additions and 91 deletions

View File

@ -50,11 +50,17 @@ A @tech{place channel} can be used as a @tech{synchronizable event}
can also receive messages with @racket[place-channel-get], and
messages can be sent with @racket[place-channel-put].
Two @tech{place channels} are @racket[equal?] if they are endpoints
for the same underlying channels while both or neither is a
@tech{place descriptor}. @tech{Place channels} can be @racket[equal?]
without being @racket[eq?] after being sent messages through a
@tech{place channel}.
Constraints on messages across a place channel---and therefore on the
kinds of data that places share---enable greater parallelism than
@racket[future], even including separate @tech{garbage collection} of
separate places. At the same time, the setup and communication costs
for places can be higher than for futures.
for places can be higher than for @tech{futures}.
For example, the following expression launches two places, echoes a
message to each, and then waits for the places to terminate:

View File

@ -0,0 +1,24 @@
#lang racket
(require racket/place)
;; Check the interaction of custodian memory limits with
;; * allocating shared arrays, and
;; * putting messages into an channel with no receiver
(define (check shared?)
(for ([i 20])
(printf "iter ~a\n" i)
(let ([c (make-custodian)])
(custodian-limit-memory c (* 1024 1024 10) c)
(parameterize ([current-custodian c])
(thread-wait
(thread
(lambda ()
(define-values (a b) (place-channel))
(for/fold ([v 0]) ([i (in-range 999999999)])
(if shared?
(cons (make-shared-bytes 1024) v)
(place-channel-put b (list 1 2 3 4)))))))))))
(check #t)
(check #t)

View File

@ -545,9 +545,14 @@ GC2_EXTERN intptr_t GC_is_place();
Otherwise returns 0;
*/
GC2_EXTERN uintptr_t GC_message_allocator_size(void *msg_memory);
GC2_EXTERN intptr_t GC_message_objects_size(void *msg_memory);
/*
Returns the total size of all memory allocated by the message allocator
Returns the total size of all objects allocated by the message allocator
*/
GC2_EXTERN intptr_t GC_message_allocator_size(void *msg_memory);
/*
Returns the total size of all memory pages allocated by the message allocator
*/
GC2_EXTERN void GC_dispose_short_message_allocator(void *msg_memory);
@ -561,6 +566,15 @@ GC2_EXTERN void GC_destroy_orphan_msg_memory(void *msg_memory);
the place channels finalizer is called.
*/
GC2_EXTERN void GC_report_unsent_message_delta(intptr_t amt);
/*
Report message-in-flight size changes to the GC. This functionality is exposed,
rather than built into GC_finish_message_allocator(), GC_adpot_message_allocator(),
GC_dispose_short_message_allocator(), and GC_destroy_orphan_msg_memory(), so that
meesages to the master GC can be limited as long as the unsent message tracking
is within a factor of 2 or so.
*/
# ifdef __cplusplus
};

View File

@ -11,8 +11,9 @@ static const int btc_redirect_thread = 511;
static const int btc_redirect_custodian = 510;
static const int btc_redirect_ephemeron = 509;
static const int btc_redirect_cust_box = 508;
static const int btc_redirect_bi_chan = 507;
inline static void account_memory(NewGC *gc, int set, intptr_t amount);
inline static void account_memory(NewGC *gc, int set, intptr_t amount, int to_master);
/*****************************************************************************/
/* thread list */
@ -73,7 +74,7 @@ inline static void mark_threads(NewGC *gc, int owner)
mzrt_mutex_lock(place_obj->lock);
sz = place_obj->memory_use;
mzrt_mutex_unlock(place_obj->lock);
account_memory(gc, owner, gcBYTES_TO_WORDS(sz));
account_memory(gc, owner, gcBYTES_TO_WORDS(sz), 0);
}
#endif
}
@ -138,7 +139,7 @@ inline static int create_blank_owner_set(NewGC *gc)
memcpy(naya, owner_table, old_size*sizeof(OTEntry*));
gc->owner_table = owner_table = naya;
bzero(((char*)owner_table) + (sizeof(OTEntry*) * old_size),
(curr_size - old_size) * sizeof(OTEntry*));
(curr_size - old_size) * sizeof(OTEntry*));
return create_blank_owner_set(gc);
}
@ -201,9 +202,12 @@ inline static int custodian_member_owner_set(NewGC *gc, void *cust, int set)
return 0;
}
inline static void account_memory(NewGC *gc, int set, intptr_t amount)
inline static void account_memory(NewGC *gc, int set, intptr_t amount, int to_master)
{
gc->owner_table[set]->memory_use += amount;
if (to_master)
gc->owner_table[set]->master_memory_use += amount;
else
gc->owner_table[set]->memory_use += amount;
}
inline static void free_owner_set(NewGC *gc, int set)
@ -256,16 +260,23 @@ inline static uintptr_t custodian_usage(NewGC*gc, void *custodian)
owner_table = gc->owner_table;
if (owner_table[i])
retval = owner_table[i]->memory_use;
retval = (owner_table[i]->memory_use + owner_table[i]->master_memory_use);
else
retval = 0;
return gcWORDS_TO_BYTES(retval);
}
inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr)
inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr, int is_a_master_page)
{
GCDEBUG((DEBUGOUTF, "BTC_memory_account_mark: %p/%p\n", page, ptr));
/* In the case of is_a_master_page, whether this place is charged is
a little random: there's no guarantee that the btc_mark values are
in sync, and there are races among places. Approximations are ok for
accounting, though, as long as the probably for completely wrong
accounting is very low. */
if(page->size_class) {
if(page->size_class > 1) {
/* big page */
@ -273,7 +284,7 @@ inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr)
if(info->btc_mark == gc->old_btc_mark) {
info->btc_mark = gc->new_btc_mark;
account_memory(gc, gc->current_mark_owner, gcBYTES_TO_WORDS(page->size));
account_memory(gc, gc->current_mark_owner, gcBYTES_TO_WORDS(page->size), is_a_master_page);
push_ptr(gc, TAG_AS_BIG_PAGE_PTR(ptr));
}
} else {
@ -282,7 +293,7 @@ inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr)
if(info->btc_mark == gc->old_btc_mark) {
info->btc_mark = gc->new_btc_mark;
account_memory(gc, gc->current_mark_owner, info->size);
account_memory(gc, gc->current_mark_owner, info->size, is_a_master_page);
ptr = OBJHEAD_TO_OBJPTR(info);
push_ptr(gc, ptr);
}
@ -292,7 +303,7 @@ inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr)
if(info->btc_mark == gc->old_btc_mark) {
info->btc_mark = gc->new_btc_mark;
account_memory(gc, gc->current_mark_owner, info->size);
account_memory(gc, gc->current_mark_owner, info->size, 0);
push_ptr(gc, ptr);
}
}
@ -352,6 +363,19 @@ int BTC_cust_box_mark(void *p, struct NewGC *gc)
return gc->mark_table[btc_redirect_cust_box](p, gc);
}
int BTC_bi_chan_mark(void *p, struct NewGC *gc)
{
if (gc->doing_memory_accounting) {
Scheme_Place_Bi_Channel *bc = (Scheme_Place_Bi_Channel *)p;
/* Race conditions here on `mem_size', and likely double counting
when the same async channels are accessible from paired bi
channels --- but those approximations are ok for accounting. */
account_memory(gc, gc->current_mark_owner, bc->sendch->mem_size, 0);
account_memory(gc, gc->current_mark_owner, bc->recvch->mem_size, 0);
}
return gc->mark_table[btc_redirect_bi_chan](p, gc);
}
static void btc_overmem_abort(NewGC *gc)
{
gc->kill_propagation_loop = 1;
@ -377,6 +401,7 @@ inline static void BTC_initialize_mark_table(NewGC *gc) {
gc->mark_table[scheme_custodian_type] = BTC_custodian_mark;
gc->mark_table[gc->ephemeron_tag] = BTC_ephemeron_mark;
gc->mark_table[gc->cust_box_tag] = BTC_cust_box_mark;
gc->mark_table[scheme_place_bi_channel_type] = BTC_bi_chan_mark;
}
inline static int BTC_get_redirect_tag(NewGC *gc, int tag) {
@ -384,6 +409,7 @@ inline static int BTC_get_redirect_tag(NewGC *gc, int tag) {
else if (tag == scheme_custodian_type) { tag = btc_redirect_custodian; }
else if (tag == gc->ephemeron_tag) { tag = btc_redirect_ephemeron; }
else if (tag == gc->cust_box_tag) { tag = btc_redirect_cust_box; }
else if (tag == scheme_place_bi_channel_type) { tag = btc_redirect_bi_chan; }
return tag;
}
@ -404,8 +430,13 @@ static void BTC_do_accounting(NewGC *gc)
/* clear the memory use numbers out */
for(i = 1; i < table_size; i++)
if(owner_table[i])
if(owner_table[i]) {
owner_table[i]->memory_use = 0;
#ifdef MZ_USE_PLACES
if (MASTERGC && MASTERGC->major_places_gc)
owner_table[i]->master_memory_use = 0;
#endif
}
/* start with root: */
while (cur->parent && SCHEME_PTR1_VAL(cur->parent)) {
@ -440,6 +471,7 @@ static void BTC_do_accounting(NewGC *gc)
owner_table = gc->owner_table;
owner_table[powner]->memory_use += owner_table[owner]->memory_use;
owner_table[powner]->master_memory_use += owner_table[owner]->master_memory_use;
}
box = cur->global_prev; cur = box ? SCHEME_PTR1_VAL(box) : NULL;

View File

@ -168,6 +168,11 @@ struct Log_Master_Info {
int ran, full;
intptr_t pre_used, post_used, pre_admin, post_admin;
};
#else
# define premaster_or_master_gc(gc) 1
# define premaster_or_place_gc(gc) 1
# define postmaster_and_master_gc(gc) 0
# define postmaster_and_place_gc(gc) 1
#endif
inline static size_t real_page_size(mpage* page);
@ -388,6 +393,14 @@ static void free_pages(NewGC *gc, void *p, size_t len, int type, int expect_mpro
mmu_free_page(gc->mmu, p, len, type, expect_mprotect, src_block, 1);
}
static void check_excessive_free_pages(NewGC *gc) {
/* If we have too many idle pages --- 4 times used pages --- then flush.
We choose 4 instead of 2 for "excessive" because a block cache (when
available) has a fill factor of 2, and flushing will not reduce that. */
if (mmu_memory_allocated(gc->mmu) > ((gc->used_pages << (LOG_APAGE_SIZE + 2)))) {
mmu_flush_freed_pages(gc->mmu);
}
}
static void free_orphaned_page(NewGC *gc, mpage *tmp) {
/* free_pages decrements gc->used_pages which is incorrect, since this is an orphaned page,
@ -398,6 +411,7 @@ static void free_orphaned_page(NewGC *gc, mpage *tmp) {
&tmp->mmu_src_block,
0); /* don't adjust count, since we're failing to adopt it */
free_mpage(tmp);
check_excessive_free_pages(gc);
}
@ -844,8 +858,20 @@ static inline void* REMOVE_BIG_PAGE_PTR_TAG(void *p) {
void GC_check_master_gc_request() {
#ifdef MZ_USE_PLACES
if (MASTERGC && MASTERGC->major_places_gc == 1) {
GC_gcollect();
NewGC *mgc = MASTERGC;
if (mgc) {
/* check for GC needed due to GC_report_unsent_message_delta(): */
if ((mgc->gen0.current_size + mgc->pending_msg_size) >= mgc->gen0.max_size) {
NewGC *saved_gc;
saved_gc = GC_switch_to_master_gc();
master_collect_initiate(mgc);
GC_switch_back_from_master(saved_gc);
}
if (mgc->major_places_gc == 1) {
GC_gcollect();
}
}
#endif
}
@ -861,14 +887,12 @@ static inline void gc_if_needed_account_alloc_size(NewGC *gc, size_t allocate_si
#ifdef MZ_USE_PLACES
if (postmaster_and_master_gc(gc)) {
master_collect_initiate(gc);
}
else {
#endif
if (!gc->dumping_avoid_collection)
garbage_collect(gc, 0, 0, NULL);
#ifdef MZ_USE_PLACES
}
} else
#endif
{
if (!gc->dumping_avoid_collection)
garbage_collect(gc, 0, 0, NULL);
}
}
gc->gen0.current_size += allocate_size;
}
@ -886,17 +910,13 @@ static void *allocate_big(const size_t request_size_bytes, int type)
#ifdef NEWGC_BTC_ACCOUNT
if(GC_out_of_memory) {
#ifdef MZ_USE_PLACES
if (premaster_or_place_gc(gc)) {
#endif
if (BTC_single_allocation_limit(gc, request_size_bytes)) {
/* We're allowed to fail. Check for allocations that exceed a single-time
limit. See BTC_single_allocation_limit() for more information. */
GC_out_of_memory();
}
#ifdef MZ_USE_PLACES
}
#endif
}
#endif
@ -1439,6 +1459,20 @@ void GC_create_message_allocator() {
gc->dumping_avoid_collection++;
}
void GC_report_unsent_message_delta(intptr_t amt)
{
#ifdef MZ_USE_PLACES
NewGC *mgc = MASTERGC;
if (!mgc) return;
while (!mzrt_cas(&mgc->pending_msg_size,
mgc->pending_msg_size,
mgc->pending_msg_size + amt)) {
}
#endif
}
void *GC_finish_message_allocator() {
NewGC *gc = GC_get_GC();
Allocator *a = gc->saved_allocator;
@ -1517,7 +1551,7 @@ void GC_adopt_message_allocator(void *param) {
gc_if_needed_account_alloc_size(gc, 0);
}
uintptr_t GC_message_allocator_size(void *param) {
intptr_t GC_message_objects_size(void *param) {
MsgMemory *msgm = (MsgMemory *) param;
if (!msgm) { return sizeof(param); }
if (msgm->big_pages && msgm->size < 1024) {
@ -1527,6 +1561,13 @@ uintptr_t GC_message_allocator_size(void *param) {
return msgm->size;
}
intptr_t GC_message_allocator_size(void *param) {
MsgMemory *msgm = (MsgMemory *) param;
if (!msgm) { return sizeof(param); }
/* approximate extra size in allocation page by just adding one: */
return msgm->size + APAGE_SIZE;
}
void GC_dispose_short_message_allocator(void *param) {
NewGC *gc = GC_get_GC();
mpage *tmp;
@ -2906,9 +2947,7 @@ static void promote_marked_gen0_big_page(NewGC *gc, mpage *page) {
/* This is the first mark routine. It's a bit complicated. */
void GC_mark2(const void *const_p, struct NewGC *gc)
{
#ifdef MZ_USE_PLACES
int is_a_master_page = 0;
#endif
mpage *page;
void *p = (void*)const_p;
@ -2919,44 +2958,42 @@ void GC_mark2(const void *const_p, struct NewGC *gc)
if(!(page = pagemap_find_page(gc->page_maps, p))) {
#ifdef MZ_USE_PLACES
if (!MASTERGC || !MASTERGC->major_places_gc || !(page = pagemap_find_page(MASTERGC->page_maps, p)))
if (MASTERGC && MASTERGC->major_places_gc && (page = pagemap_find_page(MASTERGC->page_maps, p))) {
is_a_master_page = 1;
} else
#endif
{
#ifdef POINTER_OWNERSHIP_CHECK
mzrt_rwlock_wrlock(MASTERGCINFO->cangc);
{
int i;
int size = MASTERGCINFO->size;
for (i = 1; i < size; i++) {
if (gc->place_id != i
&& MASTERGCINFO->signal_fds[i] != (void*) REAPED_SLOT_AVAILABLE
&& MASTERGCINFO->signal_fds[i] != (void*) CREATED_BUT_NOT_REGISTERED
&& MASTERGCINFO->signal_fds[i] != (void*) SIGNALED_BUT_NOT_REGISTERED) {
if((page = pagemap_find_page(MASTERGCINFO->places_gcs[i]->page_maps, p))) {
printf("%p is owned by place %i not the current place %i\n", p, i, gc->place_id);
asm("int3");
#ifdef POINTER_OWNERSHIP_CHECK
mzrt_rwlock_wrlock(MASTERGCINFO->cangc);
{
int i;
int size = MASTERGCINFO->size;
for (i = 1; i < size; i++) {
if (gc->place_id != i
&& MASTERGCINFO->signal_fds[i] != (void*) REAPED_SLOT_AVAILABLE
&& MASTERGCINFO->signal_fds[i] != (void*) CREATED_BUT_NOT_REGISTERED
&& MASTERGCINFO->signal_fds[i] != (void*) SIGNALED_BUT_NOT_REGISTERED) {
if((page = pagemap_find_page(MASTERGCINFO->places_gcs[i]->page_maps, p))) {
printf("%p is owned by place %i not the current place %i\n", p, i, gc->place_id);
asm("int3");
}
}
}
}
mzrt_rwlock_unlock(MASTERGCINFO->cangc);
#endif
GCDEBUG((DEBUGOUTF,"Not marking %p (no page)\n",p));
return;
}
mzrt_rwlock_unlock(MASTERGCINFO->cangc);
#endif
GCDEBUG((DEBUGOUTF,"Not marking %p (no page)\n",p));
return;
}
#ifdef MZ_USE_PLACES
else {
is_a_master_page = 1;
}
#endif
}
#ifdef NEWGC_BTC_ACCOUNT
/* toss this over to the BTC mark routine if we're doing accounting */
if(gc->doing_memory_accounting) {
#ifdef NEWGC_BTC_ACCOUNT
BTC_memory_account_mark(gc, page, p); return;
#endif
BTC_memory_account_mark(gc, page, p, is_a_master_page);
return;
}
#endif
/* MED OR BIG PAGE */
if(page->size_class) {
@ -2972,14 +3009,8 @@ void GC_mark2(const void *const_p, struct NewGC *gc)
page->size_class = 3;
/* if this is in the nursery, we want to move it out of the nursery */
#ifdef MZ_USE_PLACES
if(!page->generation && !is_a_master_page)
#else
if(!page->generation)
#endif
{
promote_marked_gen0_big_page(gc, page);
}
page->marked_on = 1;
record_backtrace(page, BIG_PAGE_TO_OBJECT(page));
@ -4416,9 +4447,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log
mark_roots(gc);
mark_immobiles(gc);
TIME_STEP("rooted");
#ifdef MZ_USE_PLACES
if (premaster_or_place_gc(gc))
#endif
GC_mark_variable_stack(GC_variable_stack, 0, get_stack_base(gc), NULL);
#ifdef MZ_USE_PLACES
if (postmaster_and_master_gc(gc))
@ -4471,9 +4500,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log
TIME_STEP("finalized2");
if(gc->gc_full)
#ifdef MZ_USE_PLACES
if (premaster_or_place_gc(gc) || switching_master)
#endif
do_heap_compact(gc);
TIME_STEP("compacted");
@ -4487,9 +4514,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log
repair_finalizer_structs(gc);
repair_roots(gc);
repair_immobiles(gc);
#ifdef MZ_USE_PLACES
if (premaster_or_place_gc(gc))
#endif
GC_fixup_variable_stack(GC_variable_stack, 0, get_stack_base(gc), NULL);
TIME_STEP("reparied roots");
repair_heap(gc);
@ -4505,7 +4530,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log
reset_nursery(gc);
TIME_STEP("reset nursurey");
#ifdef NEWGC_BTC_ACCOUNT
if (gc->gc_full)
if (gc->gc_full && postmaster_and_place_gc(gc))
BTC_do_accounting(gc);
#endif
TIME_STEP("accounted");
@ -4537,10 +4562,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log
gc->no_further_modifications = 0;
/* If we have too many idle pages, flush: */
if (mmu_memory_allocated(gc->mmu) > ((gc->used_pages << (LOG_APAGE_SIZE + 1)))) {
mmu_flush_freed_pages(gc->mmu);
}
check_excessive_free_pages(gc);
/* update some statistics */
if(gc->gc_full) gc->num_major_collects++; else gc->num_minor_collects++;

View File

@ -86,6 +86,7 @@ typedef struct OTEntry {
void *originator;
void **members;
uintptr_t memory_use;
uintptr_t master_memory_use;
uintptr_t single_time_limit;
uintptr_t super_required;
char limit_set;
@ -163,6 +164,7 @@ typedef struct NewGC {
uintptr_t actual_pages_size;
void (*unsafe_allocation_abort)(struct NewGC *);
uintptr_t memory_in_use; /* the amount of memory in use */
uintptr_t pending_msg_size; /* set in master, only */
/* blame the child thread infos */
GC_Thread_Info *thread_infos;

View File

@ -618,6 +618,12 @@ int is_equal (Scheme_Object *obj1, Scheme_Object *obj2, Equal_Info *eql)
obj1 = SCHEME_PTR_VAL(obj1);
obj2 = SCHEME_PTR_VAL(obj2);
goto top;
} else if (t1 == scheme_place_bi_channel_type) {
Scheme_Place_Bi_Channel *bc1, *bc2;
bc1 = (Scheme_Place_Bi_Channel *)obj1;
bc2 = (Scheme_Place_Bi_Channel *)obj2;
return (SAME_OBJ(bc1->recvch, bc2->recvch)
&& SAME_OBJ(bc1->sendch, bc2->sendch));
} else if (!eql->for_chaperone && ((t1 == scheme_chaperone_type)
|| (t1 == scheme_proc_chaperone_type))) {
/* both chaperones */

View File

@ -1377,6 +1377,14 @@ static uintptr_t equal_hash_key(Scheme_Object *o, uintptr_t k, Hash_Info *hi)
o = SCHEME_PTR_VAL(o);
}
break;
case scheme_place_bi_channel_type:
{
k += 7;
/* a bi channel has sendch and recvch, but
sends are the same iff recvs are the same: */
o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch;
}
break;
default:
{
Scheme_Primary_Hash_Proc h1 = scheme_type_hash1s[t];
@ -1752,6 +1760,11 @@ static uintptr_t equal_hash_key2(Scheme_Object *o, Hash_Info *hi)
/* Needed for interning */
o = SCHEME_PTR_VAL(o);
goto top;
case scheme_place_bi_channel_type:
/* a bi channel has sendch and recvch, but
sends are the same iff recvs are the same: */
o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch;
goto top;
default:
{
Scheme_Secondary_Hash_Proc h2 = scheme_type_hash2s[t];

View File

@ -1120,13 +1120,6 @@ static Scheme_Object *trivial_copy(Scheme_Object *so)
case scheme_null_type:
case scheme_void_type:
return so;
case scheme_place_type:
scheme_hash_key(((Scheme_Place *) so)->channel);
return ((Scheme_Place *) so)->channel;
break;
case scheme_place_bi_channel_type: /* allocated in the master and can be passed along as is */
scheme_hash_key(so);
return so;
case scheme_byte_string_type:
case scheme_flvector_type:
case scheme_fxvector_type:
@ -1150,6 +1143,19 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
new_so = so;
switch (SCHEME_TYPE(so)) {
case scheme_place_type:
so = ((Scheme_Place *) so)->channel;
new_so = so;
case scheme_place_bi_channel_type: /* ^^^ fall through ^^* */
if (copy_mode) {
Scheme_Place_Bi_Channel *ch;
ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel);
ch->so.type = scheme_place_bi_channel_type;
ch->sendch = ((Scheme_Place_Bi_Channel *)so)->sendch;
ch->recvch = ((Scheme_Place_Bi_Channel *)so)->recvch;
new_so = (Scheme_Object *)ch;
}
break;
case scheme_char_type:
if (copy_mode)
new_so = scheme_make_char(SCHEME_CHAR_VAL(so));
@ -2182,12 +2188,7 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
a[0] = places_deep_uncopy(place_data->module);
a[1] = places_deep_uncopy(place_data->function);
a[1] = scheme_intern_exact_symbol(SCHEME_SYM_VAL(a[1]), SCHEME_SYM_LEN(a[1]));
if (!SAME_TYPE(SCHEME_TYPE(place_data->channel), scheme_place_bi_channel_type)) {
channel = places_deep_uncopy(place_data->channel);
}
else {
channel = place_data->channel;
}
channel = places_deep_uncopy(place_data->channel);
place_obj = place_data->place_obj;
REGISTER_SO(place_object);
place_object = place_obj;
@ -2344,7 +2345,7 @@ Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) {
if (new_so) return new_so;
/* small messages are deemed to be < 1k, this could be tuned in either direction */
if (GC_message_allocator_size(msg_memory) < 1024) {
if (GC_message_objects_size(msg_memory) < 1024) {
new_so = do_places_deep_copy(so, mzPDC_UNCOPY, 1);
GC_dispose_short_message_allocator(msg_memory);
}
@ -2443,11 +2444,28 @@ static void* GC_master_malloc_tagged(size_t size) {
return ptr;
}
static void maybe_report_message_size(Scheme_Place_Async_Channel *ch)
{
#ifdef MZ_PRECISE_GC
if ((ch->reported_size > (2 * ch->mem_size))
|| (((ch->reported_size * 2) < ch->mem_size)
&& ((ch->mem_size - ch->reported_size) > (1 << (LOG_APAGE_SIZE + 1))))) {
intptr_t delta = ch->mem_size - ch->reported_size;
ch->reported_size = ch->mem_size;
GC_report_unsent_message_delta(delta);
}
#endif
}
static void async_channel_finalize(void *p, void* data) {
Scheme_Place_Async_Channel *ch;
int i;
Scheme_Hash_Table *ht = NULL;
ch = (Scheme_Place_Async_Channel*)p;
ch->mem_size = 0;
maybe_report_message_size(ch);
mzrt_mutex_destroy(ch->lock);
for (i = 0; i < ch->size ; i++) {
ht = NULL;
@ -2543,7 +2561,7 @@ Scheme_Place_Bi_Channel *place_bi_channel_create() {
Scheme_Place_Async_Channel *tmp;
Scheme_Place_Bi_Channel *ch;
ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel));
ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel);
ch->so.type = scheme_place_bi_channel_type;
tmp = place_async_channel_create();
@ -2556,7 +2574,7 @@ Scheme_Place_Bi_Channel *place_bi_channel_create() {
Scheme_Place_Bi_Channel *place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig) {
Scheme_Place_Bi_Channel *ch;
ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel));
ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel);
ch->so.type = scheme_place_bi_channel_type;
ch->sendch = orig->recvch;
@ -2595,6 +2613,7 @@ static Scheme_Object *GC_master_make_vector(int size) {
static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) {
void *msg_memory = NULL;
Scheme_Object *o;
intptr_t sz;
int cnt;
o = scheme_places_serialize(uo, &msg_memory);
@ -2634,6 +2653,11 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
ch->msg_memory[ch->in] = msg_memory;
++ch->count;
ch->in = ((ch->in + 1) % ch->size);
sz = GC_message_allocator_size(msg_memory);
ch->mem_size += sz;
maybe_report_message_size(ch);
}
if (!cnt && ch->wakeup_signal) {
@ -2802,6 +2826,7 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) {
Scheme_Object *msg = NULL;
void *msg_memory = NULL;
intptr_t sz;
mzrt_mutex_lock(ch->lock);
{
@ -2815,10 +2840,15 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel
--ch->count;
ch->out = ((ch->out + 1) % ch->size);
sz = GC_message_allocator_size(msg_memory);
ch->mem_size -= sz;
maybe_report_message_size(ch);
}
}
mzrt_mutex_unlock(ch->lock);
if (msg) {
return scheme_places_deserialize(msg, msg_memory);
}

View File

@ -3673,6 +3673,8 @@ typedef struct Scheme_Place_Async_Channel {
#endif
Scheme_Object **msgs;
void **msg_memory;
intptr_t mem_size;
intptr_t reported_size; /* size reported to master GC; avoid reporting too often */
void *wakeup_signal;
} Scheme_Place_Async_Channel;