fix interaction of futures with memory limits

The continuation of a future being evaluated concurrently was not
correctly attributed to the future's custodian (as inherited from
from the creating thread).
This commit is contained in:
Matthew Flatt 2016-12-06 11:01:12 -07:00
parent 20842aaf3a
commit 6fe17be82f
3 changed files with 69 additions and 1 deletions

View File

@ -0,0 +1,44 @@
#lang racket
(require racket/future)
;; A test for `make-vector` in futures, especially the interaction
;; with memory limits.
(define (try size)
(define c (make-custodian))
(custodian-limit-memory c (* 4 #x1000000))
(define cb (make-custodian-box c 'running))
(define ok? #f)
(thread-wait
(parameterize ([current-custodian c])
(thread
(lambda ()
(define (go n w)
(let loop ([v w])
(loop (cons #;(if v (lambda (x y) y) cons) (make-vector n) v))))
;; Will executors are here as memory-debugging canaries:
(define f1 (future (let ([w (make-will-executor)])
(lambda () (let ([v w])
(set! w #f)
(go size v))))))
(define f2 (future (lambda () (let loop () (loop)))))
(define (check f)
(with-handlers ([exn:fail:out-of-memory? (lambda (exn)
(set! ok? #t))])
(touch f)))
(sleep 1)
(check f1)))))
(unless (or ok?
(not (custodian-box-value cb)))
(error "didn't get out-of-memory or shutdown as expected"))
(printf "ok at ~a\n" size))
(try #x10000)
(try #x100000)
(try #x1000000)
(try #x10000000)

View File

@ -38,6 +38,8 @@ inline static void BTC_register_new_thread(void *t, void *c)
} }
inline static void BTC_register_thread(void *t, void *c) inline static void BTC_register_thread(void *t, void *c)
/* Might be called in a future thread to change to a custodian that
has a set number */
{ {
NewGC *gc = GC_get_GC(); NewGC *gc = GC_get_GC();
GC_Thread_Info *work; GC_Thread_Info *work;

View File

@ -434,6 +434,8 @@ static void init_cpucount(void);
#ifndef MZ_PRECISE_GC #ifndef MZ_PRECISE_GC
# define GC_set_accounting_custodian(c) /* nothing */ # define GC_set_accounting_custodian(c) /* nothing */
# define GC_register_thread(t, c) /* nothing */
# define GC_register_new_thread(t, c) /* nothing */
#endif #endif
/**********************************************************************/ /**********************************************************************/
@ -674,9 +676,12 @@ static void init_future_thread(Scheme_Future_State *fs, int i)
params.fts = fts; params.fts = fts;
params.fs = fs; params.fs = fs;
/* Make enough of a thread record to deal with multiple values. */ /* Make enough of a thread record to deal with multiple values
and to support GC and memory accounting. */
skeleton = MALLOC_ONE_TAGGED(Scheme_Thread); skeleton = MALLOC_ONE_TAGGED(Scheme_Thread);
skeleton->so.type = scheme_thread_type; skeleton->so.type = scheme_thread_type;
GC_register_new_thread(skeleton, main_custodian);
skeleton->running = MZTHREAD_RUNNING;
fts->thread = skeleton; fts->thread = skeleton;
@ -791,6 +796,8 @@ static void check_future_thread_creation(Scheme_Future_State *fs)
static void start_gc_not_ok(Scheme_Future_State *fs) static void start_gc_not_ok(Scheme_Future_State *fs)
/* must have mutex_lock */ /* must have mutex_lock */
{ {
Scheme_Thread *p;
while (fs->wait_for_gc) { while (fs->wait_for_gc) {
int quit = fs->abort_all_futures; int quit = fs->abort_all_futures;
fs->need_gc_done_post++; fs->need_gc_done_post++;
@ -815,6 +822,10 @@ static void start_gc_not_ok(Scheme_Future_State *fs)
} }
} }
#endif #endif
p = scheme_current_thread;
MZ_RUNSTACK = p->runstack;
MZ_RUNSTACK_START = p->runstack_start;
} }
static void end_gc_not_ok(Scheme_Future_Thread_State *fts, static void end_gc_not_ok(Scheme_Future_Thread_State *fts,
@ -836,6 +847,11 @@ static void end_gc_not_ok(Scheme_Future_Thread_State *fts,
p->cont_mark_stack = MZ_CONT_MARK_STACK; p->cont_mark_stack = MZ_CONT_MARK_STACK;
p->cont_mark_pos = MZ_CONT_MARK_POS; p->cont_mark_pos = MZ_CONT_MARK_POS;
/* To ensure that memory accounting goes through the thread
record, clear these roots: */
MZ_RUNSTACK = NULL;
MZ_RUNSTACK_START = NULL;
/* FIXME: clear scheme_current_thread->ku.multiple.array ? */ /* FIXME: clear scheme_current_thread->ku.multiple.array ? */
--fs->gc_not_ok; --fs->gc_not_ok;
@ -2289,6 +2305,9 @@ void *worker_thread_future_loop(void *arg)
mzrt_sema_post(params->ready_sema); mzrt_sema_post(params->ready_sema);
scheme_current_thread->runstack = MZ_RUNSTACK;
scheme_current_thread->runstack_start = MZ_RUNSTACK_START;
while (1) { while (1) {
mzrt_sema_wait(fs->future_pending_sema); mzrt_sema_wait(fs->future_pending_sema);
mzrt_mutex_lock(fs->future_mutex); mzrt_mutex_lock(fs->future_mutex);
@ -2317,6 +2336,7 @@ void *worker_thread_future_loop(void *arg)
scheme_jit_fill_threadlocal_table(); scheme_jit_fill_threadlocal_table();
fts->thread->current_ft = ft; fts->thread->current_ft = ft;
GC_register_thread(fts->thread, ft->cust);
MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size; MZ_RUNSTACK = MZ_RUNSTACK_START + fts->runstack_size;
MZ_CONT_MARK_STACK = 0; MZ_CONT_MARK_STACK = 0;
@ -2416,6 +2436,7 @@ void *worker_thread_future_loop(void *arg)
} }
fts->thread->current_ft = NULL; fts->thread->current_ft = NULL;
GC_register_thread(fts->thread, main_custodian);
} }
/* Clear stacks */ /* Clear stacks */
@ -2538,6 +2559,7 @@ static int capture_future_continuation(Scheme_Future_State *fs, future_t *ft, vo
ft->fts->thread->current_ft = NULL; /* tells worker thread that it no longer ft->fts->thread->current_ft = NULL; /* tells worker thread that it no longer
needs to handle the future */ needs to handle the future */
GC_register_thread(ft->fts->thread, main_custodian);
ft->suspended_lw = lw; ft->suspended_lw = lw;
ft->maybe_suspended_lw = 1; ft->maybe_suspended_lw = 1;