diff --git a/collects/mzlib/port.ss b/collects/mzlib/port.ss index 25ed40a..13a8238 100644 --- a/collects/mzlib/port.ss +++ b/collects/mzlib/port.ss @@ -76,16 +76,29 @@ (cons dest dests)) (let ([s (make-bytes 4096)]) (let loop () - (let ([c (read-bytes-avail! s src)]) - (unless (eof-object? c) - (for-each - (lambda (dest) - (let loop ([start 0]) - (unless (= start c) - (let ([c2 (write-bytes-avail s dest start c)]) - (loop (+ start c2)))))) - (cons dest dests)) - (loop)))))) + (let ([b (peek-byte-or-special src)]) + (if (or (byte? b) (eof-object? b)) + ;; Read-bytes-avail should work fine --- assuming + ;; that no one else reads the byte, first. So we + ;; have a race condition here, and the solution is + ;; to add read-bytes-avail-or-special! to MzScheme + (let ([c (read-bytes-avail! s src)]) + (unless (eof-object? c) + (for-each + (lambda (dest) + (let loop ([start 0]) + (unless (= start c) + (let ([c2 (write-bytes-avail s dest start c)]) + (loop (+ start c2)))))) + (cons dest dests)) + (loop))) + ;; Got a special + (begin + (for-each + (lambda (dest) + (write-special dest b)) + (cons dest dests)) + (loop))))))) (define merge-input (case-lambda @@ -116,7 +129,8 @@ rd))])) ;; Not kill-safe. - ;; Works only when read proc never returns an event. + ;; If the `read' proc returns an event, the event must produce + ;; 0 always (define (make-input-port/read-to-peek name read fast-peek close) (define lock-semaphore (make-semaphore 1)) (define-values (peeked-r peeked-w) (make-pipe)) @@ -141,7 +155,8 @@ (if (char-ready? peeked-r) (read-bytes-avail!* s peeked-r) ;; If nothing is saved from a peeking read, - ;; dispatch to `read', otherwise + ;; dispatch to `read', otherwise return + ;; previously peeked data (cond [(null? special-peeked) (when progress-requested? (make-progress)) @@ -175,7 +190,7 @@ ;; specials. (cond [(and unless-evt (sync/timeout 0 unless-evt)) - 0] + #f] [(null? special-peeked) ;; Empty special queue, so read through the original proc (let ([r (read s)]) @@ -186,6 +201,15 @@ (write-bytes s peeked-w 0 r) ;; Now try again (peek-bytes-avail!* s skip #f peeked-r)] + [(evt? r) + (if unless-evt + ;; Technically, there's a race condition here. + ;; We might choose r (and return 0) even when + ;; unless-evt becomes available first. However, + ;; this race is not detectable only by the inside + ;; of `read'. + (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) + r)] [else (set! special-peeked (cons r null)) (set! special-peeked-tail special-peeked) @@ -194,7 +218,7 @@ [else ;; Non-empty special queue, so try to use it (let* ([pos (file-position peeked-r)] - [avail (- peeked-end pos)] + [avail (max 0 (- peeked-end pos))] [sk (- skip avail)]) (let loop ([sk sk] [l special-peeked]) @@ -206,10 +230,10 @@ [r (read t)]) (cond [(evt? r) - ;; We can't deal with an event, so complain - (error 'make-input-port/read-to-peek - "original read produced an event: ~e" - r)] + (if unless-evt + ;; See note above + (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) + r)] [(eq? r 0) ;; Original read thinks a spin is ok, ;; so we return 0 to skin, too. @@ -298,7 +322,81 @@ (set! progress-requested? #t) (port-progress-evt peeked-r)) commit-it)) - + + ;; Not kill-safe. + (define make-pipe-with-specials + (opt-lambda ([limit (expt 2 64)] [in-name 'pipe] [out-name 'pipe]) + (let-values ([(r w) (make-pipe limit)] + [(more) null] + [(more-last) #f] + [(more-sema) #f] + [(sema-semaphore) (make-semaphore 1)]) + (define (flush-more) + (if (null? more) + (set! more-last #f) + (when (bytes? (car more)) + (write-bytes (car more) w) + (set! more (cdr more)) + (flush-more)))) + (values + (make-input-port/read-to-peek + in-name + (lambda (s) + (let ([v (read-bytes-avail! r)]) + (if (eq? v 0) + (if more-last + ;; Return a special + (let ([a (car more)]) + (set! more (cdr more)) + (flush-more) + (lambda (file line col ppos) + a)) + ;; Nothing available, yet. + (call-with-semaphore + sema-semaphore + (lambda () + (unless more-sema + (set! more-sema (make-semaphore))) + (wrap-evt (semaphore-peek-evt more-sema) + (lambda (x) 0)))))))) + #f + void) + (make-output-port + out-name + always-evt + ;; write + (lambda (str start end buffer? w/break?) + (call-with-semaphore + sema-semaphore + (lambda () + (if more-last + (let ([p (cons (subbytes str start end) null)]) + (set-cdr! more-last p) + (set! more-last p)) + (write-bytes str w start end)) + (when more-sema + (semaphore-post more-sema) + (set! more-sema #f))))) + ;; close + (lambda () + (call-with-semaphore + sema-semaphore + (lambda () + (close-output-port w) + (set! more null) + (set! more-last #f) + (when more-sema + (semaphore-post more-sema))))) + ;; write-special + (lambda (v buffer? w/break?) + (call-with-semaphore + sema-semaphore + (lambda () + (let ([p (cons v null)]) + (if more-last + (set-cdr! more-last p) + (set! more p)) + (set! more-last p)))))))))) (define input-port-append (opt-lambda (close-orig? . ports) @@ -311,7 +409,7 @@ eof (let ([n (read-bytes-avail!* str (car ports))]) (cond - [(eq? n 0) (car ports)] + [(eq? n 0) (wrap-evt (car ports) (lambda (x) 0))] [(eof-object? n) (when close-orig? (close-input-port (car ports)))