From c11527494ed3a08d1ea6caf8e7ec6661f2505ef7 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Sun, 24 Jun 2012 07:22:13 -0600 Subject: [PATCH] racket/port: fix problems with `read-bytes!-evt' See PR 12860; some of problem related to the PR were "fixed" by adjusting the guarantees that are specified in the documentation. Another problem was that non-consecutive bytes could be returned. --- collects/mzlib/port.rkt | 29 ++++++++++------ collects/scribblings/reference/port-lib.scrbl | 32 ++++++++++-------- collects/tests/racket/portlib.rktl | 33 ++++++++++++++++++- 3 files changed, 69 insertions(+), 25 deletions(-) diff --git a/collects/mzlib/port.rkt b/collects/mzlib/port.rkt index 98dbebb03b..e331742109 100644 --- a/collects/mzlib/port.rkt +++ b/collects/mzlib/port.rkt @@ -959,21 +959,30 @@ ;; go is the main reading function, either called directly for ;; a poll, or called in a thread for a non-poll read (define (go nack ch poll?) - (let try-again ([pos 0][bstr orig-bstr]) - (let* ([progress-evt (or prog-evt (port-progress-evt input-port))] - [v ((if poll? peek-bytes-avail!* peek-bytes-avail!) - bstr (+ pos (or peek-offset 0)) progress-evt input-port pos)]) + (let try-again ([pos 0] [bstr orig-bstr] [progress-evt #f]) + (let* ([progress-evt + ;; if no progress event is given, get one to ensure that + ;; consecutive bytes are read and can be committed: + (or progress-evt prog-evt (port-progress-evt input-port))] + [v (and + ;; to implement weak support for reusing the buffer in `read-bytes!-evt', + ;; need to check nack after getting progress-evt: + (not (sync/timeout 0 nack)) + ;; try to get bytes: + ((if poll? peek-bytes-avail!* peek-bytes-avail!) + bstr (+ pos (or peek-offset 0)) progress-evt input-port pos))]) (cond ;; the first two cases below are shortcuts, and not ;; strictly necessary - [(sync/timeout 0 nack) (void)] + [(sync/timeout 0 nack) + (void)] [(sync/timeout 0 progress-evt) (cond [poll? #f] [prog-evt (void)] - [else (try-again pos bstr)])] + [else (try-again 0 bstr #f)])] [(and poll? (equal? v 0)) #f] [(and (number? v) (need-more? bstr (+ pos v))) - => (lambda (bstr) (try-again (+ v pos) bstr))] + => (lambda (bstr) (try-again (+ v pos) bstr progress-evt))] [else (let* ([v2 (cond [(number? v) (shrink bstr (+ v pos))] [(positive? pos) pos] @@ -999,7 +1008,7 @@ (let ([result (combo bstr eof)]) (if poll? result (channel-put ch result)))] [poll? #f] - [else (try-again 0 orig-bstr)]))])))) + [else (try-again 0 orig-bstr #f)]))])))) (if (zero? (bytes-length orig-bstr)) (wrap-evt always-evt (lambda (x) 0)) (poll-or-spawn go))) @@ -1025,8 +1034,8 @@ (lambda (bstr v) v) peek-offset prog-evt)) -(define (read-bytes!-evt bstr input-port) - (-read-bytes!-evt bstr input-port #f #f)) +(define (read-bytes!-evt bstr input-port [progress-evt #f]) + (-read-bytes!-evt bstr input-port #f progress-evt)) (define (peek-bytes!-evt bstr peek-offset prog-evt input-port) (-read-bytes!-evt bstr input-port peek-offset prog-evt)) diff --git a/collects/scribblings/reference/port-lib.scrbl b/collects/scribblings/reference/port-lib.scrbl index 74c4f9ca9a..bbe84972cb 100644 --- a/collects/scribblings/reference/port-lib.scrbl +++ b/collects/scribblings/reference/port-lib.scrbl @@ -626,7 +626,8 @@ a special non-byte value during the read attempt.} @defproc[(read-bytes!-evt [bstr (and/c bytes? (not/c immutable?))] - [in input-port?]) + [in input-port?] + [progress-evt (or/c progress-evt? #f)]) evt?]{ Like @racket[read-bytes-evt], except that the read bytes are placed @@ -635,12 +636,15 @@ into @racket[bstr], and the number of bytes to read corresponds to @racket[eof] or the number of read bytes. The @racket[bstr] may be mutated any time after the first -synchronization attempt on the event. If the event is not synchronized -multiple times concurrently, @racket[bstr-bytes] is never mutated by -the event after it is chosen in a synchronization (no matter how many -synchronization attempts preceded the choice). Thus, the event may be -sensibly used multiple times until a successful choice, but should not -be used in multiple concurrent synchronizations.} +synchronization attempt on the event and until either the event is +selected, a non-@racket[#f] @racket[progress-evt] is ready, or the +current @tech{custodian} (at the time of synchronization) is shut +down. Note that there is no time bound otherwise on when @racket[bstr] +might be mutated if the event is not selected by a synchronzation; +nevertheless, multiple synchronization attempts can use the same +result from @racket[read-bytes!-evt] as long as there is no +intervening read on @racket[in] until one of the synchronization +attempts selects the event.} @defproc[(read-bytes-avail!-evt [bstr (and/c bytes? (not/c immutable?))] [in input-port?]) @@ -689,20 +693,20 @@ Like @racket[read-line-evt], but returns a byte string instead of a string.} @defproc*[([(peek-bytes-evt [k exact-nonnegative-integer?] [skip exact-nonnegative-integer?] - [progress (or/c evt? #f)] [in input-port?]) evt?] + [progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?] [(peek-bytes!-evt [bstr (and/c bytes? (not/c immutable?))] [skip exact-nonnegative-integer?] - [progress (or/c evt? #f)] [in input-port?]) evt?] + [progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?] [(peek-bytes-avail!-evt [bstr (and/c bytes? (not/c immutable?))] [skip exact-nonnegative-integer?] - [progress (or/c evt? #f)] [in input-port?]) evt?] + [progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?] [(peek-string-evt [k exact-nonnegative-integer?] [skip exact-nonnegative-integer?] - [progress (or/c evt? #f)] [in input-port?]) evt?] + [progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?] [(peek-string!-evt [str (and/c string? (not/c immutable?))] [skip exact-nonnegative-integer?] - [progress (or/c evt? #f)] [in input-port?]) evt?])]{ + [progress-evt (or/c progress-evt? #f)] [in input-port?]) evt?])]{ Like the @racket[read-...-evt] functions, but for peeking. The @racket[skip] argument indicates the number of bytes to skip, and -@racket[progress] indicates an event that effectively cancels the peek -(so that the event never becomes ready). The @racket[progress] +@racket[progress-evt] indicates an event that effectively cancels the peek +(so that the event never becomes ready). The @racket[progress-evt] argument can be @racket[#f], in which case the event is never canceled.} diff --git a/collects/tests/racket/portlib.rktl b/collects/tests/racket/portlib.rktl index 8a35eba5e7..c5078319ab 100644 --- a/collects/tests/racket/portlib.rktl +++ b/collects/tests/racket/portlib.rktl @@ -893,6 +893,28 @@ (flush-output out) (test "hello world" read in)) +;; -------------------------------------------------- +;; check that `read-bytes-evt' gets + +(let () + (define-values (i o) (make-pipe)) + (define res #f) + + (define t + (thread + (lambda () + (set! res (sync (read-bytes-evt 2 i)))))) + + (write-bytes #"1" o) + (sleep 0.1) + (write-bytes #"2" o) + (test #"1" read-bytes 1 i) + (sleep) + (write-bytes #"34" o) + + (sync t) + (test #"23" values res)) + ;; -------------------------------------------------- ;; check that string and byte-string evts can be reused @@ -944,7 +966,16 @@ (tcp-close listener)) (let ([integer->byte (lambda (s) (bitwise-and s #xFF))]) - (check-can-reuse read-bytes-evt read-bytes write-bytes integer->byte list->bytes bytes?)) + #; + (check-can-reuse read-bytes-evt read-bytes write-bytes integer->byte list->bytes bytes?) + ;; the following should work because we use the same evt only after + ;; success or to start at the same point in the input stream: + (check-can-reuse (lambda (n in) + (define bstr (make-bytes n)) + (wrap-evt (read-bytes!-evt bstr in) + (lambda (v) (if (eof-object? v) v bstr)))) + read-bytes write-bytes integer->byte list->bytes bytes?)) + #; (check-can-reuse read-string-evt read-string write-string integer->char list->string string?)) ;; --------------------------------------------------