diff --git a/src/racket/src/env.c b/src/racket/src/env.c index a5c2e48059..785c06850f 100644 --- a/src/racket/src/env.c +++ b/src/racket/src/env.c @@ -509,10 +509,28 @@ Scheme_Env *scheme_place_instance_init(void *stack_base) { return env; } +static void force_more_closed(Scheme_Object *o, Scheme_Close_Custodian_Client *f, void *data) +{ + /* no need to shut down threads: */ + if (!f || SCHEME_THREADP(o)) + return; + + /* don't close stdin, stdout, or stderr file descriptors: */ + if (SAME_OBJ(scheme_orig_stdin_port, o) + || SAME_OBJ(scheme_orig_stderr_port, o) + || SAME_OBJ(scheme_orig_stdout_port, o)) + return; + + f(o, data); +} + void scheme_place_instance_destroy() { /* shutdown custodian */ - /* run atexit handlers to flush file ports */ + /* run atexit handlers to flush file ports, and also + force file-stream ports closed */ + scheme_add_atexit_closer(force_more_closed); scheme_run_atexit_closers(); + scheme_release_file_descriptor(); scheme_end_futures_per_place(); #if defined(MZ_USE_PLACES) diff --git a/src/racket/src/future.c b/src/racket/src/future.c index 6fb9ba1548..cdbb8bed6c 100644 --- a/src/racket/src/future.c +++ b/src/racket/src/future.c @@ -721,6 +721,8 @@ void scheme_end_futures_per_place() free(fs->pool_threads); free(fs); + + scheme_future_state = NULL; } } diff --git a/src/racket/src/mzrt.c b/src/racket/src/mzrt.c index 0b4d804b18..21fcf61de9 100644 --- a/src/racket/src/mzrt.c +++ b/src/racket/src/mzrt.c @@ -182,19 +182,19 @@ mzrt_thread_id mz_proc_thread_id(mz_proc_thread* thread) { } mz_proc_thread* mzrt_proc_first_thread_init() { - /* initialize mz_proc_thread struct for first thread myself that wasn't created with mz_proc_thread_create, - * so it can communicate with other mz_proc_thread_created threads via pt_mboxes */ + /* initialize mz_proc_thread struct for first thread that wasn't created with mz_proc_thread_create */ mz_proc_thread *thread = (mz_proc_thread*)malloc(sizeof(mz_proc_thread)); - thread->mbox = pt_mbox_create(); thread->threadid = mz_proc_thread_self(); proc_thread_self = thread; thread->refcount = 1; return thread; } -mz_proc_thread* mz_proc_thread_create_w_stacksize(mz_proc_thread_start start_proc, void* data, intptr_t stacksize) { +mz_proc_thread* mz_proc_thread_create_w_stacksize(mz_proc_thread_start start_proc, void* data, intptr_t stacksize) +{ mz_proc_thread *thread = (mz_proc_thread*)malloc(sizeof(mz_proc_thread)); mzrt_thread_stub_data *stub_data; + int ok; # ifndef WIN32 pthread_attr_t *attr; @@ -212,20 +212,26 @@ mz_proc_thread* mz_proc_thread_create_w_stacksize(mz_proc_thread_start start_pro stub_data = (mzrt_thread_stub_data*)malloc(sizeof(mzrt_thread_stub_data)); - thread->mbox = pt_mbox_create(); stub_data->start_proc = start_proc; stub_data->data = data; stub_data->thread = thread; # ifdef WIN32 thread->threadid = (HANDLE)_beginthreadex(NULL, stacksize, mzrt_win_thread_stub, stub_data, 0, NULL); + ok = (thread->threadid != -1L); # else # ifdef NEED_GC_THREAD_OPS - GC_pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data); + ok = !GC_pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data); # else - pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data); + ok = !pthread_create(&thread->threadid, attr, mzrt_thread_stub, stub_data); # endif # endif + if (!ok) { + free(thread); + free(stub_data); + return NULL; + } + return thread; } @@ -766,60 +772,6 @@ int mzrt_sema_destroy(mzrt_sema *s) #endif -/****************** PROCESS THREAD MAIL BOX *******************************/ - -pt_mbox *pt_mbox_create() { - pt_mbox *mbox = (pt_mbox *)malloc(sizeof(pt_mbox)); - mbox->count = 0; - mbox->in = 0; - mbox->out = 0; - mzrt_mutex_create(&mbox->mutex); - mzrt_cond_create(&mbox->nonempty); - mzrt_cond_create(&mbox->nonfull); - return mbox; -} - -void pt_mbox_send(pt_mbox *mbox, int type, void *payload, pt_mbox *origin) { - mzrt_mutex_lock(mbox->mutex); - while ( mbox->count == 5 ) { - mzrt_cond_wait(mbox->nonfull, mbox->mutex); - } - mbox->queue[mbox->in].type = type; - mbox->queue[mbox->in].payload = payload; - mbox->queue[mbox->in].origin = origin; - mbox->in = (mbox->in + 1) % 5; - mbox->count++; - mzrt_cond_signal(mbox->nonempty); - mzrt_mutex_unlock(mbox->mutex); -} - -void pt_mbox_recv(pt_mbox *mbox, int *type, void **payload, pt_mbox **origin){ - mzrt_mutex_lock(mbox->mutex); - while ( mbox->count == 0 ) { - mzrt_cond_wait(mbox->nonempty, mbox->mutex); - } - *type = mbox->queue[mbox->out].type; - *payload = mbox->queue[mbox->out].payload; - *origin = mbox->queue[mbox->out].origin; - mbox->out = (mbox->out + 1) % 5; - mbox->count--; - mzrt_cond_signal(mbox->nonfull); - mzrt_mutex_unlock(mbox->mutex); -} - -void pt_mbox_send_recv(pt_mbox *mbox, int type, void *payload, pt_mbox *origin, int *return_type, void **return_payload) { - pt_mbox *return_origin; - pt_mbox_send(mbox, type, payload, origin); - pt_mbox_recv(origin, return_type, return_payload, &return_origin); -} - -void pt_mbox_destroy(pt_mbox *mbox) { - mzrt_mutex_destroy(mbox->mutex); - mzrt_cond_destroy(mbox->nonempty); - mzrt_cond_destroy(mbox->nonfull); - free(mbox); -} - /************************************************************************/ /************************************************************************/ /************************************************************************/ diff --git a/src/racket/src/mzrt.h b/src/racket/src/mzrt.h index 51648c0567..bf84517a45 100644 --- a/src/racket/src/mzrt.h +++ b/src/racket/src/mzrt.h @@ -31,7 +31,6 @@ typedef pthread_t mzrt_thread_id; typedef struct mz_proc_thread { mzrt_thread_id threadid; int refcount; - struct pt_mbox *mbox; } mz_proc_thread; @@ -84,28 +83,7 @@ int mzrt_sema_post(mzrt_sema *sema); int mzrt_sema_wait(mzrt_sema *sema); int mzrt_sema_destroy(mzrt_sema *sema); -/****************** PROCESS THREAD MAIL BOX *******************************/ -typedef struct pt_mbox_msg { - int type; - void *payload; - struct pt_mbox *origin; -} pt_mbox_msg; - -typedef struct pt_mbox { - struct pt_mbox_msg queue[5]; - int count; - int in; - int out; - mzrt_mutex *mutex; - mzrt_cond *nonempty; - mzrt_cond *nonfull; -} pt_mbox; - -pt_mbox *pt_mbox_create(); -void pt_mbox_send(pt_mbox *mbox, int type, void *payload, pt_mbox *origin); -void pt_mbox_recv(pt_mbox *mbox, int *type, void **payload, pt_mbox **origin); -void pt_mbox_send_recv(pt_mbox *mbox, int type, void *payload, pt_mbox *origin, int *return_type, void **return_payload); -void pt_mbox_destroy(pt_mbox *mbox); +/****************** Compare and Swap *******************************/ static MZ_INLINE int mzrt_cas(volatile size_t *addr, size_t old, size_t new_val) { #if defined(__GNUC__) && !defined(__INTEL_COMPILER) && __GNUC__ <= 4 && __GNUC_MINOR__ < 1 diff --git a/src/racket/src/place.c b/src/racket/src/place.c index 975b8e64ed..05ed1cae72 100644 --- a/src/racket/src/place.c +++ b/src/racket/src/place.c @@ -238,6 +238,11 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { /* create new place */ proc_thread = mz_proc_thread_create(place_start_proc, place_data); + if (!proc_thread) { + mzrt_sema_destroy(ready); + scheme_signal_error("place: place creation failed"); + } + /* wait until the place has started and grabbed the value from `place_data'; it's important that a GC doesn't happen here until the other place is far enough. */ @@ -1717,12 +1722,14 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { p->error_buf = &new_error_buf; if (!scheme_setjmp(new_error_buf)) { Scheme_Object *dynamic_require; + + scheme_check_place_port_ok(); + dynamic_require = scheme_builtin_value("dynamic-require"); place_main = scheme_apply(dynamic_require, 2, a); a[0] = channel; scheme_apply(place_main, 1, a); - } - else { + } else { rc = 1; } p->error_buf = saved_error_buf; diff --git a/src/racket/src/port.c b/src/racket/src/port.c index 6d652f5028..0b3c73e674 100644 --- a/src/racket/src/port.c +++ b/src/racket/src/port.c @@ -653,6 +653,13 @@ void scheme_init_port_places(void) put_external_event_fd = fds[1]; fcntl(external_event_fd, F_SETFL, MZ_NONBLOCKING); fcntl(put_external_event_fd, F_SETFL, MZ_NONBLOCKING); + } else { + if (!scheme_current_place_id) { + scheme_log_abort("creation of scheduler pipe failed"); + abort(); + } else { + /* place will call scheme_check_place_port_ok() to discover failure */ + } } } # endif @@ -687,6 +694,19 @@ void scheme_set_stdio_makers(Scheme_Stdio_Maker_Proc in, scheme_make_stderr = err; } +#ifdef MZ_USE_PLACES +void scheme_check_place_port_ok() +{ +# if defined(FILES_HAVE_FDS) +# ifndef USE_OSKIT_CONSOLE + if (!external_event_fd) { + scheme_signal_error("place: scheduler pipe failed"); + } +# endif +# endif +} +#endif + /*========================================================================*/ /* fd arrays */ /*========================================================================*/ @@ -9162,6 +9182,13 @@ void scheme_kill_green_thread_timer() #elif defined(USE_WIN32_THREAD_TIMER) scheme_stop_itimer_thread(); #endif + +#if defined(FILES_HAVE_FDS) +# ifndef USE_OSKIT_CONSOLE + close(external_event_fd); + close(put_external_event_fd); +# endif +#endif } #ifdef OS_X diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index b31ab27437..d92541db43 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -3621,5 +3621,6 @@ Scheme_Env *scheme_place_instance_init(); void scheme_place_instance_destroy(); void scheme_kill_green_thread_timer(); void scheme_place_check_for_interruption(); +void scheme_check_place_port_ok(); #endif /* __mzscheme_private__ */