From 7089a17ba2931b1be04c9eec68c3bef7967d0ca7 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Fri, 17 Jun 2011 11:14:50 -0600 Subject: [PATCH] fix `make-limited-input-port' limit tracking for committed peeks and make the port thread-safe --- collects/mzlib/port.rkt | 60 +++++++++++++++++++++--------- collects/tests/racket/portlib.rktl | 57 ++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/collects/mzlib/port.rkt b/collects/mzlib/port.rkt index fad66f277a..f87d0c9a9d 100644 --- a/collects/mzlib/port.rkt +++ b/collects/mzlib/port.rkt @@ -868,33 +868,59 @@ (define make-limited-input-port (lambda (port limit [close-orig? #t]) - (let ([got 0]) + (let ([got 0] + [lock-semaphore (make-semaphore 1)]) + (define (do-read str) + (let ([count (min (- limit got) (bytes-length str))]) + (if (zero? count) + eof + (let ([n (read-bytes-avail!* str port 0 count)]) + (cond [(eq? n 0) (wrap-evt port (lambda (x) 0))] + [(number? n) (set! got (+ got n)) n] + [(procedure? n) (set! got (add1 got)) n] + [else n]))))) + (define (do-peek str skip progress-evt) + (let ([count (max 0 (min (- limit got skip) (bytes-length str)))]) + (if (zero? count) + eof + (let ([n (peek-bytes-avail!* str skip progress-evt port 0 count)]) + (if (eq? n 0) + (wrap-evt port (lambda (x) 0)) + n))))) + (define (try-again) + (wrap-evt + (semaphore-peek-evt lock-semaphore) + (lambda (x) 0))) (make-input-port (object-name port) (lambda (str) - (let ([count (min (- limit got) (bytes-length str))]) - (if (zero? count) - eof - (let ([n (read-bytes-avail!* str port 0 count)]) - (cond [(eq? n 0) (wrap-evt port (lambda (x) 0))] - [(number? n) (set! got (+ got n)) n] - [(procedure? n) (set! got (add1 got)) n] - [else n]))))) + (call-with-semaphore + lock-semaphore + do-read + try-again + str)) (lambda (str skip progress-evt) - (let ([count (max 0 (min (- limit got skip) (bytes-length str)))]) - (if (zero? count) - eof - (let ([n (peek-bytes-avail!* str skip progress-evt port 0 count)]) - (if (eq? n 0) - (wrap-evt port (lambda (x) 0)) - n))))) + (call-with-semaphore + lock-semaphore + do-peek + try-again + str skip progress-evt)) (lambda () (when close-orig? (close-input-port port))) (and (port-provides-progress-evts? port) (lambda () (port-progress-evt port))) (and (port-provides-progress-evts? port) - (lambda (n evt target-evt) (port-commit-peeked n evt target-evt port))) + (lambda (n evt target-evt) + (let loop () + (if (semaphore-try-wait? lock-semaphore) + (let ([ok? (port-commit-peeked n evt target-evt port)]) + (when ok? (set! got (+ got n))) + (semaphore-post lock-semaphore) + ok?) + (sync (handle-evt evt (lambda (v) #f)) + (handle-evt (semaphore-peek-evt lock-semaphore) + (lambda (v) (loop)))))))) (lambda () (port-next-location port)) (lambda () (port-count-lines! port)) (add1 (file-position port)))))) diff --git a/collects/tests/racket/portlib.rktl b/collects/tests/racket/portlib.rktl index 0c0908759d..3a59e06538 100644 --- a/collects/tests/racket/portlib.rktl +++ b/collects/tests/racket/portlib.rktl @@ -811,6 +811,63 @@ (check-all void) (check-all port-count-lines!)) +;; -------------------------------------------------- +;; Check that commit-based reading counts against the limit: + +(let ([p (make-limited-input-port + (open-input-string "A\nB\nC\nD\n") + 4)]) + (test `((#"A" 2) (#"B" 4) (,eof 4) (,eof 4)) + list + (list (sync (read-bytes-line-evt p)) + (file-position p)) + (list (sync (read-bytes-line-evt p)) + (file-position p)) + (list (sync (read-bytes-line-evt p)) + (file-position p)) + (list (sync (read-bytes-line-evt p)) + (file-position p)))) + +;; -------------------------------------------------- + +;; Check that commit-based reading counts against a port limit: + +(let* ([p (make-limited-input-port + (open-input-string "A\nB\nC\nD\n") + 4)] + [N 6] + [chs (for/list ([i N]) + (let ([ch (make-channel)]) + (thread + (lambda () + (channel-put ch (list (sync (read-bytes-line-evt p)) + (file-position p))))) + ch))] + [r (for/list ([ch chs]) + (channel-get ch))]) + r) + +;; check proper locking for concurrent access: +(let* ([p (make-limited-input-port + (open-input-string "A\nB\nC\nD\n") + 4)] + [N 6] + [chs (for/list ([i N]) + (let ([ch (make-channel)]) + (thread + (lambda () + (when (even? i) (sleep)) + (channel-put ch (list (sync (read-bytes-line-evt p)) + (file-position p))))) + ch))] + [rs (for/list ([ch chs]) + (channel-get ch))]) + (test 2 apply + (for/list ([r rs]) (if (bytes? (car r)) 1 0))) + (test #t values (for/and ([r rs]) + (if (eof-object? (car r)) + (eq? (cadr r) 4) + (memq (cadr r) '(2 4)))))) + ;; -------------------------------------------------- (report-errs)