futures: more bug fixes, make `touch' safe

- bug fixes are related to allocation and runstack space
 - a `touch' within a future can complete before the `touch'ing
   future is itself `touch'ed
 - also make `length' safe (and JIT-inlined)
This commit is contained in:
Matthew Flatt 2011-04-09 17:10:14 -06:00
parent 52598af1c7
commit 857003378a
17 changed files with 426 additions and 100 deletions

View File

@ -205,13 +205,18 @@ The @racket[action] field is a symbol:
@racket[touch]ed, because the evaluation may depend on the
current continuation.}
@item{@racket['touch] (never in process 0): like @racket['sync] or
@racket['block], but for a @racket[touch] operation within a
future thunk.}
@item{@racket['result] or @racket['abort]: waiting or handling for
@racket['sync] or @racket['block] ended with a value or an
error, respectively.}
@racket['sync], @racket['block], or @racket['touch] ended with
a value or an error, respectively.}
@item{@racket['suspend] (never in process 0): a process blocked by
@racket['sync] or @racket['block] abandoned evaluation of a
future; some other process may pick up the future later.}
@racket['sync], @racket['block], or @racket['touch] abandoned
evaluation of a future; some other process may pick up the
future later.}
@item{@racket['touch-pause] and @racket['touch-resume] (in process 0,
only): waiting in @racket[touch] for a future whose thunk is
@ -226,13 +231,13 @@ The @racket[action] field is a symbol:
Assuming no @racket['missing] events, then @racket['start-work] or
@racket['start-0-work] is always paired with @racket['end-work],
@racket['sync] and @racket['block] are always paired with
@racket['result], @racket['abort], or @racket['suspend], and
@racket['sync], @racket['block], and @racket['touch] are always paired
with @racket['result], @racket['abort], or @racket['suspend], and
@racket['touch-pause] is always paired with @racket['touch-resume].
In process 0, some event pairs can be nested within other event pairs:
@racket['sync] or @racket['block] with @racket['result] or
@racket['abort], and @racket['touch-pause] with
@racket['sync], @racket['block], or @racket['touch] with
@racket['result] or @racket['abort], and @racket['touch-pause] with
@racket['touch-resume].}
@; ----------------------------------------------------------------------

View File

@ -152,6 +152,7 @@
(un #f 'list? 0)
(un #f 'list? '(1 2 . 3))
(un-exact #t 'list? '(1 2 3))
(un-exact 3 'length '(1 2 3))
(un #f 'boolean? 0)
(un #t 'boolean? #t)
(un #t 'boolean? #f)

View File

@ -630,7 +630,7 @@ int GC_is_allocated(void *p)
# else
# define PREFIX_WSIZE 1
# endif
#else /* GC_ALIGHT_FOUR or byte aligned */
#else /* GC_ALIGN_FOUR or byte aligned */
# define PREFIX_WSIZE 0
#endif
#define PREFIX_SIZE (PREFIX_WSIZE * WORD_SIZE)
@ -1046,14 +1046,20 @@ uintptr_t GC_make_jit_nursery_page(int count, uintptr_t *sz) {
if (!new_mpage->size) {
/* To avoid roundoff problems, the JIT needs the
result to be not a multiple of THREAD_LOCAL_PAGE_SIZE,
so add a prefix if alignment didn't force one. */
it it needs any pointer position produced by
allocation not to land at the end of the allocated
region. */
int bad;
#if defined(GC_ALIGN_SIXTEEN)
new_mpage->size = 16;
bad = !(new_mpage->size & (16 - 1));
#elif defined(GC_ALIGN_EIGHT)
new_mpage->size = 8;
bad = !(new_mpage->size & (8 - 1));
#else
new_mpage->size = WORD_SIZE;
bad = 1;
#endif
if (bad) {
fprintf(stderr, "invalid alignment configuration: %ld\n", new_mpage->size);
}
}
if (sz)
*sz = size - new_mpage->size;
@ -1241,6 +1247,9 @@ void *GC_malloc_pair(void *car, void *cdr)
cdr = gc->park[1];
gc->park[0] = NULL;
gc->park[1] = NULL;
/* Future-local allocation can fail: */
if (!pair) return NULL;
}
else {
objhead *info = (objhead *) PTR(GC_gen0_alloc_page_ptr);

View File

@ -173,6 +173,9 @@ void *GC_malloc_weak_box(void *p, void **secondary, int soffset, int is_late)
w = (GC_Weak_Box *)GC_malloc_one_tagged(sizeof(GC_Weak_Box));
/* Future-local allocation may fail: */
if (!w) return NULL;
p = gc->park[0];
secondary = (void **)gc->park[1];
gc->park[0] = NULL;

View File

@ -8449,6 +8449,7 @@ struct Scheme_Lightweight_Continuation {
void *stack_slice;
Scheme_Object **runstack_slice;
Scheme_Cont_Mark *cont_mark_stack_slice;
void *stored1, *stored2;
};
void scheme_init_thread_lwc(void) XFORM_SKIP_PROC
@ -8538,7 +8539,7 @@ Scheme_Lightweight_Continuation *scheme_capture_lightweight_continuation(Scheme_
for (i = 0; i < len; i++) {
if (((uintptr_t)runstack_slice[i] >= (uintptr_t)lwc->runstack_end)
&& ((uintptr_t)runstack_slice[i] <= (uintptr_t)lwc->runstack_start))
runstack_slice[i] = 0;
runstack_slice[i] = NULL;
}
len = lwc->cont_mark_stack_end - lwc->cont_mark_stack_start;
@ -8586,7 +8587,7 @@ static void *apply_lwc_k()
p->ku.k.p1 = NULL;
p->ku.k.p2 = NULL;
return scheme_apply_lightweight_continuation(lw, result, p->ku.k.i1);
return scheme_apply_lightweight_continuation(lw, result, p->ku.k.i1, p->ku.k.i2);
}
int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw)
@ -8612,7 +8613,8 @@ int scheme_can_apply_lightweight_continuation(Scheme_Lightweight_Continuation *l
Scheme_Object *scheme_apply_lightweight_continuation(Scheme_Lightweight_Continuation *lw,
Scheme_Object *result,
int result_is_rs_argv)
int result_is_rs_argv,
intptr_t min_stacksize)
XFORM_SKIP_PROC
{
intptr_t len, cm_len, cm_pos_delta, cm_delta, i, cm;
@ -8621,12 +8623,19 @@ Scheme_Object *scheme_apply_lightweight_continuation(Scheme_Lightweight_Continua
len = lw->saved_lwc->runstack_start - lw->saved_lwc->runstack_end;
if (!scheme_check_runstack(len)) {
if (!scheme_check_runstack(len)
/* besides making sure that the save slice fits, we need to
make sure that any advance check on available from the old thread
still applies in the new thread */
|| ((MZ_RUNSTACK - MZ_RUNSTACK_START) < min_stacksize)) {
/* This will not happen when restoring a future-thread-captured
continuation in a future thread. */
scheme_current_thread->ku.k.p1 = lw;
scheme_current_thread->ku.k.p2 = result;
scheme_current_thread->ku.k.i1 = result_is_rs_argv;
scheme_current_thread->ku.k.i2 = min_stacksize;
if (len < min_stacksize)
len = min_stacksize;
return (Scheme_Object *)scheme_enlarge_runstack(len, apply_lwc_k);
}

View File

@ -300,7 +300,8 @@ static void init_future_thread(struct Scheme_Future_State *fs, int i);
static void requeue_future(struct future_t *future, struct Scheme_Future_State *fs);
static void future_do_runtimecall(struct Scheme_Future_Thread_State *fts,
void *func,
int is_atomic);
int is_atomic,
int can_suspend);
static int capture_future_continuation(future_t *ft, void **storage);
static void future_raise_wrong_type_exn(const char *who,
const char *expected_type,
@ -309,7 +310,7 @@ static void future_raise_wrong_type_exn(const char *who,
Scheme_Object **argv);
#define INITIAL_C_STACK_SIZE 500000
#define FUTURE_RUNSTACK_SIZE 10000
#define FUTURE_RUNSTACK_SIZE 2000
#define FEVENT_BUFFER_SIZE 512
@ -322,6 +323,7 @@ enum {
FEVENT_RTCALL_ATOMIC,
FEVENT_HANDLE_RTCALL_ATOMIC,
FEVENT_RTCALL,
FEVENT_RTCALL_TOUCH,
FEVENT_HANDLE_RTCALL,
FEVENT_RTCALL_RESULT,
FEVENT_HANDLE_RTCALL_RESULT,
@ -336,14 +338,14 @@ enum {
static const char * const fevent_strs[] = { "create", "complete",
"start-work", "start-0-work", "end-work",
"sync", "sync", "block", "block",
"sync", "sync", "block", "touch", "block",
"result", "result", "abort", "abort",
"suspend",
"touch-pause", "touch-resume", "missing" };
static const char * const fevent_long_strs[] = { "created", "completed",
"started work", "started (process 0, only)", "ended work",
"synchronizing with process 0", "synchronizing",
"BLOCKING on process 0", "HANDLING",
"BLOCKING on process 0", "touching future", "HANDLING",
"result from process 0", "result determined",
"abort from process 0", "abort determined",
"suspended",
@ -372,6 +374,7 @@ typedef struct Scheme_Future_State {
future_t *future_queue_end;
future_t *future_waiting_atomic;
future_t *future_waiting_lwc;
future_t *future_waiting_touch;
int next_futureid;
mzrt_mutex *future_mutex; /* BEWARE: don't allocate while holding this lock */
@ -461,7 +464,7 @@ typedef struct future_thread_params_t {
Scheme_Object ***scheme_current_runstack_ptr;
Scheme_Object ***scheme_current_runstack_start_ptr;
Scheme_Thread **current_thread_ptr;
void *jit_future_storage_ptr;
void **jit_future_storage_ptr;
Scheme_Current_LWC *lwc;
} future_thread_params_t;
@ -497,14 +500,9 @@ void scheme_init_futures(Scheme_Env *newenv)
0),
newenv);
scheme_add_global_constant(
"touch",
scheme_make_prim_w_arity(
touch,
"touch",
1,
1),
newenv);
p = scheme_make_prim_w_arity(touch, "touch", 1, 1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("touch", p, newenv);
p = scheme_make_immed_prim(
scheme_current_future,
@ -600,6 +598,7 @@ void futures_init(void)
REGISTER_SO(fs->future_queue_end);
REGISTER_SO(fs->future_waiting_atomic);
REGISTER_SO(fs->future_waiting_lwc);
REGISTER_SO(fs->future_waiting_touch);
REGISTER_SO(fs->fevent_syms);
REGISTER_SO(fs->fevent_prefab);
REGISTER_SO(jit_future_storage);
@ -1045,7 +1044,7 @@ static void flush_future_logs(Scheme_Future_State *fs)
b = &fts->fevents1;
if (b->count) {
t = b->a[i].timestamp;
t = b->a[b->i].timestamp;
if (!min_set || (t < min_t)) {
min_t = t;
min_b = b;
@ -1354,6 +1353,14 @@ static void enqueue_future_for_fsema(future_t *ft, fsemaphore_t *sema)
}
}
static fsemaphore_t *block_until_sema_ready(fsemaphore_t *sema)
{
/* This little function cooperates with the GC, unlike the
function that calls it. */
scheme_block_until(fsemaphore_ready, NULL, (Scheme_Object*)sema, 0);
return sema;
}
Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
XFORM_SKIP_PROC
{
@ -1381,22 +1388,19 @@ Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
sema->ready);
#endif
jit_future_storage[0] = (void*)sema;
mzrt_mutex_lock(sema->mut);
if (!sema->ready) {
if (!fts) {
/* Then we are on the runtime thread, block and wait for the
fsema to be ready while cooperating with the scheduler */
mzrt_mutex_unlock(sema->mut);
scheme_block_until(fsemaphore_ready, NULL, (Scheme_Object*)sema, 0);
/* Fetch the sema pointer again, in case it was moved during a GC */
sema = (fsemaphore_t*)jit_future_storage[0];
sema = block_until_sema_ready(sema);
mzrt_mutex_lock(sema->mut);
} else {
/* On a future thread, suspend the future (to be
resumed whenever the fsema becomes ready */
future_t *future = fts->thread->current_ft;
jit_future_storage[0] = (void*)sema;
jit_future_storage[1] = (void*)future;
if (!future) {
/* Should never be here */
@ -1556,7 +1560,69 @@ static void dequeue_future(Scheme_Future_State *fs, future_t *ft)
--fs->future_queue_count;
}
static void future_in_runtime(future_t * volatile ft, int what)
static void complete_rtcall(Scheme_Future_State *fs, future_t *future)
{
if (future->suspended_lw) {
/* Re-enqueue the future so that some future thread can continue */
requeue_future_within_lock(future, fs);
} else {
/* Signal the waiting worker thread that it
can continue running machine code */
future->want_lw = 0;
if (future->can_continue_sema) {
mzrt_sema_post(future->can_continue_sema);
future->can_continue_sema = NULL;
}
}
}
static void direct_future_to_future_touch(Scheme_Future_State *fs, future_t *ft, future_t *t_ft)
{
Scheme_Object *retval = ft->retval;
receive_special_result(ft, retval, 0);
t_ft->retval_s = retval;
send_special_result(t_ft, retval);
t_ft->arg_S1 = NULL;
complete_rtcall(fs, t_ft);
}
static future_t *get_future_for_touch(future_t *ft)
XFORM_SKIP_PROC
/* called in any thread with lock held */
{
if ((ft->status == WAITING_FOR_PRIM) && (ft->prim_func == touch)) {
/* try to enqueue it... */
Scheme_Object **a = ft->arg_S1;
if (ft->suspended_lw)
a = scheme_adjust_runstack_argument(ft->suspended_lw, a);
return (future_t *)a[0];
} else
return NULL;
}
static void trigger_added_touches(Scheme_Future_State *fs, future_t *ft)
XFORM_SKIP_PROC
/* lock held; called from both future and runtime threads */
{
if (ft->touching) {
Scheme_Object *touching = ft->touching;
while (!SCHEME_NULLP(touching)) {
Scheme_Object *wb = SCHEME_CAR(touching);
future_t *t_ft = (future_t *)SCHEME_WEAK_BOX_VAL(wb);
if (t_ft && (get_future_for_touch(t_ft) == ft)) {
direct_future_to_future_touch(fs, ft, t_ft);
}
touching = SCHEME_CDR(touching);
}
}
}
static void future_in_runtime(Scheme_Future_State *fs, future_t * volatile ft, int what)
{
mz_jmp_buf newbuf, * volatile savebuf;
Scheme_Thread *p = scheme_current_thread;
@ -1587,7 +1653,12 @@ static void future_in_runtime(future_t * volatile ft, int what)
p->current_ft = old_ft;
ft->retval = retval;
mzrt_mutex_lock(fs->future_mutex);
ft->status = FINISHED;
trigger_added_touches(fs, ft);
mzrt_mutex_unlock(fs->future_mutex);
record_fevent(FEVENT_COMPLETE, ft->id);
record_fevent(FEVENT_END_WORK, ft->id);
@ -1611,7 +1682,7 @@ static int prefer_to_apply_future_in_runtime()
return 1;
}
Scheme_Object *touch(int argc, Scheme_Object *argv[])
Scheme_Object *general_touch(int argc, Scheme_Object *argv[])
/* Called in runtime thread */
{
Scheme_Future_State *fs = scheme_future_state;
@ -1644,7 +1715,7 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
ft->status = RUNNING;
mzrt_mutex_unlock(fs->future_mutex);
future_in_runtime(ft, what);
future_in_runtime(fs, ft, what);
retval = ft->retval;
@ -1707,7 +1778,7 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
ft->status = RUNNING;
/* may raise an exception or escape: */
mzrt_mutex_unlock(fs->future_mutex);
future_in_runtime(ft, FEVENT_START_WORK);
future_in_runtime(fs, ft, FEVENT_START_WORK);
} else {
/* Someone needs to handle the future. We're banking on some
future thread eventually picking up the future, which is
@ -1735,6 +1806,62 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
return retval;
}
Scheme_Object *touch(int argc, Scheme_Object *argv[])
XFORM_SKIP_PROC
/* can be called in future thread */
{
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
if (!fts) {
return general_touch(argc, argv);
} else {
if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_future_type)) {
Scheme_Future_State *fs = scheme_future_state;
future_t *ft = (future_t *)argv[0];
int status;
mzrt_mutex_lock(fs->future_mutex);
status = ft->status;
mzrt_mutex_unlock(fs->future_mutex);
if (status == FINISHED) {
Scheme_Object *retval = ft->retval;
receive_special_result(ft, retval, 0);
return retval;
} else {
#ifdef MZ_PRECISE_GC
/* Try adding current future to ft's chain of touching futures */
Scheme_Object *wb, *pr;
future_t *current_ft = scheme_current_thread->current_ft;
wb = GC_malloc_weak_box(current_ft, NULL, 0, 0);
if (wb) {
pr = GC_malloc_pair(wb, scheme_null);
if (pr) {
mzrt_mutex_lock(fs->future_mutex);
if (ft->status != FINISHED) {
if (ft->touching)
SCHEME_CDR(pr) = ft->touching;
ft->touching = pr;
current_ft->in_touch_queue = 1;
mzrt_mutex_unlock(fs->future_mutex);
} else {
/* `ft' switched to FINISHED while we were trying add,
so carry on with its result */
Scheme_Object *retval = ft->retval;
mzrt_mutex_unlock(fs->future_mutex);
receive_special_result(ft, retval, 0);
return retval;
}
}
}
#endif
}
}
return scheme_rtcall_iS_s("touch", FSRC_PRIM, touch, argc, argv);
}
}
#if defined(linux)
# include <unistd.h>
#elif defined(OS_X)
@ -1930,6 +2057,7 @@ void *worker_thread_future_loop(void *arg)
/* Update the status */
ft->status = FINISHED;
trigger_added_touches(fs, ft);
record_fevent(FEVENT_COMPLETE, fid);
}
@ -1971,7 +2099,8 @@ static Scheme_Object *_apply_future_lw(future_t *ft)
result_is_rs_argv = 0;
}
v = scheme_apply_lightweight_continuation(lw, v, result_is_rs_argv);
v = scheme_apply_lightweight_continuation(lw, v, result_is_rs_argv,
FUTURE_RUNSTACK_SIZE);
if (SAME_OBJ(v, SCHEME_TAIL_CALL_WAITING)) {
v = scheme_ts_scheme_force_value_same_mark(v);
@ -2049,9 +2178,9 @@ void scheme_check_future_work()
/* Check for work that future threads need from the runtime thread
and that can be done in any Scheme thread (e.g., get a new page
for allocation). */
future_t *ft;
future_t *ft, *other_ft;
Scheme_Future_State *fs = scheme_future_state;
int more = 1;
int more;
if (!fs) return;
@ -2059,6 +2188,7 @@ void scheme_check_future_work()
check_future_thread_creation(fs);
more = 1;
while (more) {
/* Try to get a future waiting on a atomic operation */
mzrt_mutex_lock(fs->future_mutex);
@ -2081,6 +2211,52 @@ void scheme_check_future_work()
invoke_rtcall(fs, ft, 1);
}
more = 1;
while (more) {
/* Try to get a future that's waiting to touch another future: */
mzrt_mutex_lock(fs->future_mutex);
ft = fs->future_waiting_touch;
if (ft) {
fs->future_waiting_touch = ft->next_waiting_touch;
ft->next_waiting_touch = NULL;
other_ft = get_future_for_touch(ft);
more = 1;
} else {
other_ft = NULL;
more = 0;
}
mzrt_mutex_unlock(fs->future_mutex);
if (other_ft) {
/* Chain to `ft' from `other_ft': */
Scheme_Object *wb, *pr;
int was_done;
wb = scheme_make_weak_box((Scheme_Object *)ft);
pr = scheme_make_pair(wb, scheme_null);
mzrt_mutex_lock(fs->future_mutex);
if (other_ft->status == FINISHED) {
/* Completed while we tried to allocated a chain link. */
ft->status = HANDLING_PRIM;
ft->want_lw = 0;
was_done = 1;
} else {
/* enqueue */
if (other_ft->touching)
SCHEME_CDR(pr) = other_ft->touching;
other_ft->touching = pr;
was_done = 0;
}
mzrt_mutex_unlock(fs->future_mutex);
if (was_done) {
/* other_ft is done: */
direct_future_to_future_touch(fs, other_ft, ft);
}
}
}
while (1) {
/* Try to get a future waiting to be suspended */
mzrt_mutex_lock(fs->future_mutex);
@ -2113,7 +2289,8 @@ void scheme_check_future_work()
static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
void *func,
int is_atomic)
int is_atomic,
int can_suspend)
XFORM_SKIP_PROC
/* Called in future thread */
{
@ -2150,6 +2327,11 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
prefer_to_suspend = 1;
}
if (!can_suspend) {
insist_to_suspend = 0;
prefer_to_suspend = 0;
}
if (prefer_to_suspend
&& GC_gen0_alloc_page_ptr
&& capture_future_continuation(future, storage)) {
@ -2160,7 +2342,11 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
mzrt_mutex_lock(fs->future_mutex);
record_fevent(is_atomic ? FEVENT_RTCALL_ATOMIC : FEVENT_RTCALL, fid);
if (func == touch) {
record_fevent(FEVENT_RTCALL_TOUCH, fid);
} else {
record_fevent(is_atomic ? FEVENT_RTCALL_ATOMIC : FEVENT_RTCALL, fid);
}
/* Set up the arguments for the runtime call
to be picked up by the main rt thread */
@ -2181,6 +2367,17 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
}
}
if (func == touch) {
if (!future->in_touch_queue) {
/* Ask the runtime thread to put this future on the queue
of the future being touched: */
future->next_waiting_touch = fs->future_waiting_touch;
fs->future_waiting_touch = future;
} else {
future->in_touch_queue = 0; /* done with back-door argument */
}
}
scheme_signal_received_at(fs->signal_handle);
if (fts->thread->current_ft) {
@ -2243,7 +2440,7 @@ static void future_raise_wrong_type_exn(const char *who, const char *expected_ty
future->time_of_request = get_future_timestamp();
future->source_of_request = who;
future_do_runtimecall(fts, (void*)scheme_wrong_type, 0);
future_do_runtimecall(fts, (void*)scheme_wrong_type, 0, 1);
/* Fetch the future again, in case moved by a GC */
future = fts->thread->current_ft;
@ -2274,7 +2471,7 @@ Scheme_Object **scheme_rtcall_on_demand(const char *who, int src_type, prim_on_d
future->source_of_request = who;
future->source_type = src_type;
future_do_runtimecall(fts, (void*)f, 1);
future_do_runtimecall(fts, (void*)f, 1, 1);
/* Fetch the future again, in case moved by a GC */
future = fts->thread->current_ft;
@ -2308,7 +2505,7 @@ Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, Sche
else
is_atomic = 0;
future_do_runtimecall(fts, (void*)scheme_make_fsemaphore_inl, is_atomic);
future_do_runtimecall(fts, (void*)scheme_make_fsemaphore_inl, is_atomic, 1);
/* Fetch the future again, in case moved by a GC */
future = fts->thread->current_ft;
@ -2339,7 +2536,7 @@ Scheme_Object *scheme_rtcall_make_future(const char *who, int src_type, Scheme_O
future->source_of_request = who;
future->source_type = src_type;
future_do_runtimecall(fts, (void*)scheme_future, is_atomic);
future_do_runtimecall(fts, (void*)scheme_future, is_atomic, 1);
/* Fetch the future again, in case moved by a GC */
future = fts->thread->current_ft;
@ -2367,7 +2564,7 @@ void scheme_rtcall_allocate_values(const char *who, int src_type, int count, Sch
future->source_of_request = who;
future->source_type = src_type;
future_do_runtimecall(fts, (void*)f, 1);
future_do_runtimecall(fts, (void*)f, 1, 1);
/* Fetch the future again, in case moved by a GC */
future = fts->thread->current_ft;
@ -2392,7 +2589,7 @@ uintptr_t scheme_rtcall_alloc(const char *who, int src_type)
if (fts->gen0_start) {
intptr_t cur;
cur = GC_gen0_alloc_page_ptr;
if (cur < (fts->gen0_start + (fts->gen0_size - 1) * align)) {
if (cur < (GC_gen0_alloc_page_end - align)) {
if (cur & (align - 1)) {
/* round up to next page boundary */
cur &= ~(align - 1);
@ -2416,7 +2613,7 @@ uintptr_t scheme_rtcall_alloc(const char *who, int src_type)
future->prim_protocol = SIG_ALLOC;
future->arg_i0 = fts->gen0_size;
future_do_runtimecall(fts, (void*)GC_make_jit_nursery_page, 1);
future_do_runtimecall(fts, (void*)GC_make_jit_nursery_page, 1, 0);
future = fts->thread->current_ft;
retval = future->alloc_retval;
@ -2452,7 +2649,7 @@ void scheme_rtcall_new_mark_segment(Scheme_Thread *p)
future->prim_protocol = SIG_ALLOC_MARK_SEGMENT;
future->arg_s0 = (Scheme_Object *)p;
future_do_runtimecall(fts, (void*)scheme_new_mark_segment, 1);
future_do_runtimecall(fts, (void*)scheme_new_mark_segment, 1, 0);
}
static int push_marks(future_t *f, Scheme_Cont_Frame_Data *d)
@ -2589,7 +2786,8 @@ static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future, int is_a
case SIG_ALLOC:
{
uintptr_t ret, sz;
ret = GC_make_jit_nursery_page(future->arg_i0, &sz);
int amt = future->arg_i0;
ret = GC_make_jit_nursery_page(amt, &sz);
future->alloc_retval = ret;
future->alloc_sz_retval = sz;
future->alloc_retval_counter = scheme_did_gc_count;
@ -2650,19 +2848,10 @@ static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future, int is_a
if (need_pop)
pop_marks(&mark_d);
record_fevent(FEVENT_HANDLE_RTCALL_RESULT, future->id);
mzrt_mutex_lock(fs->future_mutex);
if (future->suspended_lw) {
/* Re-enqueue the future so that some future thread can continue */
requeue_future_within_lock(future, fs);
} else {
/* Signal the waiting worker thread that it
can continue running machine code */
future->want_lw = 0;
if (future->can_continue_sema) {
mzrt_sema_post(future->can_continue_sema);
future->can_continue_sema = NULL;
}
}
complete_rtcall(fs, future);
mzrt_mutex_unlock(fs->future_mutex);
}
@ -2730,7 +2919,6 @@ static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile
(void)scheme_top_level_do(do_invoke_rtcall_k, 1);
}
record_fevent(FEVENT_HANDLE_RTCALL_RESULT, future->id);
}
p->error_buf = savebuf;
}
@ -2760,7 +2948,7 @@ future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft)
future_t *get_pending_future(Scheme_Future_State *fs)
XFORM_SKIP_PROC
/* Called in future thread */
/* Called in future thread with lock held */
{
future_t *f;

View File

@ -50,8 +50,13 @@ typedef struct future_t {
int id;
int thread_short_id;
/* The status field is the main locking mechanism. It
should only be read and written when holding a lock
(and all associated fields for a status should be
set at the same time). */
int status;
int work_completed;
mzrt_sema *can_continue_sema;
Scheme_Object *orig_lambda;
@ -62,6 +67,7 @@ typedef struct future_t {
/* Runtime call stuff */
int want_lw; /* flag to indicate waiting for lw capture */
int in_touch_queue; /* flag to indicate waiting for lw capture */
int rt_prim_is_atomic;
double time_of_request;
const char *source_of_request;
@ -120,9 +126,12 @@ typedef struct future_t {
struct future_t *next_waiting_atomic;
struct future_t *next_waiting_lwc;
struct future_t *next_waiting_touch;
struct future_t *prev_in_fsema_queue;
struct future_t *next_in_fsema_queue;
Scheme_Object *touching; /* a list of weak pointers to futures touching this one */
} future_t;
typedef struct fsemaphore_t {

View File

@ -98,7 +98,7 @@
@string-append{ future->arg_@|(string t)|@|(number->string i)| = @|a|;})
"\n")
@(if (equal? arg-types '("Scheme_Object*")) @string-append{send_special_result(future, @(car arg-names));} "")
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
@(if (string=? result-type "void") "" @string-append{retval = @|fretval|;})
@(if (string=? result-type "void") "" @string-append{@|fretval| = 0;})

View File

@ -237,6 +237,7 @@ struct scheme_jit_common_record {
void *app_values_slow_code, *app_values_multi_slow_code, *app_values_tail_slow_code;
void *values_code;
void *list_p_code, *list_p_branch_code;
void *list_length_code;
void *finish_tail_call_code, *finish_tail_call_fixup_code;
void *module_run_start_code, *module_exprun_start_code, *module_start_start_code;
void *box_flonum_from_stack_code;

View File

@ -53,6 +53,7 @@ define_ts_iS_s(scheme_checked_caar, FSRC_MARKS)
define_ts_iS_s(scheme_checked_cadr, FSRC_MARKS)
define_ts_iS_s(scheme_checked_cdar, FSRC_MARKS)
define_ts_iS_s(scheme_checked_cddr, FSRC_MARKS)
define_ts_s_s(scheme_checked_length, FSRC_MARKS)
define_ts_iS_s(scheme_checked_mcar, FSRC_MARKS)
define_ts_iS_s(scheme_checked_mcdr, FSRC_MARKS)
define_ts_iS_s(scheme_checked_set_mcar, FSRC_MARKS)

View File

@ -17,7 +17,7 @@
future->arg_i1 = g52;
future->arg_S2 = g53;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -43,7 +43,7 @@
future->arg_S1 = g55;
future->arg_s2 = g56;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -67,7 +67,7 @@
future->source_type = src_type;
future->arg_s0 = g57;
send_special_result(future, g57);
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -91,7 +91,7 @@
future->source_type = src_type;
future->arg_n0 = g58;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -115,7 +115,7 @@
future->source_type = src_type;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -140,7 +140,7 @@
future->arg_s0 = g59;
future->arg_s1 = g60;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -165,7 +165,7 @@
future->arg_t0 = g61;
future->arg_t1 = g62;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -190,7 +190,7 @@
future->arg_s0 = g63;
future->arg_s1 = g64;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_m;
future->retval_m = 0;
@ -215,7 +215,7 @@
future->arg_S0 = g65;
future->arg_l1 = g66;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -239,7 +239,7 @@
future->source_type = src_type;
future->arg_l0 = g67;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -265,7 +265,7 @@
future->arg_s1 = g69;
future->arg_i2 = g70;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
@ -291,7 +291,7 @@
future->arg_i1 = g72;
future->arg_S2 = g73;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
@ -316,7 +316,7 @@
future->arg_s0 = g74;
future->arg_s1 = g75;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
@ -340,7 +340,7 @@
future->source_type = src_type;
future->arg_b0 = g76;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
@ -365,7 +365,7 @@
future->arg_s0 = g77;
future->arg_l1 = g78;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -390,7 +390,7 @@
future->arg_i0 = g79;
future->arg_S1 = g80;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -414,7 +414,7 @@
future->source_type = src_type;
future->arg_S0 = g81;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -438,7 +438,7 @@
future->source_type = src_type;
future->arg_s0 = g82;
send_special_result(future, g82);
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
@ -464,7 +464,7 @@
future->arg_S1 = g84;
future->arg_i2 = g85;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -490,7 +490,7 @@
future->arg_i1 = g87;
future->arg_S2 = g88;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
@ -514,7 +514,7 @@
future->source_type = src_type;
future->arg_z0 = g89;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_p;
future->retval_p = 0;
@ -539,7 +539,7 @@
future->arg_s0 = g90;
future->arg_i1 = g91;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;
retval = future->retval_s;
future->retval_s = 0;
@ -565,7 +565,7 @@
future->arg_i1 = g93;
future->arg_s2 = g94;
future_do_runtimecall(fts, (void*)f, 0);
future_do_runtimecall(fts, (void*)f, 0, 1);
future = fts->thread->current_ft;

View File

@ -1943,6 +1943,79 @@ static int common7(mz_jit_state *jitter, void *_data)
return 1;
}
static int common8(mz_jit_state *jitter, void *_data)
{
/* list_length_code */
/* argument is in R0 */
{
void *code;
GC_CAN_IGNORE jit_insn *refloop, *ref1, *ref2, *ref3, *ref4, *ref5;
code = jit_get_ip().ptr;
sjc.list_length_code = code;
mz_prolog(JIT_R2);
__START_SHORT_JUMPS__(1);
/* Save original argument: */
jit_movr_p(JIT_V1, JIT_R0);
/* Note: there's no fuel check in this loop, just like there isn't in
scheme_list_length(). Maybe there should be. */
/* R0 has argument, R1 has counter */
jit_movi_l(JIT_R1, 0);
refloop = _jit.x.pc;
ref2 = jit_beqi_p(jit_forward(), JIT_R0, scheme_null);
ref3 = jit_bmsi_l(jit_forward(), JIT_R0, 0x1);
jit_ldxi_s(JIT_R2, JIT_R0, &((Scheme_Object *)0x0)->type);
ref4 = jit_bnei_i(jit_forward(), JIT_R2, scheme_pair_type);
CHECK_LIMIT();
jit_ldxi_s(JIT_R2, JIT_R0, &MZ_OPT_HASH_KEY(&((Scheme_Stx *)0x0)->iso));
ref5 = jit_bmsi_ul(jit_forward(), JIT_R2, PAIR_IS_NON_LIST);
jit_ldxi_p(JIT_R0, JIT_R0, (intptr_t)&SCHEME_CDR(0x0));
jit_addi_l(JIT_R1, JIT_R1, 1);
(void)jit_jmpi(refloop);
CHECK_LIMIT();
/* Return result: */
mz_patch_branch(ref2);
__END_SHORT_JUMPS__(1);
jit_lshi_l(JIT_R0, JIT_R1, 1);
jit_ori_l(JIT_R0, JIT_R0, 1);
ref1 = _jit.x.pc;
mz_epilog(JIT_R2);
__START_SHORT_JUMPS__(1);
mz_patch_branch(ref3);
mz_patch_branch(ref4);
mz_patch_branch(ref5);
__END_SHORT_JUMPS__(1);
JIT_UPDATE_THREAD_RSPTR();
jit_prepare(1);
jit_pusharg_p(JIT_V1);
(void)mz_finish_lwe(ts_scheme_checked_length, ref2);
CHECK_LIMIT();
jit_retval(JIT_R0);
__START_SHORT_JUMPS__(1);
(void)jit_jmpi(ref1);
__END_SHORT_JUMPS__(1);
scheme_jit_register_sub_func(jitter, code, scheme_false);
}
return 1;
}
int scheme_do_generate_common(mz_jit_state *jitter, void *_data)
{
if (!common0(jitter, _data)) return 0;
@ -1954,6 +2027,7 @@ int scheme_do_generate_common(mz_jit_state *jitter, void *_data)
if (!common5(jitter, _data)) return 0;
if (!common6(jitter, _data)) return 0;
if (!common7(jitter, _data)) return 0;
if (!common8(jitter, _data)) return 0;
return 1;
}

View File

@ -699,6 +699,18 @@ int scheme_generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
}
CHECK_LIMIT();
return 1;
} else if (IS_NAMED_PRIM(rator, "length")) {
mz_runstack_skipped(jitter, 1);
scheme_generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
(void)jit_calli(sjc.list_length_code);
return 1;
} else if (IS_NAMED_PRIM(rator, "vector-length")
|| IS_NAMED_PRIM(rator, "fxvector-length")
@ -1131,7 +1143,8 @@ int scheme_generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
generate_inlined_type_test(jitter, app, scheme_fsemaphore_type, scheme_fsemaphore_type, 1, for_branch, branch_short, need_sync);
return 1;
} else if (IS_NAMED_PRIM(rator, "future")
| IS_NAMED_PRIM(rator, "fsemaphore-count")
|| IS_NAMED_PRIM(rator, "touch")
|| IS_NAMED_PRIM(rator, "fsemaphore-count")
|| IS_NAMED_PRIM(rator, "make-fsemaphore")
|| IS_NAMED_PRIM(rator, "fsemaphore-post")
|| IS_NAMED_PRIM(rator, "fsemaphore-wait")

View File

@ -248,11 +248,11 @@ scheme_init_list (Scheme_Env *env)
"immutable?",
1, 1, 1),
env);
scheme_add_global_constant ("length",
scheme_make_immed_prim(length_prim,
"length",
1, 1),
env);
p = scheme_make_immed_prim(length_prim, "length", 1, 1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("length", p, env);
scheme_add_global_constant ("append",
scheme_make_immed_prim(append_prim,
"append",
@ -1187,6 +1187,11 @@ length_prim (int argc, Scheme_Object *argv[])
return scheme_make_integer(l);
}
Scheme_Object *scheme_checked_length(Scheme_Object *v)
{
return length_prim(1, &v);
}
Scheme_Object *
scheme_append(Scheme_Object *l1, Scheme_Object *l2)
{

View File

@ -5712,9 +5712,11 @@ static int future_MARK(void *p, struct NewGC *gc) {
gcMARK2(f->next, gc);
gcMARK2(f->next_waiting_atomic, gc);
gcMARK2(f->next_waiting_lwc, gc);
gcMARK2(f->next_waiting_touch, gc);
gcMARK2(f->suspended_lw, gc);
gcMARK2(f->prev_in_fsema_queue, gc);
gcMARK2(f->next_in_fsema_queue, gc);
gcMARK2(f->touching, gc);
return
gcBYTES_TO_WORDS(sizeof(future_t));
}
@ -5744,9 +5746,11 @@ static int future_FIXUP(void *p, struct NewGC *gc) {
gcFIXUP2(f->next, gc);
gcFIXUP2(f->next_waiting_atomic, gc);
gcFIXUP2(f->next_waiting_lwc, gc);
gcFIXUP2(f->next_waiting_touch, gc);
gcFIXUP2(f->suspended_lw, gc);
gcFIXUP2(f->prev_in_fsema_queue, gc);
gcFIXUP2(f->next_in_fsema_queue, gc);
gcFIXUP2(f->touching, gc);
return
gcBYTES_TO_WORDS(sizeof(future_t));
}

View File

@ -2354,9 +2354,11 @@ future {
gcMARK2(f->next, gc);
gcMARK2(f->next_waiting_atomic, gc);
gcMARK2(f->next_waiting_lwc, gc);
gcMARK2(f->next_waiting_touch, gc);
gcMARK2(f->suspended_lw, gc);
gcMARK2(f->prev_in_fsema_queue, gc);
gcMARK2(f->next_in_fsema_queue, gc);
gcMARK2(f->touching, gc);
size:
gcBYTES_TO_WORDS(sizeof(future_t));
}

View File

@ -2310,7 +2310,8 @@ Scheme_Lightweight_Continuation *scheme_capture_lightweight_continuation(Scheme_
void **storage);
Scheme_Object *scheme_apply_lightweight_continuation(Scheme_Lightweight_Continuation *captured,
Scheme_Object *result,
int result_is_rs_argv);
int result_is_rs_argv,
intptr_t min_stacksize);
Scheme_Object **scheme_adjust_runstack_argument(Scheme_Lightweight_Continuation *captured,
Scheme_Object **arg);
@ -3517,6 +3518,7 @@ Scheme_Object *scheme_checked_caar(int argc, Scheme_Object **argv);
Scheme_Object *scheme_checked_cadr(int argc, Scheme_Object **argv);
Scheme_Object *scheme_checked_cdar(int argc, Scheme_Object **argv);
Scheme_Object *scheme_checked_cddr(int argc, Scheme_Object **argv);
Scheme_Object *scheme_checked_length(Scheme_Object *v);
Scheme_Object *scheme_checked_mcar(int argc, Scheme_Object **argv);
Scheme_Object *scheme_checked_mcdr(int argc, Scheme_Object **argv);
Scheme_Object *scheme_checked_set_mcar (int argc, Scheme_Object *argv[]);