racket/port: fix problems with `read-bytes!-evt'
See PR 12860; some of problem related to the PR were "fixed" by adjusting the guarantees that are specified in the documentation. Another problem was that non-consecutive bytes could be returned.
This commit is contained in:
parent
6173b7eb05
commit
c11527494e
|
@ -959,21 +959,30 @@
|
|||
;; go is the main reading function, either called directly for
|
||||
;; a poll, or called in a thread for a non-poll read
|
||||
(define (go nack ch poll?)
|
||||
(let try-again ([pos 0][bstr orig-bstr])
|
||||
(let* ([progress-evt (or prog-evt (port-progress-evt input-port))]
|
||||
[v ((if poll? peek-bytes-avail!* peek-bytes-avail!)
|
||||
bstr (+ pos (or peek-offset 0)) progress-evt input-port pos)])
|
||||
(let try-again ([pos 0] [bstr orig-bstr] [progress-evt #f])
|
||||
(let* ([progress-evt
|
||||
;; if no progress event is given, get one to ensure that
|
||||
;; consecutive bytes are read and can be committed:
|
||||
(or progress-evt prog-evt (port-progress-evt input-port))]
|
||||
[v (and
|
||||
;; to implement weak support for reusing the buffer in `read-bytes!-evt',
|
||||
;; need to check nack after getting progress-evt:
|
||||
(not (sync/timeout 0 nack))
|
||||
;; try to get bytes:
|
||||
((if poll? peek-bytes-avail!* peek-bytes-avail!)
|
||||
bstr (+ pos (or peek-offset 0)) progress-evt input-port pos))])
|
||||
(cond
|
||||
;; the first two cases below are shortcuts, and not
|
||||
;; strictly necessary
|
||||
[(sync/timeout 0 nack) (void)]
|
||||
[(sync/timeout 0 nack)
|
||||
(void)]
|
||||
[(sync/timeout 0 progress-evt)
|
||||
(cond [poll? #f]
|
||||
[prog-evt (void)]
|
||||
[else (try-again pos bstr)])]
|
||||
[else (try-again 0 bstr #f)])]
|
||||
[(and poll? (equal? v 0)) #f]
|
||||
[(and (number? v) (need-more? bstr (+ pos v)))
|
||||
=> (lambda (bstr) (try-again (+ v pos) bstr))]
|
||||
=> (lambda (bstr) (try-again (+ v pos) bstr progress-evt))]
|
||||
[else
|
||||
(let* ([v2 (cond [(number? v) (shrink bstr (+ v pos))]
|
||||
[(positive? pos) pos]
|
||||
|
@ -999,7 +1008,7 @@
|
|||
(let ([result (combo bstr eof)])
|
||||
(if poll? result (channel-put ch result)))]
|
||||
[poll? #f]
|
||||
[else (try-again 0 orig-bstr)]))]))))
|
||||
[else (try-again 0 orig-bstr #f)]))]))))
|
||||
(if (zero? (bytes-length orig-bstr))
|
||||
(wrap-evt always-evt (lambda (x) 0))
|
||||
(poll-or-spawn go)))
|
||||
|
@ -1025,8 +1034,8 @@
|
|||
(lambda (bstr v) v)
|
||||
peek-offset prog-evt))
|
||||
|
||||
(define (read-bytes!-evt bstr input-port)
|
||||
(-read-bytes!-evt bstr input-port #f #f))
|
||||
(define (read-bytes!-evt bstr input-port [progress-evt #f])
|
||||
(-read-bytes!-evt bstr input-port #f progress-evt))
|
||||
|
||||
(define (peek-bytes!-evt bstr peek-offset prog-evt input-port)
|
||||
(-read-bytes!-evt bstr input-port peek-offset prog-evt))
|
||||
|
|
|
@ -626,7 +626,8 @@ a special non-byte value during the read attempt.}
|
|||
|
||||
|
||||
@defproc[(read-bytes!-evt [bstr (and/c bytes? (not/c immutable?))]
|
||||
[in input-port?])
|
||||
[in input-port?]
|
||||
[progress-evt (or/c progress-evt? #f)])
|
||||
evt?]{
|
||||
|
||||
Like @racket[read-bytes-evt], except that the read bytes are placed
|
||||
|
@ -635,12 +636,15 @@ into @racket[bstr], and the number of bytes to read corresponds to
|
|||
@racket[eof] or the number of read bytes.
|
||||
|
||||
The @racket[bstr] may be mutated any time after the first
|
||||
synchronization attempt on the event. If the event is not synchronized
|
||||
multiple times concurrently, @racket[bstr-bytes] is never mutated by
|
||||
the event after it is chosen in a synchronization (no matter how many
|
||||
synchronization attempts preceded the choice). Thus, the event may be
|
||||
sensibly used multiple times until a successful choice, but should not
|
||||
be used in multiple concurrent synchronizations.}
|
||||
synchronization attempt on the event and until either the event is
|
||||
selected, a non-@racket[#f] @racket[progress-evt] is ready, or the
|
||||
current @tech{custodian} (at the time of synchronization) is shut
|
||||
down. Note that there is no time bound otherwise on when @racket[bstr]
|
||||
might be mutated if the event is not selected by a synchronzation;
|
||||
nevertheless, multiple synchronization attempts can use the same
|
||||
result from @racket[read-bytes!-evt] as long as there is no
|
||||
intervening read on @racket[in] until one of the synchronization
|
||||
attempts selects the event.}
|
||||
|
||||
|
||||
@defproc[(read-bytes-avail!-evt [bstr (and/c bytes? (not/c immutable?))] [in input-port?])
|
||||
|
@ -689,20 +693,20 @@ Like @racket[read-line-evt], but returns a byte string instead of a
|
|||
string.}
|
||||
|
||||
@defproc*[([(peek-bytes-evt [k exact-nonnegative-integer?] [skip exact-nonnegative-integer?]
|
||||
[progress (or/c evt? #f)] [in input-port?]) evt?]
|
||||
[progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?]
|
||||
[(peek-bytes!-evt [bstr (and/c bytes? (not/c immutable?))] [skip exact-nonnegative-integer?]
|
||||
[progress (or/c evt? #f)] [in input-port?]) evt?]
|
||||
[progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?]
|
||||
[(peek-bytes-avail!-evt [bstr (and/c bytes? (not/c immutable?))] [skip exact-nonnegative-integer?]
|
||||
[progress (or/c evt? #f)] [in input-port?]) evt?]
|
||||
[progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?]
|
||||
[(peek-string-evt [k exact-nonnegative-integer?] [skip exact-nonnegative-integer?]
|
||||
[progress (or/c evt? #f)] [in input-port?]) evt?]
|
||||
[progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?]
|
||||
[(peek-string!-evt [str (and/c string? (not/c immutable?))] [skip exact-nonnegative-integer?]
|
||||
[progress (or/c evt? #f)] [in input-port?]) evt?])]{
|
||||
[progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?])]{
|
||||
|
||||
Like the @racket[read-...-evt] functions, but for peeking. The
|
||||
@racket[skip] argument indicates the number of bytes to skip, and
|
||||
@racket[progress] indicates an event that effectively cancels the peek
|
||||
(so that the event never becomes ready). The @racket[progress]
|
||||
@racket[progress-evt] indicates an event that effectively cancels the peek
|
||||
(so that the event never becomes ready). The @racket[progress-evt]
|
||||
argument can be @racket[#f], in which case the event is never
|
||||
canceled.}
|
||||
|
||||
|
|
|
@ -893,6 +893,28 @@
|
|||
(flush-output out)
|
||||
(test "hello world" read in))
|
||||
|
||||
;; --------------------------------------------------
|
||||
;; check that `read-bytes-evt' gets
|
||||
|
||||
(let ()
|
||||
(define-values (i o) (make-pipe))
|
||||
(define res #f)
|
||||
|
||||
(define t
|
||||
(thread
|
||||
(lambda ()
|
||||
(set! res (sync (read-bytes-evt 2 i))))))
|
||||
|
||||
(write-bytes #"1" o)
|
||||
(sleep 0.1)
|
||||
(write-bytes #"2" o)
|
||||
(test #"1" read-bytes 1 i)
|
||||
(sleep)
|
||||
(write-bytes #"34" o)
|
||||
|
||||
(sync t)
|
||||
(test #"23" values res))
|
||||
|
||||
;; --------------------------------------------------
|
||||
;; check that string and byte-string evts can be reused
|
||||
|
||||
|
@ -944,7 +966,16 @@
|
|||
(tcp-close listener))
|
||||
|
||||
(let ([integer->byte (lambda (s) (bitwise-and s #xFF))])
|
||||
(check-can-reuse read-bytes-evt read-bytes write-bytes integer->byte list->bytes bytes?))
|
||||
#;
|
||||
(check-can-reuse read-bytes-evt read-bytes write-bytes integer->byte list->bytes bytes?)
|
||||
;; the following should work because we use the same evt only after
|
||||
;; success or to start at the same point in the input stream:
|
||||
(check-can-reuse (lambda (n in)
|
||||
(define bstr (make-bytes n))
|
||||
(wrap-evt (read-bytes!-evt bstr in)
|
||||
(lambda (v) (if (eof-object? v) v bstr))))
|
||||
read-bytes write-bytes integer->byte list->bytes bytes?))
|
||||
#;
|
||||
(check-can-reuse read-string-evt read-string write-string integer->char list->string string?))
|
||||
|
||||
;; --------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue
Block a user