subprocess: support adding to an existing subprocess group

This commit is contained in:
Matthew Flatt 2017-10-10 10:44:23 -06:00
parent 9f5e9c8db3
commit 1cc55f30fe
6 changed files with 115 additions and 40 deletions

View File

@ -6,6 +6,7 @@
@defproc*[([(subprocess [stdout (or/c (and/c output-port? file-stream-port?) #f)]
[stdin (or/c (and/c input-port? file-stream-port?) #f)]
[stderr (or/c (and/c output-port? file-stream-port?) #f 'stdout)]
[group (or/c #f 'new subprocess) (and (subprocess-group-enabled) 'new)]
[command path-string?]
[arg (or/c path? string-no-nuls? bytes-no-nuls?)] ...)
(values subprocess?
@ -15,6 +16,7 @@
[(subprocess [stdout (or/c (and/c output-port? file-stream-port?) #f)]
[stdin (or/c (and/c input-port? file-stream-port?) #f)]
[stderr (or/c (and/c output-port? file-stream-port?) #f)]
[group (or/c #f 'new subprocess) (and (subprocess-group-enabled) 'new)]
[command path-string?]
[exact 'exact]
[arg string?])
@ -70,6 +72,20 @@ that is supplied as standard output is also used for standard error.
For each port or @racket['stdout] that is provided, no
pipe is created and the corresponding returned value is @racket[#f].
If @racket[group] is @racket['new], then the new process is created as
a new OS-level process group. In that case, @racket[subprocess-kill]
attempts to terminate all processes within the group, which may
include additional processes created by the subprocess.
@margin-note*{Beware that creating a group may interfere with the job
control in an interactive shell, since job control is based on process
groups.} See @racket[subprocess-kill] for details. If @racket[group]
is a subprocess, then that subprocess must have been created with
@racket['new], and the new subprocess will be added to the group;
adding to the group will succeed only on Unix and Mac OS, and only in
the same cases that @racket[subprocess-kill] would have an effect
(i.e., the subprocess is not known to have terminated), otherwise it
will fail silently.
The @racket[subprocess] procedure returns four values:
@itemize[
@ -107,13 +123,6 @@ the current custodian (see @secref["custodians"]). The
process or the creation of operating system pipes for process
communication.
If the @racket[subprocess-group-enabled] parameter's value is true,
then the new process is created as a new OS-level process group. In
that case, @racket[subprocess-kill] attempts to terminate all
processes within the group, which may include additional processes
created by the subprocess. See @racket[subprocess-kill] for details,
and see @racket[subprocess-group-enabled] for additional caveats.
The @racket[current-subprocess-custodian-mode] parameter determines
whether the subprocess itself is registered with the current
@tech{custodian} so that a custodian shutdown calls
@ -121,7 +130,9 @@ whether the subprocess itself is registered with the current
A subprocess can be used as a @tech{synchronizable event} (see @secref["sync"]).
A subprocess value is @tech{ready for synchronization} when
@racket[subprocess-wait] would not block; @resultItself{subprocess value}.}
@racket[subprocess-wait] would not block; @resultItself{subprocess value}.
@history[#:changed "6.11.0.1" @elem{Added the @racket[group] argument.}]}
@defproc[(subprocess-wait [subproc subprocess?]) void?]{
@ -229,10 +240,8 @@ not all of them.}
@defboolparam[subprocess-group-enabled on?]{
A @tech{parameter} that determines whether a subprocess is created as
a new process group. See @racket[subprocess-kill] for more information.
Beware that creating a group may interfere with the job control in an
interactive shell, since job control is based on process groups.}
a new process group by default. See @racket[subprocess] and
@racket[subprocess-kill] for more information.}
@defproc[(shell-execute [verb (or/c string? #f)]

View File

@ -451,8 +451,20 @@
(parameterize ([current-input-port (open-input-string "")])
(system (format "kill ~a" sub-pid)))))))])
(try #t)
(try #f)))
(try #f))
(let ()
(define-values (p1 o1 i1 e1) (subprocess (current-output-port) (current-input-port) (current-error-port) 'new "/bin/cat"))
(define-values (p2 o2 i2 e2) (subprocess (current-output-port) (current-input-port) (current-error-port) p1 "/bin/cat"))
(test 'running subprocess-status p1)
(test 'running subprocess-status p2)
(subprocess-kill p1 #t)
(test p1 sync p1)
(test p2 sync p2)
(test (subprocess-status p1) subprocess-status p2)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Check status result

View File

@ -60,6 +60,7 @@ typedef struct Scheme_Subprocess {
Scheme_Object so;
rktio_process_t *proc;
Scheme_Custodian_Reference *mref;
int is_group_rep;
} Scheme_Subprocess;
/******************** refcounts ********************/
@ -270,7 +271,7 @@ ROSYM static Scheme_Object *must_truncate_symbol;
ROSYM Scheme_Object *scheme_none_symbol, *scheme_line_symbol, *scheme_block_symbol;
ROSYM static Scheme_Object *exact_symbol;
ROSYM static Scheme_Object *exact_symbol, *new_symbol;
#define READ_STRING_BYTE_BUFFER_SIZE 1024
THREAD_LOCAL_DECL(static char *read_string_byte_buffer);
@ -327,8 +328,10 @@ scheme_init_port (Scheme_Env *env)
scheme_block_symbol = scheme_intern_symbol("block");
REGISTER_SO(exact_symbol);
REGISTER_SO(new_symbol);
exact_symbol = scheme_intern_symbol("exact");
new_symbol = scheme_intern_symbol("new");
REGISTER_SO(fd_input_port_type);
REGISTER_SO(fd_output_port_type);
@ -5778,6 +5781,8 @@ static Scheme_Object *redirect_get_or_peek_bytes_k(void)
/* subprocess */
/*========================================================================*/
#define SCHEME_SUBPROCESSP(o) SAME_TYPE(SCHEME_TYPE(o), scheme_subprocess_type)
static void close_subprocess_handle(void *so, void *ignored)
{
Scheme_Subprocess *sp = (Scheme_Subprocess *)so;
@ -5877,7 +5882,7 @@ static Scheme_Object *do_subprocess_kill(Scheme_Object *_sp, Scheme_Object *kill
if (!ok) {
if (can_error)
scheme_raise_exn(MZEXN_FAIL,
"subprocess-kill: operation failed\n"
"Subprocess-kill: operation failed\n"
" system error: %R");
}
@ -5957,7 +5962,7 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
Scheme_Object *errport;
Scheme_Object *a[4];
Scheme_Subprocess *subproc;
Scheme_Object *cust_mode, *current_dir;
Scheme_Object *cust_mode, *current_dir, *group;
int flags = 0;
rktio_fd_t *stdout_fd = NULL;
rktio_fd_t *stdin_fd = NULL;
@ -5966,6 +5971,7 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
rktio_envvars_t *envvars;
rktio_process_result_t *result;
Scheme_Config *config;
int command_arg_i;
int argc;
char **argv, *command;
@ -6030,18 +6036,34 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
scheme_wrong_contract(name, "(or/c (and/c file-stream-port? output-port?) #f 'stdout)", 2, c, args);
}
if (!SCHEME_PATH_STRINGP(args[3]))
scheme_wrong_contract(name, "path-string?", 3, c, args);
if ((c > 4)
&& (SCHEME_FALSEP(args[3])
|| SAME_OBJ(args[3], new_symbol)
|| SCHEME_SUBPROCESSP(args[3]))) {
/* optional group specification provided */
command_arg_i = 4;
group = args[3];
} else {
command_arg_i = 3;
group = scheme_false;
}
if (!SCHEME_PATH_STRINGP(args[command_arg_i]))
scheme_wrong_contract(name,
(((command_arg_i == 3) && (c > 4))
? "(or/c path-string? #f 'new subprocess?)"
: "path-string?"),
command_arg_i, c, args);
/*--------------------------------------*/
/* Sort out arguments */
/*--------------------------------------*/
argc = c - 3;
argc = c - command_arg_i;
argv = MALLOC_N(char *, argc);
{
char *ef;
ef = scheme_expand_string_filename(args[3],
ef = scheme_expand_string_filename(args[command_arg_i],
(char *)name,
NULL,
SCHEME_GUARD_FILE_EXECUTE);
@ -6056,13 +6078,13 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
argv[0] = np;
}
if ((c == 6) && SAME_OBJ(args[4], exact_symbol)) {
if ((c == (command_arg_i + 3)) && SAME_OBJ(args[command_arg_i+1], exact_symbol)) {
argv[2] = NULL;
if (!SCHEME_CHAR_STRINGP(args[5]) || scheme_any_string_has_null(args[5]))
scheme_wrong_contract(name, CHAR_STRING_W_NO_NULLS, 5, c, args);
if (!SCHEME_CHAR_STRINGP(args[command_arg_i+2]) || scheme_any_string_has_null(args[command_arg_i+2]))
scheme_wrong_contract(name, CHAR_STRING_W_NO_NULLS, command_arg_i+2, c, args);
{
Scheme_Object *bs;
bs = scheme_char_string_to_byte_string(args[5]);
bs = scheme_char_string_to_byte_string(args[command_arg_i+2]);
argv[1] = SCHEME_BYTE_STR_VAL(bs);
}
@ -6071,11 +6093,11 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
else
scheme_contract_error(name,
"exact command line not supported on this platform",
"exact command", 1, args[5],
"exact command", 1, args[command_arg_i + 2],
NULL);
} else {
int i;
for (i = 4; i < c; i++) {
for (i = command_arg_i + 1; i < c; i++) {
if (((!SCHEME_CHAR_STRINGP(args[i]) && !SCHEME_BYTE_STRINGP(args[i]))
|| scheme_any_string_has_null(args[i]))
&& !SCHEME_PATHP(args[i]))
@ -6087,13 +6109,22 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
bs = args[i];
if (SCHEME_CHAR_STRINGP(args[i]))
bs = scheme_char_string_to_byte_string_locale(bs);
argv[i - 3] = SCHEME_BYTE_STR_VAL(bs);
argv[i - command_arg_i] = SCHEME_BYTE_STR_VAL(bs);
}
}
}
command = argv[0];
if (SCHEME_SUBPROCESSP(group)) {
if (!((Scheme_Subprocess *)group)->is_group_rep) {
scheme_contract_error(name, "subprocess does not represent a new group",
"subprocess", 1, group,
NULL);
return NULL;
}
}
if (!stdin_fd || !stdout_fd || !stderr_fd)
scheme_custodian_check_available(NULL, name, "file-stream");
@ -6103,8 +6134,12 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
config = scheme_current_config();
cust_mode = scheme_get_param(config, MZCONFIG_SUBPROC_GROUP_ENABLED);
if (SCHEME_TRUEP(cust_mode))
if (SCHEME_FALSEP(group)) {
group = scheme_get_param(config, MZCONFIG_SUBPROC_GROUP_ENABLED);
if (SCHEME_TRUEP(group))
group = new_symbol;
}
if (SAME_OBJ(group, new_symbol))
flags |= RKTIO_PROCESS_NEW_GROUP;
cust_mode = scheme_get_param(config, MZCONFIG_SUBPROC_CUSTODIAN_MODE);
@ -6122,6 +6157,7 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
result = rktio_process(scheme_rktio,
command, argc, (rktio_const_string_t *)argv,
stdout_fd, stdin_fd, stderr_fd,
(SCHEME_SUBPROCESSP(group) ? ((Scheme_Subprocess *)group)->proc : NULL),
SCHEME_PATH_VAL(current_dir), envvars,
flags);
@ -6160,6 +6196,7 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
subproc = MALLOC_ONE_TAGGED(Scheme_Subprocess);
subproc->so.type = scheme_subprocess_type;
subproc->proc = result->process;
subproc->is_group_rep = SAME_OBJ(group, new_symbol);
scheme_add_finalizer(subproc, close_subprocess_handle, NULL);
if (SCHEME_TRUEP(cust_mode)) {

View File

@ -13,7 +13,7 @@
consistently.)
*/
#define MZSCHEME_VERSION "6.11.0.2"
#define MZSCHEME_VERSION "6.11.0.1"
#define MZSCHEME_VERSION_X 6
#define MZSCHEME_VERSION_Y 11

View File

@ -546,8 +546,12 @@ typedef struct rktio_process_result_t {
RKTIO_EXTERN rktio_process_result_t *rktio_process(rktio_t *rktio,
rktio_const_string_t command, int argc, rktio_const_string_t *argv,
rktio_fd_t *stdout_fd, rktio_fd_t *stdin_fd, rktio_fd_t *stderr_fd,
rktio_const_string_t current_directory, rktio_envvars_t *envvars,
RKTIO_NULLABLE rktio_fd_t *stdout_fd,
RKTIO_NULLABLE rktio_fd_t *stdin_fd,
RKTIO_NULLABLE rktio_fd_t *stderr_fd,
RKTIO_NULLABLE rktio_process_t *group_proc,
rktio_const_string_t current_directory,
rktio_envvars_t *envvars,
int flags);
/* `flags` flags: */
#define RKTIO_PROCESS_NEW_GROUP (1<<0)
@ -572,7 +576,8 @@ RKTIO_EXTERN void rktio_process_forget(rktio_t *rktio, rktio_process_t *sp);
/* Deallocates a process record, whether or not the process has
stopped. */
RKTIO_EXTERN rktio_ok_t rktio_poll_process_done(rktio_t *rktio, rktio_process_t *sp);
RKTIO_EXTERN_ERR(RKTIO_PROCESS_ERROR)
rktio_tri_t rktio_poll_process_done(rktio_t *rktio, rktio_process_t *sp);
/* Check whether a process has completed: */
#define RKTIO_PROCESS_ERROR (-2)
#define RKTIO_PROCESS_DONE 1

View File

@ -1199,6 +1199,7 @@ int rktio_process_allowed_flags(rktio_t *rktio)
rktio_process_result_t *rktio_process(rktio_t *rktio,
const char *command, int argc, rktio_const_string_t *argv,
rktio_fd_t *stdout_fd, rktio_fd_t *stdin_fd, rktio_fd_t *stderr_fd,
rktio_process_t *group_proc,
const char *current_directory, rktio_envvars_t *envvars,
int flags)
{
@ -1345,18 +1346,24 @@ rktio_process_result_t *rktio_process(rktio_t *rktio,
pid = fork();
#endif
if (pid > 0) {
/* This is the original process, which needs to manage the
newly created child process. */
if (new_process_group)
if (new_process_group || group_proc) {
/* there's a race condition between this use and the exec(),
and there's a race condition between the other setpgid() in
the child processand sending signals from the parent
process; so, we set in both, and at least one will
succeed; we could perform better error checking, since
EACCES is the only expected error */
setpgid(pid, pid);
int pgid = pid;
if (group_proc)
pgid = group_proc->pid;
setpgid(pid, pgid); /* note: silent failure */
}
#if defined(CENTRALIZED_SIGCHILD)
{
@ -1374,9 +1381,14 @@ rktio_process_result_t *rktio_process(rktio_t *rktio,
#endif
} else if (!pid) {
/* This is the new child process */
if (new_process_group)
if (new_process_group || group_proc) {
/* see also setpgid above */
setpgid(getpid(), getpid()); /* setpgid(0, 0) would work on some platforms */
int actual_pid = getpid();
int pgid = actual_pid;
if (group_proc)
pgid = group_proc->pid;
setpgid(actual_pid, pgid); /* note: silent failure */
}
} else {
get_posix_error();
}
@ -1547,7 +1559,7 @@ rktio_process_result_t *rktio_process(rktio_t *rktio,
subproc->handle = (void *)sc;
#endif
subproc->pid = pid;
subproc->is_group = new_process_group;
subproc->is_group = (new_process_group || group_proc);
result->process = subproc;