places fixes: Windows leaks, custodian force-exit, stdio interaction

This commit is contained in:
Matthew Flatt 2011-07-01 14:13:14 -06:00
parent 27d7ad1e87
commit e46a470f8d
7 changed files with 207 additions and 102 deletions

View File

@ -0,0 +1,24 @@
#lang racket
(unless (getenv "IN_SUBPLACE")
(putenv "IN_SUBPLACE" "yes")
;; Place that closes stdin shouldn't close stdin in the original
;; place:
(place-wait (place ch
(close-input-port (current-input-port))
(close-output-port (current-output-port))
(close-output-port (current-error-port))))
(unless (and (equal? #f (port-closed? (current-input-port)))
(equal? #f (port-closed? (current-output-port)))
(equal? #f (port-closed? (current-error-port))))
(error "sub-place port-close test failed"))
;; Closing only stdin should lead to closed stdin in
;; a sub-place:
(close-input-port (current-input-port))
(define p (place ch
(place-channel-put ch (port-closed? (current-input-port)))))
(unless (equal? (place-channel-get p) #t)
(error "closed-stdin test failed")))

View File

@ -515,21 +515,24 @@ static void force_more_closed(Scheme_Object *o, Scheme_Close_Custodian_Client *f
if (!f || SCHEME_THREADP(o))
return;
/* don't close stdin, stdout, or stderr file descriptors: */
if (SAME_OBJ(scheme_orig_stdin_port, o)
|| SAME_OBJ(scheme_orig_stderr_port, o)
|| SAME_OBJ(scheme_orig_stdout_port, o))
return;
f(o, data);
}
void scheme_place_instance_destroy() {
/* shutdown custodian */
static void force_more_closed_after(Scheme_Object *o, Scheme_Close_Custodian_Client *f, void *data)
{
scheme_run_atexit_closers(o, f, data);
force_more_closed(o, f, data);
}
void scheme_place_instance_destroy(int force)
{
/* run atexit handlers to flush file ports, and also
force file-stream ports closed */
scheme_add_atexit_closer(force_more_closed);
scheme_run_atexit_closers();
if (force)
scheme_run_atexit_closers_on_all(force_more_closed);
else
scheme_run_atexit_closers_on_all(force_more_closed_after);
scheme_release_file_descriptor();
scheme_end_futures_per_place();

View File

@ -719,6 +719,11 @@ void scheme_end_futures_per_place()
free_fevent(&fs->runtime_fevents);
mzrt_mutex_destroy(fs->future_mutex);
mzrt_sema_destroy(fs->future_pending_sema);
mzrt_sema_destroy(fs->gc_ok_c);
mzrt_sema_destroy(fs->gc_done_c);
free(fs->pool_threads);
free(fs);

View File

@ -172,7 +172,6 @@ typedef struct Scheme_Place_Object {
char die;
char pbreak;
char ref;
mz_jmp_buf *exit_buf;
void *signal_handle;
void *parent_signal_handle; /* set to NULL when the place terminates */
intptr_t result; /* initialized to 1, reset when parent_signal_handle becomes NULL */
@ -1540,14 +1539,16 @@ static void *place_start_proc(void *data_arg) {
return rc;
}
void scheme_place_check_for_interruption() {
void scheme_place_check_for_interruption()
{
Scheme_Place_Object *place_obj;
char local_die;
char local_break;
place_obj = place_object;
if (place_obj) {
{
if (!place_obj)
return;
mzrt_mutex_lock(place_obj->lock);
local_die = place_obj->die;
@ -1555,15 +1556,12 @@ void scheme_place_check_for_interruption() {
place_obj->pbreak = 0;
mzrt_mutex_unlock(place_obj->lock);
}
if (local_die) {
scheme_longjmp(*place_obj->exit_buf, 1);
}
if (local_die)
scheme_kill_thread(scheme_main_thread);
if (local_break)
scheme_break_thread(NULL);
}
}
static void place_release_place_object() {
int ref = 0;
@ -1604,7 +1602,7 @@ static Scheme_Object *def_place_exit_handler_proc(int argc, Scheme_Object *argv[
place_set_result(argv[0]);
/*printf("Leavin place: proc thread id%u\n", ptid);*/
scheme_place_instance_destroy();
scheme_place_instance_destroy(0);
place_release_place_object();
mz_proc_thread_exit(NULL);
@ -1681,9 +1679,6 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
mz_jmp_buf new_error_buf;
Scheme_Object * volatile rc = scheme_false;
place_obj->exit_buf = &new_error_buf;
p = scheme_get_current_thread();
saved_error_buf = p->error_buf;
p->error_buf = &new_error_buf;
@ -1708,7 +1703,7 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
scheme_log(NULL, SCHEME_LOG_DEBUG, 0, "place %d: exiting", scheme_current_place_id);
/*printf("Leavin place: proc thread id%u\n", ptid);*/
scheme_place_instance_destroy();
scheme_place_instance_destroy(place_obj->die);
place_release_place_object();

View File

@ -132,8 +132,6 @@ typedef struct Win_FD_Input_Thread {
HANDLE thread;
} Win_FD_Input_Thread;
static HANDLE refcount_sema;
typedef struct Win_FD_Output_Thread {
/* This is malloced for use in a Win32 thread */
HANDLE fd;
@ -203,17 +201,28 @@ typedef struct Scheme_Subprocess {
/******************** refcounts ********************/
#ifdef WINDOWS_FILE_HANDLES
#if defined(WINDOWS_FILE_HANDLES) || defined(MZ_USE_PLACES)
# define MZ_LOCK_REFCOUNTS
static mzrt_mutex *refcount_mutex;
#endif
static int *malloc_refcount()
static int *malloc_refcount(int val, int free_on_zero)
{
if (!refcount_sema)
refcount_sema = CreateSemaphore(NULL, 1, 1, NULL);
int *rc;
return (int *)malloc(sizeof(int));
#ifdef MZ_LOCK_REFCOUNTS
if (!refcount_mutex)
mzrt_mutex_create(&refcount_mutex);
#endif
rc = (int *)malloc(2 * sizeof(int));
*rc = val;
rc[1] = free_on_zero;
return rc;
}
static int dec_refcount(int *refcount)
static int adj_refcount(int *refcount, int amt)
XFORM_SKIP_PROC
{
int rc;
@ -221,33 +230,26 @@ static int dec_refcount(int *refcount)
if (!refcount)
return 0;
WaitForSingleObject(refcount_sema, INFINITE);
*refcount -= 1;
#ifdef MZ_LOCK_REFCOUNTS
mzrt_mutex_lock(refcount_mutex);
#endif
if (amt > 0) {
/* don't increment up from 0 */
if (*refcount)
*refcount += amt;
} else
*refcount += amt;
rc = *refcount;
ReleaseSemaphore(refcount_sema, 1, NULL);
#ifdef MZ_LOCK_REFCOUNTS
mzrt_mutex_unlock(refcount_mutex);
#endif
if (!rc) free(refcount);
if (!rc && refcount[1])
free(refcount);
return rc;
}
#else
static int *malloc_refcount()
{
return (int *)scheme_malloc_atomic(sizeof(int));
}
static int dec_refcount(int *refcount)
{
if (!refcount)
return 0;
*refcount -= 1;
return *refcount;
}
#endif
/******************** file-descriptor I/O ********************/
/* Windows/Mac I/O is piggy-backed on Unix file-descriptor I/O. Making
@ -258,6 +260,8 @@ static int dec_refcount(int *refcount)
#ifdef MZ_FDS
static int *stdin_refcount, *stdout_refcount, *stderr_refcount;
# define MZPORT_FD_BUFFSIZE 4096
# define MZPORT_FD_DIRECT_THRESHOLD MZPORT_FD_BUFFSIZE
@ -406,7 +410,8 @@ THREAD_LOCAL_DECL(void *scheme_break_semaphore;)
#ifdef MZ_FDS
static Scheme_Object *make_fd_input_port(int fd, Scheme_Object *name, int regfile, int textmode, int *refcount, int internal);
static Scheme_Object *make_fd_output_port(int fd, Scheme_Object *name, int regfile, int textmode, int read_too, int flush_mode);
static Scheme_Object *make_fd_output_port(int fd, Scheme_Object *name, int regfile, int textmode, int read_too, int flush_mode,
int *refcount);
#endif
#ifdef USE_OSKIT_CONSOLE
static Scheme_Object *make_oskit_console_input_port();
@ -587,6 +592,14 @@ void scheme_init_port_places(void)
All writing by the main thread will get flushed on exit
(but not, of course, if the thread is shutdown via a
custodian). */
if (!stdin_refcount) {
/* Referece counts are needed for stdio and places; start
at 1 in main place, but then cancel initial count */
stdin_refcount = malloc_refcount(1, 0);
stdout_refcount = malloc_refcount(1, 0);
stderr_refcount = malloc_refcount(1, 0);
}
#endif
REGISTER_SO(read_string_byte_buffer);
@ -602,9 +615,10 @@ void scheme_init_port_places(void)
#else
# ifdef MZ_FDS
# ifdef WINDOWS_FILE_HANDLES
: make_fd_input_port((int)GetStdHandle(STD_INPUT_HANDLE), scheme_intern_symbol("stdin"), 0, 0, NULL, 0)
: make_fd_input_port((int)GetStdHandle(STD_INPUT_HANDLE), scheme_intern_symbol("stdin"), 0, 0,
stdin_refcount, 0)
# else
: make_fd_input_port(0, scheme_intern_symbol("stdin"), 0, 0, NULL, 0)
: make_fd_input_port(0, scheme_intern_symbol("stdin"), 0, 0, stdin_refcount, 0)
# endif
# else
: scheme_make_named_file_input_port(stdin, scheme_intern_symbol("stdin"))
@ -618,9 +632,10 @@ void scheme_init_port_places(void)
# ifdef WINDOWS_FILE_HANDLES
: make_fd_output_port((int)GetStdHandle(STD_OUTPUT_HANDLE),
scheme_intern_symbol("stdout"), 0, 0, 0,
-1)
-1, stdout_refcount)
# else
: make_fd_output_port(1, scheme_intern_symbol("stdout"), 0, 0, 0, -1)
: make_fd_output_port(1, scheme_intern_symbol("stdout"), 0, 0, 0, -1,
stdout_refcount)
# endif
#else
: scheme_make_file_output_port(stdout)
@ -633,16 +648,24 @@ void scheme_init_port_places(void)
# ifdef WINDOWS_FILE_HANDLES
: make_fd_output_port((int)GetStdHandle(STD_ERROR_HANDLE),
scheme_intern_symbol("stderr"), 0, 0, 0,
MZ_FLUSH_ALWAYS)
MZ_FLUSH_ALWAYS, stderr_refcount)
# else
: make_fd_output_port(2, scheme_intern_symbol("stderr"), 0, 0, 0,
MZ_FLUSH_ALWAYS)
MZ_FLUSH_ALWAYS, stderr_refcount)
# endif
#else
: scheme_make_file_output_port(stderr)
#endif
);
#ifdef MZ_FDS
if (!scheme_current_place_id) {
adj_refcount(stdin_refcount, -1);
adj_refcount(stdout_refcount, -1);
adj_refcount(stderr_refcount, -1);
}
#endif
#if defined(FILES_HAVE_FDS)
# ifndef USE_OSKIT_CONSOLE
/* Set up a pipe for signalling external events: */
@ -4195,7 +4218,8 @@ scheme_do_open_output_file(char *name, int offset, int argc, Scheme_Object *argv
} while ((ok == -1) && (errno == EINTR));
regfile = S_ISREG(buf.st_mode);
return make_fd_output_port(fd, scheme_make_path(filename), regfile, 0, and_read, -1);
return make_fd_output_port(fd, scheme_make_path(filename), regfile, 0, and_read,
-1, NULL);
#else
# ifdef WINDOWS_FILE_HANDLES
if (!existsok)
@ -4280,7 +4304,8 @@ scheme_do_open_output_file(char *name, int offset, int argc, Scheme_Object *argv
SetEndOfFile(fd);
}
return make_fd_output_port((int)fd, scheme_make_path(filename), regfile, mode[1] == 't', and_read, -1);
return make_fd_output_port((int)fd, scheme_make_path(filename), regfile, mode[1] == 't', and_read,
-1, NULL);
# else
if (scheme_directory_exists(filename)) {
if (!existsok)
@ -5476,7 +5501,7 @@ fd_close_input(Scheme_Input_Port *port)
} /* otherwise, thread is responsible for clean-up */
} else {
int rc;
rc = dec_refcount(fip->refcount);
rc = adj_refcount(fip->refcount, -1);
if (!rc) {
CloseHandle((HANDLE)fip->fd);
}
@ -5484,7 +5509,7 @@ fd_close_input(Scheme_Input_Port *port)
#else
{
int rc;
rc = dec_refcount(fip->refcount);
rc = adj_refcount(fip->refcount, -1);
if (!rc) {
int cr;
do {
@ -5495,6 +5520,12 @@ fd_close_input(Scheme_Input_Port *port)
#endif
}
static void
fd_init_close_input(Scheme_Input_Port *port)
{
/* never actually opened! */
}
static void
fd_need_wakeup(Scheme_Input_Port *port, void *fds)
{
@ -5559,6 +5590,7 @@ make_fd_input_port(int fd, Scheme_Object *name, int regfile, int win_textmode, i
Scheme_Input_Port *ip;
Scheme_FD *fip;
unsigned char *bfr;
int start_closed = 0;
fip = MALLOC_ONE_RT(Scheme_FD);
#ifdef MZTAG_REQUIRED
@ -5579,7 +5611,13 @@ make_fd_input_port(int fd, Scheme_Object *name, int regfile, int win_textmode, i
fip->textmode = win_textmode;
#endif
if (refcount) {
fip->refcount = refcount;
if (!adj_refcount(refcount, 1)) {
/* fd is already closed! */
start_closed = 1;
}
}
fip->flush = MZ_FLUSH_NEVER;
@ -5591,7 +5629,9 @@ make_fd_input_port(int fd, Scheme_Object *name, int regfile, int win_textmode, i
scheme_progress_evt_via_get,
scheme_peeked_read_via_get,
fd_byte_ready,
fd_close_input,
(start_closed
? fd_init_close_input
: fd_close_input),
fd_need_wakeup,
!internal);
ip->p.buffer_mode_fun = fd_input_buffer_mode;
@ -5599,7 +5639,7 @@ make_fd_input_port(int fd, Scheme_Object *name, int regfile, int win_textmode, i
ip->pending_eof = 1; /* means that pending EOFs should be tracked */
#ifdef WINDOWS_FILE_HANDLES
if (!regfile) {
if (!regfile && !start_closed) {
/* To get non-blocking I/O for anything that can block, we create
a separate reader thread.
@ -5643,6 +5683,9 @@ make_fd_input_port(int fd, Scheme_Object *name, int regfile, int win_textmode, i
}
#endif
if (start_closed)
scheme_close_input_port((Scheme_Object *)ip);
return (Scheme_Object *)ip;
}
@ -5713,7 +5756,7 @@ static void WindowsFDICleanup(Win_FD_Input_Thread *th)
CloseHandle(th->ready_sema);
CloseHandle(th->you_clean_up_sema);
rc = dec_refcount(th->refcount);
rc = adj_refcount(th->refcount, -1);
if (!rc) CloseHandle(th->fd);
free(th->buffer);
@ -6736,7 +6779,7 @@ fd_close_output(Scheme_Output_Port *port)
fop->oth = NULL;
} else {
int rc;
rc = dec_refcount(fop->refcount);
rc = adj_refcount(fop->refcount, -1);
if (!rc) {
CloseHandle((HANDLE)fop->fd);
}
@ -6744,7 +6787,7 @@ fd_close_output(Scheme_Output_Port *port)
#else
{
int rc;
rc = dec_refcount(fop->refcount);
rc = adj_refcount(fop->refcount, -1);
if (!rc) {
int cr;
@ -6756,6 +6799,12 @@ fd_close_output(Scheme_Output_Port *port)
#endif
}
static void
fd_init_close_output(Scheme_Output_Port *port)
{
/* never actually opened */
}
static int fd_output_buffer_mode(Scheme_Port *p, int mode)
{
Scheme_FD *fd;
@ -6777,11 +6826,12 @@ static int fd_output_buffer_mode(Scheme_Port *p, int mode)
static Scheme_Object *
make_fd_output_port(int fd, Scheme_Object *name, int regfile, int win_textmode, int and_read,
int flush_mode)
int flush_mode, int *refcount)
{
Scheme_FD *fop;
unsigned char *bfr;
Scheme_Object *the_port;
int start_closed = 0;
fop = MALLOC_ONE_RT(Scheme_FD);
#ifdef MZTAG_REQUIRED
@ -6814,24 +6864,36 @@ make_fd_output_port(int fd, Scheme_Object *name, int regfile, int win_textmode,
fop->flush = MZ_FLUSH_NEVER;
}
if (refcount) {
fop->refcount = refcount;
if (!adj_refcount(refcount, 1)) {
/* fd is already closed! */
start_closed = 1;
}
}
the_port = (Scheme_Object *)scheme_make_output_port(fd_output_port_type,
fop,
name,
scheme_write_evt_via_write,
fd_write_string,
(Scheme_Out_Ready_Fun)fd_write_ready,
fd_close_output,
(start_closed
? fd_init_close_output
: fd_close_output),
(Scheme_Need_Wakeup_Output_Fun)fd_write_need_wakeup,
NULL,
NULL,
1);
((Scheme_Port *)the_port)->buffer_mode_fun = fd_output_buffer_mode;
if (start_closed)
scheme_close_output_port(the_port);
if (and_read) {
int *rc;
Scheme_Object *a[2];
rc = malloc_refcount();
*rc = 2;
rc = malloc_refcount(1, 1);
fop->refcount = rc;
a[1] = the_port;
a[0] = make_fd_input_port(fd, name, regfile, win_textmode, rc, 0);
@ -6845,11 +6907,10 @@ static void flush_if_output_fds(Scheme_Object *o, Scheme_Close_Custodian_Client
if (SCHEME_OUTPUT_PORTP(o)) {
Scheme_Output_Port *op;
op = scheme_output_port_record(o);
if (SAME_OBJ(op->sub_type, fd_output_port_type)) {
if (SAME_OBJ(op->sub_type, fd_output_port_type))
scheme_flush_output(o);
}
}
}
#ifdef WINDOWS_FILE_HANDLES
@ -6926,7 +6987,7 @@ static void WindowsFDOCleanup(Win_FD_Output_Thread *oth)
CloseHandle(oth->work_sema);
CloseHandle(oth->you_clean_up_sema);
rc = dec_refcount(oth->refcount);
rc = adj_refcount(oth->refcount, -1);
if (!rc) CloseHandle(oth->fd);
if (oth->buffer)
@ -6942,7 +7003,7 @@ Scheme_Object *
scheme_make_fd_output_port(int fd, Scheme_Object *name, int regfile, int textmode, int read_too)
{
#ifdef MZ_FDS
return make_fd_output_port(fd, name, regfile, textmode, read_too, -1);
return make_fd_output_port(fd, name, regfile, textmode, read_too, -1, NULL);
#else
return NULL;
#endif
@ -8347,7 +8408,7 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
/*--------------------------------------*/
in = (in ? in : make_fd_input_port(from_subprocess[0], scheme_intern_symbol("subprocess-stdout"), 0, 0, NULL, 0));
out = (out ? out : make_fd_output_port(to_subprocess[1], scheme_intern_symbol("subprocess-stdin"), 0, 0, 0, -1));
out = (out ? out : make_fd_output_port(to_subprocess[1], scheme_intern_symbol("subprocess-stdin"), 0, 0, 0, -1, NULL));
if (stderr_is_stdout)
err = scheme_false;
else
@ -9189,6 +9250,9 @@ void scheme_kill_green_thread_timer()
close(put_external_event_fd);
# endif
#endif
#ifdef WIN32_FD_HANDLES
CloseHandle((HANDLE)scheme_break_semaphore);
#endif
}
#ifdef OS_X

View File

@ -564,7 +564,8 @@ typedef struct Scheme_Custodian_Box {
Scheme_Thread *scheme_do_close_managed(Scheme_Custodian *m, Scheme_Exit_Closer_Func f);
Scheme_Custodian *scheme_get_current_custodian(void);
void scheme_run_atexit_closers(void);
void scheme_run_atexit_closers_on_all(Scheme_Exit_Closer_Func alt);
void scheme_run_atexit_closers(Scheme_Object *o, Scheme_Close_Custodian_Client *f, void *data);
typedef struct Scheme_Security_Guard {
Scheme_Object so;
@ -3618,7 +3619,7 @@ typedef struct Scheme_Place {
} Scheme_Place;
Scheme_Env *scheme_place_instance_init();
void scheme_place_instance_destroy();
void scheme_place_instance_destroy(int force);
void scheme_kill_green_thread_timer();
void scheme_place_check_for_interruption();
void scheme_check_place_port_ok();

View File

@ -1694,7 +1694,7 @@ static void shrink_cust_box_array(void)
# define clean_cust_box_list() /* empty */
#endif
static void run_closers(Scheme_Object *o, Scheme_Close_Custodian_Client *f, void *data)
void scheme_run_atexit_closers(Scheme_Object *o, Scheme_Close_Custodian_Client *f, void *data)
{
Scheme_Object *l;
@ -1707,7 +1707,7 @@ static void run_closers(Scheme_Object *o, Scheme_Close_Custodian_Client *f, void
}
}
void scheme_run_atexit_closers(void)
void scheme_run_atexit_closers_on_all(Scheme_Exit_Closer_Func alt)
{
mz_jmp_buf newbuf, *savebuf;
@ -1720,11 +1720,16 @@ void scheme_run_atexit_closers(void)
savebuf = scheme_current_thread->error_buf;
scheme_current_thread->error_buf = &newbuf;
if (!scheme_setjmp(newbuf)) {
scheme_do_close_managed(NULL, run_closers);
scheme_do_close_managed(NULL, alt ? alt : scheme_run_atexit_closers);
}
scheme_current_thread->error_buf = savebuf;
}
void do_run_atexit_closers_on_all()
{
scheme_run_atexit_closers_on_all(NULL);
}
void scheme_set_atexit(Scheme_At_Exit_Proc p)
{
replacement_at_exit = p;
@ -1734,12 +1739,12 @@ void scheme_add_atexit_closer(Scheme_Exit_Closer_Func f)
{
if (!cust_closers) {
if (replacement_at_exit) {
replacement_at_exit(scheme_run_atexit_closers);
replacement_at_exit(do_run_atexit_closers_on_all);
} else {
#ifdef USE_ON_EXIT_FOR_ATEXIT
on_exit(scheme_run_atexit_closers, NULL);
on_exit(do_run_atexit_closers_on_all, NULL);
#else
atexit(scheme_run_atexit_closers);
atexit(do_run_atexit_closers_on_all);
#endif
}
@ -3715,12 +3720,8 @@ static void raise_break(Scheme_Thread *p)
p->block_needs_wakeup = block_needs_wakeup;
}
static void exit_or_escape(Scheme_Thread *p)
static void escape_to_kill(Scheme_Thread *p)
{
/* Maybe this killed thread is nested: */
if (p->nester) {
if (p->running & MZTHREAD_KILLED)
p->running -= MZTHREAD_KILLED;
p->cjs.jumping_to_continuation = (Scheme_Object *)p;
p->cjs.alt_full_continuation = NULL;
p->cjs.is_kill = 1;
@ -3728,8 +3729,20 @@ static void exit_or_escape(Scheme_Thread *p)
scheme_longjmp(*p->error_buf, 1);
}
static void exit_or_escape(Scheme_Thread *p)
{
/* Maybe this killed thread is nested: */
if (p->nester) {
if (p->running & MZTHREAD_KILLED)
p->running -= MZTHREAD_KILLED;
escape_to_kill(p);
}
if (SAME_OBJ(p, scheme_main_thread)) {
/* Hard exit: */
if (scheme_current_place_id)
escape_to_kill(p);
if (scheme_exit)
scheme_exit(0);