fix `make-limited-input-port' limit tracking for committed peeks
and make the port thread-safe
This commit is contained in:
parent
b1e47eba45
commit
7089a17ba2
|
@ -868,33 +868,59 @@
|
|||
|
||||
(define make-limited-input-port
|
||||
(lambda (port limit [close-orig? #t])
|
||||
(let ([got 0])
|
||||
(let ([got 0]
|
||||
[lock-semaphore (make-semaphore 1)])
|
||||
(define (do-read str)
|
||||
(let ([count (min (- limit got) (bytes-length str))])
|
||||
(if (zero? count)
|
||||
eof
|
||||
(let ([n (read-bytes-avail!* str port 0 count)])
|
||||
(cond [(eq? n 0) (wrap-evt port (lambda (x) 0))]
|
||||
[(number? n) (set! got (+ got n)) n]
|
||||
[(procedure? n) (set! got (add1 got)) n]
|
||||
[else n])))))
|
||||
(define (do-peek str skip progress-evt)
|
||||
(let ([count (max 0 (min (- limit got skip) (bytes-length str)))])
|
||||
(if (zero? count)
|
||||
eof
|
||||
(let ([n (peek-bytes-avail!* str skip progress-evt port 0 count)])
|
||||
(if (eq? n 0)
|
||||
(wrap-evt port (lambda (x) 0))
|
||||
n)))))
|
||||
(define (try-again)
|
||||
(wrap-evt
|
||||
(semaphore-peek-evt lock-semaphore)
|
||||
(lambda (x) 0)))
|
||||
(make-input-port
|
||||
(object-name port)
|
||||
(lambda (str)
|
||||
(let ([count (min (- limit got) (bytes-length str))])
|
||||
(if (zero? count)
|
||||
eof
|
||||
(let ([n (read-bytes-avail!* str port 0 count)])
|
||||
(cond [(eq? n 0) (wrap-evt port (lambda (x) 0))]
|
||||
[(number? n) (set! got (+ got n)) n]
|
||||
[(procedure? n) (set! got (add1 got)) n]
|
||||
[else n])))))
|
||||
(call-with-semaphore
|
||||
lock-semaphore
|
||||
do-read
|
||||
try-again
|
||||
str))
|
||||
(lambda (str skip progress-evt)
|
||||
(let ([count (max 0 (min (- limit got skip) (bytes-length str)))])
|
||||
(if (zero? count)
|
||||
eof
|
||||
(let ([n (peek-bytes-avail!* str skip progress-evt port 0 count)])
|
||||
(if (eq? n 0)
|
||||
(wrap-evt port (lambda (x) 0))
|
||||
n)))))
|
||||
(call-with-semaphore
|
||||
lock-semaphore
|
||||
do-peek
|
||||
try-again
|
||||
str skip progress-evt))
|
||||
(lambda ()
|
||||
(when close-orig?
|
||||
(close-input-port port)))
|
||||
(and (port-provides-progress-evts? port)
|
||||
(lambda () (port-progress-evt port)))
|
||||
(and (port-provides-progress-evts? port)
|
||||
(lambda (n evt target-evt) (port-commit-peeked n evt target-evt port)))
|
||||
(lambda (n evt target-evt)
|
||||
(let loop ()
|
||||
(if (semaphore-try-wait? lock-semaphore)
|
||||
(let ([ok? (port-commit-peeked n evt target-evt port)])
|
||||
(when ok? (set! got (+ got n)))
|
||||
(semaphore-post lock-semaphore)
|
||||
ok?)
|
||||
(sync (handle-evt evt (lambda (v) #f))
|
||||
(handle-evt (semaphore-peek-evt lock-semaphore)
|
||||
(lambda (v) (loop))))))))
|
||||
(lambda () (port-next-location port))
|
||||
(lambda () (port-count-lines! port))
|
||||
(add1 (file-position port))))))
|
||||
|
|
|
@ -811,6 +811,63 @@
|
|||
(check-all void)
|
||||
(check-all port-count-lines!))
|
||||
|
||||
;; --------------------------------------------------
|
||||
;; Check that commit-based reading counts against the limit:
|
||||
|
||||
(let ([p (make-limited-input-port
|
||||
(open-input-string "A\nB\nC\nD\n")
|
||||
4)])
|
||||
(test `((#"A" 2) (#"B" 4) (,eof 4) (,eof 4))
|
||||
list
|
||||
(list (sync (read-bytes-line-evt p))
|
||||
(file-position p))
|
||||
(list (sync (read-bytes-line-evt p))
|
||||
(file-position p))
|
||||
(list (sync (read-bytes-line-evt p))
|
||||
(file-position p))
|
||||
(list (sync (read-bytes-line-evt p))
|
||||
(file-position p))))
|
||||
|
||||
;; --------------------------------------------------
|
||||
|
||||
;; Check that commit-based reading counts against a port limit:
|
||||
|
||||
(let* ([p (make-limited-input-port
|
||||
(open-input-string "A\nB\nC\nD\n")
|
||||
4)]
|
||||
[N 6]
|
||||
[chs (for/list ([i N])
|
||||
(let ([ch (make-channel)])
|
||||
(thread
|
||||
(lambda ()
|
||||
(channel-put ch (list (sync (read-bytes-line-evt p))
|
||||
(file-position p)))))
|
||||
ch))]
|
||||
[r (for/list ([ch chs])
|
||||
(channel-get ch))])
|
||||
r)
|
||||
|
||||
;; check proper locking for concurrent access:
|
||||
(let* ([p (make-limited-input-port
|
||||
(open-input-string "A\nB\nC\nD\n")
|
||||
4)]
|
||||
[N 6]
|
||||
[chs (for/list ([i N])
|
||||
(let ([ch (make-channel)])
|
||||
(thread
|
||||
(lambda ()
|
||||
(when (even? i) (sleep))
|
||||
(channel-put ch (list (sync (read-bytes-line-evt p))
|
||||
(file-position p)))))
|
||||
ch))]
|
||||
[rs (for/list ([ch chs])
|
||||
(channel-get ch))])
|
||||
(test 2 apply + (for/list ([r rs]) (if (bytes? (car r)) 1 0)))
|
||||
(test #t values (for/and ([r rs])
|
||||
(if (eof-object? (car r))
|
||||
(eq? (cadr r) 4)
|
||||
(memq (cadr r) '(2 4))))))
|
||||
|
||||
;; --------------------------------------------------
|
||||
|
||||
(report-errs)
|
||||
|
|
Loading…
Reference in New Issue
Block a user