improve port progress-evt guarantees; fix for `read-bytes-evt' et al.

A progress evt from a close input port must be initially ready,
and the primitive `peek-bytes-avail!' checks a progress evt
before checking whether the port is closed.

These changes resolve a race in `read-bytes-evt' and related evt
constructors.
This commit is contained in:
Matthew Flatt 2012-08-04 10:14:55 -06:00
parent f28e8a02d0
commit 7c9e6d7193
6 changed files with 55 additions and 16 deletions

View File

@ -545,6 +545,10 @@
((list-ref c 0))
(send-result #t))))))
commits))))
(define (close-it)
(close)
;; to ensure that progress evts are ready:
(close-input-port peeked-r))
(make-input-port
name
;; Read
@ -559,7 +563,7 @@
(peek-it s skip unless-evt)
(fast-peek s skip fast-peek-k))))
peek-it)
close
close-it
(lambda ()
(set! progress-requested? #t)
(port-progress-evt peeked-r))
@ -1006,7 +1010,9 @@
eof)
(let ([n (peek-bytes-avail!* str skip progress-evt port 0 count)])
(if (eq? n 0)
(wrap-evt port (lambda (x) 0))
(if (and progress-evt (sync/timeout 0 progress-evt))
#f
(wrap-evt port (lambda (x) 0)))
n)))))
(define (try-again)
(wrap-evt
@ -1110,7 +1116,8 @@
;; go is the main reading function, either called directly for
;; a poll, or called in a thread for a non-poll read
(define (go nack ch poll?)
;; FIXME - what if the input port is closed?
;; Beware that the input port might become closed at any time.
;; For the most part, progress evts should take care of that.
(let try-again ([pos 0] [bstr orig-bstr] [progress-evt #f])
(let* ([progress-evt
;; if no progress event is given, get one to ensure that
@ -1119,7 +1126,7 @@
[v (and
;; to implement weak support for reusing the buffer in `read-bytes!-evt',
;; need to check nack after getting progress-evt:
(not (sync/timeout 0 nack))
(not (sync/timeout 0 nack))
;; try to get bytes:
((if poll? peek-bytes-avail!* peek-bytes-avail!)
bstr (+ pos (or peek-offset 0)) progress-evt input-port pos))])

View File

@ -164,13 +164,15 @@ The arguments implement the port as follows:
]
The results and conventions for @racket[peek] are
mostly the same as for @racket[read-in]. The main difference is in
the handling of the progress event, if it is not @racket[#f]. If
the given progress event becomes ready, the
@racket[peek] must abort any skip attempts and not peek
any values. In particular, @racket[peek] must not peek
any values if the progress event is initially ready.
The results and conventions for @racket[peek] are mostly the same
as for @racket[read-in]. The main difference is in the handling of
the progress event, if it is not @racket[#f]. If the given
progress event becomes ready, the @racket[peek] must abort any
skip attempts and not peek any values. In particular,
@racket[peek] must not peek any values if the progress event is
initially ready. If the port has been closed, the progress event
should be ready, in which case @racket[peek] should complete
(instead of failing because the port is closed).
Unlike @racket[read-in], @racket[peek] should produce
@racket[#f] (or an event whose value is @racket[#f]) if no bytes
@ -219,7 +221,8 @@ The arguments implement the port as follows:
@item{@racket[get-progress-evt] --- either @racket[#f] (the
default), or a procedure that takes no arguments and returns an
event. The event must become ready only after data is next read
from the port or the port is closed. After the event becomes
from the port or the port is closed. If the port is already closed,
the event must be ready. After the event becomes
ready, it must remain so. See the description of @racket[read-in]
for information about the allowed results of this function when
@racket[read-in] returns a pipe input port. See also

View File

@ -250,7 +250,9 @@ end-of-file, at least one byte (or special) past the skipped bytes, or
until a non-@racket[#f] @racket[progress] becomes ready. Furthermore,
if @racket[progress] is ready before bytes are peeked, no bytes are
peeked or skipped, and @racket[progress] may cut short the skipping
process if it becomes available during the peek attempt.
process if it becomes available during the peek attempt. Furthermore,
@racket[progress] is checked even before determining whether the port
is still open.
The result of @racket[peek-bytes-avail!] is @racket[0] only in the
case that @racket[progress] becomes ready before bytes are peeked.}
@ -331,9 +333,9 @@ like @racket[peek-bytes-avail!].}
(current-input-port)])
progress-evt?]{
Returns a @tech{synchronizable event} that becomes ready after any subsequent read from
@racket[in], or after @racket[in] is closed. After the event becomes
ready, it remains ready.}
Returns a @tech{synchronizable event} that becomes ready after any
subsequent read from @racket[in] or after @racket[in] is
closed. After the event becomes ready, it remains ready.}
@defproc[(port-provides-progress-evts? [in input-port?]) boolean]{

View File

@ -73,6 +73,13 @@
(test #t byte-ready? r)
(test #f char-ready? r))
;; Progress evts for a closed port should start out ready:
(let ()
(define-values (i o) (make-pipe))
(close-input-port i)
(test #t evt? (sync/timeout 0 (port-progress-evt i)))
(test 0 peek-bytes-avail! (make-bytes 10) 0 (port-progress-evt i) i))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Based on the Racket manual...

View File

@ -312,6 +312,14 @@
#f
void))
(let ([p (make-input-port/read-to-peek
'example
(lambda (bytes) (bytes-set! bytes 0 0) 1)
#f
void)])
(close-input-port p)
(test #t evt? (sync/timeout 0 (port-progress-evt p))))
(let ([p (make-list-port #\h #\e #\l #\l #\o)])
(test (char->integer #\h) peek-byte p)
(test (char->integer #\e) peek-byte p 1)

View File

@ -2153,6 +2153,13 @@ intptr_t scheme_get_byte_string_unless(const char *who,
if (ip->input_lock)
scheme_wait_input_allowed(ip, only_avail);
/* check progress evt before checking for closed: */
if (unless_evt
&& SAME_TYPE(SCHEME_TYPE(unless_evt), scheme_progress_evt_type)
&& SCHEME_SEMAP(SCHEME_PTR2_VAL(unless_evt))
&& scheme_try_plain_sema(SCHEME_PTR2_VAL(unless_evt)))
return 0;
CHECK_PORT_CLOSED(who, "input", port, ip->closed);
if (only_avail == -1) {
@ -2904,6 +2911,11 @@ Scheme_Object *scheme_progress_evt_via_get(Scheme_Input_Port *port)
sema = scheme_make_sema(0);
if (port->closed) {
scheme_post_sema_all(sema);
return sema;
}
port->progress_evt = sema;
port->slow = 1;