diff --git a/pkgs/racket-doc/scribblings/reference/port-buffers.scrbl b/pkgs/racket-doc/scribblings/reference/port-buffers.scrbl index f94661ee6d..455c96416e 100644 --- a/pkgs/racket-doc/scribblings/reference/port-buffers.scrbl +++ b/pkgs/racket-doc/scribblings/reference/port-buffers.scrbl @@ -54,7 +54,15 @@ the default port read handler (see @racket[port-read-handler]). output port to be physically written. Only @tech{file-stream ports}, TCP ports, and custom ports (see @secref["customport"]) use buffers; when called on a port without a buffer, @racket[flush-output] -has no effect.} +has no effect. + +If flushing a @tech{file-stream port} or @tech{TCP port} encounters an +error when writing, then all buffered bytes in the port are discarded. +Consequently, a further attempt to flush or close the port will not +fail. + +@history[#:changed "7.4.0.10" @elem{Consistently, discard buffered bytes on error, + including in a TCP output port.}]} @defproc*[([(file-stream-buffer-mode [port port?]) (or/c 'none 'line 'block #f)] [(file-stream-buffer-mode [port port?] [mode (or/c 'none 'line 'block)]) void?])]{ diff --git a/pkgs/racket-test-core/tests/racket/file.rktl b/pkgs/racket-test-core/tests/racket/file.rktl index f8b487580a..e59fb69950 100644 --- a/pkgs/racket-test-core/tests/racket/file.rktl +++ b/pkgs/racket-test-core/tests/racket/file.rktl @@ -1,7 +1,8 @@ (load-relative "loadtest.rktl") (require ffi/file - ffi/unsafe) + ffi/unsafe + compiler/find-exe) (Section 'file) @@ -1912,6 +1913,116 @@ (sc-run #f priv-mod '(read)) (sc-run #f priv-mod '(read write delete)))) +;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Check that when flushing a TCP output port fails, it +;; clears the buffer + +(let-values ([(subproc stdout stdin stderr) (subprocess #f #f #f (find-exe) "-n")]) + + ;; A flush will eventually fail: + (with-handlers ([exn:fail:filesystem? void]) + (let loop () + (write-bytes #"foo" stdin) + (flush-output stdin) + (loop))) + + ;; Next flush should not fail, because the buffer content + ;; should have been discarded by a failed flush: + (test (void) flush-output stdin) + + ;; Closing should not fail: + (test (void) close-output-port stdin) + + (close-input-port stderr) + (close-input-port stdout) + + (subprocess-wait subproc)) + +;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Check that when flushing an OS-level pipe fails, it +;; clears the buffer + +(let-values ([(subproc stdout stdin stderr) (subprocess #f #f #f (find-exe) "-n")]) + + ;; A flush will eventually fail: + (with-handlers ([exn:fail:filesystem? void]) + (let loop () + (write-bytes #"foo" stdin) + (flush-output stdin) + (loop))) + + ;; Next flush should not fail, because the buffer content + ;; should have been discarded by a failed flush: + (test (void) flush-output stdin) + + ;; Closing should not fail: + (test (void) close-output-port stdin) + + (close-input-port stderr) + (close-input-port stdout) + + (subprocess-wait subproc)) + +;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Check that an asynchronous break that interrupts a flush +;; doesn't lose buffered bytes + +(let-values ([(subproc stdout stdin stderr) (subprocess #f #f #f (find-exe) "-e" + (format "~s" + '(begin + (define noise (make-bytes 256 (char->integer #\x))) + ;; Fill up the OS-level output pipe: + (let loop () + (unless (zero? (write-bytes-avail* noise (current-output-port))) + (loop))) + ;; Wait until the other end has read: + (write-bytes-avail #"noise" (current-output-port)) + (close-output-port (current-output-port)) + ;; Drain the OS-level input pipe, suceeding if we + ;; find a "!". + (let loop () + (define b (read-byte (current-input-port))) + (when (eqv? b (char->integer #\!)) + (exit 0)) + (when (eof-object? b) + (exit 1)) + (loop)))))]) + + ;; Fill up the OS-level output pipe: + (let loop () + (unless (zero? (write-bytes-avail* #"?????" stdin)) + (loop))) + + ;; At this point, the other end is still waiting for us to read. + ;; Add something to the Racket-level buffer that we want to make sure + ;; doesn't get lost + (write-bytes #"!" stdin) + + ;; Thread will get stuck trying to flush: + (define t (thread (lambda () + (with-handlers ([exn:break? void]) + (flush-output stdin))))) + + (sync (system-idle-evt)) + (break-thread t) + (thread-wait t) + + ;; Drain output from subprocess, so it can be unblocked: + (let loop () + (unless (eof-object? (read-bytes-avail! (make-bytes 10) stdout)) + (loop))) + + ;; Subprocess should be reading at this point + (flush-output stdin) + + (close-output-port stdin) + (close-input-port stderr) + (close-input-port stdout) + + (subprocess-wait subproc) + + (test 0 subprocess-status subproc)) + ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Check `in-directory' diff --git a/racket/src/io/network/tcp-port.rkt b/racket/src/io/network/tcp-port.rkt index 5e8179fc58..1b4f1183b4 100644 --- a/racket/src/io/network/tcp-port.rkt +++ b/racket/src/io/network/tcp-port.rkt @@ -63,7 +63,8 @@ [name name] [fd fd] [fd-refcount fd-refcount] - [buffer-mode 'block]))) + [buffer-mode 'block]) + #:plumber #f)) ;; ---------------------------------------- diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 9a52c82d27..c9d47d6841 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -166,6 +166,14 @@ (define n (rktio_write_in rktio fd bstr start-pos end-pos)) (cond [(rktio-error? n) + ;; Discard buffer content before reporting the error. This + ;; isn't the obviously right choice, but otherwise a future + ;; flush attempt (including one triggered by trying to + ;; close the port or one triggered by a plumber) will + ;; likely just fail again, which is probably worse than + ;; dropping bytes. + (set! start-pos 0) + (set! end-pos 0) (end-atomic) (send fd-output-port this raise-write-error n)] [(fx= n 0) @@ -218,7 +226,8 @@ (cond [(fx= src-start src-end) ;; Flush request - (and (flush-buffer) 0)] + (or (and (flush-buffer) 0) + (wrap-evt evt (lambda (v) #f)))] [(and (not (eq? buffer-mode 'none)) (not nonbuffer/nonblock?) (fx< end-pos (bytes-length bstr))) @@ -231,7 +240,7 @@ (fast-mode! amt) amt] [(not (flush-buffer)) ; <- can temporarily leave atomic mode - #f] + (wrap-evt evt (lambda (v) #f))] [else (define n (rktio_write_in rktio fd src-bstr src-start src-end)) (cond @@ -251,7 +260,8 @@ (flush-buffer-fully #f) ; can temporarily leave atomic mode (when bstr ; <- in case a concurrent close succeeded (send fd-output-port this on-close) - (plumber-flush-handle-remove! flush-handle) + (when flush-handle + (plumber-flush-handle-remove! flush-handle)) (set! bstr #f) (fd-close fd fd-refcount) (unsafe-custodian-unregister this custodian-reference)))] @@ -323,10 +333,11 @@ (define fd (fd-output-port-fd p)) (define fd-refcount (fd-output-port-fd-refcount p)) (define evt (fd-evt fd RKTIO_POLL_WRITE p)) - (define flush-handle (plumber-add-flush! plumber - (lambda (h) - (atomically - (send fd-output-port p flush-buffer/external))))) + (define flush-handle (and plumber + (plumber-add-flush! plumber + (lambda (h) + (atomically + (send fd-output-port p flush-buffer/external)))))) (define custodian-reference (register-fd-close cust fd fd-refcount flush-handle p)) (set-core-output-port-evt! p evt) (set-fd-output-port-flush-handle! p flush-handle) diff --git a/racket/src/racket/src/network.c b/racket/src/racket/src/network.c index 33768989ee..a2405e9781 100644 --- a/racket/src/racket/src/network.c +++ b/racket/src/racket/src/network.c @@ -468,7 +468,8 @@ tcp_in_buffer_mode(Scheme_Port *p, int mode) static intptr_t tcp_do_write_string(Scheme_Output_Port *port, const char *s, intptr_t offset, intptr_t len, - int rarely_block, int enable_break) + int rarely_block, int enable_break, + int drop_buffer_on_error) { /* We've already checked for buffering before we got here. */ /* If rarely_block is 1, it means only write as much as @@ -492,7 +493,9 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port, if (rarely_block) return sent; else - sent += tcp_do_write_string(port, s, offset + sent, len - sent, 0, enable_break); + sent += tcp_do_write_string(port, s, offset + sent, len - sent, + 0, enable_break, + drop_buffer_on_error); } } @@ -524,10 +527,19 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port, goto top; } - if (sent == RKTIO_WRITE_ERROR) + if (sent == RKTIO_WRITE_ERROR) { + if (drop_buffer_on_error) { + /* See "Drop unsuccessfully flushed bytes" in "port.c" for a + rationale (although TCP ports are not automatically + registered with a plumber). */ + data->b.out_bufpos = 0; + data->b.out_bufmax = 0; + } + scheme_raise_exn(MZEXN_FAIL_NETWORK, "tcp-write: error writing\n" " system error: %R"); + } return sent; } @@ -548,7 +560,8 @@ static int tcp_flush(Scheme_Output_Port *port, } amt = tcp_do_write_string(port, data->b.out_buffer, data->b.out_bufpos, data->b.out_bufmax - data->b.out_bufpos, - rarely_block, enable_break); + rarely_block, enable_break, + 1); flushed += amt; data->b.out_bufpos += amt; if (rarely_block && (data->b.out_bufpos < data->b.out_bufmax)) @@ -599,7 +612,7 @@ static intptr_t tcp_write_string(Scheme_Output_Port *port, } /* When we get here, the buffer is empty */ - return tcp_do_write_string(port, s, offset, len, rarely_block, enable_break); + return tcp_do_write_string(port, s, offset, len, rarely_block, enable_break, 0); } static void tcp_close_output(Scheme_Output_Port *port) diff --git a/racket/src/racket/src/port.c b/racket/src/racket/src/port.c index 0b471495db..fd193b4c48 100644 --- a/racket/src/racket/src/port.c +++ b/racket/src/racket/src/port.c @@ -5224,6 +5224,16 @@ static void release_flushing_lock(void *_fop) fop->flushing = 0; } +static void consume_buffer_bytes(Scheme_FD *fop, intptr_t wrote) +{ + if (fop->bufcount == wrote) + fop->bufcount = 0; + else { + memmove(fop->buffer + wrote, fop->buffer, fop->bufcount - wrote); + fop->bufcount -= wrote; + } +} + static intptr_t flush_fd(Scheme_Output_Port *op, const char * volatile bufstr, volatile uintptr_t buflen, volatile uintptr_t offset, int immediate_only, int enable_break) @@ -5232,6 +5242,7 @@ static intptr_t flush_fd(Scheme_Output_Port *op, { Scheme_FD * volatile fop = (Scheme_FD *)op->port_data; volatile intptr_t wrote = 0; + volatile int consume_buffer; if (fop->flushing) { if (scheme_force_port_closed) { @@ -5251,15 +5262,12 @@ static intptr_t flush_fd(Scheme_Output_Port *op, if (!bufstr) { bufstr = (char *)fop->buffer; buflen = fop->bufcount; - } + consume_buffer = 1; + } else + consume_buffer = 0; if (buflen) { fop->flushing = 1; - fop->bufcount = 0; - /* If write is interrupted, we drop chars on the floor. - Not ideal, but we'll go with it for now. - Note that write_string_avail supports break-reliable - output through `immediate_only'. */ while (1) { intptr_t len; @@ -5272,6 +5280,8 @@ static intptr_t flush_fd(Scheme_Output_Port *op, if (immediate_only == 2) { fop->flushing = 0; + if (consume_buffer) + consume_buffer_bytes(fop, wrote); return wrote; } @@ -5287,6 +5297,14 @@ static intptr_t flush_fd(Scheme_Output_Port *op, enable_break); END_ESCAPEABLE(); } else if (len == RKTIO_WRITE_ERROR) { + if (consume_buffer) { + /* Drop unsuccessfully flushed bytes. This isn't the + obviously right choice, but otherwise a future flush + attempt (including one triggered by trying to close the + port or one triggered by a plumber) will likely just fail + again, which is probably worse than dropping bytes. */ + consume_buffer_bytes(fop, buflen); + } if (scheme_force_port_closed) { /* Don't signal exn or wait. Just give up. */ return wrote; @@ -5298,6 +5316,8 @@ static intptr_t flush_fd(Scheme_Output_Port *op, return 0; /* doesn't get here */ } } else if ((len + offset == buflen) || immediate_only) { + if (consume_buffer) + consume_buffer_bytes(fop, buflen); fop->flushing = 0; return wrote + len; } else {