add async-apply support to FFI

This commit is contained in:
Matthew Flatt 2010-07-11 14:59:40 -06:00
parent 41cfcbe862
commit 768a3721f9
8 changed files with 508 additions and 122 deletions

View File

@ -436,17 +436,18 @@
#:wrapper [wrapper #f] #:wrapper [wrapper #f]
#:keep [keep #f] #:keep [keep #f]
#:atomic? [atomic? #f] #:atomic? [atomic? #f]
#:async-apply [async-apply #f]
#:save-errno [errno #f]) #:save-errno [errno #f])
(_cprocedure* itypes otype abi wrapper keep atomic? errno)) (_cprocedure* itypes otype abi wrapper keep atomic? async-apply errno))
;; for internal use ;; for internal use
(define held-callbacks (make-weak-hasheq)) (define held-callbacks (make-weak-hasheq))
(define (_cprocedure* itypes otype abi wrapper keep atomic? errno) (define (_cprocedure* itypes otype abi wrapper keep atomic? async-apply errno)
(define-syntax-rule (make-it wrap) (define-syntax-rule (make-it wrap)
(make-ctype _fpointer (make-ctype _fpointer
(lambda (x) (lambda (x)
(and x (and x
(let ([cb (ffi-callback (wrap x) itypes otype abi atomic?)]) (let ([cb (ffi-callback (wrap x) itypes otype abi atomic? async-apply)])
(cond [(eq? keep #t) (hash-set! held-callbacks x cb)] (cond [(eq? keep #t) (hash-set! held-callbacks x cb)]
[(box? keep) [(box? keep)
(let ([x (unbox keep)]) (let ([x (unbox keep)])
@ -478,7 +479,7 @@
(provide _fun) (provide _fun)
(define-for-syntax _fun-keywords (define-for-syntax _fun-keywords
`([#:abi ,#'#f] [#:keep ,#'#t] [#:atomic? ,#'#f] [#:save-errno ,#'#f])) `([#:abi ,#'#f] [#:keep ,#'#t] [#:atomic? ,#'#f] [#:async-apply ,#'#f] [#:save-errno ,#'#f]))
(define-syntax (_fun stx) (define-syntax (_fun stx)
(define (err msg . sub) (apply raise-syntax-error '_fun msg stx sub)) (define (err msg . sub) (apply raise-syntax-error '_fun msg stx sub))
(define xs #f) (define xs #f)
@ -626,6 +627,7 @@
#,wrapper #,wrapper
#,(kwd-ref '#:keep) #,(kwd-ref '#:keep)
#,(kwd-ref '#:atomic?) #,(kwd-ref '#:atomic?)
#,(kwd-ref '#:async-apply)
#,(kwd-ref '#:save-errno)))]) #,(kwd-ref '#:save-errno)))])
(if (or (caddr output) input-names (ormap caddr inputs) (if (or (caddr output) input-names (ormap caddr inputs)
(ormap (lambda (x) (not (car x))) inputs) (ormap (lambda (x) (not (car x))) inputs)

View File

@ -0,0 +1,3 @@
#lang racket
(provide y)
(define y 1)

View File

@ -345,6 +345,7 @@ the later case, the result is the @scheme[ctype]).}
[output-type ctype?] [output-type ctype?]
[#:abi abi (or/c symbol/c #f) #f] [#:abi abi (or/c symbol/c #f) #f]
[#:atomic? atomic? any/c #f] [#:atomic? atomic? any/c #f]
[#:async-apply async-apply (or/c #f ((-> any) . -> . any)) #f]
[#:save-errno save-errno (or/c #f 'posix 'windows) #f] [#:save-errno save-errno (or/c #f 'posix 'windows) #f]
[#:wrapper wrapper (or/c #f (procedure? . -> . procedure?)) [#:wrapper wrapper (or/c #f (procedure? . -> . procedure?))
#f] #f]
@ -381,13 +382,31 @@ If @scheme[atomic?] is true, then when a Racket procedure is given
this procedure type and called from foreign code, then the Racket this procedure type and called from foreign code, then the Racket
process is put into atomic mode while evaluating the Racket procedure process is put into atomic mode while evaluating the Racket procedure
body. In atomic mode, other Racket threads do not run, so the Racket body. In atomic mode, other Racket threads do not run, so the Racket
code must not call any function that potentially synchronizes with code must not call any function that potentially blocks on
other threads, or else it may deadlock. In addition, the Racket code synchronization with other threads, or else it may lead to deadlock. In
must not perform any potentially blocking operation (such as I/O), it addition, the Racket code must not perform any potentially blocking
must not raise an uncaught exception, it must not perform any escaping operation (such as I/O), it must not raise an uncaught exception, it
continuation jumps, and its non-tail recursion must be minimal to must not perform any escaping continuation jumps, and its non-tail
avoid C-level stack overflow; otherwise, the process may crash or recursion must be minimal to avoid C-level stack overflow; otherwise,
misbehave. the process may crash or misbehave.
If an @scheme[async-apply] procedure is provided, then a Racket
procedure with the generated procedure type can be applied in a
foreign thread (i.e., an OS-level thread other than the one used to
run Racket). In that case, @scheme[async-apply] is applied to a thunk
that encapsulates the specific callback invocation, and the foreign
thread blocks until the thunk is called and completes; the thunk must
be called exactly once, and the callback invocation must return
normally. The @scheme[async-apply] procedure itself is called in an
unspecified Racket thread and in atomic mode (see @scheme[atomic?]
above); its job is to arrange for the thunk to be called in a suitable
context without blocking in any synchronization. (If the callback is
known to complete quickly, require no synchronization, and work
independent of the Racket thread in which it runs, then
@scheme[async-apply] can apply the thunk directly.) Foreign-thread
detection to trigger @scheme[async-apply] works only when Racket is
compiled with OS-level thread support, which is the default for many
platforms.
If @scheme[save-errno] is @scheme['posix], then the value of If @scheme[save-errno] is @scheme['posix], then the value of
@as-index{@tt{errno}} is saved (specific to the current thread) @as-index{@tt{errno}} is saved (specific to the current thread)
@ -471,7 +490,8 @@ values: @itemize[
([fun-option (code:line #:abi abi-expr) ([fun-option (code:line #:abi abi-expr)
(code:line #:save-errno save-errno-expr) (code:line #:save-errno save-errno-expr)
(code:line #:keep keep-expr) (code:line #:keep keep-expr)
(code:line #:atomic? atomic?-expr)] (code:line #:atomic? atomic?-expr)
(code:line #:async-apply async-apply-expr)]
[maybe-args code:blank [maybe-args code:blank
(code:line (id ...) ::) (code:line (id ...) ::)
(code:line id ::) (code:line id ::)

View File

@ -1106,7 +1106,7 @@ typedef struct ffi_callback_struct {
Scheme_Object* proc; Scheme_Object* proc;
Scheme_Object* itypes; Scheme_Object* itypes;
Scheme_Object* otype; Scheme_Object* otype;
char call_in_scheduler; Scheme_Object* sync;
} ffi_callback_struct; } ffi_callback_struct;
#define SCHEME_FFICALLBACKP(x) (SCHEME_TYPE(x)==ffi_callback_tag) #define SCHEME_FFICALLBACKP(x) (SCHEME_TYPE(x)==ffi_callback_tag)
#define MYNAME "ffi-callback?" #define MYNAME "ffi-callback?"
@ -1127,6 +1127,7 @@ int ffi_callback_MARK(void *p) {
gcMARK(s->proc); gcMARK(s->proc);
gcMARK(s->itypes); gcMARK(s->itypes);
gcMARK(s->otype); gcMARK(s->otype);
gcMARK(s->sync);
return gcBYTES_TO_WORDS(sizeof(ffi_callback_struct)); return gcBYTES_TO_WORDS(sizeof(ffi_callback_struct));
} }
int ffi_callback_FIXUP(void *p) { int ffi_callback_FIXUP(void *p) {
@ -1135,11 +1136,18 @@ int ffi_callback_FIXUP(void *p) {
gcFIXUP(s->proc); gcFIXUP(s->proc);
gcFIXUP(s->itypes); gcFIXUP(s->itypes);
gcFIXUP(s->otype); gcFIXUP(s->otype);
gcFIXUP(s->sync);
return gcBYTES_TO_WORDS(sizeof(ffi_callback_struct)); return gcBYTES_TO_WORDS(sizeof(ffi_callback_struct));
} }
END_XFORM_SKIP; END_XFORM_SKIP;
#endif #endif
/* The sync field:
NULL => non-atomic mode, no sync proc
#t => atomic mode, no sync proc
(rcons queue proc) => non-atomic mode, sync proc
(box (rcons queue proc)) => atomic mode, sync proc */
/*****************************************************************************/ /*****************************************************************************/
/* Pointer objects */ /* Pointer objects */
/* use cpointer (with a NULL tag when creating), #f for NULL */ /* use cpointer (with a NULL tag when creating), #f for NULL */
@ -2603,12 +2611,13 @@ static Scheme_Object *foreign_ffi_call(int argc, Scheme_Object *argv[])
/*****************************************************************************/ /*****************************************************************************/
/* Scheme callbacks */ /* Scheme callbacks */
void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) typedef void (*ffi_callback_t)(ffi_cif* cif, void* resultp, void** args, void *userdata);
static ffi_callback_struct *extract_ffi_callback(void *userdata)
XFORM_SKIP_PROC
{ {
ffi_callback_struct *data; ffi_callback_struct *data;
Scheme_Object *argv_stack[MAX_QUICK_ARGS];
int argc = cif->nargs, i;
Scheme_Object **argv, *p, *v;
#ifdef MZ_PRECISE_GC #ifdef MZ_PRECISE_GC
{ {
void *tmp; void *tmp;
@ -2619,11 +2628,24 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
#else #else
data = (ffi_callback_struct*)userdata; data = (ffi_callback_struct*)userdata;
#endif #endif
return data;
}
void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
{
ffi_callback_struct *data;
Scheme_Object *argv_stack[MAX_QUICK_ARGS];
int argc = cif->nargs, i;
Scheme_Object **argv, *p, *v;
data = extract_ffi_callback(userdata);
if (argc <= MAX_QUICK_ARGS) if (argc <= MAX_QUICK_ARGS)
argv = argv_stack; argv = argv_stack;
else else
argv = scheme_malloc(argc * sizeof(Scheme_Object*)); argv = scheme_malloc(argc * sizeof(Scheme_Object*));
if (data->call_in_scheduler) if (data->sync && !SCHEME_RPAIRP(data->sync))
scheme_start_in_scheduler(); scheme_start_in_scheduler();
for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) { for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) {
v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0); v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0);
@ -2631,10 +2653,137 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
} }
p = _scheme_apply(data->proc, argc, argv); p = _scheme_apply(data->proc, argc, argv);
SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1); SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1);
if (data->call_in_scheduler) if (data->sync && !SCHEME_RPAIRP(data->sync))
scheme_end_in_scheduler(); scheme_end_in_scheduler();
} }
#ifdef MZ_USE_MZRT
/* When OS-level thread support is avaiable, support callbacks
in foreign threads that are executed on the main Scheme thread. */
typedef struct Queued_Callback {
ffi_cif* cif;
void* resultp;
void** args;
void *userdata;
mzrt_sema *sema;
int called;
struct Queued_Callback *next;
} Queued_Callback;
typedef struct FFI_Sync_Queue {
Queued_Callback *callbacks; /* malloc()ed list */
mzrt_mutex *lock;
mzrt_thread_id orig_thread;
void *sig_hand;
} FFI_Sync_Queue;
THREAD_LOCAL_DECL(static struct FFI_Sync_Queue *ffi_sync_queue);
static Scheme_Object *callback_thunk(void *_qc, int argc, Scheme_Object *argv[])
{
Queued_Callback *qc = (Queued_Callback *)_qc;
if (qc->called)
scheme_raise_exn(MZEXN_FAIL_CONTRACT,
"callback thunk for synchronization has already been called");
qc->called = 1;
ffi_do_callback(qc->cif, qc->resultp, qc->args, qc->userdata);
mzrt_sema_post(qc->sema);
return scheme_void;
}
void scheme_check_foreign_work(void)
{
GC_CAN_IGNORE Queued_Callback *qc;
ffi_callback_struct *data;
Scheme_Object *a[1], *proc;
if (ffi_sync_queue) {
do {
mzrt_mutex_lock(ffi_sync_queue->lock);
qc = ffi_sync_queue->callbacks;
if (qc)
ffi_sync_queue->callbacks = qc->next;
mzrt_mutex_unlock(ffi_sync_queue->lock);
if (qc) {
qc->next = NULL;
data = extract_ffi_callback(qc->userdata);
proc = scheme_make_closed_prim_w_arity(callback_thunk, (void *)qc,
"callback-thunk", 0, 0);
a[0] = proc;
proc = data->sync;
if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc);
proc = SCHEME_CDR(proc);
scheme_start_in_scheduler();
_scheme_apply(proc, 1, a);
scheme_end_in_scheduler();
}
} while (qc);
}
}
#endif
void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
XFORM_SKIP_PROC
{
ffi_callback_struct *data;
data = extract_ffi_callback(userdata);
#ifdef MZ_USE_MZRT
{
FFI_Sync_Queue *queue;
Scheme_Object *o;
o = data->sync;
if (SCHEME_BOXP(o)) o = SCHEME_BOX_VAL(o);
queue = (FFI_Sync_Queue *)SCHEME_CAR(o);
if (queue->orig_thread != mz_proc_thread_self()) {
Queued_Callback *qc;
mzrt_sema *sema;
mzrt_sema_create(&sema, 0);
qc = (Queued_Callback *)malloc(sizeof(Queued_Callback));
qc->cif = cif;
qc->resultp = resultp;
qc->args = args;
qc->userdata = userdata;
qc->sema = sema;
qc->called = 0;
mzrt_mutex_lock(queue->lock);
qc->next = queue->callbacks;
queue->callbacks = qc;
mzrt_mutex_unlock(queue->lock);
scheme_signal_received_at(queue->sig_hand);
/* wait for the callback to be invoked in the main thread */
mzrt_sema_wait(sema);
mzrt_sema_destroy(sema);
free(qc);
return;
}
}
#endif
ffi_do_callback(cif, resultp, args, userdata);
}
/* see ffi-callback below */ /* see ffi-callback below */
typedef struct closure_and_cif_struct { typedef struct closure_and_cif_struct {
ffi_closure closure; ffi_closure closure;
@ -2660,7 +2809,7 @@ void free_cl_cif_args(void *ignored, void *p)
scheme_free_code(p); scheme_free_code(p);
} }
/* (ffi-callback scheme-proc in-types out-type [abi]) -> ffi-callback */ /* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */
/* the treatment of in-types and out-types is similar to that in ffi-call */ /* the treatment of in-types and out-types is similar to that in ffi-call */
/* the real work is done by ffi_do_callback above */ /* the real work is done by ffi_do_callback above */
#define MYNAME "ffi-callback" #define MYNAME "ffi-callback"
@ -2669,8 +2818,10 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
ffi_callback_struct *data; ffi_callback_struct *data;
Scheme_Object *itypes = argv[1]; Scheme_Object *itypes = argv[1];
Scheme_Object *otype = argv[2]; Scheme_Object *otype = argv[2];
Scheme_Object *sync;
Scheme_Object *p, *base; Scheme_Object *p, *base;
ffi_abi abi; ffi_abi abi;
int is_atomic;
int nargs, i; int nargs, i;
/* ffi_closure objects are problematic when used with a moving GC. The /* ffi_closure objects are problematic when used with a moving GC. The
* problem is that memory that is GC-visible can move at any time. The * problem is that memory that is GC-visible can move at any time. The
@ -2705,6 +2856,7 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
GC_CAN_IGNORE ffi_cif *cif; GC_CAN_IGNORE ffi_cif *cif;
GC_CAN_IGNORE ffi_closure *cl; GC_CAN_IGNORE ffi_closure *cl;
GC_CAN_IGNORE closure_and_cif *cl_cif_args; GC_CAN_IGNORE closure_and_cif *cl_cif_args;
GC_CAN_IGNORE ffi_callback_t do_callback;
if (!SCHEME_PROCP(argv[0])) if (!SCHEME_PROCP(argv[0]))
scheme_wrong_type(MYNAME, "procedure", 0, argc, argv); scheme_wrong_type(MYNAME, "procedure", 0, argc, argv);
nargs = scheme_proper_list_length(itypes); nargs = scheme_proper_list_length(itypes);
@ -2714,6 +2866,31 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
scheme_wrong_type(MYNAME, "C-type", 2, argc, argv); scheme_wrong_type(MYNAME, "C-type", 2, argc, argv);
rtype = CTYPE_PRIMTYPE(base); rtype = CTYPE_PRIMTYPE(base);
abi = GET_ABI(MYNAME,3); abi = GET_ABI(MYNAME,3);
is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4]));
sync = (is_atomic ? scheme_true : NULL);
if (argc > 5)
(void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1);
if (((argc > 5) && SCHEME_TRUEP(argv[5]))) {
#ifdef MZ_USE_MZRT
if (!ffi_sync_queue) {
mzrt_thread_id tid;
void *sig_hand;
ffi_sync_queue = (FFI_Sync_Queue *)malloc(sizeof(FFI_Sync_Queue));
tid = mz_proc_thread_self();
ffi_sync_queue->orig_thread = tid;
mzrt_mutex_create(&ffi_sync_queue->lock);
sig_hand = scheme_get_signal_handle();
ffi_sync_queue->sig_hand = sig_hand;
ffi_sync_queue->callbacks = NULL;
}
sync = scheme_make_raw_pair((Scheme_Object *)ffi_sync_queue,
argv[5]);
if (is_atomic) sync = scheme_box(sync);
#endif
do_callback = ffi_queue_callback;
} else
do_callback = ffi_do_callback;
/* malloc space for everything needed, so a single free gets rid of this */ /* malloc space for everything needed, so a single free gets rid of this */
cl_cif_args = scheme_malloc_code(sizeof(closure_and_cif) + nargs*sizeof(ffi_cif*)); cl_cif_args = scheme_malloc_code(sizeof(closure_and_cif) + nargs*sizeof(ffi_cif*));
cl = &(cl_cif_args->closure); /* cl is the same as cl_cif_args */ cl = &(cl_cif_args->closure); /* cl is the same as cl_cif_args */
@ -2734,7 +2911,7 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
data->proc = (argv[0]); data->proc = (argv[0]);
data->itypes = (argv[1]); data->itypes = (argv[1]);
data->otype = (argv[2]); data->otype = (argv[2]);
data->call_in_scheduler = (((argc > 4) && SCHEME_TRUEP(argv[4]))); data->sync = (sync);
# ifdef MZ_PRECISE_GC # ifdef MZ_PRECISE_GC
{ {
/* put data in immobile, weak box */ /* put data in immobile, weak box */
@ -2745,7 +2922,7 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
# else /* MZ_PRECISE_GC undefined */ # else /* MZ_PRECISE_GC undefined */
cl_cif_args->data = (void*)data; cl_cif_args->data = (void*)data;
# endif /* MZ_PRECISE_GC */ # endif /* MZ_PRECISE_GC */
if (ffi_prep_closure(cl, cif, &ffi_do_callback, (void*)(cl_cif_args->data)) if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data))
!= FFI_OK) != FFI_OK)
scheme_signal_error scheme_signal_error
("internal error: ffi_prep_closure did not return FFI_OK"); ("internal error: ffi_prep_closure did not return FFI_OK");
@ -2960,7 +3137,7 @@ void scheme_init_foreign(Scheme_Env *env)
scheme_add_global("ffi-call", scheme_add_global("ffi-call",
scheme_make_prim_w_arity(foreign_ffi_call, "ffi-call", 3, 5), menv); scheme_make_prim_w_arity(foreign_ffi_call, "ffi-call", 3, 5), menv);
scheme_add_global("ffi-callback", scheme_add_global("ffi-callback",
scheme_make_prim_w_arity(foreign_ffi_callback, "ffi-callback", 3, 5), menv); scheme_make_prim_w_arity(foreign_ffi_callback, "ffi-callback", 3, 6), menv);
scheme_add_global("saved-errno", scheme_add_global("saved-errno",
scheme_make_prim_w_arity(foreign_saved_errno, "saved-errno", 0, 0), menv); scheme_make_prim_w_arity(foreign_saved_errno, "saved-errno", 0, 0), menv);
scheme_add_global("lookup-errno", scheme_add_global("lookup-errno",

View File

@ -936,7 +936,13 @@ ffi_abi sym_to_abi(char *who, Scheme_Object *sym)
[proc "Scheme_Object*"] [proc "Scheme_Object*"]
[itypes "Scheme_Object*"] [itypes "Scheme_Object*"]
[otype "Scheme_Object*"] [otype "Scheme_Object*"]
[call_in_scheduler "char"]] [sync "Scheme_Object*"]]
/* The sync field:
NULL => non-atomic mode, no sync proc
#t => atomic mode, no sync proc
(rcons queue proc) => non-atomic mode, sync proc
(box (rcons queue proc)) => atomic mode, sync proc */
/*****************************************************************************/ /*****************************************************************************/
/* Pointer objects */ /* Pointer objects */
@ -1970,12 +1976,13 @@ void free_fficall_data(void *ignored, void *p)
/*****************************************************************************/ /*****************************************************************************/
/* Scheme callbacks */ /* Scheme callbacks */
void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) typedef void (*ffi_callback_t)(ffi_cif* cif, void* resultp, void** args, void *userdata);
static ffi_callback_struct *extract_ffi_callback(void *userdata)
XFORM_SKIP_PROC
{ {
ffi_callback_struct *data; ffi_callback_struct *data;
Scheme_Object *argv_stack[MAX_QUICK_ARGS];
int argc = cif->nargs, i;
Scheme_Object **argv, *p, *v;
#ifdef MZ_PRECISE_GC #ifdef MZ_PRECISE_GC
{ {
void *tmp; void *tmp;
@ -1986,11 +1993,24 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
#else #else
data = (ffi_callback_struct*)userdata; data = (ffi_callback_struct*)userdata;
#endif #endif
return data;
}
void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
{
ffi_callback_struct *data;
Scheme_Object *argv_stack[MAX_QUICK_ARGS];
int argc = cif->nargs, i;
Scheme_Object **argv, *p, *v;
data = extract_ffi_callback(userdata);
if (argc <= MAX_QUICK_ARGS) if (argc <= MAX_QUICK_ARGS)
argv = argv_stack; argv = argv_stack;
else else
argv = scheme_malloc(argc * sizeof(Scheme_Object*)); argv = scheme_malloc(argc * sizeof(Scheme_Object*));
if (data->call_in_scheduler) if (data->sync && !SCHEME_RPAIRP(data->sync))
scheme_start_in_scheduler(); scheme_start_in_scheduler();
for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) { for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) {
v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0); v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0);
@ -1998,10 +2018,137 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
} }
p = _scheme_apply(data->proc, argc, argv); p = _scheme_apply(data->proc, argc, argv);
SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1); SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1);
if (data->call_in_scheduler) if (data->sync && !SCHEME_RPAIRP(data->sync))
scheme_end_in_scheduler(); scheme_end_in_scheduler();
} }
#ifdef MZ_USE_MZRT
/* When OS-level thread support is avaiable, support callbacks
in foreign threads that are executed on the main Scheme thread. */
typedef struct Queued_Callback {
ffi_cif* cif;
void* resultp;
void** args;
void *userdata;
mzrt_sema *sema;
int called;
struct Queued_Callback *next;
} Queued_Callback;
typedef struct FFI_Sync_Queue {
Queued_Callback *callbacks; /* malloc()ed list */
mzrt_mutex *lock;
mzrt_thread_id orig_thread;
void *sig_hand;
} FFI_Sync_Queue;
THREAD_LOCAL_DECL(static struct FFI_Sync_Queue *ffi_sync_queue);
static Scheme_Object *callback_thunk(void *_qc, int argc, Scheme_Object *argv[])
{
Queued_Callback *qc = (Queued_Callback *)_qc;
if (qc->called)
scheme_raise_exn(MZEXN_FAIL_CONTRACT,
"callback thunk for synchronization has already been called");
qc->called = 1;
ffi_do_callback(qc->cif, qc->resultp, qc->args, qc->userdata);
mzrt_sema_post(qc->sema);
return scheme_void;
}
void scheme_check_foreign_work(void)
{
GC_CAN_IGNORE Queued_Callback *qc;
ffi_callback_struct *data;
Scheme_Object *a[1], *proc;
if (ffi_sync_queue) {
do {
mzrt_mutex_lock(ffi_sync_queue->lock);
qc = ffi_sync_queue->callbacks;
if (qc)
ffi_sync_queue->callbacks = qc->next;
mzrt_mutex_unlock(ffi_sync_queue->lock);
if (qc) {
qc->next = NULL;
data = extract_ffi_callback(qc->userdata);
proc = scheme_make_closed_prim_w_arity(callback_thunk, (void *)qc,
"callback-thunk", 0, 0);
a[0] = proc;
proc = data->sync;
if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc);
proc = SCHEME_CDR(proc);
scheme_start_in_scheduler();
_scheme_apply(proc, 1, a);
scheme_end_in_scheduler();
}
} while (qc);
}
}
#endif
void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
XFORM_SKIP_PROC
{
ffi_callback_struct *data;
data = extract_ffi_callback(userdata);
#ifdef MZ_USE_MZRT
{
FFI_Sync_Queue *queue;
Scheme_Object *o;
o = data->sync;
if (SCHEME_BOXP(o)) o = SCHEME_BOX_VAL(o);
queue = (FFI_Sync_Queue *)SCHEME_CAR(o);
if (queue->orig_thread != mz_proc_thread_self()) {
Queued_Callback *qc;
mzrt_sema *sema;
mzrt_sema_create(&sema, 0);
qc = (Queued_Callback *)malloc(sizeof(Queued_Callback));
qc->cif = cif;
qc->resultp = resultp;
qc->args = args;
qc->userdata = userdata;
qc->sema = sema;
qc->called = 0;
mzrt_mutex_lock(queue->lock);
qc->next = queue->callbacks;
queue->callbacks = qc;
mzrt_mutex_unlock(queue->lock);
scheme_signal_received_at(queue->sig_hand);
/* wait for the callback to be invoked in the main thread */
mzrt_sema_wait(sema);
mzrt_sema_destroy(sema);
free(qc);
return;
}
}
#endif
ffi_do_callback(cif, resultp, args, userdata);
}
/* see ffi-callback below */ /* see ffi-callback below */
typedef struct closure_and_cif_struct { typedef struct closure_and_cif_struct {
ffi_closure closure; ffi_closure closure;
@ -2027,15 +2174,17 @@ void free_cl_cif_args(void *ignored, void *p)
scheme_free_code(p); scheme_free_code(p);
} }
/* (ffi-callback scheme-proc in-types out-type [abi]) -> ffi-callback */ /* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */
/* the treatment of in-types and out-types is similar to that in ffi-call */ /* the treatment of in-types and out-types is similar to that in ffi-call */
/* the real work is done by ffi_do_callback above */ /* the real work is done by ffi_do_callback above */
@cdefine[ffi-callback 3 5]{ @cdefine[ffi-callback 3 6]{
ffi_callback_struct *data; ffi_callback_struct *data;
Scheme_Object *itypes = argv[1]; Scheme_Object *itypes = argv[1];
Scheme_Object *otype = argv[2]; Scheme_Object *otype = argv[2];
Scheme_Object *sync;
Scheme_Object *p, *base; Scheme_Object *p, *base;
ffi_abi abi; ffi_abi abi;
int is_atomic;
int nargs, i; int nargs, i;
/* ffi_closure objects are problematic when used with a moving GC. The /* ffi_closure objects are problematic when used with a moving GC. The
* problem is that memory that is GC-visible can move at any time. The * problem is that memory that is GC-visible can move at any time. The
@ -2070,6 +2219,7 @@ void free_cl_cif_args(void *ignored, void *p)
GC_CAN_IGNORE ffi_cif *cif; GC_CAN_IGNORE ffi_cif *cif;
GC_CAN_IGNORE ffi_closure *cl; GC_CAN_IGNORE ffi_closure *cl;
GC_CAN_IGNORE closure_and_cif *cl_cif_args; GC_CAN_IGNORE closure_and_cif *cl_cif_args;
GC_CAN_IGNORE ffi_callback_t do_callback;
if (!SCHEME_PROCP(argv[0])) if (!SCHEME_PROCP(argv[0]))
scheme_wrong_type(MYNAME, "procedure", 0, argc, argv); scheme_wrong_type(MYNAME, "procedure", 0, argc, argv);
nargs = scheme_proper_list_length(itypes); nargs = scheme_proper_list_length(itypes);
@ -2079,6 +2229,31 @@ void free_cl_cif_args(void *ignored, void *p)
scheme_wrong_type(MYNAME, "C-type", 2, argc, argv); scheme_wrong_type(MYNAME, "C-type", 2, argc, argv);
rtype = CTYPE_PRIMTYPE(base); rtype = CTYPE_PRIMTYPE(base);
abi = GET_ABI(MYNAME,3); abi = GET_ABI(MYNAME,3);
is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4]));
sync = (is_atomic ? scheme_true : NULL);
if (argc > 5)
(void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1);
if (((argc > 5) && SCHEME_TRUEP(argv[5]))) {
#ifdef MZ_USE_MZRT
if (!ffi_sync_queue) {
mzrt_thread_id tid;
void *sig_hand;
ffi_sync_queue = (FFI_Sync_Queue *)malloc(sizeof(FFI_Sync_Queue));
tid = mz_proc_thread_self();
ffi_sync_queue->orig_thread = tid;
mzrt_mutex_create(&ffi_sync_queue->lock);
sig_hand = scheme_get_signal_handle();
ffi_sync_queue->sig_hand = sig_hand;
ffi_sync_queue->callbacks = NULL;
}
sync = scheme_make_raw_pair((Scheme_Object *)ffi_sync_queue,
argv[5]);
if (is_atomic) sync = scheme_box(sync);
#endif
do_callback = ffi_queue_callback;
} else
do_callback = ffi_do_callback;
/* malloc space for everything needed, so a single free gets rid of this */ /* malloc space for everything needed, so a single free gets rid of this */
cl_cif_args = scheme_malloc_code(sizeof(closure_and_cif) + nargs*sizeof(ffi_cif*)); cl_cif_args = scheme_malloc_code(sizeof(closure_and_cif) + nargs*sizeof(ffi_cif*));
cl = &(cl_cif_args->closure); /* cl is the same as cl_cif_args */ cl = &(cl_cif_args->closure); /* cl is the same as cl_cif_args */
@ -2095,7 +2270,7 @@ void free_cl_cif_args(void *ignored, void *p)
scheme_signal_error("internal error: ffi_prep_cif did not return FFI_OK"); scheme_signal_error("internal error: ffi_prep_cif did not return FFI_OK");
@cmake["data" ffi-callback @cmake["data" ffi-callback
"cl_cif_args" "argv[0]" "argv[1]" "argv[2]" "cl_cif_args" "argv[0]" "argv[1]" "argv[2]"
"((argc > 4) && SCHEME_TRUEP(argv[4]))"] "sync"]
@@@IFDEF{MZ_PRECISE_GC}{ @@@IFDEF{MZ_PRECISE_GC}{
{ {
/* put data in immobile, weak box */ /* put data in immobile, weak box */
@ -2106,7 +2281,7 @@ void free_cl_cif_args(void *ignored, void *p)
}{ }{
cl_cif_args->data = (void*)data; cl_cif_args->data = (void*)data;
} }
if (ffi_prep_closure(cl, cif, &ffi_do_callback, (void*)(cl_cif_args->data)) if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data))
!= FFI_OK) != FFI_OK)
scheme_signal_error scheme_signal_error
("internal error: ffi_prep_closure did not return FFI_OK"); ("internal error: ffi_prep_closure did not return FFI_OK");

View File

@ -290,6 +290,7 @@ typedef struct Thread_Local_Variables {
struct Scheme_Hash_Table *place_local_symbol_table_; struct Scheme_Hash_Table *place_local_symbol_table_;
struct Scheme_Hash_Table *place_local_keyword_table_; struct Scheme_Hash_Table *place_local_keyword_table_;
struct Scheme_Hash_Table *place_local_parallel_symbol_table_; struct Scheme_Hash_Table *place_local_parallel_symbol_table_;
struct FFI_Sync_Queue *ffi_sync_queue_;
/*KPLAKE1*/ /*KPLAKE1*/
} Thread_Local_Variables; } Thread_Local_Variables;
@ -582,6 +583,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define place_local_symbol_table XOA (scheme_get_thread_local_variables()->place_local_symbol_table_) #define place_local_symbol_table XOA (scheme_get_thread_local_variables()->place_local_symbol_table_)
#define place_local_keyword_table XOA (scheme_get_thread_local_variables()->place_local_keyword_table_) #define place_local_keyword_table XOA (scheme_get_thread_local_variables()->place_local_keyword_table_)
#define place_local_parallel_symbol_table XOA (scheme_get_thread_local_variables()->place_local_parallel_symbol_table_) #define place_local_parallel_symbol_table XOA (scheme_get_thread_local_variables()->place_local_parallel_symbol_table_)
#define ffi_sync_queue XOA (scheme_get_thread_local_variables()->ffi_sync_queue_)
/*KPLAKE2*/ /*KPLAKE2*/
/* **************************************** */ /* **************************************** */

View File

@ -469,6 +469,10 @@ void scheme_suspend_remembered_threads(void);
void scheme_resume_remembered_threads(void); void scheme_resume_remembered_threads(void);
#endif #endif
#ifdef MZ_USE_MZRT
extern void scheme_check_foreign_work(void);
#endif
void scheme_kickoff_green_thread_time_slice_timer(long usec); void scheme_kickoff_green_thread_time_slice_timer(long usec);
#ifdef UNIX_PROCESSES #ifdef UNIX_PROCESSES

View File

@ -4159,6 +4159,9 @@ void scheme_thread_block(float sleep_time)
#ifdef MZ_USE_FUTURES #ifdef MZ_USE_FUTURES
scheme_check_future_work(); scheme_check_future_work();
#endif #endif
#ifdef MZ_USE_MZRT
scheme_check_foreign_work();
#endif
if (!do_atomic && (sleep_end >= 0.0)) { if (!do_atomic && (sleep_end >= 0.0)) {
find_next_thread(&next); find_next_thread(&next);