fix GC interaction for non-Racket thread and #:async-apply callbacks

This commit is contained in:
Matthew Flatt 2010-11-23 09:39:28 -07:00
parent f3c62a0efd
commit 142cdb800f
2 changed files with 155 additions and 91 deletions

View File

@ -1187,10 +1187,10 @@ END_XFORM_SKIP;
#endif #endif
/* The sync field: /* The sync field:
NULL => non-atomic mode, no sync proc NULL => non-atomic mode
#t => atomic mode, no sync proc #t => atomic mode, no sync proc
(rcons queue proc) => non-atomic mode, sync proc proc => non-atomic mode, sync proc
(box (rcons queue proc)) => atomic mode, sync proc */ (box proc) => atomic mode, sync proc */
/*****************************************************************************/ /*****************************************************************************/
/* Pointer objects */ /* Pointer objects */
@ -2709,7 +2709,7 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
argv = argv_stack; argv = argv_stack;
else else
argv = scheme_malloc(argc * sizeof(Scheme_Object*)); argv = scheme_malloc(argc * sizeof(Scheme_Object*));
if (data->sync && !SCHEME_RPAIRP(data->sync)) if (data->sync && !SCHEME_PROCP(data->sync))
scheme_start_in_scheduler(); scheme_start_in_scheduler();
for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) { for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) {
v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0, 0); v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0, 0);
@ -2717,7 +2717,7 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
} }
p = _scheme_apply(data->proc, argc, argv); p = _scheme_apply(data->proc, argc, argv);
SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1); SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1);
if (data->sync && !SCHEME_RPAIRP(data->sync)) if (data->sync && !SCHEME_PROCP(data->sync))
scheme_end_in_scheduler(); scheme_end_in_scheduler();
} }
@ -2786,7 +2786,6 @@ void scheme_check_foreign_work(void)
proc = data->sync; proc = data->sync;
if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc); if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc);
proc = SCHEME_CDR(proc);
scheme_start_in_scheduler(); scheme_start_in_scheduler();
_scheme_apply(proc, 1, a); _scheme_apply(proc, 1, a);
@ -2802,49 +2801,44 @@ void scheme_check_foreign_work(void)
void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
XFORM_SKIP_PROC XFORM_SKIP_PROC
{ {
ffi_callback_struct *data;
data = extract_ffi_callback(userdata);
#ifdef MZ_USE_MZRT #ifdef MZ_USE_MZRT
{ /* This function must not refer to any GCable address, not even
FFI_Sync_Queue *queue; temporarily, because a GC may occur concurrent to this
Scheme_Object *o; function if it's in another thread. */
FFI_Sync_Queue *queue;
o = data->sync; queue = (FFI_Sync_Queue *)((void **)userdata)[1];
if (SCHEME_BOXP(o)) o = SCHEME_BOX_VAL(o); userdata = ((void **)userdata)[0];
queue = (FFI_Sync_Queue *)SCHEME_CAR(o); if (queue->orig_thread != mz_proc_thread_self()) {
Queued_Callback *qc;
mzrt_sema *sema;
if (queue->orig_thread != mz_proc_thread_self()) { mzrt_sema_create(&sema, 0);
Queued_Callback *qc;
mzrt_sema *sema;
mzrt_sema_create(&sema, 0); qc = (Queued_Callback *)malloc(sizeof(Queued_Callback));
qc->cif = cif;
qc->resultp = resultp;
qc->args = args;
qc->userdata = userdata;
qc->sema = sema;
qc->called = 0;
qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); mzrt_mutex_lock(queue->lock);
qc->cif = cif; qc->next = queue->callbacks;
qc->resultp = resultp; queue->callbacks = qc;
qc->args = args; mzrt_mutex_unlock(queue->lock);
qc->userdata = userdata; scheme_signal_received_at(queue->sig_hand);
qc->sema = sema;
qc->called = 0;
mzrt_mutex_lock(queue->lock); /* wait for the callback to be invoked in the main thread */
qc->next = queue->callbacks; mzrt_sema_wait(sema);
queue->callbacks = qc;
mzrt_mutex_unlock(queue->lock);
scheme_signal_received_at(queue->sig_hand);
/* wait for the callback to be invoked in the main thread */ mzrt_sema_destroy(sema);
mzrt_sema_wait(sema); free(qc);
return;
mzrt_sema_destroy(sema);
free(qc);
return;
}
} }
#endif #endif
ffi_do_callback(cif, resultp, args, userdata); ffi_do_callback(cif, resultp, args, userdata);
} }
@ -2858,6 +2852,7 @@ typedef struct closure_and_cif_struct {
void *data; void *data;
#endif #endif
} closure_and_cif; } closure_and_cif;
/* free the above */ /* free the above */
void free_cl_cif_args(void *ignored, void *p) void free_cl_cif_args(void *ignored, void *p)
{ {
@ -2873,6 +2868,20 @@ void free_cl_cif_args(void *ignored, void *p)
scheme_free_code(p); scheme_free_code(p);
} }
#ifdef MZ_USE_MZRT
void free_cl_cif_queue_args(void *ignored, void *p)
{
void *data = ((closure_and_cif*)p)->data;
void **q = (void **)data;
data = q[0];
free(q);
#ifdef MZ_PRECISE_GC
GC_free_immobile_box((void**)data);
#endif
scheme_free_code(p);
}
#endif
/* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */ /* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */
/* the treatment of in-types and out-types is similar to that in ffi-call */ /* the treatment of in-types and out-types is similar to that in ffi-call */
/* the real work is done by ffi_do_callback above */ /* the real work is done by ffi_do_callback above */
@ -2921,6 +2930,11 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
GC_CAN_IGNORE ffi_closure *cl; GC_CAN_IGNORE ffi_closure *cl;
GC_CAN_IGNORE closure_and_cif *cl_cif_args; GC_CAN_IGNORE closure_and_cif *cl_cif_args;
GC_CAN_IGNORE ffi_callback_t do_callback; GC_CAN_IGNORE ffi_callback_t do_callback;
GC_CAN_IGNORE void *callback_data;
#ifdef MZ_USE_MZRT
int keep_queue = 0;
#endif
if (!SCHEME_PROCP(argv[0])) if (!SCHEME_PROCP(argv[0]))
scheme_wrong_type(MYNAME, "procedure", 0, argc, argv); scheme_wrong_type(MYNAME, "procedure", 0, argc, argv);
nargs = scheme_proper_list_length(itypes); nargs = scheme_proper_list_length(itypes);
@ -2948,9 +2962,9 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
ffi_sync_queue->sig_hand = sig_hand; ffi_sync_queue->sig_hand = sig_hand;
ffi_sync_queue->callbacks = NULL; ffi_sync_queue->callbacks = NULL;
} }
sync = scheme_make_raw_pair((Scheme_Object *)ffi_sync_queue, sync = argv[5];
argv[5]);
if (is_atomic) sync = scheme_box(sync); if (is_atomic) sync = scheme_box(sync);
keep_queue = 1;
#endif #endif
do_callback = ffi_queue_callback; do_callback = ffi_queue_callback;
} else } else
@ -2979,18 +2993,36 @@ static Scheme_Object *foreign_ffi_callback(int argc, Scheme_Object *argv[])
# ifdef MZ_PRECISE_GC # ifdef MZ_PRECISE_GC
{ {
/* put data in immobile, weak box */ /* put data in immobile, weak box */
void **tmp; GC_CAN_IGNORE void **tmp;
tmp = GC_malloc_immobile_box(GC_malloc_weak_box(data, NULL, 0, 1)); tmp = GC_malloc_immobile_box(GC_malloc_weak_box(data, NULL, 0, 1));
cl_cif_args->data = (struct immobile_box*)tmp; callback_data = (struct immobile_box*)tmp;
} }
# else /* MZ_PRECISE_GC undefined */ # else /* MZ_PRECISE_GC undefined */
cl_cif_args->data = (void*)data; callback_data = (void*)data;
# endif /* MZ_PRECISE_GC */ # endif /* MZ_PRECISE_GC */
#ifdef MZ_USE_MZRT
if (keep_queue) {
/* For ffi_queue_callback(), add a level of indirection in
`data' to hold the place-specific `ffi_sync_queue'.
Use `free_cl_cif_data_args' to clean up this extra level. */
GC_CAN_IGNORE void **tmp;
tmp = (void **)malloc(sizeof(void*) * 2);
tmp[0] = callback_data;
tmp[1] = ffi_sync_queue;
callback_data = (void *)tmp;
}
#endif
cl_cif_args->data = callback_data;
if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data)) if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data))
!= FFI_OK) != FFI_OK)
scheme_signal_error scheme_signal_error
("internal error: ffi_prep_closure did not return FFI_OK"); ("internal error: ffi_prep_closure did not return FFI_OK");
scheme_register_finalizer(data, free_cl_cif_args, cl_cif_args, NULL, NULL); #ifdef MZ_USE_MZRT
if (keep_queue)
scheme_register_finalizer(data, free_cl_cif_queue_args, cl_cif_args, NULL, NULL);
else
#endif
scheme_register_finalizer(data, free_cl_cif_args, cl_cif_args, NULL, NULL);
return (Scheme_Object*)data; return (Scheme_Object*)data;
} }
#undef MYNAME #undef MYNAME

View File

@ -983,10 +983,10 @@ ffi_abi sym_to_abi(char *who, Scheme_Object *sym)
[sync "Scheme_Object*"]] [sync "Scheme_Object*"]]
/* The sync field: /* The sync field:
NULL => non-atomic mode, no sync proc NULL => non-atomic mode
#t => atomic mode, no sync proc #t => atomic mode, no sync proc
(rcons queue proc) => non-atomic mode, sync proc proc => non-atomic mode, sync proc
(box (rcons queue proc)) => atomic mode, sync proc */ (box proc) => atomic mode, sync proc */
/*****************************************************************************/ /*****************************************************************************/
/* Pointer objects */ /* Pointer objects */
@ -2077,7 +2077,7 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
argv = argv_stack; argv = argv_stack;
else else
argv = scheme_malloc(argc * sizeof(Scheme_Object*)); argv = scheme_malloc(argc * sizeof(Scheme_Object*));
if (data->sync && !SCHEME_RPAIRP(data->sync)) if (data->sync && !SCHEME_PROCP(data->sync))
scheme_start_in_scheduler(); scheme_start_in_scheduler();
for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) { for (i=0, p=data->itypes; i<argc; i++, p=SCHEME_CDR(p)) {
v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0, 0); v = C2SCHEME(SCHEME_CAR(p), args[i], 0, 0, 0);
@ -2085,7 +2085,7 @@ void ffi_do_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
} }
p = _scheme_apply(data->proc, argc, argv); p = _scheme_apply(data->proc, argc, argv);
SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1); SCHEME2C(data->otype, resultp, 0, p, NULL, NULL, 1);
if (data->sync && !SCHEME_RPAIRP(data->sync)) if (data->sync && !SCHEME_PROCP(data->sync))
scheme_end_in_scheduler(); scheme_end_in_scheduler();
} }
@ -2154,7 +2154,6 @@ void scheme_check_foreign_work(void)
proc = data->sync; proc = data->sync;
if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc); if (SCHEME_BOXP(proc)) proc = SCHEME_BOX_VAL(proc);
proc = SCHEME_CDR(proc);
scheme_start_in_scheduler(); scheme_start_in_scheduler();
_scheme_apply(proc, 1, a); _scheme_apply(proc, 1, a);
@ -2170,49 +2169,44 @@ void scheme_check_foreign_work(void)
void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata) void ffi_queue_callback(ffi_cif* cif, void* resultp, void** args, void *userdata)
XFORM_SKIP_PROC XFORM_SKIP_PROC
{ {
ffi_callback_struct *data;
data = extract_ffi_callback(userdata);
#ifdef MZ_USE_MZRT #ifdef MZ_USE_MZRT
{ /* This function must not refer to any GCable address, not even
FFI_Sync_Queue *queue; temporarily, because a GC may occur concurrent to this
Scheme_Object *o; function if it's in another thread. */
FFI_Sync_Queue *queue;
o = data->sync; queue = (FFI_Sync_Queue *)((void **)userdata)[1];
if (SCHEME_BOXP(o)) o = SCHEME_BOX_VAL(o); userdata = ((void **)userdata)[0];
if (queue->orig_thread != mz_proc_thread_self()) {
Queued_Callback *qc;
mzrt_sema *sema;
queue = (FFI_Sync_Queue *)SCHEME_CAR(o); mzrt_sema_create(&sema, 0);
if (queue->orig_thread != mz_proc_thread_self()) { qc = (Queued_Callback *)malloc(sizeof(Queued_Callback));
Queued_Callback *qc; qc->cif = cif;
mzrt_sema *sema; qc->resultp = resultp;
qc->args = args;
qc->userdata = userdata;
qc->sema = sema;
qc->called = 0;
mzrt_sema_create(&sema, 0); mzrt_mutex_lock(queue->lock);
qc->next = queue->callbacks;
queue->callbacks = qc;
mzrt_mutex_unlock(queue->lock);
scheme_signal_received_at(queue->sig_hand);
qc = (Queued_Callback *)malloc(sizeof(Queued_Callback)); /* wait for the callback to be invoked in the main thread */
qc->cif = cif; mzrt_sema_wait(sema);
qc->resultp = resultp;
qc->args = args;
qc->userdata = userdata;
qc->sema = sema;
qc->called = 0;
mzrt_mutex_lock(queue->lock); mzrt_sema_destroy(sema);
qc->next = queue->callbacks; free(qc);
queue->callbacks = qc; return;
mzrt_mutex_unlock(queue->lock);
scheme_signal_received_at(queue->sig_hand);
/* wait for the callback to be invoked in the main thread */
mzrt_sema_wait(sema);
mzrt_sema_destroy(sema);
free(qc);
return;
}
} }
#endif #endif
ffi_do_callback(cif, resultp, args, userdata); ffi_do_callback(cif, resultp, args, userdata);
} }
@ -2226,6 +2220,7 @@ typedef struct closure_and_cif_struct {
void *data; void *data;
#endif #endif
} closure_and_cif; } closure_and_cif;
/* free the above */ /* free the above */
void free_cl_cif_args(void *ignored, void *p) void free_cl_cif_args(void *ignored, void *p)
{ {
@ -2241,6 +2236,20 @@ void free_cl_cif_args(void *ignored, void *p)
scheme_free_code(p); scheme_free_code(p);
} }
#ifdef MZ_USE_MZRT
void free_cl_cif_queue_args(void *ignored, void *p)
{
void *data = ((closure_and_cif*)p)->data;
void **q = (void **)data;
data = q[0];
free(q);
#ifdef MZ_PRECISE_GC
GC_free_immobile_box((void**)data);
#endif
scheme_free_code(p);
}
#endif
/* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */ /* (ffi-callback scheme-proc in-types out-type [abi atomic? sync]) -> ffi-callback */
/* the treatment of in-types and out-types is similar to that in ffi-call */ /* the treatment of in-types and out-types is similar to that in ffi-call */
/* the real work is done by ffi_do_callback above */ /* the real work is done by ffi_do_callback above */
@ -2287,6 +2296,11 @@ void free_cl_cif_args(void *ignored, void *p)
GC_CAN_IGNORE ffi_closure *cl; GC_CAN_IGNORE ffi_closure *cl;
GC_CAN_IGNORE closure_and_cif *cl_cif_args; GC_CAN_IGNORE closure_and_cif *cl_cif_args;
GC_CAN_IGNORE ffi_callback_t do_callback; GC_CAN_IGNORE ffi_callback_t do_callback;
GC_CAN_IGNORE void *callback_data;
#ifdef MZ_USE_MZRT
int keep_queue = 0;
#endif
if (!SCHEME_PROCP(argv[0])) if (!SCHEME_PROCP(argv[0]))
scheme_wrong_type(MYNAME, "procedure", 0, argc, argv); scheme_wrong_type(MYNAME, "procedure", 0, argc, argv);
nargs = scheme_proper_list_length(itypes); nargs = scheme_proper_list_length(itypes);
@ -2314,9 +2328,9 @@ void free_cl_cif_args(void *ignored, void *p)
ffi_sync_queue->sig_hand = sig_hand; ffi_sync_queue->sig_hand = sig_hand;
ffi_sync_queue->callbacks = NULL; ffi_sync_queue->callbacks = NULL;
} }
sync = scheme_make_raw_pair((Scheme_Object *)ffi_sync_queue, sync = argv[5];
argv[5]);
if (is_atomic) sync = scheme_box(sync); if (is_atomic) sync = scheme_box(sync);
keep_queue = 1;
#endif #endif
do_callback = ffi_queue_callback; do_callback = ffi_queue_callback;
} else } else
@ -2341,18 +2355,36 @@ void free_cl_cif_args(void *ignored, void *p)
@@@IFDEF{MZ_PRECISE_GC}{ @@@IFDEF{MZ_PRECISE_GC}{
{ {
/* put data in immobile, weak box */ /* put data in immobile, weak box */
void **tmp; GC_CAN_IGNORE void **tmp;
tmp = GC_malloc_immobile_box(GC_malloc_weak_box(data, NULL, 0, 1)); tmp = GC_malloc_immobile_box(GC_malloc_weak_box(data, NULL, 0, 1));
cl_cif_args->data = (struct immobile_box*)tmp; callback_data = (struct immobile_box*)tmp;
} }
}{ }{
cl_cif_args->data = (void*)data; callback_data = (void*)data;
} }
#ifdef MZ_USE_MZRT
if (keep_queue) {
/* For ffi_queue_callback(), add a level of indirection in
`data' to hold the place-specific `ffi_sync_queue'.
Use `free_cl_cif_data_args' to clean up this extra level. */
GC_CAN_IGNORE void **tmp;
tmp = (void **)malloc(sizeof(void*) * 2);
tmp[0] = callback_data;
tmp[1] = ffi_sync_queue;
callback_data = (void *)tmp;
}
#endif
cl_cif_args->data = callback_data;
if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data)) if (ffi_prep_closure(cl, cif, do_callback, (void*)(cl_cif_args->data))
!= FFI_OK) != FFI_OK)
scheme_signal_error scheme_signal_error
("internal error: ffi_prep_closure did not return FFI_OK"); ("internal error: ffi_prep_closure did not return FFI_OK");
scheme_register_finalizer(data, free_cl_cif_args, cl_cif_args, NULL, NULL); #ifdef MZ_USE_MZRT
if (keep_queue)
scheme_register_finalizer(data, free_cl_cif_queue_args, cl_cif_args, NULL, NULL);
else
#endif
scheme_register_finalizer(data, free_cl_cif_args, cl_cif_args, NULL, NULL);
return (Scheme_Object*)data; return (Scheme_Object*)data;
} }