Added future semaphores (fsemaphores)
This commit is contained in:
parent
a98553f99b
commit
c6642de116
|
@ -242,12 +242,60 @@ We should also test deep continuations.
|
|||
(check-equal? #t (fsemaphore-try-wait? m1))
|
||||
(check-equal? #f (fsemaphore-try-wait? m2)))
|
||||
|
||||
;Test for errors when passing bad arguments
|
||||
(check-exn exn:fail:contract? (λ () (make-fsemaphore -1)))
|
||||
(check-exn exn:fail:contract? (λ () (make-fsemaphore (cons "a" "b"))))
|
||||
(check-exn exn:fail:contract? (λ () (fsemaphore-count (cons "foo" "goo"))))
|
||||
(check-exn exn:fail:contract? (λ () (fsemaphore-post (cons 1 2))))
|
||||
(check-exn exn:fail:contract? (λ () (fsemaphore-wait (cons 1 2))))
|
||||
(check-exn exn:fail:contract? (λ () (fsemaphore-try-wait? (cons 1 2))))
|
||||
|
||||
(check-exn exn:fail:contract? (λ ()
|
||||
(let ([f (future (λ ()
|
||||
(make-fsemaphore (cons "go"
|
||||
"mavs"))))])
|
||||
(sleep 0.5)
|
||||
(touch f))))
|
||||
|
||||
(check-exn exn:fail:contract? (λ ()
|
||||
(let ([f (future (λ ()
|
||||
(make-fsemaphore -1)))])
|
||||
(sleep 0.5)
|
||||
(touch f))))
|
||||
|
||||
(let ([f (future (λ ()
|
||||
(fsemaphore-post 33)))])
|
||||
(sleep 0.5)
|
||||
(check-exn exn:fail? (λ () (touch f))))
|
||||
|
||||
(let ([f (future (λ ()
|
||||
(fsemaphore-count 33)))])
|
||||
(sleep 0.5)
|
||||
(check-exn exn:fail? (λ () (touch f))))
|
||||
|
||||
(let ([f (future (λ ()
|
||||
(fsemaphore-wait 33)))])
|
||||
(sleep 0.5)
|
||||
(check-exn exn:fail? (λ () (touch f))))
|
||||
|
||||
(let ([f (future (λ ()
|
||||
(fsemaphore-try-wait? 33)))])
|
||||
(sleep 0.5)
|
||||
(check-exn exn:fail? (λ () (touch f))))
|
||||
|
||||
;try-wait
|
||||
(let ([m1 (make-fsemaphore 20)]
|
||||
[m2 (make-fsemaphore 0)])
|
||||
(check-equal? #t (fsemaphore-try-wait? m1))
|
||||
(check-equal? #f (fsemaphore-try-wait? m2)))
|
||||
|
||||
(let* ([m1 (make-fsemaphore 20)]
|
||||
[m2 (make-fsemaphore 0)]
|
||||
[f1 (future (λ ()
|
||||
(fsemaphore-try-wait? m2)))]
|
||||
[f2 (future (λ ()
|
||||
(fsemaphore-try-wait? m1)))])
|
||||
(sleep 0.5)
|
||||
(check-equal? #f (touch f1))
|
||||
(check-equal? #t (touch f2)))
|
||||
|
||||
|
@ -265,18 +313,18 @@ We should also test deep continuations.
|
|||
(touch f)
|
||||
(check-equal? 0 (fsemaphore-count m)))
|
||||
|
||||
;The f1 future should never terminate
|
||||
(printf "test n-4~n")
|
||||
(let* ([m (make-fsemaphore 0)]
|
||||
[dummy 5]
|
||||
[f1 (future (λ () (fsemaphore-wait m) (set! dummy 42)))]
|
||||
[f2 (future (λ () 88))])
|
||||
(check-equal? 88 (touch f2))
|
||||
(sleep 3)
|
||||
(sleep 1)
|
||||
(check-equal? 0 (fsemaphore-count m))
|
||||
(check-equal? 5 dummy))
|
||||
(check-equal? 5 dummy)
|
||||
(fsemaphore-post m)
|
||||
(touch f1)
|
||||
(check-equal? 42 dummy))
|
||||
|
||||
(printf "test n-3~n")
|
||||
(let* ([m (make-fsemaphore 0)]
|
||||
[dummy 5]
|
||||
[f1 (future (λ ()
|
||||
|
@ -286,11 +334,10 @@ We should also test deep continuations.
|
|||
[f2 (future (λ ()
|
||||
(fsemaphore-post m)
|
||||
#t))])
|
||||
(sleep 2)
|
||||
(sleep 1)
|
||||
(check-equal? 42 (touch f1))
|
||||
(check-equal? 0 (fsemaphore-count m)))
|
||||
|
||||
(printf "test n-2~n")
|
||||
(let* ([m1 (make-fsemaphore 0)]
|
||||
[m2 (make-fsemaphore 0)]
|
||||
[dummy 8]
|
||||
|
@ -306,7 +353,6 @@ We should also test deep continuations.
|
|||
dummy))])
|
||||
(check-equal? 11 (touch f2)))
|
||||
|
||||
(printf "test n-1~n")
|
||||
(let* ([m (make-fsemaphore 0)]
|
||||
[f1 (future (λ ()
|
||||
(sleep 1)
|
||||
|
@ -320,7 +366,6 @@ We should also test deep continuations.
|
|||
;of a touch. Meanwhile, another future is allocating (requiring
|
||||
;the help of the runtime thread which is also "blocked" waiting
|
||||
;for the semaphore to become ready.
|
||||
(printf "test n~n")
|
||||
(let* ([m (make-fsemaphore 0)]
|
||||
[f1 (future (λ ()
|
||||
(sleep 1) ;Currently a blocking RT call
|
||||
|
@ -334,10 +379,37 @@ We should also test deep continuations.
|
|||
(loop (sub1 index) (cons index l))]))])
|
||||
(fsemaphore-post m)
|
||||
(car retval))))])
|
||||
(sleep 3)
|
||||
(sleep 1)
|
||||
(touch f1)
|
||||
(check-equal? 1 (touch f2)))
|
||||
|
||||
(let* ([m (make-fsemaphore 0)]
|
||||
[f1 (future (λ ()
|
||||
(fsemaphore-wait m)
|
||||
42))]
|
||||
[f2 (future (λ ()
|
||||
(fsemaphore-wait m)
|
||||
99))])
|
||||
;sleep to ensure that both futures will queue up waiting for the fsema
|
||||
(sleep 1)
|
||||
(fsemaphore-post m)
|
||||
(fsemaphore-post m)
|
||||
(check-equal? 42 (touch f1))
|
||||
(check-equal? 99 (touch f2)))
|
||||
|
||||
(let* ([m (make-fsemaphore 0)]
|
||||
[fs (for/list ([i (in-range 0 19)])
|
||||
(future (λ ()
|
||||
(fsemaphore-wait m)
|
||||
i)))])
|
||||
(sleep 1)
|
||||
(for ([i (in-range 0 19)])
|
||||
(fsemaphore-post m))
|
||||
(check-equal? 171 (foldl (λ (f acc)
|
||||
(+ (touch f) acc))
|
||||
0
|
||||
fs)))
|
||||
|
||||
;; Make sure that `future' doesn't mishandle functions
|
||||
;; that aren't be JITted:
|
||||
(check-equal?
|
||||
|
|
|
@ -36,6 +36,7 @@ Scheme_Object *scheme_fsemaphore_p(int argc, Scheme_Object *argv[])
|
|||
return scheme_false;
|
||||
}
|
||||
|
||||
|
||||
#ifdef MZ_PRECISE_GC
|
||||
static void register_traversers(void);
|
||||
#endif
|
||||
|
@ -54,6 +55,11 @@ typedef struct future_t {
|
|||
int no_retval;
|
||||
} future_t;
|
||||
|
||||
typedef struct fsemaphore_t {
|
||||
Scheme_Object so;
|
||||
int ready;
|
||||
} fsemaphore_t;
|
||||
|
||||
Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
|
||||
{
|
||||
future_t *ft;
|
||||
|
@ -147,6 +153,70 @@ Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[])
|
|||
return (ft ? (Scheme_Object *)ft : scheme_false);
|
||||
}
|
||||
|
||||
Scheme_Object *scheme_make_fsemaphore(int argc, Scheme_Object *argv[])
|
||||
{
|
||||
fsemaphore_t *fsema;
|
||||
|
||||
if (argc != 1 || !SCHEME_INTP(argv[0]))
|
||||
scheme_wrong_type("make-fsemaphore", "exact integer", 0, argc, &(argv[0]));
|
||||
|
||||
fsema = MALLOC_ONE_TAGGED(fsemaphore_t);
|
||||
fsema->so.type = scheme_fsemaphore_type;
|
||||
fsema->ready = SCHEME_INT_VAL(argv[0]);
|
||||
|
||||
return (Scheme_Object*)fsema;
|
||||
}
|
||||
|
||||
Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object *argv[])
|
||||
{
|
||||
fsemaphore_t *fsema;
|
||||
if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
|
||||
scheme_wrong_type("fsemaphore-post", "fsemaphore", 0, argc, argv);
|
||||
|
||||
fsema = (fsemaphore_t*)argv[0];
|
||||
fsema->ready++;
|
||||
|
||||
return scheme_void;
|
||||
}
|
||||
|
||||
Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object *argv[])
|
||||
{
|
||||
fsemaphore_t *fsema;
|
||||
if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
|
||||
scheme_wrong_type("fsemaphore-wait", "fsemaphore", 0, argc, argv);
|
||||
|
||||
fsema = (fsemaphore_t*)argv[0];
|
||||
/* If 0, raise an error */
|
||||
if (!fsema->ready)
|
||||
scheme_signal_error("fsemaphore-wait: attempted to wait on a semaphore with a 0 count");
|
||||
|
||||
fsema->ready--;
|
||||
return scheme_void;
|
||||
}
|
||||
|
||||
Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object *argv[])
|
||||
{
|
||||
fsemaphore_t *fsema;
|
||||
if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
|
||||
scheme_wrong_type("fsemaphore-try-wait?", "fsemaphore", 0, argc, argv);
|
||||
|
||||
fsema = (fsemaphore_t*)argv[0];
|
||||
if (fsema->ready)
|
||||
return scheme_true;
|
||||
|
||||
return scheme_false;
|
||||
}
|
||||
|
||||
Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object *argv[])
|
||||
{
|
||||
fsemaphore_t *fsema;
|
||||
if (argc != 1 || !SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
|
||||
scheme_wrong_type("fsemaphore-count", "fsemaphore", 0, argc, argv);
|
||||
|
||||
fsema = (fsemaphore_t*)argv[0];
|
||||
return scheme_make_integer(fsema->ready);
|
||||
}
|
||||
|
||||
# define FUTURE_PRIM_W_ARITY(name, func, a1, a2, env) GLOBAL_PRIM_W_ARITY(name, func, a1, a2, env)
|
||||
|
||||
void scheme_init_futures(Scheme_Env *newenv)
|
||||
|
@ -156,6 +226,12 @@ void scheme_init_futures(Scheme_Env *newenv)
|
|||
FUTURE_PRIM_W_ARITY("processor-count", processor_count, 0, 0, newenv);
|
||||
FUTURE_PRIM_W_ARITY("current-future", scheme_current_future, 0, 0, newenv);
|
||||
FUTURE_PRIM_W_ARITY("touch", touch, 1, 1, newenv);
|
||||
FUTURE_PRIM_W_ARITY("make-fsemaphore", scheme_make_fsemaphore, 1, 1, newenv);
|
||||
FUTURE_PRIM_W_ARITY("fsemaphore?", scheme_fsemaphore_p, 1, 1, newenv);
|
||||
FUTURE_PRIM_W_ARITY("fsemaphore-post", scheme_fsemaphore_post, 1, 1, newenv);
|
||||
FUTURE_PRIM_W_ARITY("fsemaphore-wait", scheme_fsemaphore_wait, 1, 1, newenv);
|
||||
FUTURE_PRIM_W_ARITY("fsemaphore-try-wait?", scheme_fsemaphore_try_wait, 1, 1, newenv);
|
||||
FUTURE_PRIM_W_ARITY("fsemaphore-count", scheme_fsemaphore_count, 1, 1, newenv);
|
||||
|
||||
scheme_finish_primitive_module(newenv);
|
||||
scheme_protect_primitive_provide(newenv, NULL);
|
||||
|
@ -226,6 +302,11 @@ static void future_do_runtimecall(struct Scheme_Future_Thread_State *fts,
|
|||
void *func,
|
||||
int is_atomic);
|
||||
static int capture_future_continuation(future_t *ft, void **storage);
|
||||
static void future_raise_wrong_type_exn(const char *who,
|
||||
const char *expected_type,
|
||||
int what,
|
||||
int argc,
|
||||
Scheme_Object **argv);
|
||||
|
||||
#define THREAD_POOL_SIZE 16
|
||||
#define INITIAL_C_STACK_SIZE 500000
|
||||
|
@ -766,13 +847,28 @@ void fsemaphore_finalize(void *p, void *data)
|
|||
}
|
||||
|
||||
Scheme_Object *scheme_make_fsemaphore_inl(int argc, Scheme_Object *ready)
|
||||
/* Called in runtime thread */
|
||||
{
|
||||
fsemaphore_t *sema;
|
||||
printf("scheme_make_fsemaphore_inl\n");
|
||||
intptr_t v;
|
||||
|
||||
/* Input validation */
|
||||
if (argc != 1 || !SCHEME_INTP(ready))
|
||||
scheme_wrong_type("future->make-fsemaphore", "exact integer", 0, argc, &ready);
|
||||
if (argc == 1) {
|
||||
if (!SCHEME_INTP(ready)) {
|
||||
if (!SCHEME_BIGNUMP(ready) || !SCHEME_BIGPOS(ready))
|
||||
scheme_wrong_type("make-fsemaphore", "non-negative exact integer", 0, argc, &ready);
|
||||
}
|
||||
|
||||
if (!scheme_get_int_val(ready, &v)) {
|
||||
scheme_raise_exn(MZEXN_FAIL,
|
||||
"make-fsemaphore: starting value %s is too large",
|
||||
scheme_make_provided_string(ready, 0, NULL));
|
||||
} else if (v < 0) {
|
||||
scheme_wrong_type("make-fsemaphore", "non-negative exact integer", 0, argc, &ready);
|
||||
}
|
||||
} else {
|
||||
scheme_wrong_type("make-fsemaphore", "non-negative exact integer", 0, argc, &ready);
|
||||
}
|
||||
|
||||
sema = MALLOC_ONE_TAGGED(fsemaphore_t);
|
||||
sema->so.type = scheme_fsemaphore_type;
|
||||
|
@ -792,7 +888,6 @@ Scheme_Object *make_fsemaphore(int argc, Scheme_Object **argv)
|
|||
Scheme_Object *arg;
|
||||
Scheme_Object *semaObj;
|
||||
|
||||
printf("make_fsemaphore\n");
|
||||
arg = argv[0];
|
||||
semaObj = scheme_make_fsemaphore_inl(argc, arg);
|
||||
return semaObj;
|
||||
|
@ -802,63 +897,92 @@ Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv)
|
|||
XFORM_SKIP_PROC
|
||||
{
|
||||
fsemaphore_t *sema;
|
||||
sema = (fsemaphore_t*)argv[0];
|
||||
|
||||
if (!SAME_TYPE(SCHEME_TYPE(sema), scheme_fsemaphore_type))
|
||||
scheme_wrong_type("fsemaphore-count", "fsemaphore", 0, argc, argv);
|
||||
|
||||
printf("scheme_fsemaphore_count\n");
|
||||
if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
|
||||
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
|
||||
if (!fts) {
|
||||
scheme_wrong_type("fsemaphore-count", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
/* On a future thread -- ask the runtime to raise an exception for us */
|
||||
future_raise_wrong_type_exn("fsemaphore-count", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
sema = (fsemaphore_t*)argv[0];
|
||||
return scheme_make_integer(sema->ready);
|
||||
}
|
||||
|
||||
static void requeue_future(future_t *future, Scheme_Future_State *fs)
|
||||
{
|
||||
mzrt_mutex_lock(fs->future_mutex);
|
||||
future->status = PENDING;
|
||||
enqueue_future(fs, future);
|
||||
/* Signal that a future is now pending */
|
||||
mzrt_sema_post(fs->future_pending_sema);
|
||||
mzrt_mutex_unlock(fs->future_mutex);
|
||||
}
|
||||
|
||||
static int try_resume_future_from_fsema_wait(fsemaphore_t *sema, Scheme_Future_State *fs)
|
||||
XFORM_SKIP_PROC
|
||||
{
|
||||
future_t *ft;
|
||||
if (!sema->queue_front) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ft = sema->queue_front;
|
||||
sema->queue_front = ft->next_in_fsema_queue;
|
||||
ft->next_in_fsema_queue = NULL;
|
||||
if (!sema->queue_front) {
|
||||
sema->queue_end = NULL;
|
||||
} else {
|
||||
sema->queue_front->prev_in_fsema_queue = NULL;
|
||||
}
|
||||
|
||||
sema->ready--;
|
||||
|
||||
/* Place the waiting future back on the run queue */
|
||||
requeue_future(ft, fs);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object **argv)
|
||||
XFORM_SKIP_PROC
|
||||
{
|
||||
fsemaphore_t *sema;
|
||||
Scheme_Future_State *fs;
|
||||
printf("scheme_fsemaphore_post\n");
|
||||
fflush(stdout);
|
||||
int old_count;
|
||||
|
||||
if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
|
||||
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
|
||||
if (!fts) {
|
||||
scheme_wrong_type("fsemaphore-post", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
/* On a future thread -- ask the runtime to raise an exception for us */
|
||||
future_raise_wrong_type_exn("fsemaphore-post", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
sema = (fsemaphore_t*)argv[0];
|
||||
fs = scheme_future_state;
|
||||
mzrt_mutex_lock(sema->mut);
|
||||
|
||||
/* Check for any futures waiting on the semaphore */
|
||||
if (sema->queue_front) {
|
||||
future_t *ft = sema->queue_front;
|
||||
sema->queue_front = ft->next_in_fsema_queue;
|
||||
ft->next_in_fsema_queue = NULL;
|
||||
if (!sema->queue_front) {
|
||||
sema->queue_end = NULL;
|
||||
} else {
|
||||
sema->queue_front->prev_in_fsema_queue = NULL;
|
||||
}
|
||||
|
||||
/* Place the waiting future back on the run queue */
|
||||
mzrt_mutex_lock(fs->future_mutex);
|
||||
ft->status = PENDING;
|
||||
enqueue_future(fs, ft);
|
||||
/* Signal that a future is now pending */
|
||||
mzrt_sema_post(fs->future_pending_sema);
|
||||
mzrt_mutex_unlock(fs->future_mutex);
|
||||
} else {
|
||||
sema->ready++;
|
||||
|
||||
old_count = sema->ready;
|
||||
sema->ready++;
|
||||
if (!old_count) {
|
||||
try_resume_future_from_fsema_wait(sema, fs);
|
||||
}
|
||||
|
||||
mzrt_mutex_unlock(sema->mut);
|
||||
return scheme_void;
|
||||
}
|
||||
|
||||
static void enqueue_future_for_fsema(Scheme_Object *objFt, Scheme_Object *objSema)
|
||||
static void enqueue_future_for_fsema(future_t *ft, fsemaphore_t *sema)
|
||||
/* This function assumed sema->mut has already been acquired! */
|
||||
{
|
||||
future_t *ft;
|
||||
fsemaphore_t *sema;
|
||||
future_t *front;
|
||||
|
||||
ft = (future_t*)objFt;
|
||||
sema = (fsemaphore_t*)objSema;
|
||||
|
||||
/* Enqueue this future in the semaphore's queue */
|
||||
front = sema->queue_front;
|
||||
if (!front) {
|
||||
|
@ -872,7 +996,6 @@ static void enqueue_future_for_fsema(Scheme_Object *objFt, Scheme_Object *objSem
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
|
||||
XFORM_SKIP_PROC
|
||||
{
|
||||
|
@ -881,104 +1004,97 @@ Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
|
|||
Scheme_Future_State *fs = scheme_future_state;
|
||||
void *storage[3];
|
||||
|
||||
printf("scheme_fsemaphore_wait ");
|
||||
fflush(stdout);
|
||||
if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
|
||||
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
|
||||
if (!fts) {
|
||||
scheme_wrong_type("fsemaphore-wait", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
/* On a future thread -- ask the runtime to raise an exception for us */
|
||||
future_raise_wrong_type_exn("fsemaphore-wait", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
sema = (fsemaphore_t*)argv[0];
|
||||
mzrt_mutex_lock(sema->mut);
|
||||
if (!sema->ready) {
|
||||
if (!fts) {
|
||||
printf("on runtime thread -- sema not ready.\n");
|
||||
fflush(stdout);
|
||||
/* Then we are on the runtime thread, block and wait for the
|
||||
fsema to be ready while cooperating with the scheduler */
|
||||
mzrt_mutex_unlock(sema->mut);
|
||||
scheme_block_until(fsemaphore_ready, NULL, (Scheme_Object*)sema, 0);
|
||||
mzrt_mutex_lock(sema->mut);
|
||||
} else {
|
||||
printf("on future thread -- sema not ready.\n");
|
||||
fflush(stdout);
|
||||
/* On a future thread, suspend the future (to be
|
||||
resumed whenever the fsema becomes ready */
|
||||
future_t *future = fts->thread->current_ft;
|
||||
if (!future) {
|
||||
/* Exception? */
|
||||
printf("No current future for fsemaphore-wait!\n");
|
||||
return scheme_false;
|
||||
/* Should never be here */
|
||||
scheme_log_abort("fsemaphore-wait: future was NULL for future thread.");
|
||||
abort();
|
||||
}
|
||||
|
||||
/* Setup for LWC capture */
|
||||
mzrt_mutex_unlock(sema->mut);
|
||||
scheme_fill_lwc_end();
|
||||
future->lwc = scheme_current_lwc;
|
||||
future->fts = fts;
|
||||
future->arg_p = scheme_current_thread;
|
||||
future->status = WAITING_FOR_FSEMA;
|
||||
|
||||
/* Try to capture it locally (on this thread) */
|
||||
if (GC_gen0_alloc_page_ptr
|
||||
&& capture_future_continuation(future, storage)) {
|
||||
/* This will set fts->thread->current_ft to NULL */
|
||||
} else {
|
||||
/* Can't capture the continuation locally, so ask the runtime
|
||||
thread to do it (is this the right solution here?) */
|
||||
printf("fsemaphore-wait couldn't capture continuation locally.\n");
|
||||
thread to do it */
|
||||
future->next_waiting_lwc = fs->future_waiting_lwc;
|
||||
fs->future_waiting_lwc = future;
|
||||
future->want_lw = 1;
|
||||
future->status = WAITING_FOR_FSEMA;
|
||||
}
|
||||
|
||||
scheme_signal_received_at(fs->signal_handle);
|
||||
if (fts->thread->current_ft) {
|
||||
/* Wait for the signal that LW continuation was captured
|
||||
by the runtime thread */
|
||||
/* Will get here if relying on runtime thread to capture for us --
|
||||
wait for the signal that LW continuation was captured
|
||||
by the runtime thread. */
|
||||
future->can_continue_sema = fts->worker_can_continue_sema;
|
||||
end_gc_not_ok(fts, fs, MZ_RUNSTACK);
|
||||
mzrt_mutex_unlock(fs->future_mutex);
|
||||
|
||||
mzrt_sema_wait(fts->worker_can_continue_sema);
|
||||
/* Add the future to the semaphore's wait queue */
|
||||
//enqueue_future_for_fsema((Scheme_Object*)future, (Scheme_Object*)sema);
|
||||
//mzrt_mutex_unlock(sema->mut);
|
||||
//mzrt_mutex_lock(fs->future_mutex);
|
||||
//start_gc_not_ok(fs);
|
||||
}
|
||||
mzrt_sema_wait(fts->worker_can_continue_sema);
|
||||
|
||||
/* Check again whether the fsema has become ready -- if so,
|
||||
we can resume immediately */
|
||||
mzrt_mutex_lock(sema->mut);
|
||||
if (!sema->ready) {
|
||||
printf("sema still not ready after cont. capture, suspending.\n");
|
||||
enqueue_future_for_fsema((Scheme_Object*)future, (Scheme_Object*)sema);
|
||||
mzrt_mutex_unlock(sema->mut);
|
||||
mzrt_mutex_lock(fs->future_mutex);
|
||||
start_gc_not_ok(fs);
|
||||
|
||||
start_gc_not_ok(fs);
|
||||
mzrt_mutex_unlock(fs->future_mutex);
|
||||
|
||||
/* Fetch the future instance again, in case the GC has
|
||||
moved the pointer or the future has been requeued. */
|
||||
future = fts->thread->current_ft;
|
||||
if (!future) {
|
||||
/* Future continuation was requeued */
|
||||
scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
|
||||
} else if (future->no_retval) {
|
||||
future->no_retval = 0;
|
||||
scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
|
||||
}
|
||||
|
||||
return scheme_void;
|
||||
} else {
|
||||
|
||||
}
|
||||
|
||||
if (fts->thread->current_ft) {
|
||||
/* Should never get here, cont. capture should remove it */
|
||||
scheme_log_abort("fsemaphore-wait: current_ft was not NULL after continuation capture.");
|
||||
abort();
|
||||
}
|
||||
|
||||
/* Check again to see whether the sema has become ready */
|
||||
mzrt_mutex_lock(sema->mut);
|
||||
if (sema->ready) {
|
||||
/* Then resume the future immediately (requeue) */
|
||||
sema->ready--;
|
||||
requeue_future(future, fs);
|
||||
} else {
|
||||
/* Add the future to the sema's wait queue */
|
||||
enqueue_future_for_fsema(future, sema);
|
||||
}
|
||||
|
||||
mzrt_mutex_unlock(sema->mut);
|
||||
|
||||
/* Jump back to the worker thread future loop (this thread
|
||||
is now free */
|
||||
scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/* Semaphore is ready -- decrement and continue */
|
||||
if (!fts) {
|
||||
printf("on runtime thread -- sema was ready\n");
|
||||
} else {
|
||||
printf("on future thread -- sema was ready\n");
|
||||
}
|
||||
fflush(stdout);
|
||||
|
||||
sema->ready--;
|
||||
mzrt_mutex_unlock(sema->mut);
|
||||
return scheme_void;
|
||||
|
@ -991,7 +1107,16 @@ Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object **argv)
|
|||
future_t *cf;
|
||||
Scheme_Object *ret;
|
||||
|
||||
printf("scheme_fsemaphore_try_wait\n");
|
||||
if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type)) {
|
||||
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
|
||||
if (!fts) {
|
||||
scheme_wrong_type("fsemaphore-try-wait?", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
/* On a future thread -- ask the runtime to raise an exception for us */
|
||||
future_raise_wrong_type_exn("fsemaphore-try-wait?", "fsemaphore", 0, argc, argv);
|
||||
}
|
||||
|
||||
sema = (fsemaphore_t*)argv[0];
|
||||
mzrt_mutex_lock(sema->mut);
|
||||
if (!sema->ready) {
|
||||
|
@ -1005,7 +1130,7 @@ Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object **argv)
|
|||
}
|
||||
|
||||
static int fsemaphore_ready(Scheme_Object *obj)
|
||||
/* Called in runtime thread by Scheme scheduler */
|
||||
/* Called in runtime thread by Scheme scheduler */
|
||||
{
|
||||
int ret = 0;
|
||||
fsemaphore_t *fsema = (fsemaphore_t*)obj;
|
||||
|
@ -1372,7 +1497,7 @@ void *worker_thread_future_loop(void *arg)
|
|||
From this thread's perspective, this call will never return
|
||||
until all the work to be done in the future has been completed,
|
||||
including runtime calls.
|
||||
If jitcode asks the runrtime thread to do work, then
|
||||
If jitcode asks the runtime thread to do work, then
|
||||
a GC can occur. */
|
||||
LOG("Running JIT code at %p...\n", ft->code);
|
||||
|
||||
|
@ -1667,6 +1792,32 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
|
|||
/**********************************************************************/
|
||||
/* Functions for primitive invocation */
|
||||
/**********************************************************************/
|
||||
static void future_raise_wrong_type_exn(const char *who, const char *expected_type, int what, int argc, Scheme_Object **argv)
|
||||
XFORM_SKIP_PROC
|
||||
/* Called in future thread */
|
||||
{
|
||||
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
|
||||
future_t *future = fts->thread->current_ft;
|
||||
|
||||
future->prim_protocol = SIG_WRONG_TYPE_EXN;
|
||||
future->arg_str0 = who;
|
||||
future->arg_str1 = expected_type;
|
||||
future->arg_i2 = what;
|
||||
future->arg_i3 = argc;
|
||||
future->arg_S4 = argv;
|
||||
|
||||
future->time_of_request = scheme_get_inexact_milliseconds();
|
||||
future->source_of_request = who;
|
||||
//future->src_type = ??
|
||||
future_do_runtimecall(fts, (void*)scheme_wrong_type, 0);
|
||||
|
||||
future->arg_str0 = NULL;
|
||||
future->arg_str1 = NULL;
|
||||
future->arg_i2 = 0;
|
||||
future->arg_i3 = 0;
|
||||
future->arg_S4 = NULL;
|
||||
}
|
||||
|
||||
void scheme_rtcall_void_void_3args(const char *who, int src_type, prim_void_void_3args_t f)
|
||||
XFORM_SKIP_PROC
|
||||
/* Called in future thread */
|
||||
|
@ -1979,6 +2130,14 @@ static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future)
|
|||
|
||||
break;
|
||||
}
|
||||
case SIG_WRONG_TYPE_EXN:
|
||||
{
|
||||
scheme_wrong_type(future->arg_str0,
|
||||
future->arg_str1,
|
||||
future->arg_i2,
|
||||
future->arg_i3,
|
||||
future->arg_S4);
|
||||
}
|
||||
# define JIT_TS_LOCALIZE(t, f) GC_CAN_IGNORE t f = future->f
|
||||
# include "jit_ts_runtime_glue.c"
|
||||
default:
|
||||
|
@ -2109,11 +2268,11 @@ static void register_traversers(void)
|
|||
{
|
||||
#ifdef MZ_USE_FUTURES
|
||||
GC_REG_TRAV(scheme_future_type, future);
|
||||
GC_REG_TRAV(scheme_fsemaphore_type, fsemaphore);
|
||||
#else
|
||||
GC_REG_TRAV(scheme_future_type, sequential_future);
|
||||
GC_REG_TRAV(scheme_fsemaphore_type, sequential_fsemaphore);
|
||||
#endif
|
||||
|
||||
GC_REG_TRAV(scheme_fsemaphore_type, fsemaphore);
|
||||
}
|
||||
|
||||
END_XFORM_SKIP;
|
||||
|
|
|
@ -90,6 +90,11 @@ typedef struct future_t {
|
|||
Scheme_Object **arg_S2;
|
||||
int arg_i2;
|
||||
|
||||
const char *arg_str0;
|
||||
const char *arg_str1;
|
||||
int arg_i3;
|
||||
Scheme_Object **arg_S4;
|
||||
|
||||
Scheme_Thread *arg_p;
|
||||
struct Scheme_Current_LWC *lwc;
|
||||
struct Scheme_Future_Thread_State *fts;
|
||||
|
@ -138,6 +143,7 @@ typedef struct fsemaphore_t {
|
|||
#define SIG_ALLOC_MARK_SEGMENT 3
|
||||
#define SIG_ALLOC_VALUES 4
|
||||
#define SIG_MAKE_FSEMAPHORE 5
|
||||
#define SIG_WRONG_TYPE_EXN 200
|
||||
|
||||
# include "jit_ts_protos.h"
|
||||
|
||||
|
@ -181,6 +187,8 @@ extern Scheme_Object *touch(int argc, Scheme_Object **argv);
|
|||
extern Scheme_Object *future_touch(int futureid);
|
||||
#endif
|
||||
|
||||
#else
|
||||
Scheme_Object *scheme_make_fsemaphore(int argc, Scheme_Object *argv[]);
|
||||
#endif /* MZ_USE_FUTURES */
|
||||
|
||||
/* always defined: */
|
||||
|
|
|
@ -2952,11 +2952,9 @@ static Scheme_Object *ts_make_fsemaphore(int argc, Scheme_Object *ready)
|
|||
XFORM_SKIP_PROC
|
||||
{
|
||||
if (scheme_use_rtcall) {
|
||||
printf("ts_make_fsemaphore on main thread\n");
|
||||
return scheme_rtcall_make_fsemaphore("[make_fsemaphore]", FSRC_OTHER, argc, ready);
|
||||
}
|
||||
|
||||
printf("ts_make_fsemaphore on worker thread\n");
|
||||
return scheme_make_fsemaphore_inl(argc, ready);
|
||||
}
|
||||
|
||||
|
@ -2993,7 +2991,7 @@ Scheme_Object *scheme_ts_scheme_force_value_same_mark(Scheme_Object *v)
|
|||
# define mz_direct_only(p) p
|
||||
# define ts_on_demand on_demand
|
||||
# define ts_prepare_retry_alloc prepare_retry_alloc
|
||||
# define ts_make_fsemaphore scheme_make_fsemaphore_inl
|
||||
# define ts_make_fsemaphore scheme_make_fsemaphore
|
||||
# define mz_generate_direct_prim(direct_only, first_arg, reg, prim_indirect) \
|
||||
(mz_direct_only(direct_only), first_arg, mz_finishr_direct_prim(reg, prim_indirect))
|
||||
#endif
|
||||
|
@ -7503,49 +7501,12 @@ static int generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
|
|||
|
||||
return 1;
|
||||
} else if (IS_NAMED_PRIM(rator, "future?")) {
|
||||
printf("Inlining future?\n");
|
||||
generate_inlined_type_test(jitter, app, scheme_future_type, scheme_future_type, 1, for_branch, branch_short, need_sync);
|
||||
printf("Done inlining\n");
|
||||
return 1;
|
||||
} else if (IS_NAMED_PRIM(rator, "fsemaphore?")) {
|
||||
GC_CAN_IGNORE jit_insn *refcont, *ref1, *ref2, *ref3;
|
||||
|
||||
printf("Inlining fsemaphore?\n");
|
||||
printf("Starting IP is: %p\n", jit_get_ip().ptr);
|
||||
|
||||
generate_inlined_type_test(jitter, app, scheme_fsemaphore_type, scheme_fsemaphore_type, 1, for_branch, branch_short, need_sync);
|
||||
return 1;
|
||||
|
||||
//My custom implementation
|
||||
mz_runstack_skipped(jitter, 1);
|
||||
generate_non_tail(app->rand, jitter, 0, 1, 0);
|
||||
CHECK_LIMIT();
|
||||
mz_runstack_unskipped(jitter, 1);
|
||||
|
||||
ref1 = jit_bmsi_ul(jit_forward(), JIT_R0, 0x1);
|
||||
jit_ldxi_s(JIT_R1, JIT_R0, &((Scheme_Object*)0x0)->type); //Move type into JIT_R1
|
||||
|
||||
//REMOVE: These are only here to show register mappings for x86 in disas
|
||||
jit_movi_i(JIT_R2, 0x2);
|
||||
jit_movi_i(JIT_V0, 0x3);
|
||||
jit_movi_i(JIT_V1, 0x4);
|
||||
jit_movi_i(JIT_V2, 0x5);
|
||||
|
||||
ref2 = jit_bnei_l(jit_forward(), JIT_R1, scheme_fsemaphore_type); //Do the type test
|
||||
//Jump only if not equal (optimization)
|
||||
jit_movi_p(JIT_R0, scheme_true);
|
||||
refcont = jit_jmpi(jit_forward());
|
||||
|
||||
//ref2 is the 'false' branch
|
||||
mz_patch_branch(ref1);
|
||||
mz_patch_branch(ref2);
|
||||
jit_movi_p(JIT_R0, scheme_false);
|
||||
mz_patch_ucbranch(refcont);
|
||||
|
||||
//Logging
|
||||
printf("Ending IP is: %p\n", jit_get_ip().ptr);
|
||||
printf("Done inlining\n");
|
||||
return 1;
|
||||
} else if (IS_NAMED_PRIM(rator, "fsemaphore-count")) {
|
||||
mz_runstack_skipped(jitter, 1);
|
||||
generate_non_tail(app->rand, jitter, 0, 1, 0);
|
||||
|
@ -7572,24 +7533,7 @@ static int generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
|
|||
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
|
||||
|
||||
return 1;
|
||||
|
||||
|
||||
/*
|
||||
mz_pushr_p(JIT_R0);
|
||||
mz_rs_sync();
|
||||
|
||||
mz_prepare(2);
|
||||
jit_pusharg_p(JIT_V0);
|
||||
jit_movi_i(JIT_V1, 1);
|
||||
jit_pusharg_i(JIT_V1);
|
||||
mz_finish(scheme_fsemaphore_count);
|
||||
mz_popr_p(JIT_R0);
|
||||
jit_retval(JIT_R0);
|
||||
mz_rs_sync();
|
||||
|
||||
return 1; */
|
||||
} else if (IS_NAMED_PRIM(rator, "make-fsemaphore")) {
|
||||
|
||||
|
||||
mz_runstack_skipped(jitter, 1);
|
||||
generate_non_tail(app->rand, jitter, 0, 1, 0);
|
||||
|
@ -7624,7 +7568,9 @@ static int generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
|
|||
jit_pusharg_p(JIT_RUNSTACK);
|
||||
jit_movi_i(JIT_R0, 1);
|
||||
jit_pusharg_i(JIT_R0);
|
||||
mz_finish(scheme_fsemaphore_post);
|
||||
|
||||
GC_CAN_IGNORE jit_insn *refr;
|
||||
(void)mz_finish_lwe(scheme_fsemaphore_post, refr);
|
||||
mz_popr_x();
|
||||
jit_retval(JIT_R0);
|
||||
|
||||
|
|
|
@ -5703,6 +5703,7 @@ static int future_MARK(void *p, struct NewGC *gc) {
|
|||
gcMARK2(f->arg_s2, gc);
|
||||
gcMARK2(f->arg_S2, gc);
|
||||
gcMARK2(f->arg_p, gc);
|
||||
gcMARK2(f->arg_S4, gc);
|
||||
gcMARK2(f->retval_s, gc);
|
||||
gcMARK2(f->retval, gc);
|
||||
gcMARK2(f->multiple_array, gc);
|
||||
|
@ -5733,6 +5734,7 @@ static int future_FIXUP(void *p, struct NewGC *gc) {
|
|||
gcFIXUP2(f->arg_s2, gc);
|
||||
gcFIXUP2(f->arg_S2, gc);
|
||||
gcFIXUP2(f->arg_p, gc);
|
||||
gcFIXUP2(f->arg_S4, gc);
|
||||
gcFIXUP2(f->retval_s, gc);
|
||||
gcFIXUP2(f->retval, gc);
|
||||
gcFIXUP2(f->multiple_array, gc);
|
||||
|
@ -5752,6 +5754,29 @@ static int future_FIXUP(void *p, struct NewGC *gc) {
|
|||
#define future_IS_ATOMIC 0
|
||||
#define future_IS_CONST_SIZE 1
|
||||
|
||||
static int fsemaphore_SIZE(void *p, struct NewGC *gc) {
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
|
||||
}
|
||||
|
||||
static int fsemaphore_MARK(void *p, struct NewGC *gc) {
|
||||
fsemaphore_t *s = (fsemaphore_t*)p;
|
||||
gcMARK2(s->queue_front, gc);
|
||||
gcMARK2(s->queue_end, gc);
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
|
||||
}
|
||||
|
||||
static int fsemaphore_FIXUP(void *p, struct NewGC *gc) {
|
||||
fsemaphore_t *s = (fsemaphore_t*)p;
|
||||
gcFIXUP2(s->queue_front, gc);
|
||||
gcFIXUP2(s->queue_end, gc);
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(future_t));
|
||||
}
|
||||
|
||||
#define fsemaphore_IS_ATOMIC 0
|
||||
#define fsemaphore_IS_CONST_SIZE 1
|
||||
|
||||
#else
|
||||
|
||||
|
@ -5783,34 +5808,27 @@ static int sequential_future_FIXUP(void *p, struct NewGC *gc) {
|
|||
#define sequential_future_IS_ATOMIC 0
|
||||
#define sequential_future_IS_CONST_SIZE 1
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
/* Future semaphore */
|
||||
static int fsemaphore_SIZE(void *p, struct NewGC *gc) {
|
||||
static int sequential_fsemaphore_SIZE(void *p, struct NewGC *gc) {
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
|
||||
}
|
||||
|
||||
static int fsemaphore_MARK(void *p, struct NewGC *gc) {
|
||||
static int sequential_fsemaphore_MARK(void *p, struct NewGC *gc) {
|
||||
fsemaphore_t *s = (fsemaphore_t*)p;
|
||||
gcMARK2(s->queue_front, gc);
|
||||
gcMARK2(s->queue_end, gc);
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
|
||||
}
|
||||
|
||||
static int fsemaphore_FIXUP(void *p, struct NewGC *gc) {
|
||||
static int sequential_fsemaphore_FIXUP(void *p, struct NewGC *gc) {
|
||||
fsemaphore_t *s = (fsemaphore_t*)p;
|
||||
gcFIXUP2(s->queue_front, gc);
|
||||
gcFIXUP2(s->queue_end, gc);
|
||||
return
|
||||
gcBYTES_TO_WORDS(sizeof(future_t));
|
||||
}
|
||||
|
||||
#define fsemaphore_IS_ATOMIC 0
|
||||
#define fsemaphore_IS_CONST_SIZE 1
|
||||
#define sequential_fsemaphore_IS_ATOMIC 0
|
||||
#define sequential_fsemaphore_IS_CONST_SIZE 1
|
||||
|
||||
#endif
|
||||
|
||||
#endif /* FUTURE */
|
||||
|
||||
|
|
|
@ -2344,6 +2344,7 @@ future {
|
|||
gcMARK2(f->arg_s2, gc);
|
||||
gcMARK2(f->arg_S2, gc);
|
||||
gcMARK2(f->arg_p, gc);
|
||||
gcMARK2(f->arg_S4, gc);
|
||||
gcMARK2(f->retval_s, gc);
|
||||
gcMARK2(f->retval, gc);
|
||||
gcMARK2(f->multiple_array, gc);
|
||||
|
@ -2360,6 +2361,15 @@ future {
|
|||
gcBYTES_TO_WORDS(sizeof(future_t));
|
||||
}
|
||||
|
||||
fsemaphore {
|
||||
mark:
|
||||
fsemaphore_t *s = (fsemaphore_t*)p;
|
||||
gcMARK2(s->queue_front, gc);
|
||||
gcMARK2(s->queue_end, gc);
|
||||
size:
|
||||
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
sequential_future {
|
||||
|
@ -2373,17 +2383,16 @@ sequential_future {
|
|||
gcBYTES_TO_WORDS(sizeof(future_t));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
fsemaphore {
|
||||
sequential_fsemaphore {
|
||||
mark:
|
||||
fsemaphore_t *s = (fsemaphore_t*)p;
|
||||
gcMARK2(s->queue_front, gc);
|
||||
gcMARK2(s->queue_end, gc);
|
||||
fsemaphore_t *fsema = (fsemaphore_t*)p;
|
||||
size:
|
||||
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
END future;
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user