diff --git a/collects/scribblings/reference/futures.scrbl b/collects/scribblings/reference/futures.scrbl index f0a2a8e149..90d94ebbf3 100644 --- a/collects/scribblings/reference/futures.scrbl +++ b/collects/scribblings/reference/futures.scrbl @@ -43,6 +43,10 @@ concurrency. At the same time, operations that seem obviously safe may have a complex enough implementation internally that they cannot run in parallel. See also @guidesecref["effective-futures"] in @|Guide|. +A future never runs in parallel if all of the @tech{custodians} that +allow its creating thread to run are shut down. Such futures can +execute through a call to @racket[touch], however. + @deftogether[( @defproc[(future [thunk (-> any)]) future?] @defproc[(touch [f future?]) any] diff --git a/collects/tests/future/future.rkt b/collects/tests/future/future.rkt index 5fb94ad10a..7b9fb86cad 100644 --- a/collects/tests/future/future.rkt +++ b/collects/tests/future/future.rkt @@ -435,3 +435,62 @@ We should also test deep continuations. (future void) (future (parameterize ([eval-jit-enabled #f]) (eval #'(lambda () (void))))))))) + +;; A future shouldn't use up a background thread if its +;; starting thread's custodian is shut down: +(let () + (define f #f) + (define c (make-custodian)) + (parameterize ([current-custodian c]) + (sync (thread (lambda () + (set! f (future (lambda () + (let loop () (loop))))))))) + (sleep 0.1) + (custodian-shutdown-all c)) + +;; If a future is suspended via a custodian, it should still +;; work to touch it: +(let () + (define f #f) + (define s (make-fsemaphore 0)) + (define c (make-custodian)) + (parameterize ([current-custodian c]) + (sync (thread (lambda () + (set! f (future (lambda () + (fsemaphore-wait s) + 10))))))) + (sleep 0.1) + (custodian-shutdown-all c) + (fsemaphore-post s) + (check-equal? 10 (touch f))) + + +;; Start a future in a custodian-suspended future: +(let () + (define f #f) + (define s (make-fsemaphore 0)) + (define c (make-custodian)) + (parameterize ([current-custodian c]) + (sync (thread (lambda () + (set! f (future (lambda () + (fsemaphore-wait s) + (future + (lambda () + 11))))))))) + (sleep 0.1) + (custodian-shutdown-all c) + (fsemaphore-post s) + (check-equal? 11 (touch (touch f)))) + +;; Don't get stuck on a bunch of futures that +;; have been disabled: +(let () + (define c (make-custodian)) + (define (loop) (loop)) + (parameterize ([current-custodian c]) + (sync (thread (lambda () + (for ([i (in-range 100)]) + (future loop)))))) + (sleep 0.1) + (custodian-shutdown-all c) + (sleep 0.1)) diff --git a/src/racket/src/future.c b/src/racket/src/future.c index f98b45e88d..4f4f5048b5 100644 --- a/src/racket/src/future.c +++ b/src/racket/src/future.c @@ -734,8 +734,15 @@ void scheme_future_continue_after_gc() for (i = 0; i < THREAD_POOL_SIZE; i++) { if (fs->pool_threads[i]) { *(fs->pool_threads[i]->need_gc_pointer) = 0; - *(fs->pool_threads[i]->fuel_pointer) = 1; - *(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE; + + if (!fs->pool_threads[i]->thread->current_ft + || scheme_custodian_is_available(fs->pool_threads[i]->thread->current_ft->cust)) { + *(fs->pool_threads[i]->fuel_pointer) = 1; + *(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE; + } else { + /* leave fuel exhausted, which will force the thread into a slow + path when it resumes to suspend the computation */ + } } } @@ -760,6 +767,12 @@ void scheme_future_gc_pause() mzrt_mutex_unlock(fs->future_mutex); } +void scheme_future_check_custodians() +{ + scheme_future_block_until_gc(); + scheme_future_continue_after_gc(); +} + /**********************************************************************/ /* Primitive implementations */ /**********************************************************************/ @@ -772,6 +785,7 @@ Scheme_Object *scheme_future(int argc, Scheme_Object *argv[]) future_t *ft; Scheme_Native_Closure *nc; Scheme_Native_Closure_Data *ncd; + Scheme_Custodian *c; Scheme_Object *lambda = argv[0]; double time_of_start; @@ -804,6 +818,14 @@ Scheme_Object *scheme_future(int argc, Scheme_Object *argv[]) ft->id = futureid; ft->orig_lambda = lambda; ft->status = PENDING; + + if (scheme_current_thread->mref) + c = scheme_custodian_extract_reference(scheme_current_thread->mref); + else { + /* must be in a future thread (if futures can be created in future threads) */ + c = scheme_current_thread->current_ft->cust; + } + ft->cust = c; /* JIT the code if not already JITted */ if (ncd) { @@ -890,13 +912,24 @@ Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv) return scheme_make_integer(sema->ready); } +static void requeue_future_within_lock(future_t *future, Scheme_Future_State *fs) +{ + if (scheme_custodian_is_available(future->cust)) { + future->status = PENDING; + enqueue_future(fs, future); + /* Signal that a future is pending */ + mzrt_sema_post(fs->future_pending_sema); + } else { + /* The future's constodian is shut down, so don't try to + run it in a future thread anymore */ + future->status = SUSPENDED; + } +} + static void requeue_future(future_t *future, Scheme_Future_State *fs) { mzrt_mutex_lock(fs->future_mutex); - future->status = PENDING; - enqueue_future(fs, future); - /* Signal that a future is now pending */ - mzrt_sema_post(fs->future_pending_sema); + requeue_future_within_lock(future, fs); mzrt_mutex_unlock(fs->future_mutex); } @@ -1254,13 +1287,14 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[]) mzrt_mutex_lock(fs->future_mutex); if ((((ft->status == PENDING) && prefer_to_apply_future_in_runtime()) - || (ft->status == PENDING_OVERSIZE)) + || (ft->status == PENDING_OVERSIZE) + || (ft->status == SUSPENDED)) && (!ft->suspended_lw || scheme_can_apply_lightweight_continuation(ft->suspended_lw))) { if (ft->status == PENDING_OVERSIZE) { scheme_log(scheme_main_logger, SCHEME_LOG_DEBUG, 0, "future: oversize procedure deferred to runtime thread"); - } else { + } else if (ft->status != SUSPENDED) { dequeue_future(fs, ft); } ft->status = RUNNING; @@ -1323,8 +1357,9 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[]) if (ft->suspended_lw && scheme_can_apply_lightweight_continuation(ft->suspended_lw) && prefer_to_apply_future_in_runtime()) { + if (ft->status != SUSPENDED) + dequeue_future(fs, ft); ft->status = RUNNING; - dequeue_future(fs, ft); /* may raise an exception or escape: */ mzrt_mutex_unlock(fs->future_mutex); future_in_runtime(ft); @@ -1460,6 +1495,7 @@ void *worker_thread_future_loop(void *arg) mzrt_sema_wait(fs->future_pending_sema); mzrt_mutex_lock(fs->future_mutex); start_gc_not_ok(fs); + ft = get_pending_future(fs); if (ft) { @@ -1746,6 +1782,11 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts, insist_to_suspend = !is_atomic; prefer_to_suspend = (insist_to_suspend || fs->future_queue_count); + if (!scheme_custodian_is_available(future->cust)) { + insist_to_suspend = 1; + prefer_to_suspend = 1; + } + if (prefer_to_suspend && GC_gen0_alloc_page_ptr && capture_future_continuation(future, storage)) { @@ -2164,10 +2205,7 @@ static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future) mzrt_mutex_lock(fs->future_mutex); if (future->suspended_lw) { /* Re-enqueue the future so that some future thread can continue */ - future->status = PENDING; - enqueue_future(fs, future); - /* Signal that a future is pending */ - mzrt_sema_post(fs->future_pending_sema); + requeue_future_within_lock(future, fs); } else { /* Signal the waiting worker thread that it can continue running machine code */ @@ -2257,11 +2295,18 @@ future_t *get_pending_future(Scheme_Future_State *fs) { future_t *f; - f = fs->future_queue; - if (f) - dequeue_future(fs, f); - - return f; + while (1) { + f = fs->future_queue; + if (f) { + dequeue_future(fs, f); + if (!scheme_custodian_is_available(f->cust)) { + f->status = SUSPENDED; + } else { + return f; + } + } else + return NULL; + } } #endif diff --git a/src/racket/src/future.h b/src/racket/src/future.h index abeee7e2a7..61a50d6a8c 100644 --- a/src/racket/src/future.h +++ b/src/racket/src/future.h @@ -38,6 +38,7 @@ typedef void (*prim_allocate_values_t)(int, Scheme_Thread *); #define PENDING_OVERSIZE 4 #define WAITING_FOR_REQUEUE 5 #define WAITING_FOR_FSEMA 6 +#define SUSPENDED 7 #define FSRC_OTHER 0 #define FSRC_RATOR 1 @@ -56,6 +57,9 @@ typedef struct future_t { Scheme_Object *orig_lambda; void *code; + Scheme_Custodian *cust; /* an approximate custodian; don't use a future + thread if this custodian is shut down */ + /* Runtime call stuff */ int rt_prim; /* flag to indicate waiting for a prim call */ int want_lw; /* flag to indicate waiting for lw capture */ @@ -178,6 +182,7 @@ void scheme_future_block_until_gc(); void scheme_future_continue_after_gc(); void scheme_check_future_work(); void scheme_future_gc_pause(); +void scheme_future_check_custodians(); #ifdef UNIT_TEST //These forwarding decls only need to be here to make diff --git a/src/racket/src/mzmark.c b/src/racket/src/mzmark.c index 01e6f2e361..a5b544413c 100644 --- a/src/racket/src/mzmark.c +++ b/src/racket/src/mzmark.c @@ -5690,6 +5690,7 @@ static int future_SIZE(void *p, struct NewGC *gc) { static int future_MARK(void *p, struct NewGC *gc) { future_t *f = (future_t *)p; gcMARK2(f->orig_lambda, gc); + gcMARK2(f->cust, gc); gcMARK2(f->arg_s0, gc); gcMARK2(f->arg_t0, gc); gcMARK2(f->arg_S0, gc); @@ -5721,6 +5722,7 @@ static int future_MARK(void *p, struct NewGC *gc) { static int future_FIXUP(void *p, struct NewGC *gc) { future_t *f = (future_t *)p; gcFIXUP2(f->orig_lambda, gc); + gcFIXUP2(f->cust, gc); gcFIXUP2(f->arg_s0, gc); gcFIXUP2(f->arg_t0, gc); gcFIXUP2(f->arg_S0, gc); diff --git a/src/racket/src/mzmarksrc.c b/src/racket/src/mzmarksrc.c index 16cdbc1091..e0708ad259 100644 --- a/src/racket/src/mzmarksrc.c +++ b/src/racket/src/mzmarksrc.c @@ -2332,6 +2332,7 @@ future { mark: future_t *f = (future_t *)p; gcMARK2(f->orig_lambda, gc); + gcMARK2(f->cust, gc); gcMARK2(f->arg_s0, gc); gcMARK2(f->arg_t0, gc); gcMARK2(f->arg_S0, gc); diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 700786a933..dea51b9784 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -589,6 +589,8 @@ extern void scheme_flatten_config(Scheme_Config *c); extern Scheme_Object *scheme_apply_thread_thunk(Scheme_Object *rator); +Scheme_Custodian* scheme_custodian_extract_reference(Scheme_Custodian_Reference *mr); + /*========================================================================*/ /* hash tables and globals */ /*========================================================================*/ diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index 3530d5f73d..28bb1c7148 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -1106,7 +1106,8 @@ static void managed_object_gone(void *o, void *mr) remove_managed(mr, o, NULL, NULL); } -int scheme_custodian_is_available(Scheme_Custodian *m) +int scheme_custodian_is_available(Scheme_Custodian *m) XFORM_SKIP_PROC +/* may be called from a future thread */ { if (m->shut_down) return 0; @@ -1318,6 +1319,10 @@ Scheme_Thread *scheme_do_close_managed(Scheme_Custodian *m, Scheme_Exit_Closer_F m = next_m; } +#ifdef MZ_USE_FUTURES + scheme_future_check_custodians(); +#endif + return kill_self; } @@ -1431,6 +1436,15 @@ static Scheme_Object *custodian_close_all(int argc, Scheme_Object *argv[]) return scheme_void; } +Scheme_Custodian* scheme_custodian_extract_reference(Scheme_Custodian_Reference *mr) +{ + return CUSTODIAN_FAM(mr); +} + +int scheme_custodian_is_shut_down(Scheme_Custodian* c) +{ + return c->shut_down; +} static Scheme_Object *extract_thread(Scheme_Object *o) {