diff --git a/collects/mzlib/port.ss b/collects/mzlib/port.ss index aef398c..1a928c3 100644 --- a/collects/mzlib/port.ss +++ b/collects/mzlib/port.ss @@ -141,303 +141,312 @@ ;; Not kill-safe. ;; If the `read' proc returns an event, the event must produce ;; 0 always - (define (make-input-port/read-to-peek name read fast-peek close) - (define lock-semaphore (make-semaphore 1)) - (define commit-semaphore (make-semaphore 1)) - (define-values (peeked-r peeked-w) (make-pipe)) - (define special-peeked null) - (define special-peeked-tail #f) - (define progress-requested? #f) - (define use-manager? #f) - (define manager-th #f) - (define manager-ch (make-channel)) - (define resume-ch (make-channel)) - (define buf (make-bytes 4096)) - (define (try-again) - (wrap-evt - (semaphore-peek-evt lock-semaphore) - (lambda (x) 0))) - (define (suspend-manager) - (channel-put manager-ch 'suspend)) - (define (resume-manager) - (channel-put resume-ch 'resume)) - (define (with-manager-lock thunk) - (thread-resume manager-th (current-thread)) - (dynamic-wind suspend-manager thunk resume-manager)) - (define (make-progress) - (write-byte 0 peeked-w) - (read-byte peeked-r)) - (define (read-it-with-lock s) - (if use-manager? - (with-manager-lock (lambda () (do-read-it s))) - (do-read-it s))) - (define (read-it s) - (call-with-semaphore - lock-semaphore - read-it-with-lock - try-again - s)) - (define (do-read-it s) - (if (char-ready? peeked-r) - (read-bytes-avail!* s peeked-r) - ;; If nothing is saved from a peeking read, - ;; dispatch to `read', otherwise return - ;; previously peeked data - (cond - [(null? special-peeked) - (when progress-requested? (make-progress)) - (read s)] - [else (if (bytes? (car special-peeked)) - (let ([b (car special-peeked)]) - (write-bytes b peeked-w) - (set! special-peeked (cdr special-peeked)) - (when (null? special-peeked) - (set! special-peeked-tail #f)) - (read-bytes-avail!* s peeked-r)) - (begin0 - (car special-peeked) - (make-progress) - (set! special-peeked (cdr special-peeked)) - (when (null? special-peeked) - (set! special-peeked-tail #f))))]))) - (define (peek-it-with-lock s skip unless-evt) - (if use-manager? - (with-manager-lock (lambda () (do-peek-it s skip unless-evt))) - (do-peek-it s skip unless-evt))) - (define (peek-it s skip unless-evt) - (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)]) - (if (eq? v 0) - (call-with-semaphore - lock-semaphore - peek-it-with-lock - try-again - s skip unless-evt) - v))) - (define (do-peek-it s skip unless-evt) - (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)]) - (if (eq? v 0) - ;; The peek may have failed because peeked-r is empty, - ;; because unless-evt is ready, or because the skip is - ;; far. Handle nicely the common case where there are no - ;; specials. + (define make-input-port/read-to-peek + (opt-lambda (name read fast-peek close + [location-proc #f] + [count-lines!-proc void] + [init-position 1] + [buffer-mode-proc #f]) + (define lock-semaphore (make-semaphore 1)) + (define commit-semaphore (make-semaphore 1)) + (define-values (peeked-r peeked-w) (make-pipe)) + (define special-peeked null) + (define special-peeked-tail #f) + (define progress-requested? #f) + (define use-manager? #f) + (define manager-th #f) + (define manager-ch (make-channel)) + (define resume-ch (make-channel)) + (define buf (make-bytes 4096)) + (define (try-again) + (wrap-evt + (semaphore-peek-evt lock-semaphore) + (lambda (x) 0))) + (define (suspend-manager) + (channel-put manager-ch 'suspend)) + (define (resume-manager) + (channel-put resume-ch 'resume)) + (define (with-manager-lock thunk) + (thread-resume manager-th (current-thread)) + (dynamic-wind suspend-manager thunk resume-manager)) + (define (make-progress) + (write-byte 0 peeked-w) + (read-byte peeked-r)) + (define (read-it-with-lock s) + (if use-manager? + (with-manager-lock (lambda () (do-read-it s))) + (do-read-it s))) + (define (read-it s) + (call-with-semaphore + lock-semaphore + read-it-with-lock + try-again + s)) + (define (do-read-it s) + (if (char-ready? peeked-r) + (read-bytes-avail!* s peeked-r) + ;; If nothing is saved from a peeking read, + ;; dispatch to `read', otherwise return + ;; previously peeked data (cond - [(and unless-evt (sync/timeout 0 unless-evt)) - #f] - [(null? special-peeked) - ;; Empty special queue, so read through the original proc. - ;; We only only need - ;; (- (+ skip (bytes-length s)) (pipe-content-length peeked-w)) - ;; bytes, but read more (up to size of buf) to help move - ;; things along. - (let* ([r (read buf)]) - (cond - [(number? r) - ;; The nice case --- reading gave us more bytes - (write-bytes buf peeked-w 0 r) - ;; Now try again - (peek-bytes-avail!* s skip #f peeked-r)] - [(evt? r) - (if unless-evt - ;; Technically, there's a race condition here. - ;; We might choose r (and return 0) even when - ;; unless-evt becomes available first. However, - ;; this race is detectable only by the inside - ;; of `read'. - (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) - r)] - [else - (set! special-peeked (cons r null)) - (set! special-peeked-tail special-peeked) - ;; Now try again - (do-peek-it s skip unless-evt)]))] - [else - ;; Non-empty special queue, so try to use it - (let* ([avail (pipe-content-length peeked-r)] - [sk (- skip avail)]) - (let loop ([sk sk] - [l special-peeked]) - (cond - [(null? l) - ;; Not enough even in the special queue. - ;; Read once and add it. - (let* ([t (make-bytes (min 4096 (+ sk (bytes-length s))))] - [r (read t)]) - (cond - [(evt? r) - (if unless-evt - ;; See note above - (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) - r)] - [(eq? r 0) - ;; Original read thinks a spin is ok, - ;; so we return 0 to skin, too. - 0] - [else (let ([v (if (number? r) - (subbytes t 0 r) - r)]) - (let ([pr (cons v null)]) - (set-cdr! special-peeked-tail pr) - (set! special-peeked-tail pr)) - ;; Got something; now try again - (do-peek-it s skip unless-evt))]))] - [(eof-object? (car l)) - ;; No peeking past an EOF - eof] - [(procedure? (car l)) - (if (zero? sk) - (car l) - (loop (sub1 sk) (cdr l)))] - [(bytes? (car l)) - (let ([len (bytes-length (car l))]) - (if (sk . < . len) - (let ([n (min (bytes-length s) - (- len sk))]) - (bytes-copy! s 0 (car l) sk (+ sk n)) - n) - (loop (- sk len) (cdr l))))])))]) - v))) - (define (commit-it-with-lock amt unless-evt done-evt) - (if use-manager? - (with-manager-lock (lambda () (do-commit-it amt unless-evt done-evt))) - (do-commit-it amt unless-evt done-evt))) - (define (commit-it amt unless-evt done-evt) - (call-with-semaphore - lock-semaphore - commit-it-with-lock - #f - amt unless-evt done-evt)) - (define (do-commit-it amt unless-evt done-evt) - (if (sync/timeout 0 unless-evt) - #f - (let* ([avail (pipe-content-length peeked-r)] - [p-commit (min avail amt)]) - (let loop ([amt (- amt p-commit)] - [l special-peeked]) + [(null? special-peeked) + (when progress-requested? (make-progress)) + (read s)] + [else (if (bytes? (car special-peeked)) + (let ([b (car special-peeked)]) + (write-bytes b peeked-w) + (set! special-peeked (cdr special-peeked)) + (when (null? special-peeked) + (set! special-peeked-tail #f)) + (read-bytes-avail!* s peeked-r)) + (begin0 + (car special-peeked) + (make-progress) + (set! special-peeked (cdr special-peeked)) + (when (null? special-peeked) + (set! special-peeked-tail #f))))]))) + (define (peek-it-with-lock s skip unless-evt) + (if use-manager? + (with-manager-lock (lambda () (do-peek-it s skip unless-evt))) + (do-peek-it s skip unless-evt))) + (define (peek-it s skip unless-evt) + (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)]) + (if (eq? v 0) + (call-with-semaphore + lock-semaphore + peek-it-with-lock + try-again + s skip unless-evt) + v))) + (define (do-peek-it s skip unless-evt) + (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)]) + (if (eq? v 0) + ;; The peek may have failed because peeked-r is empty, + ;; because unless-evt is ready, or because the skip is + ;; far. Handle nicely the common case where there are no + ;; specials. (cond - [(amt . <= . 0) - ;; Enough has been peeked. Do commit... - (actual-commit p-commit l unless-evt done-evt)] - [(null? l) - ;; Requested commit was larger than previous peeks + [(and unless-evt (sync/timeout 0 unless-evt)) #f] - [(bytes? (car l)) - (let ([bl (bytes-length (car l))]) - (if (bl . > . amt) - ;; Split the string - (let ([next (cons - (subbytes (car l) amt) - (cdr l))]) - (set-car! l (subbytes (car l) 0 amt)) - (set-cdr! l next) - (when (eq? l special-peeked-tail) - (set! special-peeked-tail next)) - (loop 0 (cdr l))) - ;; Consume this string... - (loop (- amt bl) (cdr l))))] + [(null? special-peeked) + ;; Empty special queue, so read through the original proc. + ;; We only only need + ;; (- (+ skip (bytes-length s)) (pipe-content-length peeked-w)) + ;; bytes, but read more (up to size of buf) to help move + ;; things along. + (let* ([r (read buf)]) + (cond + [(number? r) + ;; The nice case --- reading gave us more bytes + (write-bytes buf peeked-w 0 r) + ;; Now try again + (peek-bytes-avail!* s skip #f peeked-r)] + [(evt? r) + (if unless-evt + ;; Technically, there's a race condition here. + ;; We might choose r (and return 0) even when + ;; unless-evt becomes available first. However, + ;; this race is detectable only by the inside + ;; of `read'. + (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) + r)] + [else + (set! special-peeked (cons r null)) + (set! special-peeked-tail special-peeked) + ;; Now try again + (do-peek-it s skip unless-evt)]))] [else - (loop (sub1 amt) (cdr l))]))))) - (define (actual-commit p-commit l unless-evt done-evt) - ;; The `finish' proc finally, actually, will commit... - (define (finish) - (unless (zero? p-commit) - (peek-byte peeked-r (sub1 p-commit)) - (port-commit-peeked p-commit unless-evt always-evt peeked-r)) - (set! special-peeked l) - (when (null? special-peeked) - (set! special-peeked-tail #f)) - (when (and progress-requested? (zero? p-commit)) - (make-progress)) - #t) - ;; If we can sync done-evt immediately, then finish. - (if (sync/timeout 0 (wrap-evt done-evt (lambda (x) #t))) - (finish) - ;; We need to wait, so we'll have to release the lock. - ;; Send the work to a manager thread. - (let ([result-ch (make-channel)] - [w/manager? use-manager?]) - (if w/manager? - ;; Resume manager if it was running: - (resume-manager) - ;; Start manager if it wasn't running: - (begin - (set! manager-th (thread manage-commits)) - (set! use-manager? #t) - (thread-resume manager-th (current-thread)))) - ;; Sets use-manager? if the manager wasn't already running: - (channel-put manager-ch (list finish unless-evt done-evt result-ch)) - ;; Release locks: - (semaphore-post lock-semaphore) - (begin0 - ;; Wait for manager to complete commit: - (sync result-ch) - ;; Grab locks again, so they're released - ;; properly on exit: - (semaphore-wait lock-semaphore) - (when w/manager? - (suspend-manager)))))) - (define (manage-commits) - (let loop ([commits null]) - (apply - sync - (handle-evt manager-ch - (lambda (c) - (case c - [(suspend) - (channel-get resume-ch) - (loop commits)] - [else - ;; adding a commit - (loop (cons c commits))]))) - (map (lambda (c) - (define (send-result v) - ;; Create a new thread to send the result asynchronously: - (thread-resume - (thread (lambda () - (channel-put (list-ref c 3) v))) - (current-thread)) - (when (null? (cdr commits)) - (set! use-manager? #f)) - (loop (remq c commits))) - ;; Choose between done and unless: - (if (sync/timeout 0 (list-ref c 1)) - (handle-evt always-evt - (lambda (x) - (send-result #f))) - (choice-evt - (handle-evt (list-ref c 1) - (lambda (x) - ;; unless ready, which means that the commit must fail - (send-result #f))) - (handle-evt (list-ref c 2) - (lambda (x) - ;; done-evt ready, which means that the commit - ;; must succeed. - ;; If we get here, then commits are not - ;; suspended, so we implicitly have the - ;; lock. - ((list-ref c 0)) - (send-result #t)))))) - commits)))) - (make-input-port - name - ;; Read - read-it - ;; Peek - (if fast-peek - (let ([fast-peek-k (lambda (s skip) - (peek-it s skip #f))]) - (lambda (s skip unless-evt) - (if (or unless-evt - (char-ready? peeked-r) - (pair? special-peeked)) - (peek-it s skip unless-evt) - (fast-peek s skip fast-peek-k)))) - peek-it) - close - (lambda () - (set! progress-requested? #t) - (port-progress-evt peeked-r)) - commit-it)) + ;; Non-empty special queue, so try to use it + (let* ([avail (pipe-content-length peeked-r)] + [sk (- skip avail)]) + (let loop ([sk sk] + [l special-peeked]) + (cond + [(null? l) + ;; Not enough even in the special queue. + ;; Read once and add it. + (let* ([t (make-bytes (min 4096 (+ sk (bytes-length s))))] + [r (read t)]) + (cond + [(evt? r) + (if unless-evt + ;; See note above + (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) + r)] + [(eq? r 0) + ;; Original read thinks a spin is ok, + ;; so we return 0 to skin, too. + 0] + [else (let ([v (if (number? r) + (subbytes t 0 r) + r)]) + (let ([pr (cons v null)]) + (set-cdr! special-peeked-tail pr) + (set! special-peeked-tail pr)) + ;; Got something; now try again + (do-peek-it s skip unless-evt))]))] + [(eof-object? (car l)) + ;; No peeking past an EOF + eof] + [(procedure? (car l)) + (if (zero? sk) + (car l) + (loop (sub1 sk) (cdr l)))] + [(bytes? (car l)) + (let ([len (bytes-length (car l))]) + (if (sk . < . len) + (let ([n (min (bytes-length s) + (- len sk))]) + (bytes-copy! s 0 (car l) sk (+ sk n)) + n) + (loop (- sk len) (cdr l))))])))]) + v))) + (define (commit-it-with-lock amt unless-evt done-evt) + (if use-manager? + (with-manager-lock (lambda () (do-commit-it amt unless-evt done-evt))) + (do-commit-it amt unless-evt done-evt))) + (define (commit-it amt unless-evt done-evt) + (call-with-semaphore + lock-semaphore + commit-it-with-lock + #f + amt unless-evt done-evt)) + (define (do-commit-it amt unless-evt done-evt) + (if (sync/timeout 0 unless-evt) + #f + (let* ([avail (pipe-content-length peeked-r)] + [p-commit (min avail amt)]) + (let loop ([amt (- amt p-commit)] + [l special-peeked]) + (cond + [(amt . <= . 0) + ;; Enough has been peeked. Do commit... + (actual-commit p-commit l unless-evt done-evt)] + [(null? l) + ;; Requested commit was larger than previous peeks + #f] + [(bytes? (car l)) + (let ([bl (bytes-length (car l))]) + (if (bl . > . amt) + ;; Split the string + (let ([next (cons + (subbytes (car l) amt) + (cdr l))]) + (set-car! l (subbytes (car l) 0 amt)) + (set-cdr! l next) + (when (eq? l special-peeked-tail) + (set! special-peeked-tail next)) + (loop 0 (cdr l))) + ;; Consume this string... + (loop (- amt bl) (cdr l))))] + [else + (loop (sub1 amt) (cdr l))]))))) + (define (actual-commit p-commit l unless-evt done-evt) + ;; The `finish' proc finally, actually, will commit... + (define (finish) + (unless (zero? p-commit) + (peek-byte peeked-r (sub1 p-commit)) + (port-commit-peeked p-commit unless-evt always-evt peeked-r)) + (set! special-peeked l) + (when (null? special-peeked) + (set! special-peeked-tail #f)) + (when (and progress-requested? (zero? p-commit)) + (make-progress)) + #t) + ;; If we can sync done-evt immediately, then finish. + (if (sync/timeout 0 (wrap-evt done-evt (lambda (x) #t))) + (finish) + ;; We need to wait, so we'll have to release the lock. + ;; Send the work to a manager thread. + (let ([result-ch (make-channel)] + [w/manager? use-manager?]) + (if w/manager? + ;; Resume manager if it was running: + (resume-manager) + ;; Start manager if it wasn't running: + (begin + (set! manager-th (thread manage-commits)) + (set! use-manager? #t) + (thread-resume manager-th (current-thread)))) + ;; Sets use-manager? if the manager wasn't already running: + (channel-put manager-ch (list finish unless-evt done-evt result-ch)) + ;; Release locks: + (semaphore-post lock-semaphore) + (begin0 + ;; Wait for manager to complete commit: + (sync result-ch) + ;; Grab locks again, so they're released + ;; properly on exit: + (semaphore-wait lock-semaphore) + (when w/manager? + (suspend-manager)))))) + (define (manage-commits) + (let loop ([commits null]) + (apply + sync + (handle-evt manager-ch + (lambda (c) + (case c + [(suspend) + (channel-get resume-ch) + (loop commits)] + [else + ;; adding a commit + (loop (cons c commits))]))) + (map (lambda (c) + (define (send-result v) + ;; Create a new thread to send the result asynchronously: + (thread-resume + (thread (lambda () + (channel-put (list-ref c 3) v))) + (current-thread)) + (when (null? (cdr commits)) + (set! use-manager? #f)) + (loop (remq c commits))) + ;; Choose between done and unless: + (if (sync/timeout 0 (list-ref c 1)) + (handle-evt always-evt + (lambda (x) + (send-result #f))) + (choice-evt + (handle-evt (list-ref c 1) + (lambda (x) + ;; unless ready, which means that the commit must fail + (send-result #f))) + (handle-evt (list-ref c 2) + (lambda (x) + ;; done-evt ready, which means that the commit + ;; must succeed. + ;; If we get here, then commits are not + ;; suspended, so we implicitly have the + ;; lock. + ((list-ref c 0)) + (send-result #t)))))) + commits)))) + (make-input-port + name + ;; Read + read-it + ;; Peek + (if fast-peek + (let ([fast-peek-k (lambda (s skip) + (peek-it s skip #f))]) + (lambda (s skip unless-evt) + (if (or unless-evt + (char-ready? peeked-r) + (pair? special-peeked)) + (peek-it s skip unless-evt) + (fast-peek s skip fast-peek-k)))) + peek-it) + close + (lambda () + (set! progress-requested? #t) + (port-progress-evt peeked-r)) + commit-it + location-proc + count-lines!-proc + init-position + buffer-mode-proc))) (define peeking-input-port (opt-lambda (orig-in [name (object-name orig-in)] [delta 0]) @@ -1173,7 +1182,9 @@ [buf-start 0] [buf-end 0] [buf-eof? #f] - [buf-eof-result #f]) + [buf-eof-result #f] + [buffer-mode (or (file-stream-buffer-mode port) + 'none)]) ;; Main reader entry: (define (read-it s) (cond @@ -1210,8 +1221,11 @@ (set! buf-end (- buf-end buf-start)) (set! buf-start 0)) (let* ([amt (bytes-length s)] - [c (read-bytes-avail!* buf port buf-end (min (bytes-length buf) - (+ buf-end amt)))]) + [c (read-bytes-avail!* buf port buf-end + (if (eq? buffer-mode 'block) + (bytes-length buf) + (min (bytes-length buf) + (+ buf-end amt))))]) (cond [(or (eof-object? c) (procedure? c)) @@ -1271,23 +1285,25 @@ (lambda () (when close? (close-input-port port)) - (bytes-close-converter c)))))) + (bytes-close-converter c)) + #f void 1 + (case-lambda + [() buffer-mode] + [(mode) (set! buffer-mode mode)]))))) ;; -------------------------------------------------- (define reencode-output-port - (opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)] - [buffer-mode (if (and (output-port? port) - (file-stream-port? port)) - (file-stream-buffer-mode port) - 'block)]) + (opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)]) (let ([c (bytes-open-converter "UTF-8" encoding)] [ready-bytes (make-bytes 1024)] [ready-start 0] [ready-end 0] [out-bytes (make-bytes 1024)] [out-start 0] - [out-end 0]) + [out-end 0] + [buffer-mode (or (file-stream-buffer-mode port) + 'block)]) ;; The main writing entry point: (define (write-it s start end no-buffer&block? enable-break?) @@ -1497,4 +1513,17 @@ (when close? (close-output-port port)) (bytes-close-converter c)) - write-special-it))))) + write-special-it + #f #f + #f void + 1 + (case-lambda + [() buffer-mode] + [(mode) (let ([old buffer-mode]) + (set! buffer-mode mode) + (when (or (and (eq? old 'block) + (memq mode '(none line))) + (and (eq? old 'line) + (memq mode '(none)))) + ;; Flush output + (write-it #"" 0 0 #f #f)))]))))))