place fd clean-ups

and better handling of a place-creation failure due to pieces
 at the OS layer
This commit is contained in:
Matthew Flatt 2011-07-01 08:21:50 -06:00
parent faf74c789f
commit 0f42552a0e
7 changed files with 72 additions and 87 deletions

View File

@ -509,10 +509,28 @@ Scheme_Env *scheme_place_instance_init(void *stack_base) {
return env;
}
static void force_more_closed(Scheme_Object *o, Scheme_Close_Custodian_Client *f, void *data)
{
/* no need to shut down threads: */
if (!f || SCHEME_THREADP(o))
return;
/* don't close stdin, stdout, or stderr file descriptors: */
if (SAME_OBJ(scheme_orig_stdin_port, o)
|| SAME_OBJ(scheme_orig_stderr_port, o)
|| SAME_OBJ(scheme_orig_stdout_port, o))
return;
f(o, data);
}
void scheme_place_instance_destroy() {
/* shutdown custodian */
/* run atexit handlers to flush file ports */
/* run atexit handlers to flush file ports, and also
force file-stream ports closed */
scheme_add_atexit_closer(force_more_closed);
scheme_run_atexit_closers();
scheme_release_file_descriptor();
scheme_end_futures_per_place();
#if defined(MZ_USE_PLACES)

View File

@ -721,6 +721,8 @@ void scheme_end_futures_per_place()
free(fs->pool_threads);
free(fs);
scheme_future_state = NULL;
}
}

View File

@ -182,19 +182,19 @@ mzrt_thread_id mz_proc_thread_id(mz_proc_thread* thread) {
}
mz_proc_thread* mzrt_proc_first_thread_init() {
/* initialize mz_proc_thread struct for first thread myself that wasn't created with mz_proc_thread_create,
* so it can communicate with other mz_proc_thread_created threads via pt_mboxes */
/* initialize mz_proc_thread struct for first thread that wasn't created with mz_proc_thread_create */
mz_proc_thread *thread = (mz_proc_thread*)malloc(sizeof(mz_proc_thread));
thread->mbox = pt_mbox_create();
thread->threadid = mz_proc_thread_self();
proc_thread_self = thread;
thread->refcount = 1;
return thread;
}
mz_proc_thread* mz_proc_thread_create_w_stacksize(mz_proc_thread_start start_proc, void* data, intptr_t stacksize) {
mz_proc_thread* mz_proc_thread_create_w_stacksize(mz_proc_thread_start start_proc, void* data, intptr_t stacksize)
{
mz_proc_thread *thread = (mz_proc_thread*)malloc(sizeof(mz_proc_thread));
mzrt_thread_stub_data *stub_data;
int ok;
# ifndef WIN32
pthread_attr_t *attr;
@ -212,20 +212,26 @@ mz_proc_thread* mz_proc_thread_create_w_stacksize(mz_proc_thread_start start_pro
stub_data = (mzrt_thread_stub_data*)malloc(sizeof(mzrt_thread_stub_data));
thread->mbox = pt_mbox_create();
stub_data->start_proc = start_proc;
stub_data->data = data;
stub_data->thread = thread;
# ifdef WIN32
thread->threadid = (HANDLE)_beginthreadex(NULL, stacksize, mzrt_win_thread_stub, stub_data, 0, NULL);
ok = (thread->threadid != -1L);
# else
# ifdef NEED_GC_THREAD_OPS
GC_pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data);
ok = !GC_pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data);
# else
pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data);
ok = !pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data);
# endif
# endif
if (!ok) {
free(thread);
free(stub_data);
return NULL;
}
return thread;
}
@ -766,60 +772,6 @@ int mzrt_sema_destroy(mzrt_sema *s)
#endif
/****************** PROCESS THREAD MAIL BOX *******************************/
pt_mbox *pt_mbox_create() {
pt_mbox *mbox = (pt_mbox *)malloc(sizeof(pt_mbox));
mbox->count = 0;
mbox->in = 0;
mbox->out = 0;
mzrt_mutex_create(&mbox->mutex);
mzrt_cond_create(&mbox->nonempty);
mzrt_cond_create(&mbox->nonfull);
return mbox;
}
void pt_mbox_send(pt_mbox *mbox, int type, void *payload, pt_mbox *origin) {
mzrt_mutex_lock(mbox->mutex);
while ( mbox->count == 5 ) {
mzrt_cond_wait(mbox->nonfull, mbox->mutex);
}
mbox->queue[mbox->in].type = type;
mbox->queue[mbox->in].payload = payload;
mbox->queue[mbox->in].origin = origin;
mbox->in = (mbox->in + 1) % 5;
mbox->count++;
mzrt_cond_signal(mbox->nonempty);
mzrt_mutex_unlock(mbox->mutex);
}
void pt_mbox_recv(pt_mbox *mbox, int *type, void **payload, pt_mbox **origin){
mzrt_mutex_lock(mbox->mutex);
while ( mbox->count == 0 ) {
mzrt_cond_wait(mbox->nonempty, mbox->mutex);
}
*type = mbox->queue[mbox->out].type;
*payload = mbox->queue[mbox->out].payload;
*origin = mbox->queue[mbox->out].origin;
mbox->out = (mbox->out + 1) % 5;
mbox->count--;
mzrt_cond_signal(mbox->nonfull);
mzrt_mutex_unlock(mbox->mutex);
}
void pt_mbox_send_recv(pt_mbox *mbox, int type, void *payload, pt_mbox *origin, int *return_type, void **return_payload) {
pt_mbox *return_origin;
pt_mbox_send(mbox, type, payload, origin);
pt_mbox_recv(origin, return_type, return_payload, &return_origin);
}
void pt_mbox_destroy(pt_mbox *mbox) {
mzrt_mutex_destroy(mbox->mutex);
mzrt_cond_destroy(mbox->nonempty);
mzrt_cond_destroy(mbox->nonfull);
free(mbox);
}
/************************************************************************/
/************************************************************************/
/************************************************************************/

View File

@ -31,7 +31,6 @@ typedef pthread_t mzrt_thread_id;
typedef struct mz_proc_thread {
mzrt_thread_id threadid;
int refcount;
struct pt_mbox *mbox;
} mz_proc_thread;
@ -84,28 +83,7 @@ int mzrt_sema_post(mzrt_sema *sema);
int mzrt_sema_wait(mzrt_sema *sema);
int mzrt_sema_destroy(mzrt_sema *sema);
/****************** PROCESS THREAD MAIL BOX *******************************/
typedef struct pt_mbox_msg {
int type;
void *payload;
struct pt_mbox *origin;
} pt_mbox_msg;
typedef struct pt_mbox {
struct pt_mbox_msg queue[5];
int count;
int in;
int out;
mzrt_mutex *mutex;
mzrt_cond *nonempty;
mzrt_cond *nonfull;
} pt_mbox;
pt_mbox *pt_mbox_create();
void pt_mbox_send(pt_mbox *mbox, int type, void *payload, pt_mbox *origin);
void pt_mbox_recv(pt_mbox *mbox, int *type, void **payload, pt_mbox **origin);
void pt_mbox_send_recv(pt_mbox *mbox, int type, void *payload, pt_mbox *origin, int *return_type, void **return_payload);
void pt_mbox_destroy(pt_mbox *mbox);
/****************** Compare and Swap *******************************/
static MZ_INLINE int mzrt_cas(volatile size_t *addr, size_t old, size_t new_val) {
#if defined(__GNUC__) && !defined(__INTEL_COMPILER) && __GNUC__ <= 4 && __GNUC_MINOR__ < 1

View File

@ -238,6 +238,11 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
/* create new place */
proc_thread = mz_proc_thread_create(place_start_proc, place_data);
if (!proc_thread) {
mzrt_sema_destroy(ready);
scheme_signal_error("place: place creation failed");
}
/* wait until the place has started and grabbed the value
from `place_data'; it's important that a GC doesn't happen
here until the other place is far enough. */
@ -1717,12 +1722,14 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
p->error_buf = &new_error_buf;
if (!scheme_setjmp(new_error_buf)) {
Scheme_Object *dynamic_require;
scheme_check_place_port_ok();
dynamic_require = scheme_builtin_value("dynamic-require");
place_main = scheme_apply(dynamic_require, 2, a);
a[0] = channel;
scheme_apply(place_main, 1, a);
}
else {
} else {
rc = 1;
}
p->error_buf = saved_error_buf;

View File

@ -653,6 +653,13 @@ void scheme_init_port_places(void)
put_external_event_fd = fds[1];
fcntl(external_event_fd, F_SETFL, MZ_NONBLOCKING);
fcntl(put_external_event_fd, F_SETFL, MZ_NONBLOCKING);
} else {
if (!scheme_current_place_id) {
scheme_log_abort("creation of scheduler pipe failed");
abort();
} else {
/* place will call scheme_check_place_port_ok() to discover failure */
}
}
}
# endif
@ -687,6 +694,19 @@ void scheme_set_stdio_makers(Scheme_Stdio_Maker_Proc in,
scheme_make_stderr = err;
}
#ifdef MZ_USE_PLACES
void scheme_check_place_port_ok()
{
# if defined(FILES_HAVE_FDS)
# ifndef USE_OSKIT_CONSOLE
if (!external_event_fd) {
scheme_signal_error("place: scheduler pipe failed");
}
# endif
# endif
}
#endif
/*========================================================================*/
/* fd arrays */
/*========================================================================*/
@ -9162,6 +9182,13 @@ void scheme_kill_green_thread_timer()
#elif defined(USE_WIN32_THREAD_TIMER)
scheme_stop_itimer_thread();
#endif
#if defined(FILES_HAVE_FDS)
# ifndef USE_OSKIT_CONSOLE
close(external_event_fd);
close(put_external_event_fd);
# endif
#endif
}
#ifdef OS_X

View File

@ -3621,5 +3621,6 @@ Scheme_Env *scheme_place_instance_init();
void scheme_place_instance_destroy();
void scheme_kill_green_thread_timer();
void scheme_place_check_for_interruption();
void scheme_check_place_port_ok();
#endif /* __mzscheme_private__ */