fsemaphore cleanup

- abstract over JIT inlining of fsemaphore operations
 - fix problems with non-parallel fsemaphores
 - adjust tests so they don't assume too much concurrency
 - clarify fsemaphore vs. semaphore in the docs
This commit is contained in:
Matthew Flatt 2011-02-18 09:51:04 -07:00
parent 43da90a6bc
commit 6041833ac5
9 changed files with 81 additions and 164 deletions

View File

@ -94,21 +94,28 @@ parallel. See also @guidesecref["effective-futures"] in @|Guide|.
@defproc[(make-fsemaphore [init exact-nonnegative-integer?]) fsemaphore?]{
Creates and returns a new semaphore with the counter initially set to
@racket[init].
Creates and returns a new @deftech{future semaphore} with the
counter initially set to @racket[init].
A future semaphore is similar to a plain @tech{semaphore}, but
future-semaphore operations can be performed safely in parallel (to synchronize
parallel computations). In contrast, operations on plain @tech{semaphores}
are not safe to perform in parallel, and they therefore prevent
a computation from continuing in parallel.
}
@defproc[(fsemaphore? [v any/c]) boolean?]{
Returns @racket[#t] if @racket[v] is an fsemaphore value, @racket[#f]
otherwise.
Returns @racket[#t] if @racket[v] is an @tech{future semaphore}
value, @racket[#f] otherwise.
}
@defproc[(fsemaphore-post [fsema fsemaphore?]) void?]{
Increments the semaphore's internal counter and returns @|void-const|.
Increments the @tech{future semaphore}'s internal counter and
returns @|void-const|.
}

View File

@ -347,6 +347,7 @@ We should also test deep continuations.
(fsemaphore-post m)
#t))])
(sleep 1)
(check-equal? #t (touch f2))
(check-equal? 42 (touch f1))
(check-equal? 0 (fsemaphore-count m)))
@ -360,6 +361,7 @@ We should also test deep continuations.
#t))]
[f2 (future (λ ()
(fsemaphore-post m2)
(touch f1)
(fsemaphore-wait m1)
(set! dummy (add1 dummy))
dummy))])
@ -392,7 +394,7 @@ We should also test deep continuations.
(fsemaphore-post m)
(car retval))))])
(sleep 1)
(touch f1)
(thread (lambda () (touch f1)))
(check-equal? 1 (touch f2)))
(let* ([m (make-fsemaphore 0)]
@ -408,7 +410,7 @@ We should also test deep continuations.
(fsemaphore-post m)
(check-equal? 42 (touch f1))
(check-equal? 99 (touch f2)))
(let* ([m (make-fsemaphore 0)]
[fs (for/list ([i (in-range 0 19)])
(future (λ ()

View File

@ -57,7 +57,7 @@ typedef struct future_t {
typedef struct fsemaphore_t {
Scheme_Object so;
int ready;
Scheme_Object *sema;
} fsemaphore_t;
Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
@ -155,14 +155,16 @@ Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[])
Scheme_Object *scheme_make_fsemaphore(int argc, Scheme_Object *argv[])
{
intptr_t v;
fsemaphore_t *fsema;
if (argc != 1 || !SCHEME_INTP(argv[0]))
scheme_wrong_type("make-fsemaphore", "exact integer", 0, argc, &(argv[0]));
Scheme_Object *sema;
v = scheme_get_semaphore_init("make-fsemaphore", argc, argv);
fsema = MALLOC_ONE_TAGGED(fsemaphore_t);
fsema->so.type = scheme_fsemaphore_type;
fsema->ready = SCHEME_INT_VAL(argv[0]);
sema = scheme_make_sema(v);
fsema->sema = sema;
return (Scheme_Object*)fsema;
}
@ -174,7 +176,7 @@ Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object *argv[])
scheme_wrong_type("fsemaphore-post", "fsemaphore", 0, argc, argv);
fsema = (fsemaphore_t*)argv[0];
fsema->ready++;
scheme_post_sema(fsema->sema);
return scheme_void;
}
@ -186,11 +188,8 @@ Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object *argv[])
scheme_wrong_type("fsemaphore-wait", "fsemaphore", 0, argc, argv);
fsema = (fsemaphore_t*)argv[0];
/* If 0, raise an error */
if (!fsema->ready)
scheme_signal_error("fsemaphore-wait: attempted to wait on a semaphore with a 0 count");
scheme_wait_sema(fsema->sema, 0);
fsema->ready--;
return scheme_void;
}
@ -201,7 +200,7 @@ Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object *argv[])
scheme_wrong_type("fsemaphore-try-wait?", "fsemaphore", 0, argc, argv);
fsema = (fsemaphore_t*)argv[0];
if (fsema->ready)
if (scheme_wait_sema(fsema->sema, 1))
return scheme_true;
return scheme_false;
@ -214,7 +213,7 @@ Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object *argv[])
scheme_wrong_type("fsemaphore-count", "fsemaphore", 0, argc, argv);
fsema = (fsemaphore_t*)argv[0];
return scheme_make_integer(fsema->ready);
return scheme_make_integer(((Scheme_Sema *)fsema->sema)->value);
}
# define FUTURE_PRIM_W_ARITY(name, func, a1, a2, env) GLOBAL_PRIM_W_ARITY(name, func, a1, a2, env)
@ -846,29 +845,13 @@ void fsemaphore_finalize(void *p, void *data)
mzrt_mutex_destroy(sema->mut);
}
Scheme_Object *scheme_make_fsemaphore_inl(int argc, Scheme_Object *ready)
Scheme_Object *scheme_make_fsemaphore_inl(Scheme_Object *ready)
/* Called in runtime thread */
{
fsemaphore_t *sema;
intptr_t v;
/* Input validation */
if (argc == 1) {
if (!SCHEME_INTP(ready)) {
if (!SCHEME_BIGNUMP(ready) || !SCHEME_BIGPOS(ready))
scheme_wrong_type("make-fsemaphore", "non-negative exact integer", 0, argc, &ready);
}
if (!scheme_get_int_val(ready, &v)) {
scheme_raise_exn(MZEXN_FAIL,
"make-fsemaphore: starting value %s is too large",
scheme_make_provided_string(ready, 0, NULL));
} else if (v < 0) {
scheme_wrong_type("make-fsemaphore", "non-negative exact integer", 0, argc, &ready);
}
} else {
scheme_wrong_type("make-fsemaphore", "non-negative exact integer", 0, argc, &ready);
}
v = scheme_get_semaphore_init("make-fsemaphore", 1, &ready);
sema = MALLOC_ONE_TAGGED(fsemaphore_t);
sema->so.type = scheme_fsemaphore_type;
@ -885,12 +868,7 @@ Scheme_Object *scheme_make_fsemaphore_inl(int argc, Scheme_Object *ready)
Scheme_Object *make_fsemaphore(int argc, Scheme_Object **argv)
/* Called in runtime thread (atomic/synchronized) */
{
Scheme_Object *arg;
Scheme_Object *semaObj;
arg = argv[0];
semaObj = scheme_make_fsemaphore_inl(argc, arg);
return semaObj;
return scheme_make_fsemaphore_inl(argv[0]);
}
Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv)
@ -1851,7 +1829,7 @@ void scheme_rtcall_void_void_3args(const char *who, int src_type, prim_void_void
future->arg_S0 = NULL;
}
Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, int argc, Scheme_Object *ready)
Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, Scheme_Object *ready)
XFORM_SKIP_PROC
/* Called in future thread */
{
@ -1860,7 +1838,6 @@ Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, int
future_t *future = fts->thread->current_ft;
future->prim_protocol = SIG_MAKE_FSEMAPHORE;
future->arg_i0 = argc;
future->arg_s1 = ready;
future->time_of_request = scheme_get_inexact_milliseconds();
future->source_of_request = who;
@ -2131,14 +2108,14 @@ static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future)
scheme_new_mark_segment(p_seg);
break;
}
case SIG_MAKE_FSEMAPHORE:
{
Scheme_Object *ret;
ret = scheme_make_fsemaphore_inl(future->arg_i0, future->arg_s1);
future->retval_s = ret;
future->arg_s0 = NULL;
break;
}
case SIG_MAKE_FSEMAPHORE:
{
Scheme_Object *s = future->arg_s1;
future->arg_s0 = NULL;
s = scheme_make_fsemaphore_inl(s);
future->retval_s = s;
break;
}
case SIG_ALLOC_VALUES:
{
prim_allocate_values_t func = (prim_allocate_values_t)future->prim_func;

View File

@ -142,7 +142,7 @@ typedef struct fsemaphore_t {
#define SIG_ALLOC 2
#define SIG_ALLOC_MARK_SEGMENT 3
#define SIG_ALLOC_VALUES 4
#define SIG_MAKE_FSEMAPHORE 5
#define SIG_MAKE_FSEMAPHORE 5
#define SIG_WRONG_TYPE_EXN 200
# include "jit_ts_protos.h"
@ -164,8 +164,7 @@ extern uintptr_t scheme_rtcall_alloc(const char *who, int src_type);
extern void scheme_rtcall_new_mark_segment(Scheme_Thread *p);
extern void scheme_rtcall_allocate_values(const char *who, int src_type, int count, Scheme_Thread *t,
prim_allocate_values_t f);
extern Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type,
int argc, Scheme_Object *ready);
extern Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, Scheme_Object *ready);
#else
#define IS_WORKER_THREAD 0
@ -199,8 +198,7 @@ Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_p(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object *argv[]);
//Scheme_Object *scheme_make_fsemaphore(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_make_fsemaphore_inl(int argc, Scheme_Object *ready);
Scheme_Object *scheme_make_fsemaphore_inl(Scheme_Object *ready);
Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object *argv[]);

View File

@ -2948,14 +2948,14 @@ static void ts_on_demand(void) XFORM_SKIP_PROC
on_demand();
}
static Scheme_Object *ts_make_fsemaphore(int argc, Scheme_Object *ready)
static Scheme_Object *ts_make_fsemaphore(int argc, Scheme_Object **argv)
XFORM_SKIP_PROC
{
if (scheme_use_rtcall) {
return scheme_rtcall_make_fsemaphore("[make_fsemaphore]", FSRC_OTHER, argc, ready);
return scheme_rtcall_make_fsemaphore("[make_fsemaphore]", FSRC_OTHER, argv[0]);
}
return scheme_make_fsemaphore_inl(argc, ready);
return scheme_make_fsemaphore_inl(argv[0]);
}
#ifdef MZ_PRECISE_GC
@ -7504,10 +7504,17 @@ static int generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
generate_inlined_type_test(jitter, app, scheme_future_type, scheme_future_type, 1, for_branch, branch_short, need_sync);
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore?")) {
GC_CAN_IGNORE jit_insn *refcont, *ref1, *ref2, *ref3;
generate_inlined_type_test(jitter, app, scheme_fsemaphore_type, scheme_fsemaphore_type, 1, for_branch, branch_short, need_sync);
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-count")) {
} else if (IS_NAMED_PRIM(rator, "fsemaphore-count")
|| IS_NAMED_PRIM(rator, "make-fsemaphore")
|| IS_NAMED_PRIM(rator, "fsemaphore-post")
|| IS_NAMED_PRIM(rator, "fsemaphore-wait")
|| IS_NAMED_PRIM(rator, "fsemaphore-try-wait?")) {
/* Inline calls to future functions that specially support
running in the future thread: */
GC_CAN_IGNORE jit_insn *refr;
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
@ -7520,112 +7527,22 @@ static int generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK); /* Same as push (JIT_V1) */
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
mz_finish(scheme_fsemaphore_count);
mz_popr_x();
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
} else if (IS_NAMED_PRIM(rator, "make-fsemaphore")) {
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_R0);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
mz_finish(ts_make_fsemaphore);
jit_retval(JIT_R0);
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-post")) {
GC_CAN_IGNORE jit_insn *refr;
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
(void)mz_finish_lwe(scheme_fsemaphore_post, refr);
mz_popr_x();
if (IS_NAMED_PRIM(rator, "make-fsemaphore"))
(void)mz_finish_lwe(ts_make_fsemaphore, refr);
else
(void)mz_finish_lwe(((Scheme_Primitive_Proc *)rator)->prim_val, refr);
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-wait")) {
GC_CAN_IGNORE jit_insn *refr;
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_popr_x(); /* remove arg */
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
(void)mz_finish_lwe(scheme_fsemaphore_wait, refr);
mz_popr_x();
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-try-wait?")) {
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
mz_finish(scheme_fsemaphore_try_wait);
mz_popr_x();
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
}
}

View File

@ -5817,16 +5817,20 @@ static int sequential_fsemaphore_SIZE(void *p, struct NewGC *gc) {
}
static int sequential_fsemaphore_MARK(void *p, struct NewGC *gc) {
fsemaphore_t *s = (fsemaphore_t*)p;
gcMARK2(s->sema, gc);
return
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
}
static int sequential_fsemaphore_FIXUP(void *p, struct NewGC *gc) {
fsemaphore_t *s = (fsemaphore_t*)p;
gcFIXUP2(s->sema, gc);
return
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
}
#define sequential_fsemaphore_IS_ATOMIC 1
#define sequential_fsemaphore_IS_ATOMIC 0
#define sequential_fsemaphore_IS_CONST_SIZE 1

View File

@ -2385,6 +2385,8 @@ sequential_future {
sequential_fsemaphore {
mark:
fsemaphore_t *s = (fsemaphore_t*)p;
gcMARK2(s->sema, gc);
size:
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
}

View File

@ -1604,6 +1604,8 @@ void scheme_post_syncing_nacks(Syncing *syncing);
int scheme_try_channel_get(Scheme_Object *ch);
int scheme_try_channel_put(Scheme_Object *ch, Scheme_Object *v);
intptr_t scheme_get_semaphore_init(const char *who, int n, Scheme_Object **p);
/*========================================================================*/
/* numbers */
/*========================================================================*/

View File

@ -223,25 +223,33 @@ Scheme_Object *scheme_make_sema(intptr_t v)
return (Scheme_Object *)sema;
}
static Scheme_Object *make_sema(int n, Scheme_Object **p)
intptr_t scheme_get_semaphore_init(const char *who, int n, Scheme_Object **p)
{
intptr_t v;
if (n) {
if (!SCHEME_INTP(p[0])) {
if (!SCHEME_BIGNUMP(p[0]) || !SCHEME_BIGPOS(p[0]))
scheme_wrong_type("make-semaphore", "non-negative exact integer", 0, n, p);
scheme_wrong_type(who, "non-negative exact integer", 0, n, p);
}
if (!scheme_get_int_val(p[0], &v)) {
scheme_raise_exn(MZEXN_FAIL,
"make-semaphore: starting value %s is too large",
"%s: starting value %s is too large",
who,
scheme_make_provided_string(p[0], 0, NULL));
} else if (v < 0)
scheme_wrong_type("make-semaphore", "non-negative exact integer", 0, n, p);
scheme_wrong_type(who, "non-negative exact integer", 0, n, p);
} else
v = 0;
return v;
}
static Scheme_Object *make_sema(int n, Scheme_Object **p)
{
intptr_t v;
v = scheme_get_semaphore_init("make-semaphore", n, p);
return scheme_make_sema(v);
}