futures: limit parallelism via custodians

Closes PR 11682
This commit is contained in:
Matthew Flatt 2011-03-15 14:43:18 -06:00
parent 6a79ebdc97
commit b9c4bbae67
8 changed files with 151 additions and 19 deletions

View File

@ -43,6 +43,10 @@ concurrency. At the same time, operations that seem obviously safe may
have a complex enough implementation internally that they cannot run in
parallel. See also @guidesecref["effective-futures"] in @|Guide|.
A future never runs in parallel if all of the @tech{custodians} that
allow its creating thread to run are shut down. Such futures can
execute through a call to @racket[touch], however.
@deftogether[(
@defproc[(future [thunk (-> any)]) future?]
@defproc[(touch [f future?]) any]

View File

@ -435,3 +435,62 @@ We should also test deep continuations.
(future void)
(future (parameterize ([eval-jit-enabled #f])
(eval #'(lambda () (void)))))))))
;; A future shouldn't use up a background thread if its
;; starting thread's custodian is shut down:
(let ()
(define f #f)
(define c (make-custodian))
(parameterize ([current-custodian c])
(sync (thread (lambda ()
(set! f (future (lambda ()
(let loop () (loop)))))))))
(sleep 0.1)
(custodian-shutdown-all c))
;; If a future is suspended via a custodian, it should still
;; work to touch it:
(let ()
(define f #f)
(define s (make-fsemaphore 0))
(define c (make-custodian))
(parameterize ([current-custodian c])
(sync (thread (lambda ()
(set! f (future (lambda ()
(fsemaphore-wait s)
10)))))))
(sleep 0.1)
(custodian-shutdown-all c)
(fsemaphore-post s)
(check-equal? 10 (touch f)))
;; Start a future in a custodian-suspended future:
(let ()
(define f #f)
(define s (make-fsemaphore 0))
(define c (make-custodian))
(parameterize ([current-custodian c])
(sync (thread (lambda ()
(set! f (future (lambda ()
(fsemaphore-wait s)
(future
(lambda ()
11)))))))))
(sleep 0.1)
(custodian-shutdown-all c)
(fsemaphore-post s)
(check-equal? 11 (touch (touch f))))
;; Don't get stuck on a bunch of futures that
;; have been disabled:
(let ()
(define c (make-custodian))
(define (loop) (loop))
(parameterize ([current-custodian c])
(sync (thread (lambda ()
(for ([i (in-range 100)])
(future loop))))))
(sleep 0.1)
(custodian-shutdown-all c)
(sleep 0.1))

View File

@ -734,8 +734,15 @@ void scheme_future_continue_after_gc()
for (i = 0; i < THREAD_POOL_SIZE; i++) {
if (fs->pool_threads[i]) {
*(fs->pool_threads[i]->need_gc_pointer) = 0;
*(fs->pool_threads[i]->fuel_pointer) = 1;
*(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE;
if (!fs->pool_threads[i]->thread->current_ft
|| scheme_custodian_is_available(fs->pool_threads[i]->thread->current_ft->cust)) {
*(fs->pool_threads[i]->fuel_pointer) = 1;
*(fs->pool_threads[i]->stack_boundary_pointer) -= INITIAL_C_STACK_SIZE;
} else {
/* leave fuel exhausted, which will force the thread into a slow
path when it resumes to suspend the computation */
}
}
}
@ -760,6 +767,12 @@ void scheme_future_gc_pause()
mzrt_mutex_unlock(fs->future_mutex);
}
void scheme_future_check_custodians()
{
scheme_future_block_until_gc();
scheme_future_continue_after_gc();
}
/**********************************************************************/
/* Primitive implementations */
/**********************************************************************/
@ -772,6 +785,7 @@ Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
future_t *ft;
Scheme_Native_Closure *nc;
Scheme_Native_Closure_Data *ncd;
Scheme_Custodian *c;
Scheme_Object *lambda = argv[0];
double time_of_start;
@ -804,6 +818,14 @@ Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
ft->id = futureid;
ft->orig_lambda = lambda;
ft->status = PENDING;
if (scheme_current_thread->mref)
c = scheme_custodian_extract_reference(scheme_current_thread->mref);
else {
/* must be in a future thread (if futures can be created in future threads) */
c = scheme_current_thread->current_ft->cust;
}
ft->cust = c;
/* JIT the code if not already JITted */
if (ncd) {
@ -890,13 +912,24 @@ Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv)
return scheme_make_integer(sema->ready);
}
static void requeue_future_within_lock(future_t *future, Scheme_Future_State *fs)
{
if (scheme_custodian_is_available(future->cust)) {
future->status = PENDING;
enqueue_future(fs, future);
/* Signal that a future is pending */
mzrt_sema_post(fs->future_pending_sema);
} else {
/* The future's constodian is shut down, so don't try to
run it in a future thread anymore */
future->status = SUSPENDED;
}
}
static void requeue_future(future_t *future, Scheme_Future_State *fs)
{
mzrt_mutex_lock(fs->future_mutex);
future->status = PENDING;
enqueue_future(fs, future);
/* Signal that a future is now pending */
mzrt_sema_post(fs->future_pending_sema);
requeue_future_within_lock(future, fs);
mzrt_mutex_unlock(fs->future_mutex);
}
@ -1254,13 +1287,14 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
mzrt_mutex_lock(fs->future_mutex);
if ((((ft->status == PENDING)
&& prefer_to_apply_future_in_runtime())
|| (ft->status == PENDING_OVERSIZE))
|| (ft->status == PENDING_OVERSIZE)
|| (ft->status == SUSPENDED))
&& (!ft->suspended_lw
|| scheme_can_apply_lightweight_continuation(ft->suspended_lw))) {
if (ft->status == PENDING_OVERSIZE) {
scheme_log(scheme_main_logger, SCHEME_LOG_DEBUG, 0,
"future: oversize procedure deferred to runtime thread");
} else {
} else if (ft->status != SUSPENDED) {
dequeue_future(fs, ft);
}
ft->status = RUNNING;
@ -1323,8 +1357,9 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
if (ft->suspended_lw
&& scheme_can_apply_lightweight_continuation(ft->suspended_lw)
&& prefer_to_apply_future_in_runtime()) {
if (ft->status != SUSPENDED)
dequeue_future(fs, ft);
ft->status = RUNNING;
dequeue_future(fs, ft);
/* may raise an exception or escape: */
mzrt_mutex_unlock(fs->future_mutex);
future_in_runtime(ft);
@ -1460,6 +1495,7 @@ void *worker_thread_future_loop(void *arg)
mzrt_sema_wait(fs->future_pending_sema);
mzrt_mutex_lock(fs->future_mutex);
start_gc_not_ok(fs);
ft = get_pending_future(fs);
if (ft) {
@ -1746,6 +1782,11 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
insist_to_suspend = !is_atomic;
prefer_to_suspend = (insist_to_suspend || fs->future_queue_count);
if (!scheme_custodian_is_available(future->cust)) {
insist_to_suspend = 1;
prefer_to_suspend = 1;
}
if (prefer_to_suspend
&& GC_gen0_alloc_page_ptr
&& capture_future_continuation(future, storage)) {
@ -2164,10 +2205,7 @@ static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future)
mzrt_mutex_lock(fs->future_mutex);
if (future->suspended_lw) {
/* Re-enqueue the future so that some future thread can continue */
future->status = PENDING;
enqueue_future(fs, future);
/* Signal that a future is pending */
mzrt_sema_post(fs->future_pending_sema);
requeue_future_within_lock(future, fs);
} else {
/* Signal the waiting worker thread that it
can continue running machine code */
@ -2257,11 +2295,18 @@ future_t *get_pending_future(Scheme_Future_State *fs)
{
future_t *f;
f = fs->future_queue;
if (f)
dequeue_future(fs, f);
return f;
while (1) {
f = fs->future_queue;
if (f) {
dequeue_future(fs, f);
if (!scheme_custodian_is_available(f->cust)) {
f->status = SUSPENDED;
} else {
return f;
}
} else
return NULL;
}
}
#endif

View File

@ -38,6 +38,7 @@ typedef void (*prim_allocate_values_t)(int, Scheme_Thread *);
#define PENDING_OVERSIZE 4
#define WAITING_FOR_REQUEUE 5
#define WAITING_FOR_FSEMA 6
#define SUSPENDED 7
#define FSRC_OTHER 0
#define FSRC_RATOR 1
@ -56,6 +57,9 @@ typedef struct future_t {
Scheme_Object *orig_lambda;
void *code;
Scheme_Custodian *cust; /* an approximate custodian; don't use a future
thread if this custodian is shut down */
/* Runtime call stuff */
int rt_prim; /* flag to indicate waiting for a prim call */
int want_lw; /* flag to indicate waiting for lw capture */
@ -178,6 +182,7 @@ void scheme_future_block_until_gc();
void scheme_future_continue_after_gc();
void scheme_check_future_work();
void scheme_future_gc_pause();
void scheme_future_check_custodians();
#ifdef UNIT_TEST
//These forwarding decls only need to be here to make

View File

@ -5690,6 +5690,7 @@ static int future_SIZE(void *p, struct NewGC *gc) {
static int future_MARK(void *p, struct NewGC *gc) {
future_t *f = (future_t *)p;
gcMARK2(f->orig_lambda, gc);
gcMARK2(f->cust, gc);
gcMARK2(f->arg_s0, gc);
gcMARK2(f->arg_t0, gc);
gcMARK2(f->arg_S0, gc);
@ -5721,6 +5722,7 @@ static int future_MARK(void *p, struct NewGC *gc) {
static int future_FIXUP(void *p, struct NewGC *gc) {
future_t *f = (future_t *)p;
gcFIXUP2(f->orig_lambda, gc);
gcFIXUP2(f->cust, gc);
gcFIXUP2(f->arg_s0, gc);
gcFIXUP2(f->arg_t0, gc);
gcFIXUP2(f->arg_S0, gc);

View File

@ -2332,6 +2332,7 @@ future {
mark:
future_t *f = (future_t *)p;
gcMARK2(f->orig_lambda, gc);
gcMARK2(f->cust, gc);
gcMARK2(f->arg_s0, gc);
gcMARK2(f->arg_t0, gc);
gcMARK2(f->arg_S0, gc);

View File

@ -589,6 +589,8 @@ extern void scheme_flatten_config(Scheme_Config *c);
extern Scheme_Object *scheme_apply_thread_thunk(Scheme_Object *rator);
Scheme_Custodian* scheme_custodian_extract_reference(Scheme_Custodian_Reference *mr);
/*========================================================================*/
/* hash tables and globals */
/*========================================================================*/

View File

@ -1106,7 +1106,8 @@ static void managed_object_gone(void *o, void *mr)
remove_managed(mr, o, NULL, NULL);
}
int scheme_custodian_is_available(Scheme_Custodian *m)
int scheme_custodian_is_available(Scheme_Custodian *m) XFORM_SKIP_PROC
/* may be called from a future thread */
{
if (m->shut_down)
return 0;
@ -1318,6 +1319,10 @@ Scheme_Thread *scheme_do_close_managed(Scheme_Custodian *m, Scheme_Exit_Closer_F
m = next_m;
}
#ifdef MZ_USE_FUTURES
scheme_future_check_custodians();
#endif
return kill_self;
}
@ -1431,6 +1436,15 @@ static Scheme_Object *custodian_close_all(int argc, Scheme_Object *argv[])
return scheme_void;
}
Scheme_Custodian* scheme_custodian_extract_reference(Scheme_Custodian_Reference *mr)
{
return CUSTODIAN_FAM(mr);
}
int scheme_custodian_is_shut_down(Scheme_Custodian* c)
{
return c->shut_down;
}
static Scheme_Object *extract_thread(Scheme_Object *o)
{