Added semaphores for futures

This commit is contained in:
James Swaine 2011-02-08 17:55:12 -06:00
parent 2935170eff
commit 0a73a9d5a1
11 changed files with 809 additions and 13 deletions

View File

@ -1,8 +1,14 @@
#lang racket/base
(require '#%futures)
(provide future?
future
touch
processor-count
current-future)
(provide future?
future
touch
processor-count
current-future
fsemaphore?
make-fsemaphore
fsemaphore-count
fsemaphore-post
fsemaphore-wait
fsemaphore-try-wait?)

View File

@ -87,6 +87,39 @@ in parallel. See also @guidesecref["effective-futures"] in @|Guide|.
or cores) that are available on the current machine.
}
@defproc[(make-fsemaphore [init exact-nonnegative-integer?]) fsemaphore?]{
Creates and returns a new semaphore with the counter initially
set to @racket[init].
}
@defproc[(fsemaphore? [v any/c]) boolean?]{
Returns @racket[#t] if @racket[v] is an fsemaphore value,
@racket[#f] otherwise.
}
@defproc[(fsemaphore-post [fsema fsemaphore?]) void?]{
Increments the semaphore's internal counter and returns @|void-const|.
}
@defproc[(fsemaphore-wait [fsema fsemaphore?]) void?]{
Blocks until the internal counter for @racket[fsema] is non-zero.
When the counter is non-zero, it is decremented and @racket[fsemaphore-wait]
returns @|void-const|.
}
@defproc[(fsemaphore-try-wait? [fsema fsemaphore?]) boolean?]{
Like @racket[fsemaphore-wait], but @racket[fsemaphore-try-wait?]
never blocks execution. If @racket[fsema]'s internal
counter is zero, @racket[fsemaphore-try-wait?] returns
@racket[#f] immediately without decrementing the counter.
If @racket[fsema]'s counter is positive, it
is decremented and @racket[#t] is returned.
}
@defproc[(fsemaphore-count [fsema fsemaphore?]) exact-nonnegative-integer?]{
Returns @racket[fsema]'s current internal counter value.
}
@; ----------------------------------------------------------------------

View File

@ -202,3 +202,138 @@ We should also test deep continuations.
(future (lambda ()
(and (eq? (touch f) f)
(current-future)))))))
;Future semaphore tests
(let* ([m1 (make-fsemaphore 1)]
[m2 (make-fsemaphore 0)]
[x 2]
[lst '()]
[rack-sema (make-semaphore 1)]
[f (future (λ ()
(fsemaphore? m2)))])
(check-equal? #t (fsemaphore? m1))
(check-equal? #t (fsemaphore? m2))
(check-equal? #f (fsemaphore? x))
(check-equal? #f (fsemaphore? lst))
(check-equal? #f (fsemaphore? rack-sema))
(check-equal? #t (touch f)))
(let ([m (make-fsemaphore 1)])
(fsemaphore-wait m)
(check-equal? 0 (fsemaphore-count m)))
(let ([m (make-fsemaphore 0)])
(fsemaphore-post m)
(fsemaphore-wait m)
(check-equal? 0 (fsemaphore-count m)))
(let ([m (make-fsemaphore 37)])
(check-equal? 37 (fsemaphore-count m)))
(let ([m (make-fsemaphore 37)])
(fsemaphore-wait m)
(fsemaphore-wait m)
(fsemaphore-post m)
(fsemaphore-wait m)
(check-equal? 35 (fsemaphore-count m)))
(let ([m1 (make-fsemaphore 38)]
[m2 (make-fsemaphore 0)])
(check-equal? #t (fsemaphore-try-wait? m1))
(check-equal? #f (fsemaphore-try-wait? m2)))
(let* ([m1 (make-fsemaphore 20)]
[m2 (make-fsemaphore 0)]
[f1 (future (λ ()
(fsemaphore-try-wait? m2)))]
[f2 (future (λ ()
(fsemaphore-try-wait? m1)))])
(check-equal? #f (touch f1))
(check-equal? #t (touch f2)))
;Test fsemaphore wait on a future thread
;(here the future thread should be able to capture the cont. locally)
(let* ([m (make-fsemaphore 0)]
[f (future (λ ()
(let ([l (cons 1 2)])
(for ([i (in-range 0 10000)])
(set! l (cons i l)))
(fsemaphore-wait m)
l)))])
(sleep 3)
(fsemaphore-post m)
(touch f)
(check-equal? 0 (fsemaphore-count m)))
;The f1 future should never terminate
(printf "test n-4~n")
(let* ([m (make-fsemaphore 0)]
[dummy 5]
[f1 (future (λ () (fsemaphore-wait m) (set! dummy 42)))]
[f2 (future (λ () 88))])
(check-equal? 88 (touch f2))
(sleep 3)
(check-equal? 0 (fsemaphore-count m))
(check-equal? 5 dummy))
(printf "test n-3~n")
(let* ([m (make-fsemaphore 0)]
[dummy 5]
[f1 (future (λ ()
(fsemaphore-wait m)
(set! dummy 42)
dummy))]
[f2 (future (λ ()
(fsemaphore-post m)
#t))])
(sleep 2)
(check-equal? 42 (touch f1))
(check-equal? 0 (fsemaphore-count m)))
(printf "test n-2~n")
(let* ([m1 (make-fsemaphore 0)]
[m2 (make-fsemaphore 0)]
[dummy 8]
[f1 (future (λ ()
(fsemaphore-wait m2)
(set! dummy 10)
(fsemaphore-post m1)
#t))]
[f2 (future (λ ()
(fsemaphore-post m2)
(fsemaphore-wait m1)
(set! dummy (add1 dummy))
dummy))])
(check-equal? 11 (touch f2)))
(printf "test n-1~n")
(let* ([m (make-fsemaphore 0)]
[f1 (future (λ ()
(sleep 1)
(fsemaphore-wait m)
5))])
(fsemaphore-post m)
(check-equal? 5 (touch f1)))
;Test fsemaphore ops after blocking runtime call
;Here one future will invoke fsemaphore-wait within the context
;of a touch. Meanwhile, another future is allocating (requiring
;the help of the runtime thread which is also "blocked" waiting
;for the semaphore to become ready.
(printf "test n~n")
(let* ([m (make-fsemaphore 0)]
[f1 (future (λ ()
(sleep 1) ;Currently a blocking RT call
(fsemaphore-wait m)))]
[f2 (future (λ ()
(let* ([lst '()]
[retval (let loop ([index 10000] [l lst])
(cond
[(zero? index) l]
[else
(loop (sub1 index) (cons index l))]))])
(fsemaphore-post m)
(car retval))))])
(sleep 3)
(touch f1)
(check-equal? 1 (touch f2)))

View File

@ -28,6 +28,14 @@ static Scheme_Object *future_p(int argc, Scheme_Object *argv[])
return scheme_false;
}
Scheme_Object *scheme_fsemaphore_p(int argc, Scheme_Object *argv[])
{
if (SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_fsemaphore_type))
return scheme_true;
else
return scheme_false;
}
#ifdef MZ_PRECISE_GC
static void register_traversers(void);
#endif
@ -125,6 +133,8 @@ static Scheme_Object *touch(int argc, Scheme_Object *argv[])
return NULL;
}
static Scheme_Object *processor_count(int argc, Scheme_Object *argv[])
{
return scheme_make_integer(1);
@ -206,10 +216,16 @@ void scheme_init_futures_per_place()
#define LOG_RTCALL_INT_POBJ_OBJ_OBJ(a,b,c) LOG3("(%d, %p, %p)", a, b, c)
#define LOG_RTCALL_ENV_ENV_VOID(a,b) LOG2("(%p, %p)", a, b)
static Scheme_Object *make_fsemaphore(int argc, Scheme_Object *argv[]);
static Scheme_Object *touch(int argc, Scheme_Object *argv[]);
static Scheme_Object *processor_count(int argc, Scheme_Object *argv[]);
static void futures_init(void);
static void init_future_thread(struct Scheme_Future_State *fs, int i);
static void requeue_future(struct future_t *future, struct Scheme_Future_State *fs);
static void future_do_runtimecall(struct Scheme_Future_Thread_State *fts,
void *func,
int is_atomic);
static int capture_future_continuation(future_t *ft, void **storage);
#define THREAD_POOL_SIZE 16
#define INITIAL_C_STACK_SIZE 500000
@ -278,6 +294,7 @@ static void receive_special_result(future_t *f, Scheme_Object *retval, int clear
static void send_special_result(future_t *f, Scheme_Object *retval);
static Scheme_Object *_apply_future_lw(future_t *ft);
static Scheme_Object *apply_future_lw(future_t *ft);
static int fsemaphore_ready(Scheme_Object *obj);
READ_ONLY static int cpucount;
static void init_cpucount(void);
@ -361,6 +378,55 @@ void scheme_init_futures(Scheme_Env *newenv)
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_NARY_INLINED;
scheme_add_global_constant("current-future", p, newenv);
p = scheme_make_immed_prim(
scheme_fsemaphore_p,
"fsemaphore?",
1,
1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("fsemaphore?", p, newenv);
p = scheme_make_immed_prim(
make_fsemaphore,
"make-fsemaphore",
1,
1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("make-fsemaphore", p, newenv);
p = scheme_make_immed_prim(
scheme_fsemaphore_count,
"fsemaphore-count",
1,
1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("fsemaphore-count", p, newenv);
p = scheme_make_immed_prim(
scheme_fsemaphore_wait,
"fsemaphore-wait",
1,
1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("fsemaphore-wait", p, newenv);
p = scheme_make_immed_prim(
scheme_fsemaphore_post,
"fsemaphore-post",
1,
1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("fsemaphore-post", p, newenv);
p = scheme_make_immed_prim(
scheme_fsemaphore_try_wait,
"fsemaphore-try-wait?",
1,
1);
SCHEME_PRIM_PROC_FLAGS(p) |= SCHEME_PRIM_IS_UNARY_INLINED;
scheme_add_global_constant("fsemaphore-try-wait?", p, newenv);
scheme_finish_primitive_module(newenv);
scheme_protect_primitive_provide(newenv, NULL);
}
@ -686,6 +752,264 @@ Scheme_Object *scheme_future(int argc, Scheme_Object *argv[])
return (Scheme_Object*)ft;
}
void fsemaphore_finalize(void *p, void *data)
{
fsemaphore_t *sema;
sema = (fsemaphore_t*)p;
mzrt_mutex_destroy(sema->mut);
}
Scheme_Object *scheme_make_fsemaphore_inl(int argc, Scheme_Object *ready)
{
fsemaphore_t *sema;
printf("scheme_make_fsemaphore_inl\n");
/* Input validation */
if (argc != 1 || !SCHEME_INTP(ready))
scheme_wrong_type("future->make-fsemaphore", "exact integer", 0, argc, &ready);
sema = MALLOC_ONE_TAGGED(fsemaphore_t);
sema->so.type = scheme_fsemaphore_type;
mzrt_mutex_create(&sema->mut);
sema->ready = SCHEME_INT_VAL(ready);
scheme_register_finalizer((void*)sema, fsemaphore_finalize, NULL, NULL, NULL);
return (Scheme_Object*)sema;
}
Scheme_Object *make_fsemaphore(int argc, Scheme_Object **argv)
/* Called in runtime thread (atomic/synchronized) */
{
Scheme_Object *arg;
Scheme_Object *semaObj;
printf("make_fsemaphore\n");
arg = argv[0];
semaObj = scheme_make_fsemaphore_inl(argc, arg);
return semaObj;
}
Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object **argv)
XFORM_SKIP_PROC
{
fsemaphore_t *sema;
sema = (fsemaphore_t*)argv[0];
if (!SAME_TYPE(SCHEME_TYPE(sema), scheme_fsemaphore_type))
scheme_wrong_type("fsemaphore-count", "fsemaphore", 0, argc, argv);
printf("scheme_fsemaphore_count\n");
return scheme_make_integer(sema->ready);
}
Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object **argv)
XFORM_SKIP_PROC
{
fsemaphore_t *sema;
Scheme_Future_State *fs;
printf("scheme_fsemaphore_post\n");
fflush(stdout);
sema = (fsemaphore_t*)argv[0];
fs = scheme_future_state;
mzrt_mutex_lock(sema->mut);
/* Check for any futures waiting on the semaphore */
if (sema->queue_front) {
future_t *ft = sema->queue_front;
sema->queue_front = ft->next_in_fsema_queue;
ft->next_in_fsema_queue = NULL;
if (!sema->queue_front) {
sema->queue_end = NULL;
} else {
sema->queue_front->prev_in_fsema_queue = NULL;
}
/* Place the waiting future back on the run queue */
mzrt_mutex_lock(fs->future_mutex);
ft->status = PENDING;
enqueue_future(fs, ft);
/* Signal that a future is now pending */
mzrt_sema_post(fs->future_pending_sema);
mzrt_mutex_unlock(fs->future_mutex);
} else {
sema->ready++;
}
mzrt_mutex_unlock(sema->mut);
return scheme_void;
}
static void enqueue_future_for_fsema(Scheme_Object *objFt, Scheme_Object *objSema)
/* This function assumed sema->mut has already been acquired! */
{
future_t *ft;
fsemaphore_t *sema;
future_t *front;
ft = (future_t*)objFt;
sema = (fsemaphore_t*)objSema;
/* Enqueue this future in the semaphore's queue */
front = sema->queue_front;
if (!front) {
sema->queue_front = ft;
sema->queue_end = ft;
} else {
future_t *end = sema->queue_end;
end->next_in_fsema_queue = ft;
ft->prev_in_fsema_queue = end;
sema->queue_end = ft;
}
}
Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object **argv)
XFORM_SKIP_PROC
{
fsemaphore_t *sema;
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
Scheme_Future_State *fs = scheme_future_state;
void *storage[3];
printf("scheme_fsemaphore_wait ");
fflush(stdout);
sema = (fsemaphore_t*)argv[0];
mzrt_mutex_lock(sema->mut);
if (!sema->ready) {
if (!fts) {
printf("on runtime thread -- sema not ready.\n");
fflush(stdout);
/* Then we are on the runtime thread, block and wait for the
fsema to be ready while cooperating with the scheduler */
mzrt_mutex_unlock(sema->mut);
scheme_block_until(fsemaphore_ready, NULL, (Scheme_Object*)sema, 0);
mzrt_mutex_lock(sema->mut);
} else {
printf("on future thread -- sema not ready.\n");
fflush(stdout);
/* On a future thread, suspend the future (to be
resumed whenever the fsema becomes ready */
future_t *future = fts->thread->current_ft;
if (!future) {
/* Exception? */
printf("No current future for fsemaphore-wait!\n");
return scheme_false;
}
mzrt_mutex_unlock(sema->mut);
scheme_fill_lwc_end();
future->lwc = scheme_current_lwc;
future->fts = fts;
future->arg_p = scheme_current_thread;
if (GC_gen0_alloc_page_ptr
&& capture_future_continuation(future, storage)) {
/* This will set fts->thread->current_ft to NULL */
} else {
/* Can't capture the continuation locally, so ask the runtime
thread to do it (is this the right solution here?) */
printf("fsemaphore-wait couldn't capture continuation locally.\n");
future->next_waiting_lwc = fs->future_waiting_lwc;
fs->future_waiting_lwc = future;
future->want_lw = 1;
future->status = WAITING_FOR_FSEMA;
}
scheme_signal_received_at(fs->signal_handle);
if (fts->thread->current_ft) {
/* Wait for the signal that LW continuation was captured
by the runtime thread */
future->can_continue_sema = fts->worker_can_continue_sema;
end_gc_not_ok(fts, fs, MZ_RUNSTACK);
mzrt_mutex_unlock(fs->future_mutex);
mzrt_sema_wait(fts->worker_can_continue_sema);
/* Add the future to the semaphore's wait queue */
//enqueue_future_for_fsema((Scheme_Object*)future, (Scheme_Object*)sema);
//mzrt_mutex_unlock(sema->mut);
//mzrt_mutex_lock(fs->future_mutex);
//start_gc_not_ok(fs);
}
/* Check again whether the fsema has become ready -- if so,
we can resume immediately */
mzrt_mutex_lock(sema->mut);
if (!sema->ready) {
printf("sema still not ready after cont. capture, suspending.\n");
enqueue_future_for_fsema((Scheme_Object*)future, (Scheme_Object*)sema);
mzrt_mutex_unlock(sema->mut);
mzrt_mutex_lock(fs->future_mutex);
start_gc_not_ok(fs);
mzrt_mutex_unlock(fs->future_mutex);
/* Fetch the future instance again, in case the GC has
moved the pointer or the future has been requeued. */
future = fts->thread->current_ft;
if (!future) {
/* Future continuation was requeued */
scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
} else if (future->no_retval) {
future->no_retval = 0;
scheme_future_longjmp(*scheme_current_thread->error_buf, 1);
}
return scheme_void;
} else {
}
}
}
/* Semaphore is ready -- decrement and continue */
if (!fts) {
printf("on runtime thread -- sema was ready\n");
} else {
printf("on future thread -- sema was ready\n");
}
fflush(stdout);
sema->ready--;
mzrt_mutex_unlock(sema->mut);
return scheme_void;
}
Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object **argv)
XFORM_SKIP_PROC
{
fsemaphore_t *sema;
future_t *cf;
Scheme_Object *ret;
printf("scheme_fsemaphore_try_wait\n");
sema = (fsemaphore_t*)argv[0];
mzrt_mutex_lock(sema->mut);
if (!sema->ready) {
ret = scheme_false;
} else {
ret = scheme_true;
}
mzrt_mutex_unlock(sema->mut);
return ret;
}
static int fsemaphore_ready(Scheme_Object *obj)
/* Called in runtime thread by Scheme scheduler */
{
int ret = 0;
fsemaphore_t *fsema = (fsemaphore_t*)obj;
mzrt_mutex_lock(fsema->mut);
ret = fsema->ready;
mzrt_mutex_unlock(fsema->mut);
return ret;
}
int future_ready(Scheme_Object *obj)
/* Called in runtime thread by Scheme scheduler */
{
@ -850,7 +1174,7 @@ Scheme_Object *touch(int argc, Scheme_Object *argv[])
invoke_rtcall(fs, ft);
LOG0("done.\n");
}
else if (ft->maybe_suspended_lw)
else if (ft->maybe_suspended_lw && ft->status != WAITING_FOR_FSEMA)
{
ft->maybe_suspended_lw = 0;
if (ft->suspended_lw
@ -1133,7 +1457,7 @@ static Scheme_Object *apply_future_lw(future_t *ft)
static int capture_future_continuation(future_t *ft, void **storage)
XFORM_SKIP_PROC
/* This function explicitly coorperates with the GC by storing the
/* This function explicitly cooperates with the GC by storing the
pointers it needs to save across a collection in `storage', so
it can be used in a future thread. If future-thread-local
allocation fails, the result is 0. */
@ -1150,7 +1474,9 @@ static int capture_future_continuation(future_t *ft, void **storage)
ft->suspended_lw = lw;
ft->maybe_suspended_lw = 1;
ft->status = WAITING_FOR_REQUEUE;
if (ft->status != WAITING_FOR_FSEMA)
ft->status = WAITING_FOR_REQUEUE;
ft->want_lw = 0;
ft->fts->thread->current_ft = NULL; /* tells worker thread that it no longer
needs to handle the future */
@ -1346,6 +1672,33 @@ void scheme_rtcall_void_void_3args(const char *who, int src_type, prim_void_void
future->arg_S0 = NULL;
}
Scheme_Object *scheme_rtcall_make_fsemaphore(const char *who, int src_type, int argc, Scheme_Object *ready)
XFORM_SKIP_PROC
/* Called in future thread */
{
Scheme_Object *retval;
Scheme_Future_Thread_State *fts = scheme_future_thread_state;
future_t *future = fts->thread->current_ft;
future->prim_protocol = SIG_MAKE_FSEMAPHORE;
future->arg_i0 = argc;
future->arg_s1 = ready;
future->time_of_request = scheme_get_inexact_milliseconds();
future->source_of_request = who;
future->source_type = src_type;
future_do_runtimecall(fts, (void*)scheme_make_fsemaphore_inl, 1);
#ifdef MZ_PRECISE_GC
retval = future->retval_s;
future->retval_s = NULL;
#else
retval = (Scheme_Object*)future->retval_p;
future->retval_p = NULL;
#endif
return retval;
}
void scheme_rtcall_allocate_values(const char *who, int src_type, int count, Scheme_Thread *t,
prim_allocate_values_t f)
XFORM_SKIP_PROC
@ -1592,6 +1945,14 @@ static void do_invoke_rtcall(Scheme_Future_State *fs, future_t *future)
scheme_new_mark_segment(p_seg);
break;
}
case SIG_MAKE_FSEMAPHORE:
{
Scheme_Object *ret;
ret = scheme_make_fsemaphore_inl(future->arg_i0, future->arg_s1);
future->retval_s = ret;
future->arg_s0 = NULL;
break;
}
case SIG_ALLOC_VALUES:
{
prim_allocate_values_t func = (prim_allocate_values_t)future->prim_func;
@ -1736,6 +2097,8 @@ static void register_traversers(void)
#else
GC_REG_TRAV(scheme_future_type, sequential_future);
#endif
GC_REG_TRAV(scheme_fsemaphore_type, fsemaphore);
}
END_XFORM_SKIP;

View File

@ -37,6 +37,7 @@ typedef void (*prim_allocate_values_t)(int, Scheme_Thread *);
#define FINISHED 3
#define PENDING_OVERSIZE 4
#define WAITING_FOR_REQUEUE 5
#define WAITING_FOR_FSEMA 6
#define FSRC_OTHER 0
#define FSRC_RATOR 1
@ -114,8 +115,21 @@ typedef struct future_t {
struct future_t *next_waiting_atomic;
struct future_t *next_waiting_lwc;
struct future_t *prev_in_fsema_queue;
struct future_t *next_in_fsema_queue;
} future_t;
typedef struct fsemaphore_t {
Scheme_Object so;
int ready;
mzrt_mutex *mut;
future_t *queue_front;
future_t *queue_end;
} fsemaphore_t;
/* Primitive instrumentation stuff */
/* Signature flags for primitive invocations */
@ -123,6 +137,7 @@ typedef struct future_t {
#define SIG_ALLOC 2
#define SIG_ALLOC_MARK_SEGMENT 3
#define SIG_ALLOC_VALUES 4
#define SIG_MAKE_FSEMAPHORE 5
# include "jit_ts_protos.h"
@ -171,5 +186,13 @@ extern Scheme_Object *future_touch(int futureid);
/* always defined: */
Scheme_Object *scheme_future(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_current_future(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_p(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_count(int argc, Scheme_Object *argv[]);
//Scheme_Object *scheme_make_fsemaphore(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_make_fsemaphore_inl(int argc, Scheme_Object *ready);
Scheme_Object *scheme_fsemaphore_wait(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_post(int argc, Scheme_Object *argv[]);
Scheme_Object *scheme_fsemaphore_try_wait(int argc, Scheme_Object *argv[]);
#endif

View File

@ -2948,6 +2948,18 @@ static void ts_on_demand(void) XFORM_SKIP_PROC
on_demand();
}
static Scheme_Object *ts_make_fsemaphore(int argc, Scheme_Object *ready)
XFORM_SKIP_PROC
{
if (scheme_use_rtcall) {
printf("ts_make_fsemaphore on main thread\n");
return scheme_rtcall_make_fsemaphore("[make_fsemaphore]", FSRC_OTHER, argc, ready);
}
printf("ts_make_fsemaphore on worker thread\n");
return scheme_make_fsemaphore_inl(argc, ready);
}
#ifdef MZ_PRECISE_GC
static void *ts_prepare_retry_alloc(void *p, void *p2) XFORM_SKIP_PROC
{
@ -2980,7 +2992,8 @@ Scheme_Object *scheme_ts_scheme_force_value_same_mark(Scheme_Object *v)
# define mz_finishr_direct_prim(reg, proc) mz_finishr(reg)
# define mz_direct_only(p) p
# define ts_on_demand on_demand
# define ts_prepare_retry_alloc prepare_retry_alloc
# define ts_prepare_retry_alloc prepare_retry_alloc
# define ts_make_fsemaphore scheme_make_fsemaphore_inl
# define mz_generate_direct_prim(direct_only, first_arg, reg, prim_indirect) \
(mz_direct_only(direct_only), first_arg, mz_finishr_direct_prim(reg, prim_indirect))
#endif
@ -7488,6 +7501,186 @@ static int generate_inlined_unary(mz_jit_state *jitter, Scheme_App2_Rec *app, in
jit_retval(JIT_R0);
#endif
return 1;
} else if (IS_NAMED_PRIM(rator, "future?")) {
printf("Inlining future?\n");
generate_inlined_type_test(jitter, app, scheme_future_type, scheme_future_type, 1, for_branch, branch_short, need_sync);
printf("Done inlining\n");
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore?")) {
GC_CAN_IGNORE jit_insn *refcont, *ref1, *ref2, *ref3;
printf("Inlining fsemaphore?\n");
printf("Starting IP is: %p\n", jit_get_ip().ptr);
generate_inlined_type_test(jitter, app, scheme_fsemaphore_type, scheme_fsemaphore_type, 1, for_branch, branch_short, need_sync);
return 1;
//My custom implementation
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
ref1 = jit_bmsi_ul(jit_forward(), JIT_R0, 0x1);
jit_ldxi_s(JIT_R1, JIT_R0, &((Scheme_Object*)0x0)->type); //Move type into JIT_R1
//REMOVE: These are only here to show register mappings for x86 in disas
jit_movi_i(JIT_R2, 0x2);
jit_movi_i(JIT_V0, 0x3);
jit_movi_i(JIT_V1, 0x4);
jit_movi_i(JIT_V2, 0x5);
ref2 = jit_bnei_l(jit_forward(), JIT_R1, scheme_fsemaphore_type); //Do the type test
//Jump only if not equal (optimization)
jit_movi_p(JIT_R0, scheme_true);
refcont = jit_jmpi(jit_forward());
//ref2 is the 'false' branch
mz_patch_branch(ref1);
mz_patch_branch(ref2);
jit_movi_p(JIT_R0, scheme_false);
mz_patch_ucbranch(refcont);
//Logging
printf("Ending IP is: %p\n", jit_get_ip().ptr);
printf("Done inlining\n");
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-count")) {
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
/* Push the arg onto the runstack */
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK); /* Same as push (JIT_V1) */
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
mz_finish(scheme_fsemaphore_count);
mz_popr_x();
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
/*
mz_pushr_p(JIT_R0);
mz_rs_sync();
mz_prepare(2);
jit_pusharg_p(JIT_V0);
jit_movi_i(JIT_V1, 1);
jit_pusharg_i(JIT_V1);
mz_finish(scheme_fsemaphore_count);
mz_popr_p(JIT_R0);
jit_retval(JIT_R0);
mz_rs_sync();
return 1; */
} else if (IS_NAMED_PRIM(rator, "make-fsemaphore")) {
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_R0);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
mz_finish(ts_make_fsemaphore);
jit_retval(JIT_R0);
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-post")) {
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
mz_finish(scheme_fsemaphore_post);
mz_popr_x();
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-wait")) {
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
GC_CAN_IGNORE jit_insn *refr;
(void)mz_finish_lwe(scheme_fsemaphore_wait, refr);
//mz_finish(scheme_fsemaphore_wait);
mz_popr_x();
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
} else if (IS_NAMED_PRIM(rator, "fsemaphore-try-wait?")) {
mz_runstack_skipped(jitter, 1);
generate_non_tail(app->rand, jitter, 0, 1, 0);
CHECK_LIMIT();
mz_runstack_unskipped(jitter, 1);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_pushr_p(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
mz_prepare(2);
jit_pusharg_p(JIT_RUNSTACK);
jit_movi_i(JIT_R0, 1);
jit_pusharg_i(JIT_R0);
mz_finish(scheme_fsemaphore_try_wait);
mz_popr_x();
jit_retval(JIT_R0);
mz_rs_sync();
JIT_UPDATE_THREAD_RSPTR_IF_NEEDED();
return 1;
}
}

View File

@ -1,6 +1,3 @@
#define SIG_siS_s 5
typedef Scheme_Object* (*prim_siS_s)(Scheme_Object*, int, Scheme_Object**);
Scheme_Object* scheme_rtcall_siS_s(const char *who, int src_type, prim_siS_s f, Scheme_Object* g139, int g140, Scheme_Object** g141);
#define SIG_iSs_s 6
typedef Scheme_Object* (*prim_iSs_s)(int, Scheme_Object**, Scheme_Object*);
Scheme_Object* scheme_rtcall_iSs_s(const char *who, int src_type, prim_iSs_s f, int g142, Scheme_Object** g143, Scheme_Object* g144);
@ -67,3 +64,6 @@ Scheme_Object* scheme_rtcall_si_s(const char *who, int src_type, prim_si_s f, Sc
#define SIG_sis_v 27
typedef void (*prim_sis_v)(Scheme_Object*, int, Scheme_Object*);
void scheme_rtcall_sis_v(const char *who, int src_type, prim_sis_v f, Scheme_Object* g180, int g181, Scheme_Object* g182);
#define SIG_siS_s 28
typedef Scheme_Object* (*prim_siS_s)(Scheme_Object*, int, Scheme_Object**);
Scheme_Object* scheme_rtcall_siS_s(const char *who, int src_type, prim_siS_s f, Scheme_Object* g139, int g140, Scheme_Object** g141);

View File

@ -5713,6 +5713,8 @@ static int future_MARK(void *p, struct NewGC *gc) {
gcMARK2(f->next_waiting_atomic, gc);
gcMARK2(f->next_waiting_lwc, gc);
gcMARK2(f->suspended_lw, gc);
gcMARK2(f->prev_in_fsema_queue, gc);
gcMARK2(f->next_in_fsema_queue, gc);
return
gcBYTES_TO_WORDS(sizeof(future_t));
}
@ -5741,6 +5743,8 @@ static int future_FIXUP(void *p, struct NewGC *gc) {
gcFIXUP2(f->next_waiting_atomic, gc);
gcFIXUP2(f->next_waiting_lwc, gc);
gcFIXUP2(f->suspended_lw, gc);
gcFIXUP2(f->prev_in_fsema_queue, gc);
gcFIXUP2(f->next_in_fsema_queue, gc);
return
gcBYTES_TO_WORDS(sizeof(future_t));
}
@ -5782,6 +5786,32 @@ static int sequential_future_FIXUP(void *p, struct NewGC *gc) {
#endif
/* Future semaphore */
static int fsemaphore_SIZE(void *p, struct NewGC *gc) {
return
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
}
static int fsemaphore_MARK(void *p, struct NewGC *gc) {
fsemaphore_t *s = (fsemaphore_t*)p;
gcMARK2(s->queue_front, gc);
gcMARK2(s->queue_end, gc);
return
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
}
static int fsemaphore_FIXUP(void *p, struct NewGC *gc) {
fsemaphore_t *s = (fsemaphore_t*)p;
gcFIXUP2(s->queue_front, gc);
gcFIXUP2(s->queue_end, gc);
return
gcBYTES_TO_WORDS(sizeof(future_t));
}
#define fsemaphore_IS_ATOMIC 0
#define fsemaphore_IS_CONST_SIZE 1
#endif /* FUTURE */
/**********************************************************************/

View File

@ -2354,6 +2354,8 @@ future {
gcMARK2(f->next_waiting_atomic, gc);
gcMARK2(f->next_waiting_lwc, gc);
gcMARK2(f->suspended_lw, gc);
gcMARK2(f->prev_in_fsema_queue, gc);
gcMARK2(f->next_in_fsema_queue, gc);
size:
gcBYTES_TO_WORDS(sizeof(future_t));
}
@ -2373,6 +2375,16 @@ sequential_future {
#endif
fsemaphore {
mark:
fsemaphore_t *s = (fsemaphore_t*)p;
gcMARK2(s->queue_front, gc);
gcMARK2(s->queue_end, gc);
size:
gcBYTES_TO_WORDS(sizeof(fsemaphore_t));
}
END future;
/**********************************************************************/

View File

@ -16,7 +16,7 @@
#define EXPECTED_PRIM_COUNT 1021
#define EXPECTED_UNSAFE_COUNT 76
#define EXPECTED_FLFXNUM_COUNT 68
#define EXPECTED_FUTURES_COUNT 5
#define EXPECTED_FUTURES_COUNT 11
#ifdef MZSCHEME_SOMETHING_OMITTED
# undef USE_COMPILED_STARTUP

View File

@ -260,6 +260,7 @@ enum {
scheme_rt_lightweight_cont, /* 236 */
#endif
scheme_fsemaphore_type, /* 238 */
_scheme_last_type_
};