diff --git a/collects/scribblings/reference/places.scrbl b/collects/scribblings/reference/places.scrbl index 87980decf6..ef748c08ec 100644 --- a/collects/scribblings/reference/places.scrbl +++ b/collects/scribblings/reference/places.scrbl @@ -50,11 +50,17 @@ A @tech{place channel} can be used as a @tech{synchronizable event} can also receive messages with @racket[place-channel-get], and messages can be sent with @racket[place-channel-put]. +Two @tech{place channels} are @racket[equal?] if they are endpoints +for the same underlying channels while both or neither is a +@tech{place descriptor}. @tech{Place channels} can be @racket[equal?] +without being @racket[eq?] after being sent messages through a +@tech{place channel}. + Constraints on messages across a place channel---and therefore on the kinds of data that places share---enable greater parallelism than @racket[future], even including separate @tech{garbage collection} of separate places. At the same time, the setup and communication costs -for places can be higher than for futures. +for places can be higher than for @tech{futures}. For example, the following expression launches two places, echoes a message to each, and then waits for the places to terminate: diff --git a/collects/tests/racket/place-channel-limits.rkt b/collects/tests/racket/place-channel-limits.rkt new file mode 100644 index 0000000000..7bbe38e714 --- /dev/null +++ b/collects/tests/racket/place-channel-limits.rkt @@ -0,0 +1,24 @@ +#lang racket +(require racket/place) + +;; Check the interaction of custodian memory limits with +;; * allocating shared arrays, and +;; * putting messages into an channel with no receiver + +(define (check shared?) + (for ([i 20]) + (printf "iter ~a\n" i) + (let ([c (make-custodian)]) + (custodian-limit-memory c (* 1024 1024 10) c) + (parameterize ([current-custodian c]) + (thread-wait + (thread + (lambda () + (define-values (a b) (place-channel)) + (for/fold ([v 0]) ([i (in-range 999999999)]) + (if shared? + (cons (make-shared-bytes 1024) v) + (place-channel-put b (list 1 2 3 4))))))))))) + +(check #t) +(check #t) diff --git a/src/racket/gc2/gc2.h b/src/racket/gc2/gc2.h index f345ed52e6..a289965c8e 100644 --- a/src/racket/gc2/gc2.h +++ b/src/racket/gc2/gc2.h @@ -545,9 +545,14 @@ GC2_EXTERN intptr_t GC_is_place(); Otherwise returns 0; */ -GC2_EXTERN uintptr_t GC_message_allocator_size(void *msg_memory); +GC2_EXTERN intptr_t GC_message_objects_size(void *msg_memory); /* - Returns the total size of all memory allocated by the message allocator + Returns the total size of all objects allocated by the message allocator + */ + +GC2_EXTERN intptr_t GC_message_allocator_size(void *msg_memory); +/* + Returns the total size of all memory pages allocated by the message allocator */ GC2_EXTERN void GC_dispose_short_message_allocator(void *msg_memory); @@ -561,6 +566,15 @@ GC2_EXTERN void GC_destroy_orphan_msg_memory(void *msg_memory); the place channels finalizer is called. */ +GC2_EXTERN void GC_report_unsent_message_delta(intptr_t amt); +/* + Report message-in-flight size changes to the GC. This functionality is exposed, + rather than built into GC_finish_message_allocator(), GC_adpot_message_allocator(), + GC_dispose_short_message_allocator(), and GC_destroy_orphan_msg_memory(), so that + meesages to the master GC can be limited as long as the unsent message tracking + is within a factor of 2 or so. + */ + # ifdef __cplusplus }; diff --git a/src/racket/gc2/mem_account.c b/src/racket/gc2/mem_account.c index 565484c77a..b1daa2ea75 100644 --- a/src/racket/gc2/mem_account.c +++ b/src/racket/gc2/mem_account.c @@ -11,8 +11,9 @@ static const int btc_redirect_thread = 511; static const int btc_redirect_custodian = 510; static const int btc_redirect_ephemeron = 509; static const int btc_redirect_cust_box = 508; +static const int btc_redirect_bi_chan = 507; -inline static void account_memory(NewGC *gc, int set, intptr_t amount); +inline static void account_memory(NewGC *gc, int set, intptr_t amount, int to_master); /*****************************************************************************/ /* thread list */ @@ -73,7 +74,7 @@ inline static void mark_threads(NewGC *gc, int owner) mzrt_mutex_lock(place_obj->lock); sz = place_obj->memory_use; mzrt_mutex_unlock(place_obj->lock); - account_memory(gc, owner, gcBYTES_TO_WORDS(sz)); + account_memory(gc, owner, gcBYTES_TO_WORDS(sz), 0); } #endif } @@ -138,7 +139,7 @@ inline static int create_blank_owner_set(NewGC *gc) memcpy(naya, owner_table, old_size*sizeof(OTEntry*)); gc->owner_table = owner_table = naya; bzero(((char*)owner_table) + (sizeof(OTEntry*) * old_size), - (curr_size - old_size) * sizeof(OTEntry*)); + (curr_size - old_size) * sizeof(OTEntry*)); return create_blank_owner_set(gc); } @@ -201,9 +202,12 @@ inline static int custodian_member_owner_set(NewGC *gc, void *cust, int set) return 0; } -inline static void account_memory(NewGC *gc, int set, intptr_t amount) +inline static void account_memory(NewGC *gc, int set, intptr_t amount, int to_master) { - gc->owner_table[set]->memory_use += amount; + if (to_master) + gc->owner_table[set]->master_memory_use += amount; + else + gc->owner_table[set]->memory_use += amount; } inline static void free_owner_set(NewGC *gc, int set) @@ -256,16 +260,23 @@ inline static uintptr_t custodian_usage(NewGC*gc, void *custodian) owner_table = gc->owner_table; if (owner_table[i]) - retval = owner_table[i]->memory_use; + retval = (owner_table[i]->memory_use + owner_table[i]->master_memory_use); else retval = 0; return gcWORDS_TO_BYTES(retval); } -inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr) +inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr, int is_a_master_page) { GCDEBUG((DEBUGOUTF, "BTC_memory_account_mark: %p/%p\n", page, ptr)); + + /* In the case of is_a_master_page, whether this place is charged is + a little random: there's no guarantee that the btc_mark values are + in sync, and there are races among places. Approximations are ok for + accounting, though, as long as the probably for completely wrong + accounting is very low. */ + if(page->size_class) { if(page->size_class > 1) { /* big page */ @@ -273,7 +284,7 @@ inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr) if(info->btc_mark == gc->old_btc_mark) { info->btc_mark = gc->new_btc_mark; - account_memory(gc, gc->current_mark_owner, gcBYTES_TO_WORDS(page->size)); + account_memory(gc, gc->current_mark_owner, gcBYTES_TO_WORDS(page->size), is_a_master_page); push_ptr(gc, TAG_AS_BIG_PAGE_PTR(ptr)); } } else { @@ -282,7 +293,7 @@ inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr) if(info->btc_mark == gc->old_btc_mark) { info->btc_mark = gc->new_btc_mark; - account_memory(gc, gc->current_mark_owner, info->size); + account_memory(gc, gc->current_mark_owner, info->size, is_a_master_page); ptr = OBJHEAD_TO_OBJPTR(info); push_ptr(gc, ptr); } @@ -292,7 +303,7 @@ inline static void BTC_memory_account_mark(NewGC *gc, mpage *page, void *ptr) if(info->btc_mark == gc->old_btc_mark) { info->btc_mark = gc->new_btc_mark; - account_memory(gc, gc->current_mark_owner, info->size); + account_memory(gc, gc->current_mark_owner, info->size, 0); push_ptr(gc, ptr); } } @@ -352,6 +363,19 @@ int BTC_cust_box_mark(void *p, struct NewGC *gc) return gc->mark_table[btc_redirect_cust_box](p, gc); } +int BTC_bi_chan_mark(void *p, struct NewGC *gc) +{ + if (gc->doing_memory_accounting) { + Scheme_Place_Bi_Channel *bc = (Scheme_Place_Bi_Channel *)p; + /* Race conditions here on `mem_size', and likely double counting + when the same async channels are accessible from paired bi + channels --- but those approximations are ok for accounting. */ + account_memory(gc, gc->current_mark_owner, bc->sendch->mem_size, 0); + account_memory(gc, gc->current_mark_owner, bc->recvch->mem_size, 0); + } + return gc->mark_table[btc_redirect_bi_chan](p, gc); +} + static void btc_overmem_abort(NewGC *gc) { gc->kill_propagation_loop = 1; @@ -377,6 +401,7 @@ inline static void BTC_initialize_mark_table(NewGC *gc) { gc->mark_table[scheme_custodian_type] = BTC_custodian_mark; gc->mark_table[gc->ephemeron_tag] = BTC_ephemeron_mark; gc->mark_table[gc->cust_box_tag] = BTC_cust_box_mark; + gc->mark_table[scheme_place_bi_channel_type] = BTC_bi_chan_mark; } inline static int BTC_get_redirect_tag(NewGC *gc, int tag) { @@ -384,6 +409,7 @@ inline static int BTC_get_redirect_tag(NewGC *gc, int tag) { else if (tag == scheme_custodian_type) { tag = btc_redirect_custodian; } else if (tag == gc->ephemeron_tag) { tag = btc_redirect_ephemeron; } else if (tag == gc->cust_box_tag) { tag = btc_redirect_cust_box; } + else if (tag == scheme_place_bi_channel_type) { tag = btc_redirect_bi_chan; } return tag; } @@ -404,8 +430,13 @@ static void BTC_do_accounting(NewGC *gc) /* clear the memory use numbers out */ for(i = 1; i < table_size; i++) - if(owner_table[i]) + if(owner_table[i]) { owner_table[i]->memory_use = 0; +#ifdef MZ_USE_PLACES + if (MASTERGC && MASTERGC->major_places_gc) + owner_table[i]->master_memory_use = 0; +#endif + } /* start with root: */ while (cur->parent && SCHEME_PTR1_VAL(cur->parent)) { @@ -440,6 +471,7 @@ static void BTC_do_accounting(NewGC *gc) owner_table = gc->owner_table; owner_table[powner]->memory_use += owner_table[owner]->memory_use; + owner_table[powner]->master_memory_use += owner_table[owner]->master_memory_use; } box = cur->global_prev; cur = box ? SCHEME_PTR1_VAL(box) : NULL; diff --git a/src/racket/gc2/newgc.c b/src/racket/gc2/newgc.c index 265d6236c5..8d6b3792a4 100644 --- a/src/racket/gc2/newgc.c +++ b/src/racket/gc2/newgc.c @@ -168,6 +168,11 @@ struct Log_Master_Info { int ran, full; intptr_t pre_used, post_used, pre_admin, post_admin; }; +#else +# define premaster_or_master_gc(gc) 1 +# define premaster_or_place_gc(gc) 1 +# define postmaster_and_master_gc(gc) 0 +# define postmaster_and_place_gc(gc) 1 #endif inline static size_t real_page_size(mpage* page); @@ -388,6 +393,14 @@ static void free_pages(NewGC *gc, void *p, size_t len, int type, int expect_mpro mmu_free_page(gc->mmu, p, len, type, expect_mprotect, src_block, 1); } +static void check_excessive_free_pages(NewGC *gc) { + /* If we have too many idle pages --- 4 times used pages --- then flush. + We choose 4 instead of 2 for "excessive" because a block cache (when + available) has a fill factor of 2, and flushing will not reduce that. */ + if (mmu_memory_allocated(gc->mmu) > ((gc->used_pages << (LOG_APAGE_SIZE + 2)))) { + mmu_flush_freed_pages(gc->mmu); + } +} static void free_orphaned_page(NewGC *gc, mpage *tmp) { /* free_pages decrements gc->used_pages which is incorrect, since this is an orphaned page, @@ -398,6 +411,7 @@ static void free_orphaned_page(NewGC *gc, mpage *tmp) { &tmp->mmu_src_block, 0); /* don't adjust count, since we're failing to adopt it */ free_mpage(tmp); + check_excessive_free_pages(gc); } @@ -844,8 +858,20 @@ static inline void* REMOVE_BIG_PAGE_PTR_TAG(void *p) { void GC_check_master_gc_request() { #ifdef MZ_USE_PLACES - if (MASTERGC && MASTERGC->major_places_gc == 1) { - GC_gcollect(); + NewGC *mgc = MASTERGC; + + if (mgc) { + /* check for GC needed due to GC_report_unsent_message_delta(): */ + if ((mgc->gen0.current_size + mgc->pending_msg_size) >= mgc->gen0.max_size) { + NewGC *saved_gc; + saved_gc = GC_switch_to_master_gc(); + master_collect_initiate(mgc); + GC_switch_back_from_master(saved_gc); + } + + if (mgc->major_places_gc == 1) { + GC_gcollect(); + } } #endif } @@ -861,14 +887,12 @@ static inline void gc_if_needed_account_alloc_size(NewGC *gc, size_t allocate_si #ifdef MZ_USE_PLACES if (postmaster_and_master_gc(gc)) { master_collect_initiate(gc); - } - else { -#endif - if (!gc->dumping_avoid_collection) - garbage_collect(gc, 0, 0, NULL); -#ifdef MZ_USE_PLACES - } + } else #endif + { + if (!gc->dumping_avoid_collection) + garbage_collect(gc, 0, 0, NULL); + } } gc->gen0.current_size += allocate_size; } @@ -886,17 +910,13 @@ static void *allocate_big(const size_t request_size_bytes, int type) #ifdef NEWGC_BTC_ACCOUNT if(GC_out_of_memory) { -#ifdef MZ_USE_PLACES if (premaster_or_place_gc(gc)) { -#endif if (BTC_single_allocation_limit(gc, request_size_bytes)) { /* We're allowed to fail. Check for allocations that exceed a single-time limit. See BTC_single_allocation_limit() for more information. */ GC_out_of_memory(); } -#ifdef MZ_USE_PLACES } -#endif } #endif @@ -1439,6 +1459,20 @@ void GC_create_message_allocator() { gc->dumping_avoid_collection++; } +void GC_report_unsent_message_delta(intptr_t amt) +{ +#ifdef MZ_USE_PLACES + NewGC *mgc = MASTERGC; + + if (!mgc) return; + + while (!mzrt_cas(&mgc->pending_msg_size, + mgc->pending_msg_size, + mgc->pending_msg_size + amt)) { + } +#endif +} + void *GC_finish_message_allocator() { NewGC *gc = GC_get_GC(); Allocator *a = gc->saved_allocator; @@ -1517,7 +1551,7 @@ void GC_adopt_message_allocator(void *param) { gc_if_needed_account_alloc_size(gc, 0); } -uintptr_t GC_message_allocator_size(void *param) { +intptr_t GC_message_objects_size(void *param) { MsgMemory *msgm = (MsgMemory *) param; if (!msgm) { return sizeof(param); } if (msgm->big_pages && msgm->size < 1024) { @@ -1527,6 +1561,13 @@ uintptr_t GC_message_allocator_size(void *param) { return msgm->size; } +intptr_t GC_message_allocator_size(void *param) { + MsgMemory *msgm = (MsgMemory *) param; + if (!msgm) { return sizeof(param); } + /* approximate extra size in allocation page by just adding one: */ + return msgm->size + APAGE_SIZE; +} + void GC_dispose_short_message_allocator(void *param) { NewGC *gc = GC_get_GC(); mpage *tmp; @@ -2906,9 +2947,7 @@ static void promote_marked_gen0_big_page(NewGC *gc, mpage *page) { /* This is the first mark routine. It's a bit complicated. */ void GC_mark2(const void *const_p, struct NewGC *gc) { -#ifdef MZ_USE_PLACES int is_a_master_page = 0; -#endif mpage *page; void *p = (void*)const_p; @@ -2919,44 +2958,42 @@ void GC_mark2(const void *const_p, struct NewGC *gc) if(!(page = pagemap_find_page(gc->page_maps, p))) { #ifdef MZ_USE_PLACES - if (!MASTERGC || !MASTERGC->major_places_gc || !(page = pagemap_find_page(MASTERGC->page_maps, p))) + if (MASTERGC && MASTERGC->major_places_gc && (page = pagemap_find_page(MASTERGC->page_maps, p))) { + is_a_master_page = 1; + } else #endif - { -#ifdef POINTER_OWNERSHIP_CHECK - mzrt_rwlock_wrlock(MASTERGCINFO->cangc); { - int i; - int size = MASTERGCINFO->size; - for (i = 1; i < size; i++) { - if (gc->place_id != i - && MASTERGCINFO->signal_fds[i] != (void*) REAPED_SLOT_AVAILABLE - && MASTERGCINFO->signal_fds[i] != (void*) CREATED_BUT_NOT_REGISTERED - && MASTERGCINFO->signal_fds[i] != (void*) SIGNALED_BUT_NOT_REGISTERED) { - if((page = pagemap_find_page(MASTERGCINFO->places_gcs[i]->page_maps, p))) { - printf("%p is owned by place %i not the current place %i\n", p, i, gc->place_id); - asm("int3"); +#ifdef POINTER_OWNERSHIP_CHECK + mzrt_rwlock_wrlock(MASTERGCINFO->cangc); + { + int i; + int size = MASTERGCINFO->size; + for (i = 1; i < size; i++) { + if (gc->place_id != i + && MASTERGCINFO->signal_fds[i] != (void*) REAPED_SLOT_AVAILABLE + && MASTERGCINFO->signal_fds[i] != (void*) CREATED_BUT_NOT_REGISTERED + && MASTERGCINFO->signal_fds[i] != (void*) SIGNALED_BUT_NOT_REGISTERED) { + if((page = pagemap_find_page(MASTERGCINFO->places_gcs[i]->page_maps, p))) { + printf("%p is owned by place %i not the current place %i\n", p, i, gc->place_id); + asm("int3"); + } } } } + mzrt_rwlock_unlock(MASTERGCINFO->cangc); +#endif + GCDEBUG((DEBUGOUTF,"Not marking %p (no page)\n",p)); + return; } - mzrt_rwlock_unlock(MASTERGCINFO->cangc); -#endif - GCDEBUG((DEBUGOUTF,"Not marking %p (no page)\n",p)); - return; - } -#ifdef MZ_USE_PLACES - else { - is_a_master_page = 1; - } -#endif } +#ifdef NEWGC_BTC_ACCOUNT /* toss this over to the BTC mark routine if we're doing accounting */ if(gc->doing_memory_accounting) { -#ifdef NEWGC_BTC_ACCOUNT - BTC_memory_account_mark(gc, page, p); return; -#endif + BTC_memory_account_mark(gc, page, p, is_a_master_page); + return; } +#endif /* MED OR BIG PAGE */ if(page->size_class) { @@ -2972,14 +3009,8 @@ void GC_mark2(const void *const_p, struct NewGC *gc) page->size_class = 3; /* if this is in the nursery, we want to move it out of the nursery */ -#ifdef MZ_USE_PLACES if(!page->generation && !is_a_master_page) -#else - if(!page->generation) -#endif - { promote_marked_gen0_big_page(gc, page); - } page->marked_on = 1; record_backtrace(page, BIG_PAGE_TO_OBJECT(page)); @@ -4416,9 +4447,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log mark_roots(gc); mark_immobiles(gc); TIME_STEP("rooted"); -#ifdef MZ_USE_PLACES if (premaster_or_place_gc(gc)) -#endif GC_mark_variable_stack(GC_variable_stack, 0, get_stack_base(gc), NULL); #ifdef MZ_USE_PLACES if (postmaster_and_master_gc(gc)) @@ -4471,9 +4500,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log TIME_STEP("finalized2"); if(gc->gc_full) -#ifdef MZ_USE_PLACES if (premaster_or_place_gc(gc) || switching_master) -#endif do_heap_compact(gc); TIME_STEP("compacted"); @@ -4487,9 +4514,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log repair_finalizer_structs(gc); repair_roots(gc); repair_immobiles(gc); -#ifdef MZ_USE_PLACES if (premaster_or_place_gc(gc)) -#endif GC_fixup_variable_stack(GC_variable_stack, 0, get_stack_base(gc), NULL); TIME_STEP("reparied roots"); repair_heap(gc); @@ -4505,7 +4530,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log reset_nursery(gc); TIME_STEP("reset nursurey"); #ifdef NEWGC_BTC_ACCOUNT - if (gc->gc_full) + if (gc->gc_full && postmaster_and_place_gc(gc)) BTC_do_accounting(gc); #endif TIME_STEP("accounted"); @@ -4537,10 +4562,7 @@ static void garbage_collect(NewGC *gc, int force_full, int switching_master, Log gc->no_further_modifications = 0; - /* If we have too many idle pages, flush: */ - if (mmu_memory_allocated(gc->mmu) > ((gc->used_pages << (LOG_APAGE_SIZE + 1)))) { - mmu_flush_freed_pages(gc->mmu); - } + check_excessive_free_pages(gc); /* update some statistics */ if(gc->gc_full) gc->num_major_collects++; else gc->num_minor_collects++; diff --git a/src/racket/gc2/newgc.h b/src/racket/gc2/newgc.h index 33f8839449..04a47f45d0 100644 --- a/src/racket/gc2/newgc.h +++ b/src/racket/gc2/newgc.h @@ -86,6 +86,7 @@ typedef struct OTEntry { void *originator; void **members; uintptr_t memory_use; + uintptr_t master_memory_use; uintptr_t single_time_limit; uintptr_t super_required; char limit_set; @@ -163,6 +164,7 @@ typedef struct NewGC { uintptr_t actual_pages_size; void (*unsafe_allocation_abort)(struct NewGC *); uintptr_t memory_in_use; /* the amount of memory in use */ + uintptr_t pending_msg_size; /* set in master, only */ /* blame the child thread infos */ GC_Thread_Info *thread_infos; diff --git a/src/racket/src/bool.c b/src/racket/src/bool.c index a53c303860..04eed5b246 100644 --- a/src/racket/src/bool.c +++ b/src/racket/src/bool.c @@ -618,6 +618,12 @@ int is_equal (Scheme_Object *obj1, Scheme_Object *obj2, Equal_Info *eql) obj1 = SCHEME_PTR_VAL(obj1); obj2 = SCHEME_PTR_VAL(obj2); goto top; + } else if (t1 == scheme_place_bi_channel_type) { + Scheme_Place_Bi_Channel *bc1, *bc2; + bc1 = (Scheme_Place_Bi_Channel *)obj1; + bc2 = (Scheme_Place_Bi_Channel *)obj2; + return (SAME_OBJ(bc1->recvch, bc2->recvch) + && SAME_OBJ(bc1->sendch, bc2->sendch)); } else if (!eql->for_chaperone && ((t1 == scheme_chaperone_type) || (t1 == scheme_proc_chaperone_type))) { /* both chaperones */ diff --git a/src/racket/src/hash.c b/src/racket/src/hash.c index 666711a78a..8e1b62907a 100644 --- a/src/racket/src/hash.c +++ b/src/racket/src/hash.c @@ -1377,6 +1377,14 @@ static uintptr_t equal_hash_key(Scheme_Object *o, uintptr_t k, Hash_Info *hi) o = SCHEME_PTR_VAL(o); } break; + case scheme_place_bi_channel_type: + { + k += 7; + /* a bi channel has sendch and recvch, but + sends are the same iff recvs are the same: */ + o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch; + } + break; default: { Scheme_Primary_Hash_Proc h1 = scheme_type_hash1s[t]; @@ -1752,6 +1760,11 @@ static uintptr_t equal_hash_key2(Scheme_Object *o, Hash_Info *hi) /* Needed for interning */ o = SCHEME_PTR_VAL(o); goto top; + case scheme_place_bi_channel_type: + /* a bi channel has sendch and recvch, but + sends are the same iff recvs are the same: */ + o = (Scheme_Object *)((Scheme_Place_Bi_Channel *)o)->sendch; + goto top; default: { Scheme_Secondary_Hash_Proc h2 = scheme_type_hash2s[t]; diff --git a/src/racket/src/place.c b/src/racket/src/place.c index de3f85c5bf..c84b197bcb 100644 --- a/src/racket/src/place.c +++ b/src/racket/src/place.c @@ -1120,13 +1120,6 @@ static Scheme_Object *trivial_copy(Scheme_Object *so) case scheme_null_type: case scheme_void_type: return so; - case scheme_place_type: - scheme_hash_key(((Scheme_Place *) so)->channel); - return ((Scheme_Place *) so)->channel; - break; - case scheme_place_bi_channel_type: /* allocated in the master and can be passed along as is */ - scheme_hash_key(so); - return so; case scheme_byte_string_type: case scheme_flvector_type: case scheme_fxvector_type: @@ -1150,6 +1143,19 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h new_so = so; switch (SCHEME_TYPE(so)) { + case scheme_place_type: + so = ((Scheme_Place *) so)->channel; + new_so = so; + case scheme_place_bi_channel_type: /* ^^^ fall through ^^* */ + if (copy_mode) { + Scheme_Place_Bi_Channel *ch; + ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel); + ch->so.type = scheme_place_bi_channel_type; + ch->sendch = ((Scheme_Place_Bi_Channel *)so)->sendch; + ch->recvch = ((Scheme_Place_Bi_Channel *)so)->recvch; + new_so = (Scheme_Object *)ch; + } + break; case scheme_char_type: if (copy_mode) new_so = scheme_make_char(SCHEME_CHAR_VAL(so)); @@ -2182,12 +2188,7 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { a[0] = places_deep_uncopy(place_data->module); a[1] = places_deep_uncopy(place_data->function); a[1] = scheme_intern_exact_symbol(SCHEME_SYM_VAL(a[1]), SCHEME_SYM_LEN(a[1])); - if (!SAME_TYPE(SCHEME_TYPE(place_data->channel), scheme_place_bi_channel_type)) { - channel = places_deep_uncopy(place_data->channel); - } - else { - channel = place_data->channel; - } + channel = places_deep_uncopy(place_data->channel); place_obj = place_data->place_obj; REGISTER_SO(place_object); place_object = place_obj; @@ -2344,7 +2345,7 @@ Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) { if (new_so) return new_so; /* small messages are deemed to be < 1k, this could be tuned in either direction */ - if (GC_message_allocator_size(msg_memory) < 1024) { + if (GC_message_objects_size(msg_memory) < 1024) { new_so = do_places_deep_copy(so, mzPDC_UNCOPY, 1); GC_dispose_short_message_allocator(msg_memory); } @@ -2443,11 +2444,28 @@ static void* GC_master_malloc_tagged(size_t size) { return ptr; } +static void maybe_report_message_size(Scheme_Place_Async_Channel *ch) +{ +#ifdef MZ_PRECISE_GC + if ((ch->reported_size > (2 * ch->mem_size)) + || (((ch->reported_size * 2) < ch->mem_size) + && ((ch->mem_size - ch->reported_size) > (1 << (LOG_APAGE_SIZE + 1))))) { + intptr_t delta = ch->mem_size - ch->reported_size; + ch->reported_size = ch->mem_size; + GC_report_unsent_message_delta(delta); + } +#endif +} + static void async_channel_finalize(void *p, void* data) { Scheme_Place_Async_Channel *ch; int i; Scheme_Hash_Table *ht = NULL; ch = (Scheme_Place_Async_Channel*)p; + + ch->mem_size = 0; + maybe_report_message_size(ch); + mzrt_mutex_destroy(ch->lock); for (i = 0; i < ch->size ; i++) { ht = NULL; @@ -2543,7 +2561,7 @@ Scheme_Place_Bi_Channel *place_bi_channel_create() { Scheme_Place_Async_Channel *tmp; Scheme_Place_Bi_Channel *ch; - ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel)); + ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel); ch->so.type = scheme_place_bi_channel_type; tmp = place_async_channel_create(); @@ -2556,7 +2574,7 @@ Scheme_Place_Bi_Channel *place_bi_channel_create() { Scheme_Place_Bi_Channel *place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig) { Scheme_Place_Bi_Channel *ch; - ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel)); + ch = MALLOC_ONE_TAGGED(Scheme_Place_Bi_Channel); ch->so.type = scheme_place_bi_channel_type; ch->sendch = orig->recvch; @@ -2595,6 +2613,7 @@ static Scheme_Object *GC_master_make_vector(int size) { static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) { void *msg_memory = NULL; Scheme_Object *o; + intptr_t sz; int cnt; o = scheme_places_serialize(uo, &msg_memory); @@ -2634,6 +2653,11 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo) ch->msg_memory[ch->in] = msg_memory; ++ch->count; ch->in = ((ch->in + 1) % ch->size); + + sz = GC_message_allocator_size(msg_memory); + ch->mem_size += sz; + + maybe_report_message_size(ch); } if (!cnt && ch->wakeup_signal) { @@ -2802,6 +2826,7 @@ static void register_place_object_with_channel(Scheme_Place_Async_Channel *ch, S static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) { Scheme_Object *msg = NULL; void *msg_memory = NULL; + intptr_t sz; mzrt_mutex_lock(ch->lock); { @@ -2815,10 +2840,15 @@ static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel --ch->count; ch->out = ((ch->out + 1) % ch->size); + + sz = GC_message_allocator_size(msg_memory); + ch->mem_size -= sz; + + maybe_report_message_size(ch); } } mzrt_mutex_unlock(ch->lock); - + if (msg) { return scheme_places_deserialize(msg, msg_memory); } diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 6779462a09..0304a9c41b 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -3673,6 +3673,8 @@ typedef struct Scheme_Place_Async_Channel { #endif Scheme_Object **msgs; void **msg_memory; + intptr_t mem_size; + intptr_t reported_size; /* size reported to master GC; avoid reporting too often */ void *wakeup_signal; } Scheme_Place_Async_Channel;