From 2d0f10f4736d79bc2d2fb4a0505892c6a285a561 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Tue, 20 Aug 2019 21:38:56 +0200 Subject: [PATCH] rktio: better behavior for opening write and of fifo When opening the write end of a fifo that doesn't have a reader already, the old implementation could allow writing bytes that are discarded. This new implementation uses a blocking `open` in a `pthread`, and that way the write routines know whether the stream is ready for writing or not. The difference is visible in the Racket API in a two places: `subprocess` needs to wait until a fifo writer is connected before attempting to dup the corresponding file descriotor, and more generally a use of `unsafe-port->file-descriptor` needs to wait. The former blocking operation is now build into `subprocess` (and documented), but the burden is place on callers of `unsafe-port->file-descriptor` to wait is necessary. The new `port-waiting-peer?` predicate exposes the waiting state, while `sync` is sufficient to wait for a peer. Closes #2784 --- pkgs/base/info.rkt | 2 +- .../racket-doc/scribblings/foreign/port.scrbl | 12 +- .../scribblings/reference/file-ports.scrbl | 10 +- .../scribblings/reference/port-procs.scrbl | 18 + .../scribblings/reference/subprocess.scrbl | 8 +- pkgs/racket-test-core/tests/racket/file.rktl | 46 +++ racket/src/cs/primitive/kernel.ss | 1 + racket/src/io/host/rktio.rkt | 2 +- racket/src/io/port/fd-port.rkt | 15 + racket/src/io/port/main.rkt | 4 +- racket/src/io/subprocess/main.rkt | 8 + racket/src/io/unsafe/port.rkt | 1 + racket/src/racket/src/port.c | 42 +- racket/src/racket/src/portfun.c | 4 +- racket/src/racket/src/schminc.h | 2 +- racket/src/racket/src/schpriv.h | 1 + racket/src/racket/src/schvers.h | 2 +- racket/src/rktio/rktio.def | 1 + racket/src/rktio/rktio.h | 20 +- racket/src/rktio/rktio.inc | 1 + racket/src/rktio/rktio.rktl | 5 + racket/src/rktio/rktio_fd.c | 118 +++++- racket/src/rktio/rktio_file.c | 361 ++++++++++++++++-- racket/src/rktio/rktio_ltps.c | 11 +- racket/src/rktio/rktio_private.h | 17 + 25 files changed, 652 insertions(+), 60 deletions(-) diff --git a/pkgs/base/info.rkt b/pkgs/base/info.rkt index 6c8a938bda..5c7c3d8fa7 100644 --- a/pkgs/base/info.rkt +++ b/pkgs/base/info.rkt @@ -12,7 +12,7 @@ (define collection 'multi) -(define version "7.4.0.4") +(define version "7.4.0.5") (define deps `("racket-lib" ["racket" #:version ,version])) diff --git a/pkgs/racket-doc/scribblings/foreign/port.scrbl b/pkgs/racket-doc/scribblings/foreign/port.scrbl index 2a78b5218e..320c3ea388 100644 --- a/pkgs/racket-doc/scribblings/foreign/port.scrbl +++ b/pkgs/racket-doc/scribblings/foreign/port.scrbl @@ -63,7 +63,17 @@ previously created with @racket[unsafe-file-descriptor->semaphore] or )]{ Returns a file descriptor (which is a @tt{HANDLE} value on Windows) of -a socket for @racket[port] if it has one, @racket[#f] otherwise.} +a socket for @racket[port] if it has one, @racket[#f] otherwise. + +On Unix and Mac OS, the result of +@racket[unsafe-port->file-descriptor] can be @racket[#f] if it +corresponds to a port that is waiting for its peer as reported by +@racket[port-waiting-peer?], such as the write end of a fifo where no +reader is connected. Wait until such is ready by using @racket[sync]). + +@history[#:changed "7.4.0.5" @elem{Accommodate a fifo write + end blocked on a reader by + returning @racket[#f].}]} @deftogether[( diff --git a/pkgs/racket-doc/scribblings/reference/file-ports.scrbl b/pkgs/racket-doc/scribblings/reference/file-ports.scrbl index 39cbe0a871..7b97e2db07 100644 --- a/pkgs/racket-doc/scribblings/reference/file-ports.scrbl +++ b/pkgs/racket-doc/scribblings/reference/file-ports.scrbl @@ -187,7 +187,10 @@ The file specified by @racket[path] need not be a regular file. It might be a device that is connected through the filesystem, such as @filepath{aux} on Windows or @filepath{/dev/null} on Unix. The output port is block-buffered by default, unless the file corresponds to a -terminal, in which case it is line-buffered by default. +terminal, in which case it is line-buffered by default. On Unix and +Mac OS, if the file is a fifo, then the port will block for writing +until a reader for the fifo is available; see also +@racket[port-waiting-peer?]. The port produced by @racket[open-output-file] should be explicitly closed, either though @racket[close-output-port] or indirectly via @@ -212,7 +215,10 @@ then @exnraise[exn:fail:filesystem:errno]. @history[#:changed "6.9.0.6" @elem{On Unix and Mac OS, make @racket['truncate/replace] replace on a permission error. On Windows, make @racket['replace] always replace instead truncating - like @racket['truncate/replace].}]} + like @racket['truncate/replace].} + #:changed "7.4.0.5" @elem{Changed handling of a fifo on Unix and Mac OS to + make the port block for output until the fifo has a + reader.}]} @defproc[(open-input-output-file [path path-string?] [#:mode mode-flag (or/c 'binary 'text) 'binary] diff --git a/pkgs/racket-doc/scribblings/reference/port-procs.scrbl b/pkgs/racket-doc/scribblings/reference/port-procs.scrbl index d0f16a8c21..650720f096 100644 --- a/pkgs/racket-doc/scribblings/reference/port-procs.scrbl +++ b/pkgs/racket-doc/scribblings/reference/port-procs.scrbl @@ -63,6 +63,24 @@ interactive terminal, @racket[#f] otherwise. to any value, instead of resticting the domain to ports}]} + +@defproc[(port-waiting-peer? [port port?]) boolean?]{ + +Returns @racket[#t] if @racket[port] is not ready for reading or +writing because it is waiting for a peer process to complete a stream +construction, @racket[#f] otherwise. + +On Unix and Mac OS, opening a fifo for output creates a peer-waiting +port if no reader for the same fifo is already opened. In that case, +the output port is not ready for writing until a reader is opened; +that is, write opertaions will block. Use @racket[sync] if necessary +to wait until writing will not block---that is, until the read end of +the fifo is opened. + +@history[#:added "7.4.0.5"]} + + + @defthing[eof eof-object?]{A value (distinct from all other values) that represents an end-of-file.} diff --git a/pkgs/racket-doc/scribblings/reference/subprocess.scrbl b/pkgs/racket-doc/scribblings/reference/subprocess.scrbl index 999b33e3ba..07676a225e 100644 --- a/pkgs/racket-doc/scribblings/reference/subprocess.scrbl +++ b/pkgs/racket-doc/scribblings/reference/subprocess.scrbl @@ -71,6 +71,10 @@ returned by @racket[subprocess]. The @racket[stderr] argument can be that is supplied as standard output is also used for standard error. For each port or @racket['stdout] that is provided, no pipe is created and the corresponding returned value is @racket[#f]. +If @racket[stdout] or @racket[stderr] is a port for which +@racket[port-waiting-peer?] returns true, then @racket[subprocess] +waits for the port to become ready for writing before proceeding with +the subprocess creation. If @racket[group] is @racket['new], then the new process is created as a new OS-level process group. In that case, @racket[subprocess-kill] @@ -132,7 +136,9 @@ A subprocess can be used as a @tech{synchronizable event} (see @secref["sync"]). A subprocess value is @tech{ready for synchronization} when @racket[subprocess-wait] would not block; @resultItself{subprocess value}. -@history[#:changed "6.11.0.1" @elem{Added the @racket[group] argument.}]} +@history[#:changed "6.11.0.1" @elem{Added the @racket[group] argument.} + #:changed "7.4.0.5" @elem{Added waiting for a fifo without a reader + as @racket[stdout] and/or @racket[stderr].}]} @defproc[(subprocess-wait [subproc subprocess?]) void?]{ diff --git a/pkgs/racket-test-core/tests/racket/file.rktl b/pkgs/racket-test-core/tests/racket/file.rktl index 61a66f7c63..b595c93728 100644 --- a/pkgs/racket-test-core/tests/racket/file.rktl +++ b/pkgs/racket-test-core/tests/racket/file.rktl @@ -1724,6 +1724,52 @@ (for ([f '("tmp1" "tmp2" "tmp3")] #:when (file-exists? f)) (delete-file f)) (current-directory original-dir) + +(unless (eq? 'windows (system-type)) + (define fifo (build-path work-dir "ff")) + (system* (find-executable-path "mkfifo") fifo) + + (define i1 (open-input-file fifo)) + (define o1 (open-output-file fifo #:exists 'update)) + (write-bytes #"abc" o1) + (flush-output o1) + (test #"abc" read-bytes 3 i1) + (close-input-port i1) + (close-output-port o1) + + (define (check-output-blocking do-write-abc) + ;; Make sure an output fifo blocks until there's a reader + (define t1 + (thread + (lambda () + (define o2 (open-output-file fifo #:exists 'update)) + (test #t port-waiting-peer? o2) + (do-write-abc o2) + (close-output-port o2)))) + (define t2 + (thread + (lambda () + (sync (system-idle-evt)) + (define i2 (open-input-file fifo)) + (test #"abc" read-bytes 3 i2) + (close-input-port i2)))) + (sync t1) + (sync t2)) + + (check-output-blocking (lambda (o2) (write-bytes #"abc" o2))) + (check-output-blocking (lambda (o2) + (parameterize ([current-output-port o2]) + (system* (find-executable-path "echo") + "-n" + "abc")))) + + (delete-file fifo)) + +(test #f port-waiting-peer? (current-input-port)) +(test #f port-waiting-peer? (current-output-port)) +(test #f port-waiting-peer? (open-input-bytes #"")) +(err/rt-test (port-waiting-peer? 10)) + (delete-directory work-dir) ;; Network - - - - - - - - - - - - - - - - - - - - - - diff --git a/racket/src/cs/primitive/kernel.ss b/racket/src/cs/primitive/kernel.ss index 9be97a9c05..eda6ee5712 100644 --- a/racket/src/cs/primitive/kernel.ss +++ b/racket/src/cs/primitive/kernel.ss @@ -622,6 +622,7 @@ [port-provides-progress-evts? (known-procedure 2)] [port-read-handler (known-procedure 6)] [port-try-file-lock? (known-procedure 4)] + [port-waiting-peer? (known-procedure 2)] [port-write-handler (known-procedure 6)] [port-writes-atomic? (known-procedure 2)] [port-writes-special? (known-procedure 2)] diff --git a/racket/src/io/host/rktio.rkt b/racket/src/io/host/rktio.rkt index 04144bcbc0..f94f02fc07 100644 --- a/racket/src/io/host/rktio.rkt +++ b/racket/src/io/host/rktio.rkt @@ -11,7 +11,7 @@ rktio-errstep racket-error? rktio-place-init!) -;; More `provide`s added by macros below +;; More `provide`s are added by macros below (define rktio-table (or (primitive-table '#%rktio) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 7d9d8e29f1..9a52c82d27 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -1,6 +1,7 @@ #lang racket/base (require racket/fixnum "../common/class.rkt" + "../common/check.rkt" "../host/rktio.rkt" "../host/error.rkt" "../host/thread.rkt" @@ -26,6 +27,7 @@ open-output-fd finish-fd-output-port terminal-port? + port-waiting-peer? fd-port-fd prop:fd-place-message-opener) @@ -348,6 +350,19 @@ (fd-output-port-fd cp)] [else #f])) +(define/who (port-waiting-peer? p) + (define cp (->core-output-port p #:default #f)) + (cond + [cp + (cond + [(fd-output-port? cp) + (define fd (fd-port-fd cp)) + (rktio_fd_is_pending_open rktio fd)] + [else #f])] + [(input-port? p) #f] + [else + (raise-argument-error who "port?" p)])) + ;; ---------------------------------------- ;; in atomic mode diff --git a/racket/src/io/port/main.rkt b/racket/src/io/port/main.rkt index a3f9ef36cd..ddb83e1078 100644 --- a/racket/src/io/port/main.rkt +++ b/racket/src/io/port/main.rkt @@ -16,7 +16,8 @@ "file-port.rkt" "file-stream.rkt" (only-in "fd-port.rkt" - terminal-port?) + terminal-port? + port-waiting-peer?) "file-identity.rkt" "file-lock.rkt" "bytes-port.rkt" @@ -112,6 +113,7 @@ file-stream-port? terminal-port? + port-waiting-peer? open-input-bytes open-output-bytes diff --git a/racket/src/io/subprocess/main.rkt b/racket/src/io/subprocess/main.rkt index 9b7bef6eb8..7e96713659 100644 --- a/racket/src/io/subprocess/main.rkt +++ b/racket/src/io/subprocess/main.rkt @@ -116,6 +116,14 @@ (define command-bstr (->host (->path command) who '(execute))) + ;; If `stdout` or `stderr` is a fifo with no read end open, wait for it: + (define (maybe-wait fd) + (when (and fd (rktio_fd_is_pending_open rktio (fd-port-fd fd))) + (sync fd))) + (maybe-wait stdout) + (unless (eq? stderr 'stdout) + (maybe-wait stderr)) + (start-atomic) (poll-subprocess-finalizations) (check-current-custodian who) diff --git a/racket/src/io/unsafe/port.rkt b/racket/src/io/unsafe/port.rkt index de566e4b0e..26252684e7 100644 --- a/racket/src/io/unsafe/port.rkt +++ b/racket/src/io/unsafe/port.rkt @@ -42,6 +42,7 @@ (define (unsafe-port->file-descriptor p) (define fd (fd-port-fd p)) (and fd + (not (rktio_fd_is_pending_open rktio fd)) (rktio_fd_system_fd rktio fd))) (define (unsafe-port->socket p) diff --git a/racket/src/racket/src/port.c b/racket/src/racket/src/port.c index 1cf1692619..0b471495db 100644 --- a/racket/src/racket/src/port.c +++ b/racket/src/racket/src/port.c @@ -3296,6 +3296,30 @@ scheme_file_stream_port_p (int argc, Scheme_Object *argv[]) return scheme_false; } +Scheme_Object *scheme_port_waiting_peer_p(int argc, Scheme_Object *argv[]) +{ + Scheme_Object *p = argv[0]; + + if (SCHEME_OUTPUT_PORTP(p)) { + Scheme_Output_Port *op; + + op = scheme_output_port_record(p); + + if (SAME_OBJ(op->sub_type, fd_output_port_type)) { + rktio_fd_t *rfd = ((Scheme_FD *)op->port_data)->fd; + if (rktio_fd_is_pending_open(scheme_rktio, rfd)) + return scheme_true; + } + } else if (SCHEME_INPUT_PORTP(p)) { + /* ok */ + } else { + scheme_wrong_contract("port-waiting-peer?", "port?", 0, argc, argv); + ESCAPED_BEFORE_HERE; + } + + return scheme_false; +} + int scheme_get_port_file_descriptor(Scheme_Object *p, intptr_t *_fd) { intptr_t fd = 0; @@ -3325,8 +3349,11 @@ int scheme_get_port_file_descriptor(Scheme_Object *p, intptr_t *_fd) fd = MSC_IZE (fileno)((FILE *)((Scheme_Output_File *)op->port_data)->f); fd_ok = 1; } else if (SAME_OBJ(op->sub_type, fd_output_port_type)) { - fd = rktio_fd_system_fd(scheme_rktio, ((Scheme_FD *)op->port_data)->fd); - fd_ok = 1; + rktio_fd_t *rfd = ((Scheme_FD *)op->port_data)->fd; + if (!rktio_fd_is_pending_open(scheme_rktio, rfd)) { + fd = rktio_fd_system_fd(scheme_rktio, rfd); + fd_ok = 1; + } } } } @@ -6234,6 +6261,17 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[]) if (!stdin_fd || !stdout_fd || !stderr_fd) scheme_custodian_check_available(NULL, name, "file-stream"); + /* In case `stdout_fd` or `stderr_fd` is a fifo with no read end + open, wait for it. */ + if (stdout_fd && rktio_fd_is_pending_open(scheme_rktio, stdout_fd)) { + a[0] = args[0]; + scheme_sync(1, a); + } + if (stderr_fd && rktio_fd_is_pending_open(scheme_rktio, stderr_fd)) { + a[0] = args[2]; + scheme_sync(1, a); + } + /*--------------------------------------*/ /* Create subprocess */ /*--------------------------------------*/ diff --git a/racket/src/racket/src/portfun.c b/racket/src/racket/src/portfun.c index 599f9fcca5..c585127325 100644 --- a/racket/src/racket/src/portfun.c +++ b/racket/src/racket/src/portfun.c @@ -8,8 +8,9 @@ static Scheme_Object *input_port_p (int, Scheme_Object *[]); static Scheme_Object *output_port_p (int, Scheme_Object *[]); static Scheme_Object *port_closed_p (int, Scheme_Object *[]); -static Scheme_Object *current_input_port (int, Scheme_Object *[]); static Scheme_Object *string_port_p(int, Scheme_Object *[]); +static Scheme_Object *waiting_peer_port_p(int, Scheme_Object *[]); +static Scheme_Object *current_input_port (int, Scheme_Object *[]); static Scheme_Object *current_output_port (int, Scheme_Object *[]); static Scheme_Object *current_error_port (int, Scheme_Object *[]); static Scheme_Object *make_input_port (int, Scheme_Object *[]); @@ -221,6 +222,7 @@ scheme_init_port_fun(Scheme_Startup_Env *env) ADD_FOLDING_PRIM("file-stream-port?", scheme_file_stream_port_p, 1, 1, 1, env); ADD_FOLDING_PRIM("string-port?", string_port_p, 1, 1, 1, env); ADD_FOLDING_PRIM("terminal-port?", scheme_terminal_port_p, 1, 1, 1, env); + ADD_FOLDING_PRIM("port-waiting-peer?", scheme_port_waiting_peer_p, 1, 1, 1, env); ADD_NONCM_PRIM("port-closed?", port_closed_p, 1, 1, env); ADD_NONCM_PRIM("open-input-file", open_input_file, 1, 3, env); diff --git a/racket/src/racket/src/schminc.h b/racket/src/racket/src/schminc.h index 32430b8508..e13c0274c4 100644 --- a/racket/src/racket/src/schminc.h +++ b/racket/src/racket/src/schminc.h @@ -14,7 +14,7 @@ #define USE_COMPILED_STARTUP 1 -#define EXPECTED_PRIM_COUNT 1454 +#define EXPECTED_PRIM_COUNT 1455 #ifdef MZSCHEME_SOMETHING_OMITTED # undef USE_COMPILED_STARTUP diff --git a/racket/src/racket/src/schpriv.h b/racket/src/racket/src/schpriv.h index dc5f20cb5a..8f370f10fa 100644 --- a/racket/src/racket/src/schpriv.h +++ b/racket/src/racket/src/schpriv.h @@ -3606,6 +3606,7 @@ void scheme_flush_orig_outputs(void); void scheme_flush_if_output_fds(Scheme_Object *o); Scheme_Object *scheme_file_stream_port_p(int, Scheme_Object *[]); Scheme_Object *scheme_terminal_port_p(int, Scheme_Object *[]); +Scheme_Object *scheme_port_waiting_peer_p(int, Scheme_Object *[]); Scheme_Object *scheme_do_open_input_file(char *name, int offset, int argc, Scheme_Object *argv[], int internal, int for_module); Scheme_Object *scheme_do_open_output_file(char *name, int offset, int argc, Scheme_Object *argv[], int and_read, diff --git a/racket/src/racket/src/schvers.h b/racket/src/racket/src/schvers.h index c99ff63c17..749907848c 100644 --- a/racket/src/racket/src/schvers.h +++ b/racket/src/racket/src/schvers.h @@ -16,7 +16,7 @@ #define MZSCHEME_VERSION_X 7 #define MZSCHEME_VERSION_Y 4 #define MZSCHEME_VERSION_Z 0 -#define MZSCHEME_VERSION_W 4 +#define MZSCHEME_VERSION_W 5 /* A level of indirection makes `#` work as needed: */ #define AS_a_STR_HELPER(x) #x diff --git a/racket/src/rktio/rktio.def b/racket/src/rktio/rktio.def index 5c0e965399..eeabe8f3fd 100644 --- a/racket/src/rktio/rktio.def +++ b/racket/src/rktio/rktio.def @@ -12,6 +12,7 @@ rktio_fd_is_socket rktio_fd_is_udp rktio_fd_is_terminal rktio_fd_is_text_converted +rktio_fd_is_pending_open rktio_fd_modes rktio_open rktio_close diff --git a/racket/src/rktio/rktio.h b/racket/src/rktio/rktio.h index 846f2e8c97..13b3cd8cd1 100644 --- a/racket/src/rktio/rktio.h +++ b/racket/src/rktio/rktio.h @@ -193,7 +193,8 @@ RKTIO_EXTERN rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int use `RKTIO_OPEN_INIT`. */ RKTIO_EXTERN_NOERR intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd); -/* Extracts a native file descriptor or socket. */ +/* Extracts a native file descriptor or socket. A file descriptor must + not be in pending-open mode as reported by `rktio_fd_is_pending_open`. */ RKTIO_EXTERN rktio_bool_t rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd); RKTIO_EXTERN rktio_bool_t rktio_fd_is_directory(rktio_t *rktio, rktio_fd_t *rfd); @@ -203,8 +204,14 @@ RKTIO_EXTERN rktio_bool_t rktio_fd_is_terminal(rktio_t *rktio, rktio_fd_t *rfd); /* The functions mostly report values of recorded mode flags. */ RKTIO_EXTERN rktio_bool_t rktio_fd_is_text_converted(rktio_t *rktio, rktio_fd_t *rfd); -/* Reports whether RKTIO_OPEN_TEXT was use and has an effect. The - RKTIO_OPEN_TEXT flag has an effect only on Windows. */ +/* Reports whether `RKTIO_OPEN_TEXT` was use and has an effect. The + `RKTIO_OPEN_TEXT` flag has an effect only on Windows. */ + +RKTIO_EXTERN rktio_bool_t rktio_fd_is_pending_open(rktio_t *rktio, rktio_fd_t *rfd); +/* Reports whether `rfd` will block on writing because it corresponds + to the write end of a fifo that has no open reader. In that case, + `rktio_fd_system_fd` cannot report a file descriptor and `rktio_ltps_add` + will error with `RKTIO_ERROR_UNSUPPORTED`. */ RKTIO_EXTERN_NOERR int rktio_fd_modes(rktio_t *rktio, rktio_fd_t *rfd); /* Returns all of the recorded mode flags, including those provided to @@ -216,7 +223,9 @@ RKTIO_EXTERN rktio_fd_t *rktio_open(rktio_t *rktio, rktio_const_string_t src, in in read mode, and can report `RKTIO_ERROR_IS_A_DIRECTORY`, `RKTIO_ERROR_EXISTS`, or `RKTIO_ERROR_ACCESS_DENIED` in place of a system error in write mode. On Windows, can report - `RKTIO_ERROR_UNSUPPORTED_TEXT_MODE`. */ + `RKTIO_ERROR_UNSUPPORTED_TEXT_MODE`. If `modes` has `RKTIO_OPEN_WRITE` + without `RKTIO_OPEN_READ`, then the result may be a file descriptor + in pending-open mode until the read end is opened. */ RKTIO_EXTERN rktio_ok_t rktio_close(rktio_t *rktio, rktio_fd_t *fd); /* Can report `RKTIO_ERROR_EXISTS` in place of system error, @@ -582,7 +591,8 @@ RKTIO_EXTERN rktio_process_result_t *rktio_process(rktio_t *rktio, rktio_const_string_t current_directory, rktio_envvars_t *envvars, int flags); -/* `flags` flags: */ +/* The output file descriptors `stdin_fd` must not be a pending-open + descriptor. The `flags` are: */ #define RKTIO_PROCESS_NEW_GROUP (1<<0) #define RKTIO_PROCESS_STDOUT_AS_STDERR (1<<1) #define RKTIO_PROCESS_WINDOWS_EXACT_CMDLINE (1<<2) diff --git a/racket/src/rktio/rktio.inc b/racket/src/rktio/rktio.inc index 12814a5918..6f864935fd 100644 --- a/racket/src/rktio/rktio.inc +++ b/racket/src/rktio/rktio.inc @@ -12,6 +12,7 @@ Sforeign_symbol("rktio_fd_is_socket", (void *)rktio_fd_is_socket); Sforeign_symbol("rktio_fd_is_udp", (void *)rktio_fd_is_udp); Sforeign_symbol("rktio_fd_is_terminal", (void *)rktio_fd_is_terminal); Sforeign_symbol("rktio_fd_is_text_converted", (void *)rktio_fd_is_text_converted); +Sforeign_symbol("rktio_fd_is_pending_open", (void *)rktio_fd_is_pending_open); Sforeign_symbol("rktio_fd_modes", (void *)rktio_fd_modes); Sforeign_symbol("rktio_open", (void *)rktio_open); Sforeign_symbol("rktio_close", (void *)rktio_close); diff --git a/racket/src/rktio/rktio.rktl b/racket/src/rktio/rktio.rktl index a27178576a..6f958a538f 100644 --- a/racket/src/rktio/rktio.rktl +++ b/racket/src/rktio/rktio.rktl @@ -255,6 +255,11 @@ rktio_bool_t rktio_fd_is_text_converted (((ref rktio_t) rktio) ((ref rktio_fd_t) rfd))) +(define-function + () + rktio_bool_t + rktio_fd_is_pending_open + (((ref rktio_t) rktio) ((ref rktio_fd_t) rfd))) (define-function () int diff --git a/racket/src/rktio/rktio_fd.c b/racket/src/rktio/rktio_fd.c index 87be7f5f95..18d4061008 100644 --- a/racket/src/rktio/rktio_fd.c +++ b/racket/src/rktio/rktio_fd.c @@ -29,6 +29,9 @@ struct rktio_fd_t { int bufcount; char buffer[1]; # endif +# ifdef RKTIO_USE_PENDING_OPEN + struct open_in_thread_t *pending; +# endif #endif #ifdef RKTIO_SYSTEM_WINDOWS @@ -201,9 +204,46 @@ rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes) return rfd; } +int rktio_fd_is_pending_open(rktio_t *rktio, rktio_fd_t *rfd) +{ +#ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) + rktio_pending_open_poll(rktio, rfd, rfd->pending); + return !!rfd->pending; +#else + return 0; +#endif +} + +#ifdef RKTIO_USE_PENDING_OPEN +rktio_fd_t *rktio_pending_system_fd(rktio_t *rktio, struct open_in_thread_t *oit, int modes) +{ + rktio_fd_t *rfd; + + rfd = calloc(1, sizeof(rktio_fd_t)); + rfd->modes = (modes - (modes & RKTIO_OPEN_INIT)); + rfd->pending = oit; + + return rfd; +} + +void rktio_update_system_fd(rktio_t *rktio, rktio_fd_t *rfd, int fd, int modes) +/* Convert an open-pending rfd to a normal rfd. The given `modes` adds + to (and completes) the modes provided to `rktio_pending_system_fd`. */ +{ + rfd->fd = fd; + rfd->modes |= modes; + rfd->pending = NULL; +} +#endif + + intptr_t rktio_internal_fd_system_fd(rktio_fd_t *rfd) { #ifdef RKTIO_SYSTEM_UNIX +# ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) return -1; +# endif return rfd->fd; #endif #ifdef RKTIO_SYSTEM_WINDOWS @@ -272,6 +312,14 @@ int rktio_fd_is_udp(rktio_t *rktio, rktio_fd_t *rfd) return ((rfd->modes & RKTIO_OPEN_UDP) ? 1 : 0); } +int rktio_fd_is_open_pending(rktio_t *rktio, rktio_fd_t *rfd) +{ +#ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) return 1; +#endif + return 0; +} + int rktio_fd_modes(rktio_t *rktio, rktio_fd_t *rfd) { return rfd->modes; @@ -298,6 +346,9 @@ int rktio_system_fd_is_terminal(rktio_t *rktio, intptr_t fd) int rktio_fd_is_terminal(rktio_t *rktio, rktio_fd_t *rfd) { +#ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) return 0; +#endif return rktio_system_fd_is_terminal(rktio, (intptr_t)rfd->fd); } @@ -315,6 +366,13 @@ rktio_fd_t *rktio_dup(rktio_t *rktio, rktio_fd_t *rfd) #ifdef RKTIO_SYSTEM_UNIX intptr_t nfd; +# ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) { + rktio_pending_open_retain(rktio, rfd->pending); + return rktio_pending_system_fd(rktio, rfd->pending, rfd->modes); + } +# endif + do { nfd = dup(rfd->fd); } while (nfd == -1 && errno == EINTR); @@ -375,16 +433,22 @@ void rktio_reliably_close(intptr_t s) #endif static rktio_ok_t do_close(rktio_t *rktio /* maybe NULL */, rktio_fd_t *rfd, int set_error) +/* The `rktio` argument can be NULL for a detached file descriptor */ { int ok; #ifdef RKTIO_SYSTEM_UNIX int cr; - cr = rktio_reliably_close_err(rfd->fd); +# ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) + cr = rktio_pending_open_release(rktio, rfd->pending); + else +# endif + cr = rktio_reliably_close_err(rfd->fd); # ifdef RKTIO_USE_FCNTL_AND_FORK_FOR_FILE_LOCKS - if (rktio && !(rfd->modes & RKTIO_OPEN_SOCKET)) + if (rktio && !(rfd->modes & RKTIO_OPEN_SOCKET) && (!cr || !set_error)) rktio_release_lockf(rktio, rfd->fd); # endif @@ -446,12 +510,24 @@ void rktio_forget(rktio_t *rktio, rktio_fd_t *rfd) rktio_fd_transfer_t *rktio_fd_detach(rktio_t *rktio, rktio_fd_t *rfd) { +#ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) + rktio_pending_open_detach(rktio, rfd->pending); +#endif + return (rktio_fd_transfer_t *)rfd; } rktio_fd_t *rktio_fd_attach(rktio_t *rktio, rktio_fd_transfer_t *rfdt) { - return (rktio_fd_t *)rfdt; + rktio_fd_t *rfd = (rktio_fd_t *)rfdt; + +#ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) + rktio_pending_open_attach(rktio, rfd->pending); +#endif + + return rfd; } void rktio_fd_close_transfer(rktio_fd_transfer_t *rfdt) @@ -592,6 +668,19 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush return RKTIO_POLL_READY; else { int sr; + +# ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) { + int errval = rktio_pending_open_poll(rktio, rfd, rfd->pending); + if (errval) { + errno = errval; + get_posix_error(); + return RKTIO_POLL_ERROR; + } else if (rfd->pending) + return RKTIO_POLL_NOT_READY; + } +# endif + # ifdef HAVE_POLL_SYSCALL struct pollfd pfd[1]; pfd[0].fd = rfd->fd; @@ -616,7 +705,7 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush /* Mac OS X 10.8 and 10.9: select() seems to claim that a pipe is always ready for output. To work around that problem, kqueue() support might be used for pipes, but that has different - problems. The poll() code above should be used, instead. */ + problems. The pol() code above should be used, instead. */ sr = select(rfd->fd + 1, NULL, RKTIO_FDS(writefds), RKTIO_FDS(exnfds), &time); } while ((sr == -1) && (errno == EINTR)); # endif @@ -693,6 +782,13 @@ void rktio_poll_add(rktio_t *rktio, rktio_fd_t *rfd, rktio_poll_set_t *fds, int #ifdef RKTIO_SYSTEM_UNIX rktio_poll_set_t *fds2; +# ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) { + rktio_poll_add_pending_open(rktio, rfd, rfd->pending, fds); + return; + } +# endif + if (modes & RKTIO_POLL_READ) { RKTIO_FD_SET(rfd->fd, fds); } @@ -1232,7 +1328,19 @@ intptr_t rktio_write(rktio_t *rktio, rktio_fd_t *rfd, const char *buffer, intptr if (rfd->modes & RKTIO_OPEN_SOCKET) return rktio_socket_write(rktio, rfd, buffer, len); - + +# ifdef RKTIO_USE_PENDING_OPEN + if (rfd->pending) { + int errval = rktio_pending_open_poll(rktio, rfd, rfd->pending); + if (errval) { + errno = errval; + get_posix_error(); + return RKTIO_WRITE_ERROR; + } else if (rfd->pending) + return 0; + } +# endif + flags = fcntl(rfd->fd, F_GETFL, 0); if (!(flags & RKTIO_NONBLOCKING)) fcntl(rfd->fd, F_SETFL, flags | RKTIO_NONBLOCKING); diff --git a/racket/src/rktio/rktio_file.c b/racket/src/rktio/rktio_file.c index b7bd01dff4..0e80b2c547 100644 --- a/racket/src/rktio/rktio_file.c +++ b/racket/src/rktio/rktio_file.c @@ -11,6 +11,18 @@ #ifdef RKTIO_SYSTEM_WINDOWS # include #endif +#ifdef RKTIO_USE_PENDING_OPEN +#include +#endif + +#ifdef RKTIO_SYSTEM_UNIX +static rktio_fd_t *finish_unix_fd_creation(rktio_t *rktio, int fd, int modes, rktio_fd_t *existing_rfd); +#endif + +#ifdef RKTIO_USE_PENDING_OPEN +static rktio_fd_t *open_via_thread(rktio_t *rktio, const char *filename, int modes, int flags); +static int do_pending_open_release(rktio_t *rktio, struct open_in_thread_t *data, int close_fd); +#endif /*========================================================================*/ /* Opening a file */ @@ -106,8 +118,6 @@ static rktio_fd_t *open_write(rktio_t *rktio, const char *filename, int modes) #ifdef RKTIO_SYSTEM_UNIX int fd; int flags; - struct stat buf; - int cr; flags = (((modes & RKTIO_OPEN_READ) ? O_RDWR : O_WRONLY) | ((modes & RKTIO_OPEN_MUST_EXIST ? 0 : O_CREAT))); @@ -124,48 +134,20 @@ static rktio_fd_t *open_write(rktio_t *rktio, const char *filename, int modes) } while ((fd == -1) && (errno == EINTR)); if (errno == ENXIO) { - /* FIFO with no reader? Try opening in RW mode: */ + /* FIFO with no reader? */ +#ifdef RKTIO_USE_PENDING_OPEN + return open_via_thread(rktio, filename, modes, flags | RKTIO_BINARY); +#else + /* Try opening in RW mode: */ flags -= O_WRONLY; flags |= O_RDWR; do { fd = open(filename, flags | RKTIO_NONBLOCKING | RKTIO_BINARY, 0666); } while ((fd == -1) && (errno == EINTR)); +#endif } - if (fd == -1) { - if (errno == EISDIR) { - set_racket_error(RKTIO_ERROR_IS_A_DIRECTORY); - return NULL; - } else if (errno == EEXIST) { - set_racket_error(RKTIO_ERROR_EXISTS); - return NULL; - } else if (errno == EACCES) { - set_racket_error(RKTIO_ERROR_ACCESS_DENIED); - return NULL; - } - - if (fd == -1) { - get_posix_error(); - return NULL; - } - } - - do { - cr = fstat(fd, &buf); - } while ((cr == -1) && (errno == EINTR)); - - if (cr) { - get_posix_error(); - do { - cr = close(fd); - } while ((cr == -1) && (errno == EINTR)); - return NULL; - } - - return rktio_system_fd(rktio, fd, (modes - | (S_ISREG(buf.st_mode) - ? RKTIO_OPEN_REGFILE - : RKTIO_OPEN_NOT_REGFILE))); + return finish_unix_fd_creation(rktio, fd, modes, NULL); #endif #ifdef RKTIO_SYSTEM_WINDOWS HANDLE fd; @@ -236,6 +218,54 @@ static rktio_fd_t *open_write(rktio_t *rktio, const char *filename, int modes) #endif } +#ifdef RKTIO_SYSTEM_UNIX +static rktio_fd_t *finish_unix_fd_creation(rktio_t *rktio, int fd, int modes, rktio_fd_t *existing_rfd) +{ + struct stat buf; + int cr; + + if (fd == -1) { + if (errno == EISDIR) { + set_racket_error(RKTIO_ERROR_IS_A_DIRECTORY); + return NULL; + } else if (errno == EEXIST) { + set_racket_error(RKTIO_ERROR_EXISTS); + return NULL; + } else if (errno == EACCES) { + set_racket_error(RKTIO_ERROR_ACCESS_DENIED); + return NULL; + } + + if (fd == -1) { + get_posix_error(); + return NULL; + } + } + + do { + cr = fstat(fd, &buf); + } while ((cr == -1) && (errno == EINTR)); + + if (cr) { + get_posix_error(); + do { + cr = close(fd); + } while ((cr == -1) && (errno == EINTR)); + return NULL; + } + + modes |= (S_ISREG(buf.st_mode) + ? RKTIO_OPEN_REGFILE + : RKTIO_OPEN_NOT_REGFILE); + + if (existing_rfd) { + rktio_update_system_fd(rktio, existing_rfd, fd, modes); + return existing_rfd; + } else + return rktio_system_fd(rktio, fd, modes); +} +#endif + rktio_fd_t *rktio_open(rktio_t *rktio, const char *filename, int modes) { if (modes & RKTIO_OPEN_WRITE) @@ -373,3 +403,260 @@ rktio_ok_t rktio_set_file_size(rktio_t *rktio, rktio_fd_t *rfd, rktio_filesize_t } #endif } + +/*========================================================================*/ +/* Thread for blocking open */ +/*========================================================================*/ + +/* When opening a fifo for writing, then there's no way to open in + non-blocking mode and then poll for whether reader is ready. + Instead, we have to open in a separate thread. When the open + succeeds, post to a waiting rktio_t's signal handle. + + To allow a blocked-on-opening file descriptor to be transferred + across rktio_t domains, the record that represents the extra thread + must be sharable among multiple OS threads, and we must in general + keep an array of handles. + + If the blocked-omn-opening file descriptor is closed, then we have + to be able to cancel the thread. This isn't so bad, since we want + to support canceling only while the `open` system call is + blocked. */ + +#ifdef RKTIO_USE_PENDING_OPEN + +/* An instance of `open_in_thread_t` can be shared by multiple threads + (i.e., multiple `rktio_t` instances) */ +typedef struct open_in_thread_t { + pthread_mutex_t lock; + int ready; + pthread_cond_t ready_cond; /* wait until helper thread is ready (including cleanup) */ + char *filename; + int flags; + int done; + int fd; + int errval; + int refcount; + pthread_t th; + int num_handles; + rktio_signal_handle_t **handles; +} open_in_thread_t; + + +static void free_open_in_thread(open_in_thread_t *data) +{ + pthread_detach(data->th); + if (data->handles) + free(data->handles); + free(data->filename); + free(data); +} + +static void cleanup_open_thread(void *_data) +{ + int i, refcount; + open_in_thread_t *data = (open_in_thread_t *)_data; + + pthread_mutex_lock(&data->lock); + for (i = 0; i < data->num_handles; i++) + if (data->handles[i]) + rktio_signal_received_at(data->handles[i]); + refcount = data->refcount; + data->done = 1; + pthread_mutex_unlock(&data->lock); + + if (!refcount) { + if (data->fd != -1) + rktio_reliably_close(data->fd); + free_open_in_thread(data); + } +} + +static void *do_open_in_thread(void *_data) +{ + open_in_thread_t *data = (open_in_thread_t *)_data; + int fd; + int old_type; + + /* To be on the safe side, disable cancelation except + just around the call to `open` */ + pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, &old_type); + pthread_cleanup_push(cleanup_open_thread, data); + + pthread_mutex_lock(&data->lock); + data->ready = 1; + pthread_cond_signal(&data->ready_cond); + pthread_mutex_unlock(&data->lock); + + data->fd = -1; + + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + + /* Cancelation only possible during `open`, a which point it's ok + and sufficient to run `cleanup_open_thread`; we're assuming that + either `open` returns with a file descriptor or the thread is + canceled before `open` returns, but not both (otherwise there + could be a space leak) */ + do { + fd = open(data->filename, data->flags, 0666); + } while ((fd == -1) && (errno == EINTR)); + + pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, NULL); + + data->fd = fd; + if (fd == -1) + data->errval = errno; + + /* Runs `cleanup_open_thread` while popping it: */ + pthread_cleanup_pop(1); + + /* In general, a function tha changes the cancelation type should + restore it before returning */ + pthread_setcanceltype(old_type, NULL); + + return NULL; +} + +static rktio_fd_t *open_via_thread(rktio_t *rktio, const char *filename, int modes, int flags) +{ + open_in_thread_t *data; + + data = calloc(1, sizeof(open_in_thread_t)); + + data->refcount = 1; + + data->filename = strdup(filename); + data->flags = flags; + pthread_mutex_init(&data->lock, NULL); + pthread_cond_init(&data->ready_cond, NULL); + + data->num_handles = 1; + data->handles = malloc(sizeof(rktio_signal_handle_t*)); + data->handles[0] = rktio_get_signal_handle(rktio); + + (void)pthread_create(&data->th, NULL, do_open_in_thread, data); + + pthread_mutex_lock(&data->lock); + if (!data->ready) + pthread_cond_wait(&data->ready_cond, &data->lock); + pthread_mutex_unlock(&data->lock); + + return rktio_pending_system_fd(rktio, data, modes); +} + +int rktio_pending_open_poll(rktio_t *rktio, rktio_fd_t *existing_rfd, struct open_in_thread_t *data) +/* non-zero result is an errno value */ +{ + int done; + + pthread_mutex_lock(&data->lock); + done = data->done; + pthread_mutex_unlock(&data->lock); + + if (done) { + if (data->fd == -1) + return data->errval; + else { + int fd = data->fd; + (void)do_pending_open_release(rktio, data, 0); + if (!finish_unix_fd_creation(rktio, fd, 0, existing_rfd)) { + /* Posix error must be saved in `rktio` */ + return rktio->errid; + } + return 0; + } + } else + return 0; +} + +void rktio_poll_add_pending_open(rktio_t *rktio, rktio_fd_t *rfd, struct open_in_thread_t *data, rktio_poll_set_t *fds) +{ + int done; + + pthread_mutex_lock(&data->lock); + done = data->done; + pthread_mutex_unlock(&data->lock); + + if (done) + rktio_poll_set_add_nosleep(rktio, fds); +} + +void rktio_pending_open_attach(rktio_t *rktio, struct open_in_thread_t *data) +{ + int i; + rktio_signal_handle_t *h = rktio_get_signal_handle(rktio); + + pthread_mutex_lock(&data->lock); + if (!data->done) { + for (i = 0; i < data->num_handles; i++) + if (!data->handles[i]) { + data->handles[i] = h; + break; + } + if (i >= data->num_handles) { + rktio_signal_handle_t **old = data->handles; + int n = (2 * data->num_handles); + data->handles = calloc(n, sizeof(rktio_signal_handle_t*)); + memcpy(data->handles, old, data->num_handles * sizeof(rktio_signal_handle_t*)); + data->handles[data->num_handles] = h; + data->num_handles = n; + } + } + pthread_mutex_unlock(&data->lock); +} + +static void do_detach(rktio_t *rktio, struct open_in_thread_t *data) +{ + int i; + rktio_signal_handle_t *h = rktio_get_signal_handle(rktio); + + for (i = 0; i < data->num_handles; i++) + if (data->handles[i] == h) + data->handles[i] = NULL; +} + +void rktio_pending_open_detach(rktio_t *rktio, struct open_in_thread_t *data) +{ + pthread_mutex_lock(&data->lock); + do_detach(rktio, data); + pthread_mutex_unlock(&data->lock); +} + + +void rktio_pending_open_retain(rktio_t *rktio, struct open_in_thread_t *data) +{ + pthread_mutex_lock(&data->lock); + data->refcount++; + pthread_mutex_unlock(&data->lock); +} + +int do_pending_open_release(rktio_t *rktio, struct open_in_thread_t *data, int close_fd) +/* The rktio argument can be NULL for a detached use */ +{ + int bye; + + pthread_mutex_lock(&data->lock); + --data->refcount; + bye = (data->done && !data->refcount); + if (!bye) { + do_detach(rktio, data); + if (!data->refcount) + pthread_cancel(data->th); + } + pthread_mutex_unlock(&data->lock); + + if (bye) { + if (close_fd && (data->fd != -1)) + rktio_reliably_close(data->fd); + free_open_in_thread(data); + } + + return 0; +} + +int rktio_pending_open_release(rktio_t *rktio, struct open_in_thread_t *data) +{ + return do_pending_open_release(rktio, data, 1); +} + +#endif diff --git a/racket/src/rktio/rktio_ltps.c b/racket/src/rktio/rktio_ltps.c index 7f123331b7..6ae2d4b50c 100644 --- a/racket/src/rktio/rktio_ltps.c +++ b/racket/src/rktio/rktio_ltps.c @@ -208,7 +208,16 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t # endif rktio_ltps_handle_pair_t *v; rktio_ltps_handle_t *s; - intptr_t fd = rktio_fd_system_fd(rktio, rfd); + intptr_t fd; + +#ifdef RKTIO_USE_PENDING_OPEN + if (rktio_fd_is_pending_open(rktio, rfd)) { + set_racket_error(RKTIO_ERROR_UNSUPPORTED); + return NULL; + } +#endif + + fd = rktio_fd_system_fd(rktio, rfd); # ifdef HAVE_KQUEUE_SYSCALL if (!rktio_fd_is_socket(rktio, rfd)) { diff --git a/racket/src/rktio/rktio_private.h b/racket/src/rktio/rktio_private.h index b5f1360b1e..25d3ffd7f8 100644 --- a/racket/src/rktio/rktio_private.h +++ b/racket/src/rktio/rktio_private.h @@ -339,6 +339,23 @@ void rktio_close_fds_after_fork(int len, int skip1, int skip2, int skip3); int rktio_system_fd_is_terminal(rktio_t *rktio, intptr_t fd); +#ifdef RKTIO_USE_PTHREADS +# define RKTIO_USE_PENDING_OPEN +#endif + +#ifdef RKTIO_USE_PENDING_OPEN +struct open_in_thread_t; +rktio_fd_t *rktio_pending_system_fd(rktio_t *rktio, struct open_in_thread_t *oit, int modes); +void rktio_update_system_fd(rktio_t *rktio, rktio_fd_t *rfd, int fd, int modes); +int rktio_fd_is_pending_open(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_pending_open_poll(rktio_t *rktio, rktio_fd_t *rfd, struct open_in_thread_t *oit); +void rktio_poll_add_pending_open(rktio_t *rktio, rktio_fd_t *rfd, struct open_in_thread_t *oit, rktio_poll_set_t *fds); +void rktio_pending_open_detach(rktio_t *rktio, struct open_in_thread_t *oit); +void rktio_pending_open_attach(rktio_t *rktio, struct open_in_thread_t *oit); +void rktio_pending_open_retain(rktio_t *rktio, struct open_in_thread_t *oit); +int rktio_pending_open_release(rktio_t *rktio, struct open_in_thread_t *oit); +#endif + void *rktio_envvars_to_block(rktio_t *rktio, rktio_envvars_t *envvars); void rktio_stop_fs_change(rktio_t *rktio);