diff --git a/collects/ffi/unsafe.rkt b/collects/ffi/unsafe.rkt index 215eef1180..c5fb5d835f 100644 --- a/collects/ffi/unsafe.rkt +++ b/collects/ffi/unsafe.rkt @@ -432,21 +432,22 @@ ;; optionally applying a wrapper function to modify the result primitive ;; (callouts) or the input procedure (callbacks). (define* (_cprocedure itypes otype - #:abi [abi #f] - #:wrapper [wrapper #f] - #:keep [keep #f] - #:atomic? [atomic? #f] - #:save-errno [errno #f]) - (_cprocedure* itypes otype abi wrapper keep atomic? errno)) + #:abi [abi #f] + #:wrapper [wrapper #f] + #:keep [keep #f] + #:atomic? [atomic? #f] + #:async-apply [async-apply #f] + #:save-errno [errno #f]) + (_cprocedure* itypes otype abi wrapper keep atomic? async-apply errno)) ;; for internal use (define held-callbacks (make-weak-hasheq)) -(define (_cprocedure* itypes otype abi wrapper keep atomic? errno) +(define (_cprocedure* itypes otype abi wrapper keep atomic? async-apply errno) (define-syntax-rule (make-it wrap) (make-ctype _fpointer (lambda (x) (and x - (let ([cb (ffi-callback (wrap x) itypes otype abi atomic?)]) + (let ([cb (ffi-callback (wrap x) itypes otype abi atomic? async-apply)]) (cond [(eq? keep #t) (hash-set! held-callbacks x cb)] [(box? keep) (let ([x (unbox keep)]) @@ -478,7 +479,7 @@ (provide _fun) (define-for-syntax _fun-keywords - `([#:abi ,#'#f] [#:keep ,#'#t] [#:atomic? ,#'#f] [#:save-errno ,#'#f])) + `([#:abi ,#'#f] [#:keep ,#'#t] [#:atomic? ,#'#f] [#:async-apply ,#'#f] [#:save-errno ,#'#f])) (define-syntax (_fun stx) (define (err msg . sub) (apply raise-syntax-error '_fun msg stx sub)) (define xs #f) @@ -626,6 +627,7 @@ #,wrapper #,(kwd-ref '#:keep) #,(kwd-ref '#:atomic?) + #,(kwd-ref '#:async-apply) #,(kwd-ref '#:save-errno)))]) (if (or (caddr output) input-names (ormap caddr inputs) (ormap (lambda (x) (not (car x))) inputs) diff --git a/collects/mz/private/y.rkt b/collects/mz/private/y.rkt new file mode 100644 index 0000000000..1f210949bb --- /dev/null +++ b/collects/mz/private/y.rkt @@ -0,0 +1,3 @@ +#lang racket +(provide y) +(define y 1) diff --git a/collects/scribblings/foreign/types.scrbl b/collects/scribblings/foreign/types.scrbl index 62184963c6..f874252d08 100644 --- a/collects/scribblings/foreign/types.scrbl +++ b/collects/scribblings/foreign/types.scrbl @@ -345,6 +345,7 @@ the later case, the result is the @scheme[ctype]).} [output-type ctype?] [#:abi abi (or/c symbol/c #f) #f] [#:atomic? atomic? any/c #f] + [#:async-apply async-apply (or/c #f ((-> any) . -> . any)) #f] [#:save-errno save-errno (or/c #f 'posix 'windows) #f] [#:wrapper wrapper (or/c #f (procedure? . -> . procedure?)) #f] @@ -381,13 +382,31 @@ If @scheme[atomic?] is true, then when a Racket procedure is given this procedure type and called from foreign code, then the Racket process is put into atomic mode while evaluating the Racket procedure body. In atomic mode, other Racket threads do not run, so the Racket -code must not call any function that potentially synchronizes with -other threads, or else it may deadlock. In addition, the Racket code -must not perform any potentially blocking operation (such as I/O), it -must not raise an uncaught exception, it must not perform any escaping -continuation jumps, and its non-tail recursion must be minimal to -avoid C-level stack overflow; otherwise, the process may crash or -misbehave. +code must not call any function that potentially blocks on +synchronization with other threads, or else it may lead to deadlock. In +addition, the Racket code must not perform any potentially blocking +operation (such as I/O), it must not raise an uncaught exception, it +must not perform any escaping continuation jumps, and its non-tail +recursion must be minimal to avoid C-level stack overflow; otherwise, +the process may crash or misbehave. + +If an @scheme[async-apply] procedure is provided, then a Racket +procedure with the generated procedure type can be applied in a +foreign thread (i.e., an OS-level thread other than the one used to +run Racket). In that case, @scheme[async-apply] is applied to a thunk +that encapsulates the specific callback invocation, and the foreign +thread blocks until the thunk is called and completes; the thunk must +be called exactly once, and the callback invocation must return +normally. The @scheme[async-apply] procedure itself is called in an +unspecified Racket thread and in atomic mode (see @scheme[atomic?] +above); its job is to arrange for the thunk to be called in a suitable +context without blocking in any synchronization. (If the callback is +known to complete quickly, require no synchronization, and work +independent of the Racket thread in which it runs, then +@scheme[async-apply] can apply the thunk directly.) Foreign-thread +detection to trigger @scheme[async-apply] works only when Racket is +compiled with OS-level thread support, which is the default for many +platforms. If @scheme[save-errno] is @scheme['posix], then the value of @as-index{@tt{errno}} is saved (specific to the current thread) @@ -471,7 +490,8 @@ values: @itemize[ ([fun-option (code:line #:abi abi-expr) (code:line #:save-errno save-errno-expr) (code:line #:keep keep-expr) - (code:line #:atomic? atomic?-expr)] + (code:line #:atomic? atomic?-expr) + (code:line #:async-apply async-apply-expr)] [maybe-args code:blank (code:line (id ...) ::) (code:line id ::) diff --git a/src/foreign/foreign.c b/src/foreign/foreign.c index 78c43fc712..617428a7a0 100644 --- a/src/foreign/foreign.c +++ b/src/foreign/foreign.c @@ -1106,7 +1106,7 @@ typedef struct ffi_callback_struct { Scheme_Object* proc; Scheme_Object* itypes; Scheme_Object* otype; - char call_in_scheduler; + Scheme_Object* sync; } ffi_callback_struct; #define SCHEME_FFICALLBACKP(x) (SCHEME_TYPE(x)==ffi_callback_tag) #define MYNAME "ffi-callback?" @@ -1127,6 +1127,7 @@ int ffi_callback_MARK(void *p) { gcMARK(s->proc); gcMARK(s->itypes); gcMARK(s->otype); + gcMARK(s->sync); return gcBYTES_TO_WORDS(sizeof(ffi_callback_struct)); } int ffi_callback_FIXUP(void *p) { @@ -1135,11 +1136,18 @@ int ffi_callback_FIXUP(void *p) { gcFIXUP(s->proc); gcFIXUP(s->itypes); gcFIXUP(s->otype); + gcFIXUP(s->sync); return gcBYTES_TO_WORDS(sizeof(ffi_callback_struct)); } END_XFORM_SKIP; #endif +/* The sync field: + NULL => non-atomic mode, no sync proc + #t => atomic mode, no sync proc + (rcons queue proc) => non-atomic mode, sync proc + (box (rcons queue proc)) => atomic mode, sync proc */ + /*****************************************************************************/ /* Pointer objects */ /* use cpointer (with a NULL tag when creating), #f for NULL */ @@ -2603,12 +2611,13 @@ static Scheme_Object *foreign_ffi_call(int argc, Scheme_Object *argv[]) /*****************************************************************************/ /* Scheme callbacks */ -void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) +typedef void (*ffi_callback_t)(ffi_cif* cif, void* resultp, void** args, void *userdata); + +static ffi_callback_struct *extract_ffi_callback(void *userdata) + XFORM_SKIP_PROC { ffi_callback_struct *data; - Scheme_Object *argv_stack[MAX_QUICK_ARGS]; - int argc = cif->nargs, i; - Scheme_Object **argv, *p, *v; + #ifdef MZ_PRECISE_GC { void *tmp; @@ -2619,11 +2628,24 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) #else data = (ffi_callback_struct*)userdata; #endif + + return data; +} + +void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) +{ + ffi_callback_struct *data; + Scheme_Object *argv_stack[MAX_QUICK_ARGS]; + int argc = cif->nargs, i; + Scheme_Object **argv, *p, *v; + + data = extract_ffi_callback(userdata); + if (argc <= MAX_QUICK_ARGS) argv = argv_stack; else argv = scheme_malloc(argc * sizeof(Scheme_Object*)); - if (data->call_in_scheduler) + if (data->sync && !SCHEME_RPAIRP(data->sync)) scheme_start_in_scheduler(); for (i=0, p=data->itypes; iproc, argc, argv); SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1); - if (data->call_in_scheduler) + if (data->sync && !SCHEME_RPAIRP(data->sync)) scheme_end_in_scheduler(); } +#ifdef MZ_USE_MZRT + +/* When OS-level thread support is avaiable, support callbacks + in foreign threads that are executed on the main Scheme thread. */ + +typedef struct Queued_Callback { + ffi_cif* cif; + void* resultp; + void** args; + void *userdata; + mzrt_sema *sema; + int called; + struct Queued_Callback *next; +} Queued_Callback; + +typedef struct FFI_Sync_Queue { + Queued_Callback *callbacks; /* malloc()ed list */ + mzrt_mutex *lock; + mzrt_thread_id orig_thread; + void *sig_hand; +} FFI_Sync_Queue; + +THREAD_LOCAL_DECL(static struct FFI_Sync_Queue *ffi_sync_queue); + +static Scheme_Object *callback_thunk(void *_qc, int argc, Scheme_Object *argv[]) +{ + Queued_Callback *qc = (Queued_Callback *)_qc; + + if (qc->called) + scheme_raise_exn(MZEXN_FAIL_CONTRACT, + "callback thunk for synchronization has already been called"); + qc->called = 1; + + ffi_do_callback(qc->cif, qc->resultp, qc->args, qc->userdata); + + mzrt_sema_post(qc->sema); + + return scheme_void; +} + +void scheme_check_foreign_work(void) +{ + GC_CAN_IGNORE Queued_Callback *qc; + ffi_callback_struct *data; + Scheme_Object *a[1], *proc; + + if (ffi_sync_queue) { + do { + mzrt_mutex_lock(ffi_sync_queue->lock); + qc = ffi_sync_queue->callbacks; + if (qc) + ffi_sync_queue->callbacks = qc->next; + mzrt_mutex_unlock(ffi_sync_queue->lock); + + if (qc) { + qc->next = NULL; + + data = extract_ffi_callback(qc->userdata); + + proc = scheme_make_closed_prim_w_arity(callback_thunk, (void *)qc, + "callback-thunk", 0, 0); + a[0] = proc; + + proc = data->sync; + if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc); + proc = SCHEME_CDR(proc); + + scheme_start_in_scheduler(); + _scheme_apply(proc, 1, a); + scheme_end_in_scheduler(); + } + + } while (qc); + } +} + +#endif + +void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) + XFORM_SKIP_PROC +{ + ffi_callback_struct *data; + + data = extract_ffi_callback(userdata); + +#ifdef MZ_USE_MZRT + { + FFI_Sync_Queue *queue; + Scheme_Object *o; + + o = data->sync; + if (SCHEME_BOXP(o)) o = SCHEME_BOX_VAL(o); + + queue = (FFI_Sync_Queue *)SCHEME_CAR(o); + + if (queue->orig_thread != mz_proc_thread_self()) { + Queued_Callback *qc; + mzrt_sema *sema; + + mzrt_sema_create(&sema, 0); + + qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); + qc->cif = cif; + qc->resultp = resultp; + qc->args = args; + qc->userdata = userdata; + qc->sema = sema; + qc->called = 0; + + mzrt_mutex_lock(queue->lock); + qc->next = queue->callbacks; + queue->callbacks = qc; + mzrt_mutex_unlock(queue->lock); + scheme_signal_received_at(queue->sig_hand); + + /* wait for the callback to be invoked in the main thread */ + mzrt_sema_wait(sema); + + mzrt_sema_destroy(sema); + free(qc); + return; + } + } +#endif + ffi_do_callback(cif, resultp, args, userdata); +} + /* see ffi-callback below */ typedef struct closure_and_cif_struct { ffi_closure closure; @@ -2660,97 +2809,125 @@ void free_cl_cif_args(void *ignored, void *p) scheme_free_code(p); } -/* (ffi-callback scheme-proc in-types out-type [abi]) -> ffi-callback */ +/* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */ /* the treatment of in-types and out-types is similar to that in ffi-call */ /* the real work is done by ffi_do_callback above */ #define MYNAME "ffi-callback" static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[]) { - ffi_callback_struct *data; - Scheme_Object *itypes = argv[1]; - Scheme_Object *otype = argv[2]; - Scheme_Object *p, *base; - ffi_abi abi; - int nargs, i; - /* ffi_closure objects are problematic when used with a moving GC. The - * problem is that memory that is GC-visible can move at any time. The - * solution is to use an immobile-box, which an immobile pointer (in a simple - * malloced block), which points to the ffi_callback_struct that contains the - * relevant Scheme call details. Another minor complexity is that an - * immobile box serves as a reference for the GC, which means that nothing - * will ever get collected: and the solution for this is to stick a weak-box - * in the chain. Users need to be aware of GC issues, and need to keep a - * reference to the callback object to avoid releasing the whole thing -- - * when that reference is lost, the ffi_callback_struct will be GCed, and a - * finalizer will free() the malloced memory. Everything on the malloced - * part is allocated in one block, to make it easy to free. The final layout - * of the various objects is: - * - * <<======malloc======>> : <<===========scheme_malloc===============>> - * : - * ffi_closure <------------------------\ - * | | : | - * | | : | - * | \--> immobile ----> weak | - * | box : box | - * | : | | - * | : | | - * | : \--> ffi_callback_struct - * | : | | - * V : | \-----> Scheme Closure - * cif ---> atypes : | - * : \--------> input/output types - */ - GC_CAN_IGNORE ffi_type *rtype, **atypes; - GC_CAN_IGNORE ffi_cif *cif; - GC_CAN_IGNORE ffi_closure *cl; - GC_CAN_IGNORE closure_and_cif *cl_cif_args; - if (!SCHEME_PROCP(argv[0])) - scheme_wrong_type(MYNAME, "procedure", 0, argc, argv); - nargs = scheme_proper_list_length(itypes); - if (nargs < 0) - scheme_wrong_type(MYNAME, "proper list", 1, argc, argv); - if (NULL == (base = get_ctype_base(otype))) - scheme_wrong_type(MYNAME, "C-type", 2, argc, argv); - rtype = CTYPE_PRIMTYPE(base); - abi = GET_ABI(MYNAME,3); - /* malloc space for everything needed, so a single free gets rid of this */ - cl_cif_args = scheme_malloc_code(sizeof(closure_and_cif) + nargs*sizeof(ffi_cif*)); - cl = &(cl_cif_args->closure); /* cl is the same as cl_cif_args */ - cif = &(cl_cif_args->cif); - atypes = (ffi_type **)(((char*)cl_cif_args) + sizeof(closure_and_cif)); - for (i=0, p=itypes; iso.type = ffi_callback_tag; - data->callback = (cl_cif_args); - data->proc = (argv[0]); - data->itypes = (argv[1]); - data->otype = (argv[2]); - data->call_in_scheduler = (((argc > 4) && SCHEME_TRUEP(argv[4]))); -# ifdef MZ_PRECISE_GC - { - /* put data in immobile, weak box */ - void **tmp; - tmp = GC_malloc_immobile_box(GC_malloc_weak_box(data, NULL, 0)); - cl_cif_args->data = (struct immobile_box*)tmp; - } -# else /* MZ_PRECISE_GC undefined */ - cl_cif_args->data = (void*)data; -# endif /* MZ_PRECISE_GC */ - if (ffi_prep_closure(cl, cif, &ffi_do_callback, (void*)(cl_cif_args->data)) - != FFI_OK) - scheme_signal_error - ("internal error: ffi_prep_closure did not return FFI_OK"); - scheme_register_finalizer(data, free_cl_cif_args, cl_cif_args, NULL, NULL); - return (Scheme_Object*)data; + ffi_callback_struct *data; + Scheme_Object *itypes = argv[1]; + Scheme_Object *otype = argv[2]; + Scheme_Object *sync; + Scheme_Object *p, *base; + ffi_abi abi; + int is_atomic; + int nargs, i; + /* ffi_closure objects are problematic when used with a moving GC. The + * problem is that memory that is GC-visible can move at any time. The + * solution is to use an immobile-box, which an immobile pointer (in a simple + * malloced block), which points to the ffi_callback_struct that contains the + * relevant Scheme call details. Another minor complexity is that an + * immobile box serves as a reference for the GC, which means that nothing + * will ever get collected: and the solution for this is to stick a weak-box + * in the chain. Users need to be aware of GC issues, and need to keep a + * reference to the callback object to avoid releasing the whole thing -- + * when that reference is lost, the ffi_callback_struct will be GCed, and a + * finalizer will free() the malloced memory. Everything on the malloced + * part is allocated in one block, to make it easy to free. The final layout + * of the various objects is: + * + * <<======malloc======>> : <<===========scheme_malloc===============>> + * : + * ffi_closure <------------------------\ + * | | : | + * | | : | + * | \--> immobile ----> weak | + * | box : box | + * | : | | + * | : | | + * | : \--> ffi_callback_struct + * | : | | + * V : | \-----> Scheme Closure + * cif ---> atypes : | + * : \--------> input/output types + */ + GC_CAN_IGNORE ffi_type *rtype, **atypes; + GC_CAN_IGNORE ffi_cif *cif; + GC_CAN_IGNORE ffi_closure *cl; + GC_CAN_IGNORE closure_and_cif *cl_cif_args; + GC_CAN_IGNORE ffi_callback_t do_callback; + if (!SCHEME_PROCP(argv[0])) + scheme_wrong_type(MYNAME, "procedure", 0, argc, argv); + nargs = scheme_proper_list_length(itypes); + if (nargs < 0) + scheme_wrong_type(MYNAME, "proper list", 1, argc, argv); + if (NULL == (base = get_ctype_base(otype))) + scheme_wrong_type(MYNAME, "C-type", 2, argc, argv); + rtype = CTYPE_PRIMTYPE(base); + abi = GET_ABI(MYNAME,3); + is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4])); + sync = (is_atomic ? scheme_true : NULL); + if (argc > 5) + (void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1); + if (((argc > 5) && SCHEME_TRUEP(argv[5]))) { + #ifdef MZ_USE_MZRT + if (!ffi_sync_queue) { + mzrt_thread_id tid; + void *sig_hand; + + ffi_sync_queue = (FFI_Sync_Queue *)malloc(sizeof(FFI_Sync_Queue)); + tid = mz_proc_thread_self(); + ffi_sync_queue->orig_thread = tid; + mzrt_mutex_create(&ffi_sync_queue->lock); + sig_hand = scheme_get_signal_handle(); + ffi_sync_queue->sig_hand = sig_hand; + ffi_sync_queue->callbacks = NULL; + } + sync = scheme_make_raw_pair((Scheme_Object *)ffi_sync_queue, + argv[5]); + if (is_atomic) sync = scheme_box(sync); + #endif + do_callback = ffi_queue_callback; + } else + do_callback = ffi_do_callback; + /* malloc space for everything needed, so a single free gets rid of this */ + cl_cif_args = scheme_malloc_code(sizeof(closure_and_cif) + nargs*sizeof(ffi_cif*)); + cl = &(cl_cif_args->closure); /* cl is the same as cl_cif_args */ + cif = &(cl_cif_args->cif); + atypes = (ffi_type **)(((char*)cl_cif_args) + sizeof(closure_and_cif)); + for (i=0, p=itypes; iso.type = ffi_callback_tag; + data->callback = (cl_cif_args); + data->proc = (argv[0]); + data->itypes = (argv[1]); + data->otype = (argv[2]); + data->sync = (sync); +# ifdef MZ_PRECISE_GC + { + /* put data in immobile, weak box */ + void **tmp; + tmp = GC_malloc_immobile_box(GC_malloc_weak_box(data, NULL, 0)); + cl_cif_args->data = (struct immobile_box*)tmp; + } +# else /* MZ_PRECISE_GC undefined */ + cl_cif_args->data = (void*)data; +# endif /* MZ_PRECISE_GC */ + if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data)) + != FFI_OK) + scheme_signal_error + ("internal error: ffi_prep_closure did not return FFI_OK"); + scheme_register_finalizer(data, free_cl_cif_args, cl_cif_args, NULL, NULL); + return (Scheme_Object*)data; } #undef MYNAME @@ -2960,7 +3137,7 @@ void scheme_init_foreign(Scheme_Env *env) scheme_add_global("ffi-call", scheme_make_prim_w_arity(foreign_ffi_call, "ffi-call", 3, 5), menv); scheme_add_global("ffi-callback", - scheme_make_prim_w_arity(foreign_ffi_callback, "ffi-callback", 3, 5), menv); + scheme_make_prim_w_arity(foreign_ffi_callback, "ffi-callback", 3, 6), menv); scheme_add_global("saved-errno", scheme_make_prim_w_arity(foreign_saved_errno, "saved-errno", 0, 0), menv); scheme_add_global("lookup-errno", diff --git a/src/foreign/foreign.rktc b/src/foreign/foreign.rktc index 12b772ee3e..108ab16117 100755 --- a/src/foreign/foreign.rktc +++ b/src/foreign/foreign.rktc @@ -936,7 +936,13 @@ ffi_abi sym_to_abi(char *who, Scheme_Object *sym) [proc "Scheme_Object*"] [itypes "Scheme_Object*"] [otype "Scheme_Object*"] - [call_in_scheduler "char"]] + [sync "Scheme_Object*"]] + +/* The sync field: + NULL => non-atomic mode, no sync proc + #t => atomic mode, no sync proc + (rcons queue proc) => non-atomic mode, sync proc + (box (rcons queue proc)) => atomic mode, sync proc */ /*****************************************************************************/ /* Pointer objects */ @@ -1970,12 +1976,13 @@ void free_fficall_data(void *ignored, void *p) /*****************************************************************************/ /* Scheme callbacks */ -void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) +typedef void (*ffi_callback_t)(ffi_cif* cif, void* resultp, void** args, void *userdata); + +static ffi_callback_struct *extract_ffi_callback(void *userdata) + XFORM_SKIP_PROC { ffi_callback_struct *data; - Scheme_Object *argv_stack[MAX_QUICK_ARGS]; - int argc = cif->nargs, i; - Scheme_Object **argv, *p, *v; + #ifdef MZ_PRECISE_GC { void *tmp; @@ -1986,11 +1993,24 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) #else data = (ffi_callback_struct*)userdata; #endif + + return data; +} + +void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) +{ + ffi_callback_struct *data; + Scheme_Object *argv_stack[MAX_QUICK_ARGS]; + int argc = cif->nargs, i; + Scheme_Object **argv, *p, *v; + + data = extract_ffi_callback(userdata); + if (argc <= MAX_QUICK_ARGS) argv = argv_stack; else argv = scheme_malloc(argc * sizeof(Scheme_Object*)); - if (data->call_in_scheduler) + if (data->sync && !SCHEME_RPAIRP(data->sync)) scheme_start_in_scheduler(); for (i=0, p=data->itypes; iproc, argc, argv); SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1); - if (data->call_in_scheduler) + if (data->sync && !SCHEME_RPAIRP(data->sync)) scheme_end_in_scheduler(); } +#ifdef MZ_USE_MZRT + +/* When OS-level thread support is avaiable, support callbacks + in foreign threads that are executed on the main Scheme thread. */ + +typedef struct Queued_Callback { + ffi_cif* cif; + void* resultp; + void** args; + void *userdata; + mzrt_sema *sema; + int called; + struct Queued_Callback *next; +} Queued_Callback; + +typedef struct FFI_Sync_Queue { + Queued_Callback *callbacks; /* malloc()ed list */ + mzrt_mutex *lock; + mzrt_thread_id orig_thread; + void *sig_hand; +} FFI_Sync_Queue; + +THREAD_LOCAL_DECL(static struct FFI_Sync_Queue *ffi_sync_queue); + +static Scheme_Object *callback_thunk(void *_qc, int argc, Scheme_Object *argv[]) +{ + Queued_Callback *qc = (Queued_Callback *)_qc; + + if (qc->called) + scheme_raise_exn(MZEXN_FAIL_CONTRACT, + "callback thunk for synchronization has already been called"); + qc->called = 1; + + ffi_do_callback(qc->cif, qc->resultp, qc->args, qc->userdata); + + mzrt_sema_post(qc->sema); + + return scheme_void; +} + +void scheme_check_foreign_work(void) +{ + GC_CAN_IGNORE Queued_Callback *qc; + ffi_callback_struct *data; + Scheme_Object *a[1], *proc; + + if (ffi_sync_queue) { + do { + mzrt_mutex_lock(ffi_sync_queue->lock); + qc = ffi_sync_queue->callbacks; + if (qc) + ffi_sync_queue->callbacks = qc->next; + mzrt_mutex_unlock(ffi_sync_queue->lock); + + if (qc) { + qc->next = NULL; + + data = extract_ffi_callback(qc->userdata); + + proc = scheme_make_closed_prim_w_arity(callback_thunk, (void *)qc, + "callback-thunk", 0, 0); + a[0] = proc; + + proc = data->sync; + if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc); + proc = SCHEME_CDR(proc); + + scheme_start_in_scheduler(); + _scheme_apply(proc, 1, a); + scheme_end_in_scheduler(); + } + + } while (qc); + } +} + +#endif + +void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) + XFORM_SKIP_PROC +{ + ffi_callback_struct *data; + + data = extract_ffi_callback(userdata); + +#ifdef MZ_USE_MZRT + { + FFI_Sync_Queue *queue; + Scheme_Object *o; + + o = data->sync; + if (SCHEME_BOXP(o)) o = SCHEME_BOX_VAL(o); + + queue = (FFI_Sync_Queue *)SCHEME_CAR(o); + + if (queue->orig_thread != mz_proc_thread_self()) { + Queued_Callback *qc; + mzrt_sema *sema; + + mzrt_sema_create(&sema, 0); + + qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); + qc->cif = cif; + qc->resultp = resultp; + qc->args = args; + qc->userdata = userdata; + qc->sema = sema; + qc->called = 0; + + mzrt_mutex_lock(queue->lock); + qc->next = queue->callbacks; + queue->callbacks = qc; + mzrt_mutex_unlock(queue->lock); + scheme_signal_received_at(queue->sig_hand); + + /* wait for the callback to be invoked in the main thread */ + mzrt_sema_wait(sema); + + mzrt_sema_destroy(sema); + free(qc); + return; + } + } +#endif + ffi_do_callback(cif, resultp, args, userdata); +} + /* see ffi-callback below */ typedef struct closure_and_cif_struct { ffi_closure closure; @@ -2027,15 +2174,17 @@ void free_cl_cif_args(void *ignored, void *p) scheme_free_code(p); } -/* (ffi-callback scheme-proc in-types out-type [abi]) -> ffi-callback */ +/* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */ /* the treatment of in-types and out-types is similar to that in ffi-call */ /* the real work is done by ffi_do_callback above */ -@cdefine[ffi-callback 3 5]{ +@cdefine[ffi-callback 3 6]{ ffi_callback_struct *data; Scheme_Object *itypes = argv[1]; Scheme_Object *otype = argv[2]; + Scheme_Object *sync; Scheme_Object *p, *base; ffi_abi abi; + int is_atomic; int nargs, i; /* ffi_closure objects are problematic when used with a moving GC. The * problem is that memory that is GC-visible can move at any time. The @@ -2070,6 +2219,7 @@ void free_cl_cif_args(void *ignored, void *p) GC_CAN_IGNORE ffi_cif *cif; GC_CAN_IGNORE ffi_closure *cl; GC_CAN_IGNORE closure_and_cif *cl_cif_args; + GC_CAN_IGNORE ffi_callback_t do_callback; if (!SCHEME_PROCP(argv[0])) scheme_wrong_type(MYNAME, "procedure", 0, argc, argv); nargs = scheme_proper_list_length(itypes); @@ -2079,6 +2229,31 @@ void free_cl_cif_args(void *ignored, void *p) scheme_wrong_type(MYNAME, "C-type", 2, argc, argv); rtype = CTYPE_PRIMTYPE(base); abi = GET_ABI(MYNAME,3); + is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4])); + sync = (is_atomic ? scheme_true : NULL); + if (argc > 5) + (void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1); + if (((argc > 5) && SCHEME_TRUEP(argv[5]))) { +#ifdef MZ_USE_MZRT + if (!ffi_sync_queue) { + mzrt_thread_id tid; + void *sig_hand; + + ffi_sync_queue = (FFI_Sync_Queue *)malloc(sizeof(FFI_Sync_Queue)); + tid = mz_proc_thread_self(); + ffi_sync_queue->orig_thread = tid; + mzrt_mutex_create(&ffi_sync_queue->lock); + sig_hand = scheme_get_signal_handle(); + ffi_sync_queue->sig_hand = sig_hand; + ffi_sync_queue->callbacks = NULL; + } + sync = scheme_make_raw_pair((Scheme_Object *)ffi_sync_queue, + argv[5]); + if (is_atomic) sync = scheme_box(sync); +#endif + do_callback = ffi_queue_callback; + } else + do_callback = ffi_do_callback; /* malloc space for everything needed, so a single free gets rid of this */ cl_cif_args = scheme_malloc_code(sizeof(closure_and_cif) + nargs*sizeof(ffi_cif*)); cl = &(cl_cif_args->closure); /* cl is the same as cl_cif_args */ @@ -2095,7 +2270,7 @@ void free_cl_cif_args(void *ignored, void *p) scheme_signal_error("internal error: ffi_prep_cif did not return FFI_OK"); @cmake["data" ffi-callback "cl_cif_args" "argv[0]" "argv[1]" "argv[2]" - "((argc > 4) && SCHEME_TRUEP(argv[4]))"] + "sync"] @@@IFDEF{MZ_PRECISE_GC}{ { /* put data in immobile, weak box */ @@ -2106,7 +2281,7 @@ void free_cl_cif_args(void *ignored, void *p) }{ cl_cif_args->data = (void*)data; } - if (ffi_prep_closure(cl, cif, &ffi_do_callback, (void*)(cl_cif_args->data)) + if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data)) != FFI_OK) scheme_signal_error ("internal error: ffi_prep_closure did not return FFI_OK"); diff --git a/src/racket/include/schthread.h b/src/racket/include/schthread.h index 872b067452..8d407e2fa9 100644 --- a/src/racket/include/schthread.h +++ b/src/racket/include/schthread.h @@ -290,6 +290,7 @@ typedef struct Thread_Local_Variables { struct Scheme_Hash_Table *place_local_symbol_table_; struct Scheme_Hash_Table *place_local_keyword_table_; struct Scheme_Hash_Table *place_local_parallel_symbol_table_; + struct FFI_Sync_Queue *ffi_sync_queue_; /*KPLAKE1*/ } Thread_Local_Variables; @@ -582,6 +583,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL; #define place_local_symbol_table XOA (scheme_get_thread_local_variables()->place_local_symbol_table_) #define place_local_keyword_table XOA (scheme_get_thread_local_variables()->place_local_keyword_table_) #define place_local_parallel_symbol_table XOA (scheme_get_thread_local_variables()->place_local_parallel_symbol_table_) +#define ffi_sync_queue XOA (scheme_get_thread_local_variables()->ffi_sync_queue_) /*KPLAKE2*/ /* **************************************** */ diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index c4eb4e113e..104e65b2c4 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -469,6 +469,10 @@ void scheme_suspend_remembered_threads(void); void scheme_resume_remembered_threads(void); #endif +#ifdef MZ_USE_MZRT +extern void scheme_check_foreign_work(void); +#endif + void scheme_kickoff_green_thread_time_slice_timer(long usec); #ifdef UNIX_PROCESSES diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index ecd8c95588..7329d58ac3 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -4159,6 +4159,9 @@ void scheme_thread_block(float sleep_time) #ifdef MZ_USE_FUTURES scheme_check_future_work(); #endif +#ifdef MZ_USE_MZRT + scheme_check_foreign_work(); +#endif if (!do_atomic && (sleep_end >= 0.0)) { find_next_thread(&next);