rktio: better behavior for opening write and of fifo

When opening the write end of a fifo that doesn't have a reader
already, the old implementation could allow writing bytes that are
discarded. This new implementation uses a blocking `open` in a
`pthread`, and that way the write routines know whether the stream is
ready for writing or not.

The difference is visible in the Racket API in a two places:
`subprocess` needs to wait until a fifo writer is connected before
attempting to dup the corresponding file descriotor, and more
generally a use of `unsafe-port->file-descriptor` needs to wait. The
former blocking operation is now build into `subprocess` (and
documented), but the burden is place on callers of
`unsafe-port->file-descriptor` to wait is necessary.

The new `port-waiting-peer?` predicate exposes the waiting state,
while `sync` is sufficient to wait for a peer.

Closes #2784
This commit is contained in:
Matthew Flatt 2019-08-20 21:38:56 +02:00
parent 264ec72790
commit 2d0f10f473
25 changed files with 652 additions and 60 deletions

View File

@ -12,7 +12,7 @@
(define collection 'multi) (define collection 'multi)
(define version "7.4.0.4") (define version "7.4.0.5")
(define deps `("racket-lib" (define deps `("racket-lib"
["racket" #:version ,version])) ["racket" #:version ,version]))

View File

@ -63,7 +63,17 @@ previously created with @racket[unsafe-file-descriptor->semaphore] or
)]{ )]{
Returns a file descriptor (which is a @tt{HANDLE} value on Windows) of Returns a file descriptor (which is a @tt{HANDLE} value on Windows) of
a socket for @racket[port] if it has one, @racket[#f] otherwise.} a socket for @racket[port] if it has one, @racket[#f] otherwise.
On Unix and Mac OS, the result of
@racket[unsafe-port->file-descriptor] can be @racket[#f] if it
corresponds to a port that is waiting for its peer as reported by
@racket[port-waiting-peer?], such as the write end of a fifo where no
reader is connected. Wait until such is ready by using @racket[sync]).
@history[#:changed "7.4.0.5" @elem{Accommodate a fifo write
end blocked on a reader by
returning @racket[#f].}]}
@deftogether[( @deftogether[(

View File

@ -187,7 +187,10 @@ The file specified by @racket[path] need not be a regular file. It
might be a device that is connected through the filesystem, such as might be a device that is connected through the filesystem, such as
@filepath{aux} on Windows or @filepath{/dev/null} on Unix. The output @filepath{aux} on Windows or @filepath{/dev/null} on Unix. The output
port is block-buffered by default, unless the file corresponds to a port is block-buffered by default, unless the file corresponds to a
terminal, in which case it is line-buffered by default. terminal, in which case it is line-buffered by default. On Unix and
Mac OS, if the file is a fifo, then the port will block for writing
until a reader for the fifo is available; see also
@racket[port-waiting-peer?].
The port produced by @racket[open-output-file] should be explicitly The port produced by @racket[open-output-file] should be explicitly
closed, either though @racket[close-output-port] or indirectly via closed, either though @racket[close-output-port] or indirectly via
@ -212,7 +215,10 @@ then @exnraise[exn:fail:filesystem:errno].
@history[#:changed "6.9.0.6" @elem{On Unix and Mac OS, make @racket['truncate/replace] @history[#:changed "6.9.0.6" @elem{On Unix and Mac OS, make @racket['truncate/replace]
replace on a permission error. On Windows, make replace on a permission error. On Windows, make
@racket['replace] always replace instead truncating @racket['replace] always replace instead truncating
like @racket['truncate/replace].}]} like @racket['truncate/replace].}
#:changed "7.4.0.5" @elem{Changed handling of a fifo on Unix and Mac OS to
make the port block for output until the fifo has a
reader.}]}
@defproc[(open-input-output-file [path path-string?] @defproc[(open-input-output-file [path path-string?]
[#:mode mode-flag (or/c 'binary 'text) 'binary] [#:mode mode-flag (or/c 'binary 'text) 'binary]

View File

@ -63,6 +63,24 @@ interactive terminal, @racket[#f] otherwise.
to any value, instead of resticting to any value, instead of resticting
the domain to ports}]} the domain to ports}]}
@defproc[(port-waiting-peer? [port port?]) boolean?]{
Returns @racket[#t] if @racket[port] is not ready for reading or
writing because it is waiting for a peer process to complete a stream
construction, @racket[#f] otherwise.
On Unix and Mac OS, opening a fifo for output creates a peer-waiting
port if no reader for the same fifo is already opened. In that case,
the output port is not ready for writing until a reader is opened;
that is, write opertaions will block. Use @racket[sync] if necessary
to wait until writing will not block---that is, until the read end of
the fifo is opened.
@history[#:added "7.4.0.5"]}
@defthing[eof eof-object?]{A value (distinct from all other values) @defthing[eof eof-object?]{A value (distinct from all other values)
that represents an end-of-file.} that represents an end-of-file.}

View File

@ -71,6 +71,10 @@ returned by @racket[subprocess]. The @racket[stderr] argument can be
that is supplied as standard output is also used for standard error. that is supplied as standard output is also used for standard error.
For each port or @racket['stdout] that is provided, no For each port or @racket['stdout] that is provided, no
pipe is created and the corresponding returned value is @racket[#f]. pipe is created and the corresponding returned value is @racket[#f].
If @racket[stdout] or @racket[stderr] is a port for which
@racket[port-waiting-peer?] returns true, then @racket[subprocess]
waits for the port to become ready for writing before proceeding with
the subprocess creation.
If @racket[group] is @racket['new], then the new process is created as If @racket[group] is @racket['new], then the new process is created as
a new OS-level process group. In that case, @racket[subprocess-kill] a new OS-level process group. In that case, @racket[subprocess-kill]
@ -132,7 +136,9 @@ A subprocess can be used as a @tech{synchronizable event} (see @secref["sync"]).
A subprocess value is @tech{ready for synchronization} when A subprocess value is @tech{ready for synchronization} when
@racket[subprocess-wait] would not block; @resultItself{subprocess value}. @racket[subprocess-wait] would not block; @resultItself{subprocess value}.
@history[#:changed "6.11.0.1" @elem{Added the @racket[group] argument.}]} @history[#:changed "6.11.0.1" @elem{Added the @racket[group] argument.}
#:changed "7.4.0.5" @elem{Added waiting for a fifo without a reader
as @racket[stdout] and/or @racket[stderr].}]}
@defproc[(subprocess-wait [subproc subprocess?]) void?]{ @defproc[(subprocess-wait [subproc subprocess?]) void?]{

View File

@ -1724,6 +1724,52 @@
(for ([f '("tmp1" "tmp2" "tmp3")] #:when (file-exists? f)) (delete-file f)) (for ([f '("tmp1" "tmp2" "tmp3")] #:when (file-exists? f)) (delete-file f))
(current-directory original-dir) (current-directory original-dir)
(unless (eq? 'windows (system-type))
(define fifo (build-path work-dir "ff"))
(system* (find-executable-path "mkfifo") fifo)
(define i1 (open-input-file fifo))
(define o1 (open-output-file fifo #:exists 'update))
(write-bytes #"abc" o1)
(flush-output o1)
(test #"abc" read-bytes 3 i1)
(close-input-port i1)
(close-output-port o1)
(define (check-output-blocking do-write-abc)
;; Make sure an output fifo blocks until there's a reader
(define t1
(thread
(lambda ()
(define o2 (open-output-file fifo #:exists 'update))
(test #t port-waiting-peer? o2)
(do-write-abc o2)
(close-output-port o2))))
(define t2
(thread
(lambda ()
(sync (system-idle-evt))
(define i2 (open-input-file fifo))
(test #"abc" read-bytes 3 i2)
(close-input-port i2))))
(sync t1)
(sync t2))
(check-output-blocking (lambda (o2) (write-bytes #"abc" o2)))
(check-output-blocking (lambda (o2)
(parameterize ([current-output-port o2])
(system* (find-executable-path "echo")
"-n"
"abc"))))
(delete-file fifo))
(test #f port-waiting-peer? (current-input-port))
(test #f port-waiting-peer? (current-output-port))
(test #f port-waiting-peer? (open-input-bytes #""))
(err/rt-test (port-waiting-peer? 10))
(delete-directory work-dir) (delete-directory work-dir)
;; Network - - - - - - - - - - - - - - - - - - - - - - ;; Network - - - - - - - - - - - - - - - - - - - - - -

View File

@ -622,6 +622,7 @@
[port-provides-progress-evts? (known-procedure 2)] [port-provides-progress-evts? (known-procedure 2)]
[port-read-handler (known-procedure 6)] [port-read-handler (known-procedure 6)]
[port-try-file-lock? (known-procedure 4)] [port-try-file-lock? (known-procedure 4)]
[port-waiting-peer? (known-procedure 2)]
[port-write-handler (known-procedure 6)] [port-write-handler (known-procedure 6)]
[port-writes-atomic? (known-procedure 2)] [port-writes-atomic? (known-procedure 2)]
[port-writes-special? (known-procedure 2)] [port-writes-special? (known-procedure 2)]

View File

@ -11,7 +11,7 @@
rktio-errstep rktio-errstep
racket-error? racket-error?
rktio-place-init!) rktio-place-init!)
;; More `provide`s added by macros below ;; More `provide`s are added by macros below
(define rktio-table (define rktio-table
(or (primitive-table '#%rktio) (or (primitive-table '#%rktio)

View File

@ -1,6 +1,7 @@
#lang racket/base #lang racket/base
(require racket/fixnum (require racket/fixnum
"../common/class.rkt" "../common/class.rkt"
"../common/check.rkt"
"../host/rktio.rkt" "../host/rktio.rkt"
"../host/error.rkt" "../host/error.rkt"
"../host/thread.rkt" "../host/thread.rkt"
@ -26,6 +27,7 @@
open-output-fd open-output-fd
finish-fd-output-port finish-fd-output-port
terminal-port? terminal-port?
port-waiting-peer?
fd-port-fd fd-port-fd
prop:fd-place-message-opener) prop:fd-place-message-opener)
@ -348,6 +350,19 @@
(fd-output-port-fd cp)] (fd-output-port-fd cp)]
[else #f])) [else #f]))
(define/who (port-waiting-peer? p)
(define cp (->core-output-port p #:default #f))
(cond
[cp
(cond
[(fd-output-port? cp)
(define fd (fd-port-fd cp))
(rktio_fd_is_pending_open rktio fd)]
[else #f])]
[(input-port? p) #f]
[else
(raise-argument-error who "port?" p)]))
;; ---------------------------------------- ;; ----------------------------------------
;; in atomic mode ;; in atomic mode

View File

@ -16,7 +16,8 @@
"file-port.rkt" "file-port.rkt"
"file-stream.rkt" "file-stream.rkt"
(only-in "fd-port.rkt" (only-in "fd-port.rkt"
terminal-port?) terminal-port?
port-waiting-peer?)
"file-identity.rkt" "file-identity.rkt"
"file-lock.rkt" "file-lock.rkt"
"bytes-port.rkt" "bytes-port.rkt"
@ -112,6 +113,7 @@
file-stream-port? file-stream-port?
terminal-port? terminal-port?
port-waiting-peer?
open-input-bytes open-input-bytes
open-output-bytes open-output-bytes

View File

@ -116,6 +116,14 @@
(define command-bstr (->host (->path command) who '(execute))) (define command-bstr (->host (->path command) who '(execute)))
;; If `stdout` or `stderr` is a fifo with no read end open, wait for it:
(define (maybe-wait fd)
(when (and fd (rktio_fd_is_pending_open rktio (fd-port-fd fd)))
(sync fd)))
(maybe-wait stdout)
(unless (eq? stderr 'stdout)
(maybe-wait stderr))
(start-atomic) (start-atomic)
(poll-subprocess-finalizations) (poll-subprocess-finalizations)
(check-current-custodian who) (check-current-custodian who)

View File

@ -42,6 +42,7 @@
(define (unsafe-port->file-descriptor p) (define (unsafe-port->file-descriptor p)
(define fd (fd-port-fd p)) (define fd (fd-port-fd p))
(and fd (and fd
(not (rktio_fd_is_pending_open rktio fd))
(rktio_fd_system_fd rktio fd))) (rktio_fd_system_fd rktio fd)))
(define (unsafe-port->socket p) (define (unsafe-port->socket p)

View File

@ -3296,6 +3296,30 @@ scheme_file_stream_port_p (int argc, Scheme_Object *argv[])
return scheme_false; return scheme_false;
} }
Scheme_Object *scheme_port_waiting_peer_p(int argc, Scheme_Object *argv[])
{
Scheme_Object *p = argv[0];
if (SCHEME_OUTPUT_PORTP(p)) {
Scheme_Output_Port *op;
op = scheme_output_port_record(p);
if (SAME_OBJ(op->sub_type, fd_output_port_type)) {
rktio_fd_t *rfd = ((Scheme_FD *)op->port_data)->fd;
if (rktio_fd_is_pending_open(scheme_rktio, rfd))
return scheme_true;
}
} else if (SCHEME_INPUT_PORTP(p)) {
/* ok */
} else {
scheme_wrong_contract("port-waiting-peer?", "port?", 0, argc, argv);
ESCAPED_BEFORE_HERE;
}
return scheme_false;
}
int scheme_get_port_file_descriptor(Scheme_Object *p, intptr_t *_fd) int scheme_get_port_file_descriptor(Scheme_Object *p, intptr_t *_fd)
{ {
intptr_t fd = 0; intptr_t fd = 0;
@ -3325,8 +3349,11 @@ int scheme_get_port_file_descriptor(Scheme_Object *p, intptr_t *_fd)
fd = MSC_IZE (fileno)((FILE *)((Scheme_Output_File *)op->port_data)->f); fd = MSC_IZE (fileno)((FILE *)((Scheme_Output_File *)op->port_data)->f);
fd_ok = 1; fd_ok = 1;
} else if (SAME_OBJ(op->sub_type, fd_output_port_type)) { } else if (SAME_OBJ(op->sub_type, fd_output_port_type)) {
fd = rktio_fd_system_fd(scheme_rktio, ((Scheme_FD *)op->port_data)->fd); rktio_fd_t *rfd = ((Scheme_FD *)op->port_data)->fd;
fd_ok = 1; if (!rktio_fd_is_pending_open(scheme_rktio, rfd)) {
fd = rktio_fd_system_fd(scheme_rktio, rfd);
fd_ok = 1;
}
} }
} }
} }
@ -6234,6 +6261,17 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
if (!stdin_fd || !stdout_fd || !stderr_fd) if (!stdin_fd || !stdout_fd || !stderr_fd)
scheme_custodian_check_available(NULL, name, "file-stream"); scheme_custodian_check_available(NULL, name, "file-stream");
/* In case `stdout_fd` or `stderr_fd` is a fifo with no read end
open, wait for it. */
if (stdout_fd && rktio_fd_is_pending_open(scheme_rktio, stdout_fd)) {
a[0] = args[0];
scheme_sync(1, a);
}
if (stderr_fd && rktio_fd_is_pending_open(scheme_rktio, stderr_fd)) {
a[0] = args[2];
scheme_sync(1, a);
}
/*--------------------------------------*/ /*--------------------------------------*/
/* Create subprocess */ /* Create subprocess */
/*--------------------------------------*/ /*--------------------------------------*/

View File

@ -8,8 +8,9 @@
static Scheme_Object *input_port_p (int, Scheme_Object *[]); static Scheme_Object *input_port_p (int, Scheme_Object *[]);
static Scheme_Object *output_port_p (int, Scheme_Object *[]); static Scheme_Object *output_port_p (int, Scheme_Object *[]);
static Scheme_Object *port_closed_p (int, Scheme_Object *[]); static Scheme_Object *port_closed_p (int, Scheme_Object *[]);
static Scheme_Object *current_input_port (int, Scheme_Object *[]);
static Scheme_Object *string_port_p(int, Scheme_Object *[]); static Scheme_Object *string_port_p(int, Scheme_Object *[]);
static Scheme_Object *waiting_peer_port_p(int, Scheme_Object *[]);
static Scheme_Object *current_input_port (int, Scheme_Object *[]);
static Scheme_Object *current_output_port (int, Scheme_Object *[]); static Scheme_Object *current_output_port (int, Scheme_Object *[]);
static Scheme_Object *current_error_port (int, Scheme_Object *[]); static Scheme_Object *current_error_port (int, Scheme_Object *[]);
static Scheme_Object *make_input_port (int, Scheme_Object *[]); static Scheme_Object *make_input_port (int, Scheme_Object *[]);
@ -221,6 +222,7 @@ scheme_init_port_fun(Scheme_Startup_Env *env)
ADD_FOLDING_PRIM("file-stream-port?", scheme_file_stream_port_p, 1, 1, 1, env); ADD_FOLDING_PRIM("file-stream-port?", scheme_file_stream_port_p, 1, 1, 1, env);
ADD_FOLDING_PRIM("string-port?", string_port_p, 1, 1, 1, env); ADD_FOLDING_PRIM("string-port?", string_port_p, 1, 1, 1, env);
ADD_FOLDING_PRIM("terminal-port?", scheme_terminal_port_p, 1, 1, 1, env); ADD_FOLDING_PRIM("terminal-port?", scheme_terminal_port_p, 1, 1, 1, env);
ADD_FOLDING_PRIM("port-waiting-peer?", scheme_port_waiting_peer_p, 1, 1, 1, env);
ADD_NONCM_PRIM("port-closed?", port_closed_p, 1, 1, env); ADD_NONCM_PRIM("port-closed?", port_closed_p, 1, 1, env);
ADD_NONCM_PRIM("open-input-file", open_input_file, 1, 3, env); ADD_NONCM_PRIM("open-input-file", open_input_file, 1, 3, env);

View File

@ -14,7 +14,7 @@
#define USE_COMPILED_STARTUP 1 #define USE_COMPILED_STARTUP 1
#define EXPECTED_PRIM_COUNT 1454 #define EXPECTED_PRIM_COUNT 1455
#ifdef MZSCHEME_SOMETHING_OMITTED #ifdef MZSCHEME_SOMETHING_OMITTED
# undef USE_COMPILED_STARTUP # undef USE_COMPILED_STARTUP

View File

@ -3606,6 +3606,7 @@ void scheme_flush_orig_outputs(void);
void scheme_flush_if_output_fds(Scheme_Object *o); void scheme_flush_if_output_fds(Scheme_Object *o);
Scheme_Object *scheme_file_stream_port_p(int, Scheme_Object *[]); Scheme_Object *scheme_file_stream_port_p(int, Scheme_Object *[]);
Scheme_Object *scheme_terminal_port_p(int, Scheme_Object *[]); Scheme_Object *scheme_terminal_port_p(int, Scheme_Object *[]);
Scheme_Object *scheme_port_waiting_peer_p(int, Scheme_Object *[]);
Scheme_Object *scheme_do_open_input_file(char *name, int offset, int argc, Scheme_Object *argv[], Scheme_Object *scheme_do_open_input_file(char *name, int offset, int argc, Scheme_Object *argv[],
int internal, int for_module); int internal, int for_module);
Scheme_Object *scheme_do_open_output_file(char *name, int offset, int argc, Scheme_Object *argv[], int and_read, Scheme_Object *scheme_do_open_output_file(char *name, int offset, int argc, Scheme_Object *argv[], int and_read,

View File

@ -16,7 +16,7 @@
#define MZSCHEME_VERSION_X 7 #define MZSCHEME_VERSION_X 7
#define MZSCHEME_VERSION_Y 4 #define MZSCHEME_VERSION_Y 4
#define MZSCHEME_VERSION_Z 0 #define MZSCHEME_VERSION_Z 0
#define MZSCHEME_VERSION_W 4 #define MZSCHEME_VERSION_W 5
/* A level of indirection makes `#` work as needed: */ /* A level of indirection makes `#` work as needed: */
#define AS_a_STR_HELPER(x) #x #define AS_a_STR_HELPER(x) #x

View File

@ -12,6 +12,7 @@ rktio_fd_is_socket
rktio_fd_is_udp rktio_fd_is_udp
rktio_fd_is_terminal rktio_fd_is_terminal
rktio_fd_is_text_converted rktio_fd_is_text_converted
rktio_fd_is_pending_open
rktio_fd_modes rktio_fd_modes
rktio_open rktio_open
rktio_close rktio_close

View File

@ -193,7 +193,8 @@ RKTIO_EXTERN rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int
use `RKTIO_OPEN_INIT`. */ use `RKTIO_OPEN_INIT`. */
RKTIO_EXTERN_NOERR intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd); RKTIO_EXTERN_NOERR intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd);
/* Extracts a native file descriptor or socket. */ /* Extracts a native file descriptor or socket. A file descriptor must
not be in pending-open mode as reported by `rktio_fd_is_pending_open`. */
RKTIO_EXTERN rktio_bool_t rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd); RKTIO_EXTERN rktio_bool_t rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd);
RKTIO_EXTERN rktio_bool_t rktio_fd_is_directory(rktio_t *rktio, rktio_fd_t *rfd); RKTIO_EXTERN rktio_bool_t rktio_fd_is_directory(rktio_t *rktio, rktio_fd_t *rfd);
@ -203,8 +204,14 @@ RKTIO_EXTERN rktio_bool_t rktio_fd_is_terminal(rktio_t *rktio, rktio_fd_t *rfd);
/* The functions mostly report values of recorded mode flags. */ /* The functions mostly report values of recorded mode flags. */
RKTIO_EXTERN rktio_bool_t rktio_fd_is_text_converted(rktio_t *rktio, rktio_fd_t *rfd); RKTIO_EXTERN rktio_bool_t rktio_fd_is_text_converted(rktio_t *rktio, rktio_fd_t *rfd);
/* Reports whether RKTIO_OPEN_TEXT was use and has an effect. The /* Reports whether `RKTIO_OPEN_TEXT` was use and has an effect. The
RKTIO_OPEN_TEXT flag has an effect only on Windows. */ `RKTIO_OPEN_TEXT` flag has an effect only on Windows. */
RKTIO_EXTERN rktio_bool_t rktio_fd_is_pending_open(rktio_t *rktio, rktio_fd_t *rfd);
/* Reports whether `rfd` will block on writing because it corresponds
to the write end of a fifo that has no open reader. In that case,
`rktio_fd_system_fd` cannot report a file descriptor and `rktio_ltps_add`
will error with `RKTIO_ERROR_UNSUPPORTED`. */
RKTIO_EXTERN_NOERR int rktio_fd_modes(rktio_t *rktio, rktio_fd_t *rfd); RKTIO_EXTERN_NOERR int rktio_fd_modes(rktio_t *rktio, rktio_fd_t *rfd);
/* Returns all of the recorded mode flags, including those provided to /* Returns all of the recorded mode flags, including those provided to
@ -216,7 +223,9 @@ RKTIO_EXTERN rktio_fd_t *rktio_open(rktio_t *rktio, rktio_const_string_t src, in
in read mode, and can report `RKTIO_ERROR_IS_A_DIRECTORY`, in read mode, and can report `RKTIO_ERROR_IS_A_DIRECTORY`,
`RKTIO_ERROR_EXISTS`, or `RKTIO_ERROR_ACCESS_DENIED` in place of a `RKTIO_ERROR_EXISTS`, or `RKTIO_ERROR_ACCESS_DENIED` in place of a
system error in write mode. On Windows, can report system error in write mode. On Windows, can report
`RKTIO_ERROR_UNSUPPORTED_TEXT_MODE`. */ `RKTIO_ERROR_UNSUPPORTED_TEXT_MODE`. If `modes` has `RKTIO_OPEN_WRITE`
without `RKTIO_OPEN_READ`, then the result may be a file descriptor
in pending-open mode until the read end is opened. */
RKTIO_EXTERN rktio_ok_t rktio_close(rktio_t *rktio, rktio_fd_t *fd); RKTIO_EXTERN rktio_ok_t rktio_close(rktio_t *rktio, rktio_fd_t *fd);
/* Can report `RKTIO_ERROR_EXISTS` in place of system error, /* Can report `RKTIO_ERROR_EXISTS` in place of system error,
@ -582,7 +591,8 @@ RKTIO_EXTERN rktio_process_result_t *rktio_process(rktio_t *rktio,
rktio_const_string_t current_directory, rktio_const_string_t current_directory,
rktio_envvars_t *envvars, rktio_envvars_t *envvars,
int flags); int flags);
/* `flags` flags: */ /* The output file descriptors `stdin_fd` must not be a pending-open
descriptor. The `flags` are: */
#define RKTIO_PROCESS_NEW_GROUP (1<<0) #define RKTIO_PROCESS_NEW_GROUP (1<<0)
#define RKTIO_PROCESS_STDOUT_AS_STDERR (1<<1) #define RKTIO_PROCESS_STDOUT_AS_STDERR (1<<1)
#define RKTIO_PROCESS_WINDOWS_EXACT_CMDLINE (1<<2) #define RKTIO_PROCESS_WINDOWS_EXACT_CMDLINE (1<<2)

View File

@ -12,6 +12,7 @@ Sforeign_symbol("rktio_fd_is_socket", (void *)rktio_fd_is_socket);
Sforeign_symbol("rktio_fd_is_udp", (void *)rktio_fd_is_udp); Sforeign_symbol("rktio_fd_is_udp", (void *)rktio_fd_is_udp);
Sforeign_symbol("rktio_fd_is_terminal", (void *)rktio_fd_is_terminal); Sforeign_symbol("rktio_fd_is_terminal", (void *)rktio_fd_is_terminal);
Sforeign_symbol("rktio_fd_is_text_converted", (void *)rktio_fd_is_text_converted); Sforeign_symbol("rktio_fd_is_text_converted", (void *)rktio_fd_is_text_converted);
Sforeign_symbol("rktio_fd_is_pending_open", (void *)rktio_fd_is_pending_open);
Sforeign_symbol("rktio_fd_modes", (void *)rktio_fd_modes); Sforeign_symbol("rktio_fd_modes", (void *)rktio_fd_modes);
Sforeign_symbol("rktio_open", (void *)rktio_open); Sforeign_symbol("rktio_open", (void *)rktio_open);
Sforeign_symbol("rktio_close", (void *)rktio_close); Sforeign_symbol("rktio_close", (void *)rktio_close);

View File

@ -255,6 +255,11 @@
rktio_bool_t rktio_bool_t
rktio_fd_is_text_converted rktio_fd_is_text_converted
(((ref rktio_t) rktio) ((ref rktio_fd_t) rfd))) (((ref rktio_t) rktio) ((ref rktio_fd_t) rfd)))
(define-function
()
rktio_bool_t
rktio_fd_is_pending_open
(((ref rktio_t) rktio) ((ref rktio_fd_t) rfd)))
(define-function (define-function
() ()
int int

View File

@ -29,6 +29,9 @@ struct rktio_fd_t {
int bufcount; int bufcount;
char buffer[1]; char buffer[1];
# endif # endif
# ifdef RKTIO_USE_PENDING_OPEN
struct open_in_thread_t *pending;
# endif
#endif #endif
#ifdef RKTIO_SYSTEM_WINDOWS #ifdef RKTIO_SYSTEM_WINDOWS
@ -201,9 +204,46 @@ rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes)
return rfd; return rfd;
} }
int rktio_fd_is_pending_open(rktio_t *rktio, rktio_fd_t *rfd)
{
#ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending)
rktio_pending_open_poll(rktio, rfd, rfd->pending);
return !!rfd->pending;
#else
return 0;
#endif
}
#ifdef RKTIO_USE_PENDING_OPEN
rktio_fd_t *rktio_pending_system_fd(rktio_t *rktio, struct open_in_thread_t *oit, int modes)
{
rktio_fd_t *rfd;
rfd = calloc(1, sizeof(rktio_fd_t));
rfd->modes = (modes - (modes & RKTIO_OPEN_INIT));
rfd->pending = oit;
return rfd;
}
void rktio_update_system_fd(rktio_t *rktio, rktio_fd_t *rfd, int fd, int modes)
/* Convert an open-pending rfd to a normal rfd. The given `modes` adds
to (and completes) the modes provided to `rktio_pending_system_fd`. */
{
rfd->fd = fd;
rfd->modes |= modes;
rfd->pending = NULL;
}
#endif
intptr_t rktio_internal_fd_system_fd(rktio_fd_t *rfd) intptr_t rktio_internal_fd_system_fd(rktio_fd_t *rfd)
{ {
#ifdef RKTIO_SYSTEM_UNIX #ifdef RKTIO_SYSTEM_UNIX
# ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending) return -1;
# endif
return rfd->fd; return rfd->fd;
#endif #endif
#ifdef RKTIO_SYSTEM_WINDOWS #ifdef RKTIO_SYSTEM_WINDOWS
@ -272,6 +312,14 @@ int rktio_fd_is_udp(rktio_t *rktio, rktio_fd_t *rfd)
return ((rfd->modes & RKTIO_OPEN_UDP) ? 1 : 0); return ((rfd->modes & RKTIO_OPEN_UDP) ? 1 : 0);
} }
int rktio_fd_is_open_pending(rktio_t *rktio, rktio_fd_t *rfd)
{
#ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending) return 1;
#endif
return 0;
}
int rktio_fd_modes(rktio_t *rktio, rktio_fd_t *rfd) int rktio_fd_modes(rktio_t *rktio, rktio_fd_t *rfd)
{ {
return rfd->modes; return rfd->modes;
@ -298,6 +346,9 @@ int rktio_system_fd_is_terminal(rktio_t *rktio, intptr_t fd)
int rktio_fd_is_terminal(rktio_t *rktio, rktio_fd_t *rfd) int rktio_fd_is_terminal(rktio_t *rktio, rktio_fd_t *rfd)
{ {
#ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending) return 0;
#endif
return rktio_system_fd_is_terminal(rktio, (intptr_t)rfd->fd); return rktio_system_fd_is_terminal(rktio, (intptr_t)rfd->fd);
} }
@ -315,6 +366,13 @@ rktio_fd_t *rktio_dup(rktio_t *rktio, rktio_fd_t *rfd)
#ifdef RKTIO_SYSTEM_UNIX #ifdef RKTIO_SYSTEM_UNIX
intptr_t nfd; intptr_t nfd;
# ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending) {
rktio_pending_open_retain(rktio, rfd->pending);
return rktio_pending_system_fd(rktio, rfd->pending, rfd->modes);
}
# endif
do { do {
nfd = dup(rfd->fd); nfd = dup(rfd->fd);
} while (nfd == -1 && errno == EINTR); } while (nfd == -1 && errno == EINTR);
@ -375,16 +433,22 @@ void rktio_reliably_close(intptr_t s)
#endif #endif
static rktio_ok_t do_close(rktio_t *rktio /* maybe NULL */, rktio_fd_t *rfd, int set_error) static rktio_ok_t do_close(rktio_t *rktio /* maybe NULL */, rktio_fd_t *rfd, int set_error)
/* The `rktio` argument can be NULL for a detached file descriptor */
{ {
int ok; int ok;
#ifdef RKTIO_SYSTEM_UNIX #ifdef RKTIO_SYSTEM_UNIX
int cr; int cr;
cr = rktio_reliably_close_err(rfd->fd); # ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending)
cr = rktio_pending_open_release(rktio, rfd->pending);
else
# endif
cr = rktio_reliably_close_err(rfd->fd);
# ifdef RKTIO_USE_FCNTL_AND_FORK_FOR_FILE_LOCKS # ifdef RKTIO_USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
if (rktio && !(rfd->modes & RKTIO_OPEN_SOCKET)) if (rktio && !(rfd->modes & RKTIO_OPEN_SOCKET) && (!cr || !set_error))
rktio_release_lockf(rktio, rfd->fd); rktio_release_lockf(rktio, rfd->fd);
# endif # endif
@ -446,12 +510,24 @@ void rktio_forget(rktio_t *rktio, rktio_fd_t *rfd)
rktio_fd_transfer_t *rktio_fd_detach(rktio_t *rktio, rktio_fd_t *rfd) rktio_fd_transfer_t *rktio_fd_detach(rktio_t *rktio, rktio_fd_t *rfd)
{ {
#ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending)
rktio_pending_open_detach(rktio, rfd->pending);
#endif
return (rktio_fd_transfer_t *)rfd; return (rktio_fd_transfer_t *)rfd;
} }
rktio_fd_t *rktio_fd_attach(rktio_t *rktio, rktio_fd_transfer_t *rfdt) rktio_fd_t *rktio_fd_attach(rktio_t *rktio, rktio_fd_transfer_t *rfdt)
{ {
return (rktio_fd_t *)rfdt; rktio_fd_t *rfd = (rktio_fd_t *)rfdt;
#ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending)
rktio_pending_open_attach(rktio, rfd->pending);
#endif
return rfd;
} }
void rktio_fd_close_transfer(rktio_fd_transfer_t *rfdt) void rktio_fd_close_transfer(rktio_fd_transfer_t *rfdt)
@ -592,6 +668,19 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush
return RKTIO_POLL_READY; return RKTIO_POLL_READY;
else { else {
int sr; int sr;
# ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending) {
int errval = rktio_pending_open_poll(rktio, rfd, rfd->pending);
if (errval) {
errno = errval;
get_posix_error();
return RKTIO_POLL_ERROR;
} else if (rfd->pending)
return RKTIO_POLL_NOT_READY;
}
# endif
# ifdef HAVE_POLL_SYSCALL # ifdef HAVE_POLL_SYSCALL
struct pollfd pfd[1]; struct pollfd pfd[1];
pfd[0].fd = rfd->fd; pfd[0].fd = rfd->fd;
@ -616,7 +705,7 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush
/* Mac OS X 10.8 and 10.9: select() seems to claim that a pipe /* Mac OS X 10.8 and 10.9: select() seems to claim that a pipe
is always ready for output. To work around that problem, is always ready for output. To work around that problem,
kqueue() support might be used for pipes, but that has different kqueue() support might be used for pipes, but that has different
problems. The poll() code above should be used, instead. */ problems. The pol() code above should be used, instead. */
sr = select(rfd->fd + 1, NULL, RKTIO_FDS(writefds), RKTIO_FDS(exnfds), &time); sr = select(rfd->fd + 1, NULL, RKTIO_FDS(writefds), RKTIO_FDS(exnfds), &time);
} while ((sr == -1) && (errno == EINTR)); } while ((sr == -1) && (errno == EINTR));
# endif # endif
@ -693,6 +782,13 @@ void rktio_poll_add(rktio_t *rktio, rktio_fd_t *rfd, rktio_poll_set_t *fds, int
#ifdef RKTIO_SYSTEM_UNIX #ifdef RKTIO_SYSTEM_UNIX
rktio_poll_set_t *fds2; rktio_poll_set_t *fds2;
# ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending) {
rktio_poll_add_pending_open(rktio, rfd, rfd->pending, fds);
return;
}
# endif
if (modes & RKTIO_POLL_READ) { if (modes & RKTIO_POLL_READ) {
RKTIO_FD_SET(rfd->fd, fds); RKTIO_FD_SET(rfd->fd, fds);
} }
@ -1233,6 +1329,18 @@ intptr_t rktio_write(rktio_t *rktio, rktio_fd_t *rfd, const char *buffer, intptr
if (rfd->modes & RKTIO_OPEN_SOCKET) if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_write(rktio, rfd, buffer, len); return rktio_socket_write(rktio, rfd, buffer, len);
# ifdef RKTIO_USE_PENDING_OPEN
if (rfd->pending) {
int errval = rktio_pending_open_poll(rktio, rfd, rfd->pending);
if (errval) {
errno = errval;
get_posix_error();
return RKTIO_WRITE_ERROR;
} else if (rfd->pending)
return 0;
}
# endif
flags = fcntl(rfd->fd, F_GETFL, 0); flags = fcntl(rfd->fd, F_GETFL, 0);
if (!(flags & RKTIO_NONBLOCKING)) if (!(flags & RKTIO_NONBLOCKING))
fcntl(rfd->fd, F_SETFL, flags | RKTIO_NONBLOCKING); fcntl(rfd->fd, F_SETFL, flags | RKTIO_NONBLOCKING);

View File

@ -11,6 +11,18 @@
#ifdef RKTIO_SYSTEM_WINDOWS #ifdef RKTIO_SYSTEM_WINDOWS
# include <windows.h> # include <windows.h>
#endif #endif
#ifdef RKTIO_USE_PENDING_OPEN
#include <string.h>
#endif
#ifdef RKTIO_SYSTEM_UNIX
static rktio_fd_t *finish_unix_fd_creation(rktio_t *rktio, int fd, int modes, rktio_fd_t *existing_rfd);
#endif
#ifdef RKTIO_USE_PENDING_OPEN
static rktio_fd_t *open_via_thread(rktio_t *rktio, const char *filename, int modes, int flags);
static int do_pending_open_release(rktio_t *rktio, struct open_in_thread_t *data, int close_fd);
#endif
/*========================================================================*/ /*========================================================================*/
/* Opening a file */ /* Opening a file */
@ -106,8 +118,6 @@ static rktio_fd_t *open_write(rktio_t *rktio, const char *filename, int modes)
#ifdef RKTIO_SYSTEM_UNIX #ifdef RKTIO_SYSTEM_UNIX
int fd; int fd;
int flags; int flags;
struct stat buf;
int cr;
flags = (((modes & RKTIO_OPEN_READ) ? O_RDWR : O_WRONLY) flags = (((modes & RKTIO_OPEN_READ) ? O_RDWR : O_WRONLY)
| ((modes & RKTIO_OPEN_MUST_EXIST ? 0 : O_CREAT))); | ((modes & RKTIO_OPEN_MUST_EXIST ? 0 : O_CREAT)));
@ -124,48 +134,20 @@ static rktio_fd_t *open_write(rktio_t *rktio, const char *filename, int modes)
} while ((fd == -1) && (errno == EINTR)); } while ((fd == -1) && (errno == EINTR));
if (errno == ENXIO) { if (errno == ENXIO) {
/* FIFO with no reader? Try opening in RW mode: */ /* FIFO with no reader? */
#ifdef RKTIO_USE_PENDING_OPEN
return open_via_thread(rktio, filename, modes, flags | RKTIO_BINARY);
#else
/* Try opening in RW mode: */
flags -= O_WRONLY; flags -= O_WRONLY;
flags |= O_RDWR; flags |= O_RDWR;
do { do {
fd = open(filename, flags | RKTIO_NONBLOCKING | RKTIO_BINARY, 0666); fd = open(filename, flags | RKTIO_NONBLOCKING | RKTIO_BINARY, 0666);
} while ((fd == -1) && (errno == EINTR)); } while ((fd == -1) && (errno == EINTR));
#endif
} }
if (fd == -1) { return finish_unix_fd_creation(rktio, fd, modes, NULL);
if (errno == EISDIR) {
set_racket_error(RKTIO_ERROR_IS_A_DIRECTORY);
return NULL;
} else if (errno == EEXIST) {
set_racket_error(RKTIO_ERROR_EXISTS);
return NULL;
} else if (errno == EACCES) {
set_racket_error(RKTIO_ERROR_ACCESS_DENIED);
return NULL;
}
if (fd == -1) {
get_posix_error();
return NULL;
}
}
do {
cr = fstat(fd, &buf);
} while ((cr == -1) && (errno == EINTR));
if (cr) {
get_posix_error();
do {
cr = close(fd);
} while ((cr == -1) && (errno == EINTR));
return NULL;
}
return rktio_system_fd(rktio, fd, (modes
| (S_ISREG(buf.st_mode)
? RKTIO_OPEN_REGFILE
: RKTIO_OPEN_NOT_REGFILE)));
#endif #endif
#ifdef RKTIO_SYSTEM_WINDOWS #ifdef RKTIO_SYSTEM_WINDOWS
HANDLE fd; HANDLE fd;
@ -236,6 +218,54 @@ static rktio_fd_t *open_write(rktio_t *rktio, const char *filename, int modes)
#endif #endif
} }
#ifdef RKTIO_SYSTEM_UNIX
static rktio_fd_t *finish_unix_fd_creation(rktio_t *rktio, int fd, int modes, rktio_fd_t *existing_rfd)
{
struct stat buf;
int cr;
if (fd == -1) {
if (errno == EISDIR) {
set_racket_error(RKTIO_ERROR_IS_A_DIRECTORY);
return NULL;
} else if (errno == EEXIST) {
set_racket_error(RKTIO_ERROR_EXISTS);
return NULL;
} else if (errno == EACCES) {
set_racket_error(RKTIO_ERROR_ACCESS_DENIED);
return NULL;
}
if (fd == -1) {
get_posix_error();
return NULL;
}
}
do {
cr = fstat(fd, &buf);
} while ((cr == -1) && (errno == EINTR));
if (cr) {
get_posix_error();
do {
cr = close(fd);
} while ((cr == -1) && (errno == EINTR));
return NULL;
}
modes |= (S_ISREG(buf.st_mode)
? RKTIO_OPEN_REGFILE
: RKTIO_OPEN_NOT_REGFILE);
if (existing_rfd) {
rktio_update_system_fd(rktio, existing_rfd, fd, modes);
return existing_rfd;
} else
return rktio_system_fd(rktio, fd, modes);
}
#endif
rktio_fd_t *rktio_open(rktio_t *rktio, const char *filename, int modes) rktio_fd_t *rktio_open(rktio_t *rktio, const char *filename, int modes)
{ {
if (modes & RKTIO_OPEN_WRITE) if (modes & RKTIO_OPEN_WRITE)
@ -373,3 +403,260 @@ rktio_ok_t rktio_set_file_size(rktio_t *rktio, rktio_fd_t *rfd, rktio_filesize_t
} }
#endif #endif
} }
/*========================================================================*/
/* Thread for blocking open */
/*========================================================================*/
/* When opening a fifo for writing, then there's no way to open in
non-blocking mode and then poll for whether reader is ready.
Instead, we have to open in a separate thread. When the open
succeeds, post to a waiting rktio_t's signal handle.
To allow a blocked-on-opening file descriptor to be transferred
across rktio_t domains, the record that represents the extra thread
must be sharable among multiple OS threads, and we must in general
keep an array of handles.
If the blocked-omn-opening file descriptor is closed, then we have
to be able to cancel the thread. This isn't so bad, since we want
to support canceling only while the `open` system call is
blocked. */
#ifdef RKTIO_USE_PENDING_OPEN
/* An instance of `open_in_thread_t` can be shared by multiple threads
(i.e., multiple `rktio_t` instances) */
typedef struct open_in_thread_t {
pthread_mutex_t lock;
int ready;
pthread_cond_t ready_cond; /* wait until helper thread is ready (including cleanup) */
char *filename;
int flags;
int done;
int fd;
int errval;
int refcount;
pthread_t th;
int num_handles;
rktio_signal_handle_t **handles;
} open_in_thread_t;
static void free_open_in_thread(open_in_thread_t *data)
{
pthread_detach(data->th);
if (data->handles)
free(data->handles);
free(data->filename);
free(data);
}
static void cleanup_open_thread(void *_data)
{
int i, refcount;
open_in_thread_t *data = (open_in_thread_t *)_data;
pthread_mutex_lock(&data->lock);
for (i = 0; i < data->num_handles; i++)
if (data->handles[i])
rktio_signal_received_at(data->handles[i]);
refcount = data->refcount;
data->done = 1;
pthread_mutex_unlock(&data->lock);
if (!refcount) {
if (data->fd != -1)
rktio_reliably_close(data->fd);
free_open_in_thread(data);
}
}
static void *do_open_in_thread(void *_data)
{
open_in_thread_t *data = (open_in_thread_t *)_data;
int fd;
int old_type;
/* To be on the safe side, disable cancelation except
just around the call to `open` */
pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, &old_type);
pthread_cleanup_push(cleanup_open_thread, data);
pthread_mutex_lock(&data->lock);
data->ready = 1;
pthread_cond_signal(&data->ready_cond);
pthread_mutex_unlock(&data->lock);
data->fd = -1;
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
/* Cancelation only possible during `open`, a which point it's ok
and sufficient to run `cleanup_open_thread`; we're assuming that
either `open` returns with a file descriptor or the thread is
canceled before `open` returns, but not both (otherwise there
could be a space leak) */
do {
fd = open(data->filename, data->flags, 0666);
} while ((fd == -1) && (errno == EINTR));
pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, NULL);
data->fd = fd;
if (fd == -1)
data->errval = errno;
/* Runs `cleanup_open_thread` while popping it: */
pthread_cleanup_pop(1);
/* In general, a function tha changes the cancelation type should
restore it before returning */
pthread_setcanceltype(old_type, NULL);
return NULL;
}
static rktio_fd_t *open_via_thread(rktio_t *rktio, const char *filename, int modes, int flags)
{
open_in_thread_t *data;
data = calloc(1, sizeof(open_in_thread_t));
data->refcount = 1;
data->filename = strdup(filename);
data->flags = flags;
pthread_mutex_init(&data->lock, NULL);
pthread_cond_init(&data->ready_cond, NULL);
data->num_handles = 1;
data->handles = malloc(sizeof(rktio_signal_handle_t*));
data->handles[0] = rktio_get_signal_handle(rktio);
(void)pthread_create(&data->th, NULL, do_open_in_thread, data);
pthread_mutex_lock(&data->lock);
if (!data->ready)
pthread_cond_wait(&data->ready_cond, &data->lock);
pthread_mutex_unlock(&data->lock);
return rktio_pending_system_fd(rktio, data, modes);
}
int rktio_pending_open_poll(rktio_t *rktio, rktio_fd_t *existing_rfd, struct open_in_thread_t *data)
/* non-zero result is an errno value */
{
int done;
pthread_mutex_lock(&data->lock);
done = data->done;
pthread_mutex_unlock(&data->lock);
if (done) {
if (data->fd == -1)
return data->errval;
else {
int fd = data->fd;
(void)do_pending_open_release(rktio, data, 0);
if (!finish_unix_fd_creation(rktio, fd, 0, existing_rfd)) {
/* Posix error must be saved in `rktio` */
return rktio->errid;
}
return 0;
}
} else
return 0;
}
void rktio_poll_add_pending_open(rktio_t *rktio, rktio_fd_t *rfd, struct open_in_thread_t *data, rktio_poll_set_t *fds)
{
int done;
pthread_mutex_lock(&data->lock);
done = data->done;
pthread_mutex_unlock(&data->lock);
if (done)
rktio_poll_set_add_nosleep(rktio, fds);
}
void rktio_pending_open_attach(rktio_t *rktio, struct open_in_thread_t *data)
{
int i;
rktio_signal_handle_t *h = rktio_get_signal_handle(rktio);
pthread_mutex_lock(&data->lock);
if (!data->done) {
for (i = 0; i < data->num_handles; i++)
if (!data->handles[i]) {
data->handles[i] = h;
break;
}
if (i >= data->num_handles) {
rktio_signal_handle_t **old = data->handles;
int n = (2 * data->num_handles);
data->handles = calloc(n, sizeof(rktio_signal_handle_t*));
memcpy(data->handles, old, data->num_handles * sizeof(rktio_signal_handle_t*));
data->handles[data->num_handles] = h;
data->num_handles = n;
}
}
pthread_mutex_unlock(&data->lock);
}
static void do_detach(rktio_t *rktio, struct open_in_thread_t *data)
{
int i;
rktio_signal_handle_t *h = rktio_get_signal_handle(rktio);
for (i = 0; i < data->num_handles; i++)
if (data->handles[i] == h)
data->handles[i] = NULL;
}
void rktio_pending_open_detach(rktio_t *rktio, struct open_in_thread_t *data)
{
pthread_mutex_lock(&data->lock);
do_detach(rktio, data);
pthread_mutex_unlock(&data->lock);
}
void rktio_pending_open_retain(rktio_t *rktio, struct open_in_thread_t *data)
{
pthread_mutex_lock(&data->lock);
data->refcount++;
pthread_mutex_unlock(&data->lock);
}
int do_pending_open_release(rktio_t *rktio, struct open_in_thread_t *data, int close_fd)
/* The rktio argument can be NULL for a detached use */
{
int bye;
pthread_mutex_lock(&data->lock);
--data->refcount;
bye = (data->done && !data->refcount);
if (!bye) {
do_detach(rktio, data);
if (!data->refcount)
pthread_cancel(data->th);
}
pthread_mutex_unlock(&data->lock);
if (bye) {
if (close_fd && (data->fd != -1))
rktio_reliably_close(data->fd);
free_open_in_thread(data);
}
return 0;
}
int rktio_pending_open_release(rktio_t *rktio, struct open_in_thread_t *data)
{
return do_pending_open_release(rktio, data, 1);
}
#endif

View File

@ -208,7 +208,16 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
# endif # endif
rktio_ltps_handle_pair_t *v; rktio_ltps_handle_pair_t *v;
rktio_ltps_handle_t *s; rktio_ltps_handle_t *s;
intptr_t fd = rktio_fd_system_fd(rktio, rfd); intptr_t fd;
#ifdef RKTIO_USE_PENDING_OPEN
if (rktio_fd_is_pending_open(rktio, rfd)) {
set_racket_error(RKTIO_ERROR_UNSUPPORTED);
return NULL;
}
#endif
fd = rktio_fd_system_fd(rktio, rfd);
# ifdef HAVE_KQUEUE_SYSCALL # ifdef HAVE_KQUEUE_SYSCALL
if (!rktio_fd_is_socket(rktio, rfd)) { if (!rktio_fd_is_socket(rktio, rfd)) {

View File

@ -339,6 +339,23 @@ void rktio_close_fds_after_fork(int len, int skip1, int skip2, int skip3);
int rktio_system_fd_is_terminal(rktio_t *rktio, intptr_t fd); int rktio_system_fd_is_terminal(rktio_t *rktio, intptr_t fd);
#ifdef RKTIO_USE_PTHREADS
# define RKTIO_USE_PENDING_OPEN
#endif
#ifdef RKTIO_USE_PENDING_OPEN
struct open_in_thread_t;
rktio_fd_t *rktio_pending_system_fd(rktio_t *rktio, struct open_in_thread_t *oit, int modes);
void rktio_update_system_fd(rktio_t *rktio, rktio_fd_t *rfd, int fd, int modes);
int rktio_fd_is_pending_open(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_pending_open_poll(rktio_t *rktio, rktio_fd_t *rfd, struct open_in_thread_t *oit);
void rktio_poll_add_pending_open(rktio_t *rktio, rktio_fd_t *rfd, struct open_in_thread_t *oit, rktio_poll_set_t *fds);
void rktio_pending_open_detach(rktio_t *rktio, struct open_in_thread_t *oit);
void rktio_pending_open_attach(rktio_t *rktio, struct open_in_thread_t *oit);
void rktio_pending_open_retain(rktio_t *rktio, struct open_in_thread_t *oit);
int rktio_pending_open_release(rktio_t *rktio, struct open_in_thread_t *oit);
#endif
void *rktio_envvars_to_block(rktio_t *rktio, rktio_envvars_t *envvars); void *rktio_envvars_to_block(rktio_t *rktio, rktio_envvars_t *envvars);
void rktio_stop_fs_change(rktio_t *rktio); void rktio_stop_fs_change(rktio_t *rktio);