ffi/unsafe: allow a constant result for asynch use of a callback

A constant result for foreign-thread use of a callback allows a
callback to return without synchronizing with the Racket thread.
A constant result is thus useful when a callback's work can simply
be skipped if the callback is applied in the "wrong" OS thread.
This commit is contained in:
Matthew Flatt 2013-04-23 10:57:24 -06:00
parent 07fd3676d0
commit bba223a9fe
5 changed files with 221 additions and 65 deletions

View File

@ -426,7 +426,7 @@ the later case, the result is the @racket[ctype]).}
[output-type ctype?] [output-type ctype?]
[#:abi abi (or/c #f 'default 'stdcall 'sysv) #f] [#:abi abi (or/c #f 'default 'stdcall 'sysv) #f]
[#:atomic? atomic? any/c #f] [#:atomic? atomic? any/c #f]
[#:async-apply async-apply (or/c #f ((-> any) . -> . any)) #f] [#:async-apply async-apply (or/c #f ((-> any/c) . -> . any/c) box?) #f]
[#:in-original-place? in-original-place? any/c #f] [#:in-original-place? in-original-place? any/c #f]
[#:save-errno save-errno (or/c #f 'posix 'windows) #f] [#:save-errno save-errno (or/c #f 'posix 'windows) #f]
[#:wrapper wrapper (or/c #f (procedure? . -> . procedure?)) [#:wrapper wrapper (or/c #f (procedure? . -> . procedure?))
@ -597,10 +597,12 @@ For @tech{callbacks} to Racket functions with the generated type:
to avoid C-level stack overflow; otherwise, the process may to avoid C-level stack overflow; otherwise, the process may
crash or misbehave.} crash or misbehave.}
@item{If an @racket[async-apply] procedure is provided, then a Racket @item{If a @racket[async-apply] is provided as a procedure or box, then a Racket
@tech{callback} procedure with the generated procedure type can @tech{callback} procedure with the generated procedure type can
be applied in a foreign thread (i.e., an OS-level thread other be applied in a foreign thread (i.e., an OS-level thread other
than the one used to run Racket). The call in the foreign than the one used to run Racket).
If @racket[async-apply] is a procedure, the call in the foreign
thread is transferred to the OS-level thread that runs Racket, thread is transferred to the OS-level thread that runs Racket,
but the Racket-level thread (in the sense of @racket[thread]) but the Racket-level thread (in the sense of @racket[thread])
is unspecified; the job of the provided @racket[async-apply] is unspecified; the job of the provided @racket[async-apply]
@ -626,13 +628,22 @@ For @tech{callbacks} to Racket functions with the generated type:
synchronizes within an unsuitable Racket-level thread, it can synchronizes within an unsuitable Racket-level thread, it can
deadlock or otherwise damage the Racket process. deadlock or otherwise damage the Racket process.
If @racket[async-apply] is a box, then the value contained in
the box is used as the result of the callback when it is called
in a foreign thread; the @racket[async-apply] value is
converted to a foreign value at the time that
@racket[_cprocedure] is called. Using a boxed constant value
for @racket[async-apply] avoids the need to synchronize with
the OS-level thread that runs Racket, but it effectively ignores
the Racket procedure that is wrapped as @tech{callback} when
the @tech{callback} is applied in a foreign thread.
Foreign-thread detection to trigger @racket[async-apply] works Foreign-thread detection to trigger @racket[async-apply] works
only when Racket is compiled with OS-level thread support, only when Racket is compiled with OS-level thread support,
which is the default for many platforms. If a callback with an which is the default for many platforms. If a callback with an
@racket[async-apply] is called from foreign code in the same @racket[async-apply] is called from foreign code in the same
OS-level thread that runs Racket, then the @racket[async-apply] OS-level thread that runs Racket, then @racket[async-apply]
wrapper is not used.} is not used.}
] ]
} }

View File

@ -1,4 +1,7 @@
#include <stdlib.h> #include <stdlib.h>
#ifdef USE_THREAD_TEST
#include <pthread.h>
#endif
typedef unsigned char byte; typedef unsigned char byte;
@ -211,3 +214,45 @@ X union ic7iorl increment_ic7iorl(int which, union ic7iorl v)
return v; return v;
} }
#ifdef USE_THREAD_TEST
typedef void* (*test_callback_t)(void*);
typedef void (*sleep_callback_t)();
void *do_f(void *_data)
{
test_callback_t f = ((void **)_data)[0];
void *data = ((void **)_data)[1];
data = f(data);
((void **)_data)[2] = (void *)1;
return data;
}
X void* foreign_thread_callback(test_callback_t f,
void *data,
sleep_callback_t s)
{
pthread_t th;
void *r, **d;
d = malloc(3 * sizeof(void*));
d[0] = f;
d[1] = data;
d[2] = NULL;
if (pthread_create(&th, NULL, do_f, d))
return NULL;
while (!d[2]) {
s();
}
if (pthread_join(th, &r))
return NULL;
return r;
}
#endif

View File

@ -6,7 +6,10 @@
(require ffi/unsafe (require ffi/unsafe
ffi/unsafe/cvector ffi/unsafe/cvector
ffi/vector ffi/vector
racket/extflonum) racket/extflonum
racket/place)
(define test-async? (and (place-enabled?) (not (eq? 'windows (system-type)))))
(test #f malloc 0) (test #f malloc 0)
(test #f malloc 0 _int) (test #f malloc 0 _int)
@ -101,7 +104,15 @@
(system-type 'so-suffix))))]) (system-type 'so-suffix))))])
(when (file-exists? o) (delete-file o)) (when (file-exists? o) (delete-file o))
(when (file-exists? so) (delete-file so)) (when (file-exists? so) (delete-file so))
(parameterize ([current-standard-link-libraries '()]) (parameterize ([current-standard-link-libraries '()]
[current-extension-compiler-flags
(if test-async?
(append '("-pthread" "-DUSE_THREAD_TEST") (current-extension-compiler-flags))
(current-extension-compiler-flags))]
[current-extension-linker-flags
(if test-async?
(append '("-pthread") (current-extension-linker-flags))
(current-extension-linker-flags))])
(compile-extension #t c o '()) (compile-extension #t c o '())
(link-extension #t (list o) so)) (link-extension #t (list o) so))
(lambda () (lambda ()
@ -503,6 +514,21 @@
;; Check a corner of UTF-16 conversion: ;; Check a corner of UTF-16 conversion:
(test "\U171D3" cast (cast "\U171D3" _string/utf-16 _pointer) _pointer _string/utf-16) (test "\U171D3" cast (cast "\U171D3" _string/utf-16 _pointer) _pointer _string/utf-16)
;; check async:
(when test-async?
(define (check async like)
(define foreign_thread_callback (get-ffi-obj 'foreign_thread_callback test-lib
(_fun (_fun #:async-apply async
_intptr -> _intptr)
_intptr
(_fun -> _void)
-> _intptr)))
(test (like 16) foreign_thread_callback (lambda (v) (add1 v)) 16 sleep))
(check (lambda (f) (f)) add1)
(check (box 20) (lambda (x) 20)))
;; ----------------------------------------
(report-errs) (report-errs)
#| --- ignore everything below --- #| --- ignore everything below ---

View File

@ -3644,36 +3644,44 @@ void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata
temporarily, because a GC may occur concurrent to this temporarily, because a GC may occur concurrent to this
function if it's in another thread. */ function if it's in another thread. */
FFI_Sync_Queue *queue; FFI_Sync_Queue *queue;
void **data = (void **)userdata;
queue = (FFI_Sync_Queue *)((void **)userdata)[1]; queue = (FFI_Sync_Queue *)(data)[1];
userdata = ((void **)userdata)[0]; userdata = (data)[0];
if (queue->orig_thread != mz_proc_thread_self()) { if (queue->orig_thread != mz_proc_thread_self()) {
Queued_Callback *qc; if (data[2]) {
mzrt_sema *sema; /* constant result */
memcpy(resultp, data[2], (intptr_t)data[3]);
return;
} else {
/* queue a callback and wait: */
Queued_Callback *qc;
mzrt_sema *sema;
mzrt_sema_create(&sema, 0); mzrt_sema_create(&sema, 0);
qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); qc = (Queued_Callback *)malloc(sizeof(Queued_Callback));
qc->cif = cif; qc->cif = cif;
qc->resultp = resultp; qc->resultp = resultp;
qc->args = args; qc->args = args;
qc->userdata = userdata; qc->userdata = userdata;
qc->sema = sema; qc->sema = sema;
qc->called = 0; qc->called = 0;
mzrt_mutex_lock(queue->lock); mzrt_mutex_lock(queue->lock);
qc->next = queue->callbacks; qc->next = queue->callbacks;
queue->callbacks = qc; queue->callbacks = qc;
mzrt_mutex_unlock(queue->lock); mzrt_mutex_unlock(queue->lock);
scheme_signal_received_at(queue->sig_hand); scheme_signal_received_at(queue->sig_hand);
/* wait for the callback to be invoked in the main thread */ /* wait for the callback to be invoked in the main thread */
mzrt_sema_wait(sema); mzrt_sema_wait(sema);
mzrt_sema_destroy(sema); mzrt_sema_destroy(sema);
free(qc); free(qc);
return; return;
}
} }
#endif #endif
@ -3709,10 +3717,12 @@ void free_cl_cif_args(void *ignored, void *p)
#ifdef MZ_USE_MZRT #ifdef MZ_USE_MZRT
void free_cl_cif_queue_args(void *ignored, void *p) void free_cl_cif_queue_args(void *ignored, void *p)
{ {
void *data = ((closure_and_cif*)p)->data; void *data = ((closure_and_cif*)p)->data, *constant_result;
void **q = (void **)data; void **q = (void **)data;
data = q[0]; data = q[0];
constant_result = q[3];
free(q); free(q);
if (constant_result) free(constant_result);
#ifdef MZ_PRECISE_GC #ifdef MZ_PRECISE_GC
GC_free_immobile_box((void**)data); GC_free_immobile_box((void**)data);
#endif #endif
@ -3771,6 +3781,8 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
GC_CAN_IGNORE void *callback_data; GC_CAN_IGNORE void *callback_data;
# ifdef MZ_USE_MZRT # ifdef MZ_USE_MZRT
int keep_queue = 0; int keep_queue = 0;
void *constant_reply;
int constant_reply_size;
# endif /* MZ_USE_MZRT */ # endif /* MZ_USE_MZRT */
if (!SCHEME_PROCP(argv[0])) if (!SCHEME_PROCP(argv[0]))
@ -3784,8 +3796,10 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
abi = GET_ABI(MYNAME,3); abi = GET_ABI(MYNAME,3);
is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4])); is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4]));
sync = (is_atomic ? scheme_true : NULL); sync = (is_atomic ? scheme_true : NULL);
if (argc > 5) if ((argc > 5)
(void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1); && !SCHEME_BOXP(argv[5])
&& !scheme_check_proc_arity2(NULL, 1, 5, argc, argv, 1))
scheme_wrong_contract(MYNAME, "(or/c #f (procedure-arity-includes/c 0) box?)", 5, argc, argv);
if (((argc > 5) && SCHEME_TRUEP(argv[5]))) { if (((argc > 5) && SCHEME_TRUEP(argv[5]))) {
# ifdef MZ_USE_MZRT # ifdef MZ_USE_MZRT
if (!ffi_sync_queue) { if (!ffi_sync_queue) {
@ -3800,8 +3814,24 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
ffi_sync_queue->sig_hand = sig_hand; ffi_sync_queue->sig_hand = sig_hand;
ffi_sync_queue->callbacks = NULL; ffi_sync_queue->callbacks = NULL;
} }
sync = argv[5]; if (SCHEME_BOXP(argv[5])) {
if (is_atomic) sync = scheme_box(sync); /* when called in a foreign thread, return a constant */
constant_reply_size = ctype_sizeof(otype);
if (!constant_reply_size && SCHEME_VOIDP(SCHEME_BOX_VAL(argv[5]))) {
/* void result */
constant_reply = scheme_malloc_atomic(1);
} else {
/* non-void result */
constant_reply = scheme_malloc_atomic(constant_reply_size);
SCHEME2C(MYNAME, otype, constant_reply, 0, SCHEME_BOX_VAL(argv[5]), NULL, NULL, 0);
}
} else {
/* when called in a foreign thread, queue a reply back here */
sync = argv[5];
if (is_atomic) sync = scheme_box(sync);
constant_reply = NULL;
constant_reply_size = 0;
}
keep_queue = 1; keep_queue = 1;
# endif /* MZ_USE_MZRT */ # endif /* MZ_USE_MZRT */
do_callback = ffi_queue_callback; do_callback = ffi_queue_callback;
@ -3844,10 +3874,17 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
/* For ffi_queue_callback(), add a level of indirection in `data' to /* For ffi_queue_callback(), add a level of indirection in `data' to
hold the place-specific `ffi_sync_queue'. Use hold the place-specific `ffi_sync_queue'. Use
`free_cl_cif_data_args' to clean up this extra level. */ `free_cl_cif_data_args' to clean up this extra level. */
GC_CAN_IGNORE void **tmp; GC_CAN_IGNORE void **tmp, *cr;
tmp = (void **)malloc(sizeof(void*) * 2); if (constant_reply) {
cr = malloc(constant_reply_size);
memcpy(cr, constant_reply, constant_reply_size);
constant_reply = cr;
}
tmp = (void **)malloc(sizeof(void*) * 4);
tmp[0] = callback_data; tmp[0] = callback_data;
tmp[1] = ffi_sync_queue; tmp[1] = ffi_sync_queue;
tmp[2] = constant_reply;
tmp[3] = (void *)(intptr_t)constant_reply_size;
callback_data = (void *)tmp; callback_data = (void *)tmp;
} }
# endif /* MZ_USE_MZRT */ # endif /* MZ_USE_MZRT */

View File

@ -2850,36 +2850,44 @@ void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata
temporarily, because a GC may occur concurrent to this temporarily, because a GC may occur concurrent to this
function if it's in another thread. */ function if it's in another thread. */
FFI_Sync_Queue *queue; FFI_Sync_Queue *queue;
void **data = (void **)userdata;
queue = (FFI_Sync_Queue *)((void **)userdata)[1]; queue = (FFI_Sync_Queue *)(data)[1];
userdata = ((void **)userdata)[0]; userdata = (data)[0];
if (queue->orig_thread != mz_proc_thread_self()) { if (queue->orig_thread != mz_proc_thread_self()) {
Queued_Callback *qc; if (data[2]) {
mzrt_sema *sema; /* constant result */
memcpy(resultp, data[2], (intptr_t)data[3]);
return;
} else {
/* queue a callback and wait: */
Queued_Callback *qc;
mzrt_sema *sema;
mzrt_sema_create(&sema, 0); mzrt_sema_create(&sema, 0);
qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); qc = (Queued_Callback *)malloc(sizeof(Queued_Callback));
qc->cif = cif; qc->cif = cif;
qc->resultp = resultp; qc->resultp = resultp;
qc->args = args; qc->args = args;
qc->userdata = userdata; qc->userdata = userdata;
qc->sema = sema; qc->sema = sema;
qc->called = 0; qc->called = 0;
mzrt_mutex_lock(queue->lock); mzrt_mutex_lock(queue->lock);
qc->next = queue->callbacks; qc->next = queue->callbacks;
queue->callbacks = qc; queue->callbacks = qc;
mzrt_mutex_unlock(queue->lock); mzrt_mutex_unlock(queue->lock);
scheme_signal_received_at(queue->sig_hand); scheme_signal_received_at(queue->sig_hand);
/* wait for the callback to be invoked in the main thread */ /* wait for the callback to be invoked in the main thread */
mzrt_sema_wait(sema); mzrt_sema_wait(sema);
mzrt_sema_destroy(sema); mzrt_sema_destroy(sema);
free(qc); free(qc);
return; return;
}
} }
#endif #endif
@ -2915,10 +2923,12 @@ void free_cl_cif_args(void *ignored, void *p)
#ifdef MZ_USE_MZRT #ifdef MZ_USE_MZRT
void free_cl_cif_queue_args(void *ignored, void *p) void free_cl_cif_queue_args(void *ignored, void *p)
{ {
void *data = ((closure_and_cif*)p)->data; void *data = ((closure_and_cif*)p)->data, *constant_result;
void **q = (void **)data; void **q = (void **)data;
data = q[0]; data = q[0];
constant_result = q[3];
free(q); free(q);
if (constant_result) free(constant_result);
#ifdef MZ_PRECISE_GC #ifdef MZ_PRECISE_GC
GC_free_immobile_box((void**)data); GC_free_immobile_box((void**)data);
#endif #endif
@ -2975,6 +2985,8 @@ void free_cl_cif_queue_args(void *ignored, void *p)
GC_CAN_IGNORE void *callback_data; GC_CAN_IGNORE void *callback_data;
@@IFDEF{MZ_USE_MZRT}{ @@IFDEF{MZ_USE_MZRT}{
int keep_queue = 0; int keep_queue = 0;
void *constant_reply;
int constant_reply_size;
} }
if (!SCHEME_PROCP(argv[0])) if (!SCHEME_PROCP(argv[0]))
@ -2988,8 +3000,10 @@ void free_cl_cif_queue_args(void *ignored, void *p)
abi = GET_ABI(MYNAME,3); abi = GET_ABI(MYNAME,3);
is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4])); is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4]));
sync = (is_atomic ? scheme_true : NULL); sync = (is_atomic ? scheme_true : NULL);
if (argc > 5) if ((argc > 5)
(void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1); && !SCHEME_BOXP(argv[5])
&& !scheme_check_proc_arity2(NULL, 1, 5, argc, argv, 1))
scheme_wrong_contract(MYNAME, "(or/c #f (procedure-arity-includes/c 0) box?)", 5, argc, argv);
if (((argc > 5) && SCHEME_TRUEP(argv[5]))) { if (((argc > 5) && SCHEME_TRUEP(argv[5]))) {
@@IFDEF{MZ_USE_MZRT}{ @@IFDEF{MZ_USE_MZRT}{
if (!ffi_sync_queue) { if (!ffi_sync_queue) {
@ -3004,8 +3018,24 @@ void free_cl_cif_queue_args(void *ignored, void *p)
ffi_sync_queue->sig_hand = sig_hand; ffi_sync_queue->sig_hand = sig_hand;
ffi_sync_queue->callbacks = NULL; ffi_sync_queue->callbacks = NULL;
} }
sync = argv[5]; if (SCHEME_BOXP(argv[5])) {
if (is_atomic) sync = scheme_box(sync); /* when called in a foreign thread, return a constant */
constant_reply_size = ctype_sizeof(otype);
if (!constant_reply_size && SCHEME_VOIDP(SCHEME_BOX_VAL(argv[5]))) {
/* void result */
constant_reply = scheme_malloc_atomic(1);
} else {
/* non-void result */
constant_reply = scheme_malloc_atomic(constant_reply_size);
SCHEME2C(MYNAME, otype, constant_reply, 0, SCHEME_BOX_VAL(argv[5]), NULL, NULL, 0);
}
} else {
/* when called in a foreign thread, queue a reply back here */
sync = argv[5];
if (is_atomic) sync = scheme_box(sync);
constant_reply = NULL;
constant_reply_size = 0;
}
keep_queue = 1; keep_queue = 1;
} }
do_callback = ffi_queue_callback; do_callback = ffi_queue_callback;
@ -3044,10 +3074,17 @@ void free_cl_cif_queue_args(void *ignored, void *p)
/* For ffi_queue_callback(), add a level of indirection in `data' to /* For ffi_queue_callback(), add a level of indirection in `data' to
hold the place-specific `ffi_sync_queue'. Use hold the place-specific `ffi_sync_queue'. Use
`free_cl_cif_data_args' to clean up this extra level. */ `free_cl_cif_data_args' to clean up this extra level. */
GC_CAN_IGNORE void **tmp; GC_CAN_IGNORE void **tmp, *cr;
tmp = (void **)malloc(sizeof(void*) * 2); if (constant_reply) {
cr = malloc(constant_reply_size);
memcpy(cr, constant_reply, constant_reply_size);
constant_reply = cr;
}
tmp = (void **)malloc(sizeof(void*) * 4);
tmp[0] = callback_data; tmp[0] = callback_data;
tmp[1] = ffi_sync_queue; tmp[1] = ffi_sync_queue;
tmp[2] = constant_reply;
tmp[3] = (void *)(intptr_t)constant_reply_size;
callback_data = (void *)tmp; callback_data = (void *)tmp;
} }
} }