From bba223a9fe057e64fa24f03ad48a475b290e5562 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Tue, 23 Apr 2013 10:57:24 -0600 Subject: [PATCH] ffi/unsafe: allow a constant result for asynch use of a callback A constant result for foreign-thread use of a callback allows a callback to return without synchronizing with the Racket thread. A constant result is thus useful when a callback's work can simply be skipped if the callback is applied in the "wrong" OS thread. --- collects/scribblings/foreign/types.scrbl | 21 ++++-- collects/tests/racket/foreign-test.c | 45 +++++++++++ collects/tests/racket/foreign-test.rktl | 30 +++++++- src/foreign/foreign.c | 95 ++++++++++++++++-------- src/foreign/foreign.rktc | 95 ++++++++++++++++-------- 5 files changed, 221 insertions(+), 65 deletions(-) diff --git a/collects/scribblings/foreign/types.scrbl b/collects/scribblings/foreign/types.scrbl index 4ae09d3307..93f66515cb 100644 --- a/collects/scribblings/foreign/types.scrbl +++ b/collects/scribblings/foreign/types.scrbl @@ -426,7 +426,7 @@ the later case, the result is the @racket[ctype]).} [output-type ctype?] [#:abi abi (or/c #f 'default 'stdcall 'sysv) #f] [#:atomic? atomic? any/c #f] - [#:async-apply async-apply (or/c #f ((-> any) . -> . any)) #f] + [#:async-apply async-apply (or/c #f ((-> any/c) . -> . any/c) box?) #f] [#:in-original-place? in-original-place? any/c #f] [#:save-errno save-errno (or/c #f 'posix 'windows) #f] [#:wrapper wrapper (or/c #f (procedure? . -> . procedure?)) @@ -597,10 +597,12 @@ For @tech{callbacks} to Racket functions with the generated type: to avoid C-level stack overflow; otherwise, the process may crash or misbehave.} - @item{If an @racket[async-apply] procedure is provided, then a Racket + @item{If a @racket[async-apply] is provided as a procedure or box, then a Racket @tech{callback} procedure with the generated procedure type can be applied in a foreign thread (i.e., an OS-level thread other - than the one used to run Racket). The call in the foreign + than the one used to run Racket). + + If @racket[async-apply] is a procedure, the call in the foreign thread is transferred to the OS-level thread that runs Racket, but the Racket-level thread (in the sense of @racket[thread]) is unspecified; the job of the provided @racket[async-apply] @@ -626,13 +628,22 @@ For @tech{callbacks} to Racket functions with the generated type: synchronizes within an unsuitable Racket-level thread, it can deadlock or otherwise damage the Racket process. + If @racket[async-apply] is a box, then the value contained in + the box is used as the result of the callback when it is called + in a foreign thread; the @racket[async-apply] value is + converted to a foreign value at the time that + @racket[_cprocedure] is called. Using a boxed constant value + for @racket[async-apply] avoids the need to synchronize with + the OS-level thread that runs Racket, but it effectively ignores + the Racket procedure that is wrapped as @tech{callback} when + the @tech{callback} is applied in a foreign thread. Foreign-thread detection to trigger @racket[async-apply] works only when Racket is compiled with OS-level thread support, which is the default for many platforms. If a callback with an @racket[async-apply] is called from foreign code in the same - OS-level thread that runs Racket, then the @racket[async-apply] - wrapper is not used.} + OS-level thread that runs Racket, then @racket[async-apply] + is not used.} ] } diff --git a/collects/tests/racket/foreign-test.c b/collects/tests/racket/foreign-test.c index 3e3191bb1d..03e4024877 100644 --- a/collects/tests/racket/foreign-test.c +++ b/collects/tests/racket/foreign-test.c @@ -1,4 +1,7 @@ #include +#ifdef USE_THREAD_TEST +#include +#endif typedef unsigned char byte; @@ -211,3 +214,45 @@ X union ic7iorl increment_ic7iorl(int which, union ic7iorl v) return v; } + +#ifdef USE_THREAD_TEST +typedef void* (*test_callback_t)(void*); +typedef void (*sleep_callback_t)(); + +void *do_f(void *_data) +{ + test_callback_t f = ((void **)_data)[0]; + void *data = ((void **)_data)[1]; + + data = f(data); + + ((void **)_data)[2] = (void *)1; + + return data; +} + +X void* foreign_thread_callback(test_callback_t f, + void *data, + sleep_callback_t s) +{ + pthread_t th; + void *r, **d; + + d = malloc(3 * sizeof(void*)); + d[0] = f; + d[1] = data; + d[2] = NULL; + + if (pthread_create(&th, NULL, do_f, d)) + return NULL; + + while (!d[2]) { + s(); + } + + if (pthread_join(th, &r)) + return NULL; + + return r; +} +#endif diff --git a/collects/tests/racket/foreign-test.rktl b/collects/tests/racket/foreign-test.rktl index 65d3ee8694..9f9b8de80d 100644 --- a/collects/tests/racket/foreign-test.rktl +++ b/collects/tests/racket/foreign-test.rktl @@ -6,7 +6,10 @@ (require ffi/unsafe ffi/unsafe/cvector ffi/vector - racket/extflonum) + racket/extflonum + racket/place) + +(define test-async? (and (place-enabled?) (not (eq? 'windows (system-type))))) (test #f malloc 0) (test #f malloc 0 _int) @@ -101,7 +104,15 @@ (system-type 'so-suffix))))]) (when (file-exists? o) (delete-file o)) (when (file-exists? so) (delete-file so)) - (parameterize ([current-standard-link-libraries '()]) + (parameterize ([current-standard-link-libraries '()] + [current-extension-compiler-flags + (if test-async? + (append '("-pthread" "-DUSE_THREAD_TEST") (current-extension-compiler-flags)) + (current-extension-compiler-flags))] + [current-extension-linker-flags + (if test-async? + (append '("-pthread") (current-extension-linker-flags)) + (current-extension-linker-flags))]) (compile-extension #t c o '()) (link-extension #t (list o) so)) (lambda () @@ -503,6 +514,21 @@ ;; Check a corner of UTF-16 conversion: (test "\U171D3" cast (cast "\U171D3" _string/utf-16 _pointer) _pointer _string/utf-16) +;; check async: +(when test-async? + (define (check async like) + (define foreign_thread_callback (get-ffi-obj 'foreign_thread_callback test-lib + (_fun (_fun #:async-apply async + _intptr -> _intptr) + _intptr + (_fun -> _void) + -> _intptr))) + (test (like 16) foreign_thread_callback (lambda (v) (add1 v)) 16 sleep)) + (check (lambda (f) (f)) add1) + (check (box 20) (lambda (x) 20))) + +;; ---------------------------------------- + (report-errs) #| --- ignore everything below --- diff --git a/src/foreign/foreign.c b/src/foreign/foreign.c index 19098ac75c..f031939bf5 100644 --- a/src/foreign/foreign.c +++ b/src/foreign/foreign.c @@ -3644,36 +3644,44 @@ void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata temporarily, because a GC may occur concurrent to this function if it's in another thread. */ FFI_Sync_Queue *queue; + void **data = (void **)userdata; - queue = (FFI_Sync_Queue *)((void **)userdata)[1]; - userdata = ((void **)userdata)[0]; + queue = (FFI_Sync_Queue *)(data)[1]; + userdata = (data)[0]; if (queue->orig_thread != mz_proc_thread_self()) { - Queued_Callback *qc; - mzrt_sema *sema; + if (data[2]) { + /* constant result */ + memcpy(resultp, data[2], (intptr_t)data[3]); + return; + } else { + /* queue a callback and wait: */ + Queued_Callback *qc; + mzrt_sema *sema; - mzrt_sema_create(&sema, 0); + mzrt_sema_create(&sema, 0); - qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); - qc->cif = cif; - qc->resultp = resultp; - qc->args = args; - qc->userdata = userdata; - qc->sema = sema; - qc->called = 0; + qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); + qc->cif = cif; + qc->resultp = resultp; + qc->args = args; + qc->userdata = userdata; + qc->sema = sema; + qc->called = 0; - mzrt_mutex_lock(queue->lock); - qc->next = queue->callbacks; - queue->callbacks = qc; - mzrt_mutex_unlock(queue->lock); - scheme_signal_received_at(queue->sig_hand); + mzrt_mutex_lock(queue->lock); + qc->next = queue->callbacks; + queue->callbacks = qc; + mzrt_mutex_unlock(queue->lock); + scheme_signal_received_at(queue->sig_hand); - /* wait for the callback to be invoked in the main thread */ - mzrt_sema_wait(sema); + /* wait for the callback to be invoked in the main thread */ + mzrt_sema_wait(sema); - mzrt_sema_destroy(sema); - free(qc); - return; + mzrt_sema_destroy(sema); + free(qc); + return; + } } #endif @@ -3709,10 +3717,12 @@ void free_cl_cif_args(void *ignored, void *p) #ifdef MZ_USE_MZRT void free_cl_cif_queue_args(void *ignored, void *p) { - void *data = ((closure_and_cif*)p)->data; + void *data = ((closure_and_cif*)p)->data, *constant_result; void **q = (void **)data; data = q[0]; + constant_result = q[3]; free(q); + if (constant_result) free(constant_result); #ifdef MZ_PRECISE_GC GC_free_immobile_box((void**)data); #endif @@ -3771,6 +3781,8 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[]) GC_CAN_IGNORE void *callback_data; # ifdef MZ_USE_MZRT int keep_queue = 0; + void *constant_reply; + int constant_reply_size; # endif /* MZ_USE_MZRT */ if (!SCHEME_PROCP(argv[0])) @@ -3784,8 +3796,10 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[]) abi = GET_ABI(MYNAME,3); is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4])); sync = (is_atomic ? scheme_true : NULL); - if (argc > 5) - (void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1); + if ((argc > 5) + && !SCHEME_BOXP(argv[5]) + && !scheme_check_proc_arity2(NULL, 1, 5, argc, argv, 1)) + scheme_wrong_contract(MYNAME, "(or/c #f (procedure-arity-includes/c 0) box?)", 5, argc, argv); if (((argc > 5) && SCHEME_TRUEP(argv[5]))) { # ifdef MZ_USE_MZRT if (!ffi_sync_queue) { @@ -3800,8 +3814,24 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[]) ffi_sync_queue->sig_hand = sig_hand; ffi_sync_queue->callbacks = NULL; } - sync = argv[5]; - if (is_atomic) sync = scheme_box(sync); + if (SCHEME_BOXP(argv[5])) { + /* when called in a foreign thread, return a constant */ + constant_reply_size = ctype_sizeof(otype); + if (!constant_reply_size && SCHEME_VOIDP(SCHEME_BOX_VAL(argv[5]))) { + /* void result */ + constant_reply = scheme_malloc_atomic(1); + } else { + /* non-void result */ + constant_reply = scheme_malloc_atomic(constant_reply_size); + SCHEME2C(MYNAME, otype, constant_reply, 0, SCHEME_BOX_VAL(argv[5]), NULL, NULL, 0); + } + } else { + /* when called in a foreign thread, queue a reply back here */ + sync = argv[5]; + if (is_atomic) sync = scheme_box(sync); + constant_reply = NULL; + constant_reply_size = 0; + } keep_queue = 1; # endif /* MZ_USE_MZRT */ do_callback = ffi_queue_callback; @@ -3844,10 +3874,17 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[]) /* For ffi_queue_callback(), add a level of indirection in `data' to hold the place-specific `ffi_sync_queue'. Use `free_cl_cif_data_args' to clean up this extra level. */ - GC_CAN_IGNORE void **tmp; - tmp = (void **)malloc(sizeof(void*) * 2); + GC_CAN_IGNORE void **tmp, *cr; + if (constant_reply) { + cr = malloc(constant_reply_size); + memcpy(cr, constant_reply, constant_reply_size); + constant_reply = cr; + } + tmp = (void **)malloc(sizeof(void*) * 4); tmp[0] = callback_data; tmp[1] = ffi_sync_queue; + tmp[2] = constant_reply; + tmp[3] = (void *)(intptr_t)constant_reply_size; callback_data = (void *)tmp; } # endif /* MZ_USE_MZRT */ diff --git a/src/foreign/foreign.rktc b/src/foreign/foreign.rktc index a0be4db6c5..eeb0768a50 100755 --- a/src/foreign/foreign.rktc +++ b/src/foreign/foreign.rktc @@ -2850,36 +2850,44 @@ void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata temporarily, because a GC may occur concurrent to this function if it's in another thread. */ FFI_Sync_Queue *queue; + void **data = (void **)userdata; - queue = (FFI_Sync_Queue *)((void **)userdata)[1]; - userdata = ((void **)userdata)[0]; + queue = (FFI_Sync_Queue *)(data)[1]; + userdata = (data)[0]; if (queue->orig_thread != mz_proc_thread_self()) { - Queued_Callback *qc; - mzrt_sema *sema; + if (data[2]) { + /* constant result */ + memcpy(resultp, data[2], (intptr_t)data[3]); + return; + } else { + /* queue a callback and wait: */ + Queued_Callback *qc; + mzrt_sema *sema; - mzrt_sema_create(&sema, 0); + mzrt_sema_create(&sema, 0); - qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); - qc->cif = cif; - qc->resultp = resultp; - qc->args = args; - qc->userdata = userdata; - qc->sema = sema; - qc->called = 0; + qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); + qc->cif = cif; + qc->resultp = resultp; + qc->args = args; + qc->userdata = userdata; + qc->sema = sema; + qc->called = 0; - mzrt_mutex_lock(queue->lock); - qc->next = queue->callbacks; - queue->callbacks = qc; - mzrt_mutex_unlock(queue->lock); - scheme_signal_received_at(queue->sig_hand); + mzrt_mutex_lock(queue->lock); + qc->next = queue->callbacks; + queue->callbacks = qc; + mzrt_mutex_unlock(queue->lock); + scheme_signal_received_at(queue->sig_hand); - /* wait for the callback to be invoked in the main thread */ - mzrt_sema_wait(sema); + /* wait for the callback to be invoked in the main thread */ + mzrt_sema_wait(sema); - mzrt_sema_destroy(sema); - free(qc); - return; + mzrt_sema_destroy(sema); + free(qc); + return; + } } #endif @@ -2915,10 +2923,12 @@ void free_cl_cif_args(void *ignored, void *p) #ifdef MZ_USE_MZRT void free_cl_cif_queue_args(void *ignored, void *p) { - void *data = ((closure_and_cif*)p)->data; + void *data = ((closure_and_cif*)p)->data, *constant_result; void **q = (void **)data; data = q[0]; + constant_result = q[3]; free(q); + if (constant_result) free(constant_result); #ifdef MZ_PRECISE_GC GC_free_immobile_box((void**)data); #endif @@ -2975,6 +2985,8 @@ void free_cl_cif_queue_args(void *ignored, void *p) GC_CAN_IGNORE void *callback_data; @@IFDEF{MZ_USE_MZRT}{ int keep_queue = 0; + void *constant_reply; + int constant_reply_size; } if (!SCHEME_PROCP(argv[0])) @@ -2988,8 +3000,10 @@ void free_cl_cif_queue_args(void *ignored, void *p) abi = GET_ABI(MYNAME,3); is_atomic = ((argc > 4) && SCHEME_TRUEP(argv[4])); sync = (is_atomic ? scheme_true : NULL); - if (argc > 5) - (void)scheme_check_proc_arity2(MYNAME, 1, 5, argc, argv, 1); + if ((argc > 5) + && !SCHEME_BOXP(argv[5]) + && !scheme_check_proc_arity2(NULL, 1, 5, argc, argv, 1)) + scheme_wrong_contract(MYNAME, "(or/c #f (procedure-arity-includes/c 0) box?)", 5, argc, argv); if (((argc > 5) && SCHEME_TRUEP(argv[5]))) { @@IFDEF{MZ_USE_MZRT}{ if (!ffi_sync_queue) { @@ -3004,8 +3018,24 @@ void free_cl_cif_queue_args(void *ignored, void *p) ffi_sync_queue->sig_hand = sig_hand; ffi_sync_queue->callbacks = NULL; } - sync = argv[5]; - if (is_atomic) sync = scheme_box(sync); + if (SCHEME_BOXP(argv[5])) { + /* when called in a foreign thread, return a constant */ + constant_reply_size = ctype_sizeof(otype); + if (!constant_reply_size && SCHEME_VOIDP(SCHEME_BOX_VAL(argv[5]))) { + /* void result */ + constant_reply = scheme_malloc_atomic(1); + } else { + /* non-void result */ + constant_reply = scheme_malloc_atomic(constant_reply_size); + SCHEME2C(MYNAME, otype, constant_reply, 0, SCHEME_BOX_VAL(argv[5]), NULL, NULL, 0); + } + } else { + /* when called in a foreign thread, queue a reply back here */ + sync = argv[5]; + if (is_atomic) sync = scheme_box(sync); + constant_reply = NULL; + constant_reply_size = 0; + } keep_queue = 1; } do_callback = ffi_queue_callback; @@ -3044,10 +3074,17 @@ void free_cl_cif_queue_args(void *ignored, void *p) /* For ffi_queue_callback(), add a level of indirection in `data' to hold the place-specific `ffi_sync_queue'. Use `free_cl_cif_data_args' to clean up this extra level. */ - GC_CAN_IGNORE void **tmp; - tmp = (void **)malloc(sizeof(void*) * 2); + GC_CAN_IGNORE void **tmp, *cr; + if (constant_reply) { + cr = malloc(constant_reply_size); + memcpy(cr, constant_reply, constant_reply_size); + constant_reply = cr; + } + tmp = (void **)malloc(sizeof(void*) * 4); tmp[0] = callback_data; tmp[1] = ffi_sync_queue; + tmp[2] = constant_reply; + tmp[3] = (void *)(intptr_t)constant_reply_size; callback_data = (void *)tmp; } }