racket/port: fix evt constructors to work right with read errors

Propoagate read errors to the `sync` that is applied to an event,
instead of writing the events to stdout.

Related to racket/web-server#76
This commit is contained in:
Matthew Flatt 2019-10-14 11:22:14 -06:00
parent 89403dc8cd
commit c19b944110
3 changed files with 93 additions and 13 deletions

View File

@ -655,7 +655,18 @@ closes @racket[in].}
Returns a @tech{synchronizable event} that is ready when Returns a @tech{synchronizable event} that is ready when
@racket[in] produces an @racket[eof]. If @racket[in] produces a @racket[in] produces an @racket[eof]. If @racket[in] produces a
mid-stream @racket[eof], the @racket[eof] is consumed by the event mid-stream @racket[eof], the @racket[eof] is consumed by the event
only if the event is chosen in a synchronization.} only if the event is chosen in a synchronization.
If attempting to read from @racket[in] raises an exception during a
synchronization attempt, then the exception may be reported during the
synchronization attempt, but it will silently discarded if some another
event in the same synchronization is selected or if some other event
raises an exception first.
@history[#:changed "7.5.0.3" @elem{Changed handling of read errors so
they are propagated to a synchronization attempt,
instead of treated as unhandled errors in a
background thread.}]}
@defproc[(read-bytes-evt [k exact-nonnegative-integer?] [in input-port?]) @defproc[(read-bytes-evt [k exact-nonnegative-integer?] [in input-port?])
@ -681,12 +692,14 @@ concurrently---and each synchronization corresponds to a distinct read
request. request.
The @racket[in] must support progress events, and it must not produce The @racket[in] must support progress events, and it must not produce
a special non-byte value during the read attempt.} a special non-byte value during the read attempt.
Exceptions attempting to read from @racket[in] are handled in the same
way as by @racket[eof-evt].}
@defproc[(read-bytes!-evt [bstr (and/c bytes? (not/c immutable?))] @defproc[(read-bytes!-evt [bstr (and/c bytes? (not/c immutable?))]
[in input-port?] [in input-port?])
[progress-evt (or/c progress-evt? #f)])
evt?]{ evt?]{
Like @racket[read-bytes-evt], except that the read bytes are placed Like @racket[read-bytes-evt], except that the read bytes are placed
@ -703,7 +716,10 @@ might be mutated if the event is not selected by a synchronzation;
nevertheless, multiple synchronization attempts can use the same nevertheless, multiple synchronization attempts can use the same
result from @racket[read-bytes!-evt] as long as there is no result from @racket[read-bytes!-evt] as long as there is no
intervening read on @racket[in] until one of the synchronization intervening read on @racket[in] until one of the synchronization
attempts selects the event.} attempts selects the event.
Exceptions attempting to read from @racket[in] are handled in the same
way as by @racket[eof-evt].}
@defproc[(read-bytes-avail!-evt [bstr (and/c bytes? (not/c immutable?))] [in input-port?]) @defproc[(read-bytes-avail!-evt [bstr (and/c bytes? (not/c immutable?))] [in input-port?])
@ -730,7 +746,7 @@ a byte string.}
@defproc[(read-line-evt [in input-port?] @defproc[(read-line-evt [in input-port?]
[mode (or/c 'linefeed 'return 'return-linefeed 'any 'any-one)]) [mode (or/c 'linefeed 'return 'return-linefeed 'any 'any-one) 'linefeed])
evt?]{ evt?]{
Returns a @tech{synchronizable event} that is ready when a line of Returns a @tech{synchronizable event} that is ready when a line of
@ -741,11 +757,14 @@ separator).
A line is read from the port if and only if the event is chosen in a A line is read from the port if and only if the event is chosen in a
synchronization, and the returned line always represents contiguous synchronization, and the returned line always represents contiguous
bytes in the port's stream.} bytes in the port's stream.
Exceptions attempting to read from @racket[in] are handled in the same
way as by @racket[eof-evt].}
@defproc[(read-bytes-line-evt [in input-port?] @defproc[(read-bytes-line-evt [in input-port?]
[mode (or/c 'linefeed 'return 'return-linefeed 'any 'any-one)]) [mode (or/c 'linefeed 'return 'return-linefeed 'any 'any-one) 'linefeed])
evt?]{ evt?]{
Like @racket[read-line-evt], but returns a byte string instead of a Like @racket[read-line-evt], but returns a byte string instead of a
@ -762,7 +781,7 @@ string.}
[(peek-string!-evt [str (and/c string? (not/c immutable?))] [skip exact-nonnegative-integer?] [(peek-string!-evt [str (and/c string? (not/c immutable?))] [skip exact-nonnegative-integer?]
[progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?])]{ [progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?])]{
Like the @racket[read-...-evt] functions, but for peeking. The Like the @racket[read-bytes-evt], etc., functions, but for peeking. The
@racket[skip] argument indicates the number of bytes to skip, and @racket[skip] argument indicates the number of bytes to skip, and
@racket[progress-evt] indicates an event that effectively cancels the peek @racket[progress-evt] indicates an event that effectively cancels the peek
(so that the event never becomes ready). The @racket[progress-evt] (so that the event never becomes ready). The @racket[progress-evt]
@ -796,7 +815,10 @@ each synchronization corresponds to a distinct match request.
The @racket[in] port must support progress events. If @racket[in] The @racket[in] port must support progress events. If @racket[in]
returns a special non-byte value during the match attempt, it is returns a special non-byte value during the match attempt, it is
treated like @racket[eof].} treated like @racket[eof].
Exceptions attempting to read from @racket[in] are handled in the same
way as by @racket[eof-evt].}
@; ---------------------------------------------------------------------- @; ----------------------------------------------------------------------

View File

@ -683,6 +683,60 @@
(write-char #\b o))) (write-char #\b o)))
(test 1 peek-bytes-avail! (make-bytes 1) 2 #f (make-limited-input-port i 10))) (test 1 peek-bytes-avail! (make-bytes 1) 2 #f (make-limited-input-port i 10)))
;; ----------------------------------------
;; Check that events raise an exception in the right thread
;; when a point goes bad
(let ()
(define (check make-evt)
(define-values (p fail)
(let* ([s (make-semaphore)]
[e (wrap-evt (semaphore-peek-evt s)
(lambda (v) 0))])
(values
(make-input-port
'test
(lambda (bsr)
(if (sync/timeout 0 e)
(raise 'forced-failure)
e))
(lambda (bstr offset evt)
(if (sync/timeout 0 e)
(raise 'forced-failure)
e))
void
(lambda ()
(make-semaphore))
(lambda (n evt1 evt2)
(error "no")))
(lambda ()
(semaphore-post s)))))
(thread (lambda ()
(sync (system-idle-evt))
(fail)))
(err/rt-test (sync (make-evt p))
(lambda (exn) (eq? exn 'forced-failure))))
(check (lambda (p) (eof-evt p)))
(check (lambda (p) (read-bytes-evt 10 p)))
(check (lambda (p) (read-bytes!-evt (make-bytes 10) p)))
(check (lambda (p) (read-bytes-avail!-evt (make-bytes 10) p)))
(check (lambda (p) (read-string-evt 10 p)))
(check (lambda (p) (read-string!-evt (make-string 10) p)))
(check (lambda (p) (read-line-evt p)))
(check (lambda (p) (read-bytes-line-evt p)))
(check (lambda (p) (peek-bytes-evt 10 0 #f p)))
(check (lambda (p) (peek-bytes!-evt (make-bytes 10) 0 #f p)))
(check (lambda (p) (peek-bytes!-evt (make-bytes 10) 0 (port-progress-evt p) p)))
(check (lambda (p) (peek-bytes-avail!-evt (make-bytes 10) 0 #f p)))
(check (lambda (p) (peek-bytes-avail!-evt (make-bytes 10) 0 (port-progress-evt p) p)))
(check (lambda (p) (peek-string-evt 10 0 #f p)))
(check (lambda (p) (peek-string-evt 10 0 (port-progress-evt p) p)))
(check (lambda (p) (peek-string!-evt (make-string 10) 0 #f p)))
(check (lambda (p) (peek-string!-evt (make-string 10) 0 (port-progress-evt p) p))))
;; ---------------------------------------- ;; ----------------------------------------
;; Conversion wrappers ;; Conversion wrappers

View File

@ -1138,17 +1138,21 @@
(nack-guard-evt (nack-guard-evt
(lambda (nack) (lambda (nack)
(define ch (make-channel)) (define ch (make-channel))
(define exn-ch (make-channel))
(define ready (make-semaphore)) (define ready (make-semaphore))
(let ([t (thread (lambda () (let ([t (thread (lambda ()
(parameterize-break #t (parameterize-break #t
(with-handlers ([exn:break? void]) (with-handlers ([(lambda (exn) #t)
(lambda (exn)
(channel-put exn-ch exn))])
(semaphore-post ready) (semaphore-post ready)
(go nack ch #f)))))]) (go nack ch #f)))))])
(thread (lambda () (thread (lambda ()
(sync nack) (sync nack)
(semaphore-wait ready) (semaphore-wait ready)
(break-thread t)))) (break-thread t))))
ch)))))) (choice-evt ch
(wrap-evt exn-ch (lambda (exn) (raise exn))))))))))
(define (read-at-least-bytes!-evt orig-bstr input-port need-more? shrink combo (define (read-at-least-bytes!-evt orig-bstr input-port need-more? shrink combo
peek-offset prog-evt) peek-offset prog-evt)