consistently discard buffered bytes on failed flush

In a file-stream output port or TCP output port, when flushing
encounters an error, consistently discard bytes in the buffer. This
isn't the obviously right choice, but otherwise a future flush attempt
(including one triggered by trying to close the port or one triggered
by a plumber) will likely just fail again, which is probably worse
than dropping bytes.

Also, fix related problems/inconsistencies.

Overall changes:

 * For traditional Racket, discard bytes in a TCP port when flushing
   fails.

 * For Racket CS, discard bytes in file-stream and TCP output ports
   when flushing fails.

 * For traditional Racket, when a file-stream port flush is
   interrupted by an asynchronous break, *don't* discard buffered
   bytes.

 * For Racket CS, don't register TCP ports with the current plumber.
This commit is contained in:
Matthew Flatt 2019-10-03 10:20:18 -06:00
parent db322a49ee
commit 20e0252664
6 changed files with 185 additions and 21 deletions

View File

@ -54,7 +54,15 @@ the default port read handler (see @racket[port-read-handler]).
output port to be physically written. Only @tech{file-stream ports},
TCP ports, and custom ports (see @secref["customport"]) use
buffers; when called on a port without a buffer, @racket[flush-output]
has no effect.}
has no effect.
If flushing a @tech{file-stream port} or @tech{TCP port} encounters an
error when writing, then all buffered bytes in the port are discarded.
Consequently, a further attempt to flush or close the port will not
fail.
@history[#:changed "7.4.0.10" @elem{Consistently, discard buffered bytes on error,
including in a TCP output port.}]}
@defproc*[([(file-stream-buffer-mode [port port?]) (or/c 'none 'line 'block #f)]
[(file-stream-buffer-mode [port port?] [mode (or/c 'none 'line 'block)]) void?])]{

View File

@ -1,7 +1,8 @@
(load-relative "loadtest.rktl")
(require ffi/file
ffi/unsafe)
ffi/unsafe
compiler/find-exe)
(Section 'file)
@ -1912,6 +1913,116 @@
(sc-run #f priv-mod '(read))
(sc-run #f priv-mod '(read write delete))))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Check that when flushing a TCP output port fails, it
;; clears the buffer
(let-values ([(subproc stdout stdin stderr) (subprocess #f #f #f (find-exe) "-n")])
;; A flush will eventually fail:
(with-handlers ([exn:fail:filesystem? void])
(let loop ()
(write-bytes #"foo" stdin)
(flush-output stdin)
(loop)))
;; Next flush should not fail, because the buffer content
;; should have been discarded by a failed flush:
(test (void) flush-output stdin)
;; Closing should not fail:
(test (void) close-output-port stdin)
(close-input-port stderr)
(close-input-port stdout)
(subprocess-wait subproc))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Check that when flushing an OS-level pipe fails, it
;; clears the buffer
(let-values ([(subproc stdout stdin stderr) (subprocess #f #f #f (find-exe) "-n")])
;; A flush will eventually fail:
(with-handlers ([exn:fail:filesystem? void])
(let loop ()
(write-bytes #"foo" stdin)
(flush-output stdin)
(loop)))
;; Next flush should not fail, because the buffer content
;; should have been discarded by a failed flush:
(test (void) flush-output stdin)
;; Closing should not fail:
(test (void) close-output-port stdin)
(close-input-port stderr)
(close-input-port stdout)
(subprocess-wait subproc))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Check that an asynchronous break that interrupts a flush
;; doesn't lose buffered bytes
(let-values ([(subproc stdout stdin stderr) (subprocess #f #f #f (find-exe) "-e"
(format "~s"
'(begin
(define noise (make-bytes 256 (char->integer #\x)))
;; Fill up the OS-level output pipe:
(let loop ()
(unless (zero? (write-bytes-avail* noise (current-output-port)))
(loop)))
;; Wait until the other end has read:
(write-bytes-avail #"noise" (current-output-port))
(close-output-port (current-output-port))
;; Drain the OS-level input pipe, suceeding if we
;; find a "!".
(let loop ()
(define b (read-byte (current-input-port)))
(when (eqv? b (char->integer #\!))
(exit 0))
(when (eof-object? b)
(exit 1))
(loop)))))])
;; Fill up the OS-level output pipe:
(let loop ()
(unless (zero? (write-bytes-avail* #"?????" stdin))
(loop)))
;; At this point, the other end is still waiting for us to read.
;; Add something to the Racket-level buffer that we want to make sure
;; doesn't get lost
(write-bytes #"!" stdin)
;; Thread will get stuck trying to flush:
(define t (thread (lambda ()
(with-handlers ([exn:break? void])
(flush-output stdin)))))
(sync (system-idle-evt))
(break-thread t)
(thread-wait t)
;; Drain output from subprocess, so it can be unblocked:
(let loop ()
(unless (eof-object? (read-bytes-avail! (make-bytes 10) stdout))
(loop)))
;; Subprocess should be reading at this point
(flush-output stdin)
(close-output-port stdin)
(close-input-port stderr)
(close-input-port stdout)
(subprocess-wait subproc)
(test 0 subprocess-status subproc))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Check `in-directory'

View File

@ -63,7 +63,8 @@
[name name]
[fd fd]
[fd-refcount fd-refcount]
[buffer-mode 'block])))
[buffer-mode 'block])
#:plumber #f))
;; ----------------------------------------

View File

@ -166,6 +166,14 @@
(define n (rktio_write_in rktio fd bstr start-pos end-pos))
(cond
[(rktio-error? n)
;; Discard buffer content before reporting the error. This
;; isn't the obviously right choice, but otherwise a future
;; flush attempt (including one triggered by trying to
;; close the port or one triggered by a plumber) will
;; likely just fail again, which is probably worse than
;; dropping bytes.
(set! start-pos 0)
(set! end-pos 0)
(end-atomic)
(send fd-output-port this raise-write-error n)]
[(fx= n 0)
@ -218,7 +226,8 @@
(cond
[(fx= src-start src-end)
;; Flush request
(and (flush-buffer) 0)]
(or (and (flush-buffer) 0)
(wrap-evt evt (lambda (v) #f)))]
[(and (not (eq? buffer-mode 'none))
(not nonbuffer/nonblock?)
(fx< end-pos (bytes-length bstr)))
@ -231,7 +240,7 @@
(fast-mode! amt)
amt]
[(not (flush-buffer)) ; <- can temporarily leave atomic mode
#f]
(wrap-evt evt (lambda (v) #f))]
[else
(define n (rktio_write_in rktio fd src-bstr src-start src-end))
(cond
@ -251,7 +260,8 @@
(flush-buffer-fully #f) ; can temporarily leave atomic mode
(when bstr ; <- in case a concurrent close succeeded
(send fd-output-port this on-close)
(plumber-flush-handle-remove! flush-handle)
(when flush-handle
(plumber-flush-handle-remove! flush-handle))
(set! bstr #f)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister this custodian-reference)))]
@ -323,10 +333,11 @@
(define fd (fd-output-port-fd p))
(define fd-refcount (fd-output-port-fd-refcount p))
(define evt (fd-evt fd RKTIO_POLL_WRITE p))
(define flush-handle (plumber-add-flush! plumber
(define flush-handle (and plumber
(plumber-add-flush! plumber
(lambda (h)
(atomically
(send fd-output-port p flush-buffer/external)))))
(send fd-output-port p flush-buffer/external))))))
(define custodian-reference (register-fd-close cust fd fd-refcount flush-handle p))
(set-core-output-port-evt! p evt)
(set-fd-output-port-flush-handle! p flush-handle)

View File

@ -468,7 +468,8 @@ tcp_in_buffer_mode(Scheme_Port *p, int mode)
static intptr_t tcp_do_write_string(Scheme_Output_Port *port,
const char *s, intptr_t offset, intptr_t len,
int rarely_block, int enable_break)
int rarely_block, int enable_break,
int drop_buffer_on_error)
{
/* We've already checked for buffering before we got here. */
/* If rarely_block is 1, it means only write as much as
@ -492,7 +493,9 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port,
if (rarely_block)
return sent;
else
sent += tcp_do_write_string(port, s, offset + sent, len - sent, 0, enable_break);
sent += tcp_do_write_string(port, s, offset + sent, len - sent,
0, enable_break,
drop_buffer_on_error);
}
}
@ -524,10 +527,19 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port,
goto top;
}
if (sent == RKTIO_WRITE_ERROR)
if (sent == RKTIO_WRITE_ERROR) {
if (drop_buffer_on_error) {
/* See "Drop unsuccessfully flushed bytes" in "port.c" for a
rationale (although TCP ports are not automatically
registered with a plumber). */
data->b.out_bufpos = 0;
data->b.out_bufmax = 0;
}
scheme_raise_exn(MZEXN_FAIL_NETWORK,
"tcp-write: error writing\n"
" system error: %R");
}
return sent;
}
@ -548,7 +560,8 @@ static int tcp_flush(Scheme_Output_Port *port,
}
amt = tcp_do_write_string(port, data->b.out_buffer, data->b.out_bufpos,
data->b.out_bufmax - data->b.out_bufpos,
rarely_block, enable_break);
rarely_block, enable_break,
1);
flushed += amt;
data->b.out_bufpos += amt;
if (rarely_block && (data->b.out_bufpos < data->b.out_bufmax))
@ -599,7 +612,7 @@ static intptr_t tcp_write_string(Scheme_Output_Port *port,
}
/* When we get here, the buffer is empty */
return tcp_do_write_string(port, s, offset, len, rarely_block, enable_break);
return tcp_do_write_string(port, s, offset, len, rarely_block, enable_break, 0);
}
static void tcp_close_output(Scheme_Output_Port *port)

View File

@ -5224,6 +5224,16 @@ static void release_flushing_lock(void *_fop)
fop->flushing = 0;
}
static void consume_buffer_bytes(Scheme_FD *fop, intptr_t wrote)
{
if (fop->bufcount == wrote)
fop->bufcount = 0;
else {
memmove(fop->buffer + wrote, fop->buffer, fop->bufcount - wrote);
fop->bufcount -= wrote;
}
}
static intptr_t flush_fd(Scheme_Output_Port *op,
const char * volatile bufstr, volatile uintptr_t buflen, volatile uintptr_t offset,
int immediate_only, int enable_break)
@ -5232,6 +5242,7 @@ static intptr_t flush_fd(Scheme_Output_Port *op,
{
Scheme_FD * volatile fop = (Scheme_FD *)op->port_data;
volatile intptr_t wrote = 0;
volatile int consume_buffer;
if (fop->flushing) {
if (scheme_force_port_closed) {
@ -5251,15 +5262,12 @@ static intptr_t flush_fd(Scheme_Output_Port *op,
if (!bufstr) {
bufstr = (char *)fop->buffer;
buflen = fop->bufcount;
}
consume_buffer = 1;
} else
consume_buffer = 0;
if (buflen) {
fop->flushing = 1;
fop->bufcount = 0;
/* If write is interrupted, we drop chars on the floor.
Not ideal, but we'll go with it for now.
Note that write_string_avail supports break-reliable
output through `immediate_only'. */
while (1) {
intptr_t len;
@ -5272,6 +5280,8 @@ static intptr_t flush_fd(Scheme_Output_Port *op,
if (immediate_only == 2) {
fop->flushing = 0;
if (consume_buffer)
consume_buffer_bytes(fop, wrote);
return wrote;
}
@ -5287,6 +5297,14 @@ static intptr_t flush_fd(Scheme_Output_Port *op,
enable_break);
END_ESCAPEABLE();
} else if (len == RKTIO_WRITE_ERROR) {
if (consume_buffer) {
/* Drop unsuccessfully flushed bytes. This isn't the
obviously right choice, but otherwise a future flush
attempt (including one triggered by trying to close the
port or one triggered by a plumber) will likely just fail
again, which is probably worse than dropping bytes. */
consume_buffer_bytes(fop, buflen);
}
if (scheme_force_port_closed) {
/* Don't signal exn or wait. Just give up. */
return wrote;
@ -5298,6 +5316,8 @@ static intptr_t flush_fd(Scheme_Output_Port *op,
return 0; /* doesn't get here */
}
} else if ((len + offset == buflen) || immediate_only) {
if (consume_buffer)
consume_buffer_bytes(fop, buflen);
fop->flushing = 0;
return wrote + len;
} else {