original commit: da0210c7bbdd78c09b11695e40b528d36882c6b8
This commit is contained in:
Matthew Flatt 2005-05-06 15:26:41 +00:00
parent be205971e3
commit 6d327e00cb

View File

@ -141,303 +141,312 @@
;; Not kill-safe. ;; Not kill-safe.
;; If the `read' proc returns an event, the event must produce ;; If the `read' proc returns an event, the event must produce
;; 0 always ;; 0 always
(define (make-input-port/read-to-peek name read fast-peek close) (define make-input-port/read-to-peek
(define lock-semaphore (make-semaphore 1)) (opt-lambda (name read fast-peek close
(define commit-semaphore (make-semaphore 1)) [location-proc #f]
(define-values (peeked-r peeked-w) (make-pipe)) [count-lines!-proc void]
(define special-peeked null) [init-position 1]
(define special-peeked-tail #f) [buffer-mode-proc #f])
(define progress-requested? #f) (define lock-semaphore (make-semaphore 1))
(define use-manager? #f) (define commit-semaphore (make-semaphore 1))
(define manager-th #f) (define-values (peeked-r peeked-w) (make-pipe))
(define manager-ch (make-channel)) (define special-peeked null)
(define resume-ch (make-channel)) (define special-peeked-tail #f)
(define buf (make-bytes 4096)) (define progress-requested? #f)
(define (try-again) (define use-manager? #f)
(wrap-evt (define manager-th #f)
(semaphore-peek-evt lock-semaphore) (define manager-ch (make-channel))
(lambda (x) 0))) (define resume-ch (make-channel))
(define (suspend-manager) (define buf (make-bytes 4096))
(channel-put manager-ch 'suspend)) (define (try-again)
(define (resume-manager) (wrap-evt
(channel-put resume-ch 'resume)) (semaphore-peek-evt lock-semaphore)
(define (with-manager-lock thunk) (lambda (x) 0)))
(thread-resume manager-th (current-thread)) (define (suspend-manager)
(dynamic-wind suspend-manager thunk resume-manager)) (channel-put manager-ch 'suspend))
(define (make-progress) (define (resume-manager)
(write-byte 0 peeked-w) (channel-put resume-ch 'resume))
(read-byte peeked-r)) (define (with-manager-lock thunk)
(define (read-it-with-lock s) (thread-resume manager-th (current-thread))
(if use-manager? (dynamic-wind suspend-manager thunk resume-manager))
(with-manager-lock (lambda () (do-read-it s))) (define (make-progress)
(do-read-it s))) (write-byte 0 peeked-w)
(define (read-it s) (read-byte peeked-r))
(call-with-semaphore (define (read-it-with-lock s)
lock-semaphore (if use-manager?
read-it-with-lock (with-manager-lock (lambda () (do-read-it s)))
try-again (do-read-it s)))
s)) (define (read-it s)
(define (do-read-it s) (call-with-semaphore
(if (char-ready? peeked-r) lock-semaphore
(read-bytes-avail!* s peeked-r) read-it-with-lock
;; If nothing is saved from a peeking read, try-again
;; dispatch to `read', otherwise return s))
;; previously peeked data (define (do-read-it s)
(cond (if (char-ready? peeked-r)
[(null? special-peeked) (read-bytes-avail!* s peeked-r)
(when progress-requested? (make-progress)) ;; If nothing is saved from a peeking read,
(read s)] ;; dispatch to `read', otherwise return
[else (if (bytes? (car special-peeked)) ;; previously peeked data
(let ([b (car special-peeked)])
(write-bytes b peeked-w)
(set! special-peeked (cdr special-peeked))
(when (null? special-peeked)
(set! special-peeked-tail #f))
(read-bytes-avail!* s peeked-r))
(begin0
(car special-peeked)
(make-progress)
(set! special-peeked (cdr special-peeked))
(when (null? special-peeked)
(set! special-peeked-tail #f))))])))
(define (peek-it-with-lock s skip unless-evt)
(if use-manager?
(with-manager-lock (lambda () (do-peek-it s skip unless-evt)))
(do-peek-it s skip unless-evt)))
(define (peek-it s skip unless-evt)
(let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)])
(if (eq? v 0)
(call-with-semaphore
lock-semaphore
peek-it-with-lock
try-again
s skip unless-evt)
v)))
(define (do-peek-it s skip unless-evt)
(let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)])
(if (eq? v 0)
;; The peek may have failed because peeked-r is empty,
;; because unless-evt is ready, or because the skip is
;; far. Handle nicely the common case where there are no
;; specials.
(cond (cond
[(and unless-evt (sync/timeout 0 unless-evt)) [(null? special-peeked)
#f] (when progress-requested? (make-progress))
[(null? special-peeked) (read s)]
;; Empty special queue, so read through the original proc. [else (if (bytes? (car special-peeked))
;; We only only need (let ([b (car special-peeked)])
;; (- (+ skip (bytes-length s)) (pipe-content-length peeked-w)) (write-bytes b peeked-w)
;; bytes, but read more (up to size of buf) to help move (set! special-peeked (cdr special-peeked))
;; things along. (when (null? special-peeked)
(let* ([r (read buf)]) (set! special-peeked-tail #f))
(cond (read-bytes-avail!* s peeked-r))
[(number? r) (begin0
;; The nice case --- reading gave us more bytes (car special-peeked)
(write-bytes buf peeked-w 0 r) (make-progress)
;; Now try again (set! special-peeked (cdr special-peeked))
(peek-bytes-avail!* s skip #f peeked-r)] (when (null? special-peeked)
[(evt? r) (set! special-peeked-tail #f))))])))
(if unless-evt (define (peek-it-with-lock s skip unless-evt)
;; Technically, there's a race condition here. (if use-manager?
;; We might choose r (and return 0) even when (with-manager-lock (lambda () (do-peek-it s skip unless-evt)))
;; unless-evt becomes available first. However, (do-peek-it s skip unless-evt)))
;; this race is detectable only by the inside (define (peek-it s skip unless-evt)
;; of `read'. (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)])
(choice-evt r (wrap-evt unless-evt (lambda (x) #f))) (if (eq? v 0)
r)] (call-with-semaphore
[else lock-semaphore
(set! special-peeked (cons r null)) peek-it-with-lock
(set! special-peeked-tail special-peeked) try-again
;; Now try again s skip unless-evt)
(do-peek-it s skip unless-evt)]))] v)))
[else (define (do-peek-it s skip unless-evt)
;; Non-empty special queue, so try to use it (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)])
(let* ([avail (pipe-content-length peeked-r)] (if (eq? v 0)
[sk (- skip avail)]) ;; The peek may have failed because peeked-r is empty,
(let loop ([sk sk] ;; because unless-evt is ready, or because the skip is
[l special-peeked]) ;; far. Handle nicely the common case where there are no
(cond ;; specials.
[(null? l)
;; Not enough even in the special queue.
;; Read once and add it.
(let* ([t (make-bytes (min 4096 (+ sk (bytes-length s))))]
[r (read t)])
(cond
[(evt? r)
(if unless-evt
;; See note above
(choice-evt r (wrap-evt unless-evt (lambda (x) #f)))
r)]
[(eq? r 0)
;; Original read thinks a spin is ok,
;; so we return 0 to skin, too.
0]
[else (let ([v (if (number? r)
(subbytes t 0 r)
r)])
(let ([pr (cons v null)])
(set-cdr! special-peeked-tail pr)
(set! special-peeked-tail pr))
;; Got something; now try again
(do-peek-it s skip unless-evt))]))]
[(eof-object? (car l))
;; No peeking past an EOF
eof]
[(procedure? (car l))
(if (zero? sk)
(car l)
(loop (sub1 sk) (cdr l)))]
[(bytes? (car l))
(let ([len (bytes-length (car l))])
(if (sk . < . len)
(let ([n (min (bytes-length s)
(- len sk))])
(bytes-copy! s 0 (car l) sk (+ sk n))
n)
(loop (- sk len) (cdr l))))])))])
v)))
(define (commit-it-with-lock amt unless-evt done-evt)
(if use-manager?
(with-manager-lock (lambda () (do-commit-it amt unless-evt done-evt)))
(do-commit-it amt unless-evt done-evt)))
(define (commit-it amt unless-evt done-evt)
(call-with-semaphore
lock-semaphore
commit-it-with-lock
#f
amt unless-evt done-evt))
(define (do-commit-it amt unless-evt done-evt)
(if (sync/timeout 0 unless-evt)
#f
(let* ([avail (pipe-content-length peeked-r)]
[p-commit (min avail amt)])
(let loop ([amt (- amt p-commit)]
[l special-peeked])
(cond (cond
[(amt . <= . 0) [(and unless-evt (sync/timeout 0 unless-evt))
;; Enough has been peeked. Do commit...
(actual-commit p-commit l unless-evt done-evt)]
[(null? l)
;; Requested commit was larger than previous peeks
#f] #f]
[(bytes? (car l)) [(null? special-peeked)
(let ([bl (bytes-length (car l))]) ;; Empty special queue, so read through the original proc.
(if (bl . > . amt) ;; We only only need
;; Split the string ;; (- (+ skip (bytes-length s)) (pipe-content-length peeked-w))
(let ([next (cons ;; bytes, but read more (up to size of buf) to help move
(subbytes (car l) amt) ;; things along.
(cdr l))]) (let* ([r (read buf)])
(set-car! l (subbytes (car l) 0 amt)) (cond
(set-cdr! l next) [(number? r)
(when (eq? l special-peeked-tail) ;; The nice case --- reading gave us more bytes
(set! special-peeked-tail next)) (write-bytes buf peeked-w 0 r)
(loop 0 (cdr l))) ;; Now try again
;; Consume this string... (peek-bytes-avail!* s skip #f peeked-r)]
(loop (- amt bl) (cdr l))))] [(evt? r)
(if unless-evt
;; Technically, there's a race condition here.
;; We might choose r (and return 0) even when
;; unless-evt becomes available first. However,
;; this race is detectable only by the inside
;; of `read'.
(choice-evt r (wrap-evt unless-evt (lambda (x) #f)))
r)]
[else
(set! special-peeked (cons r null))
(set! special-peeked-tail special-peeked)
;; Now try again
(do-peek-it s skip unless-evt)]))]
[else [else
(loop (sub1 amt) (cdr l))]))))) ;; Non-empty special queue, so try to use it
(define (actual-commit p-commit l unless-evt done-evt) (let* ([avail (pipe-content-length peeked-r)]
;; The `finish' proc finally, actually, will commit... [sk (- skip avail)])
(define (finish) (let loop ([sk sk]
(unless (zero? p-commit) [l special-peeked])
(peek-byte peeked-r (sub1 p-commit)) (cond
(port-commit-peeked p-commit unless-evt always-evt peeked-r)) [(null? l)
(set! special-peeked l) ;; Not enough even in the special queue.
(when (null? special-peeked) ;; Read once and add it.
(set! special-peeked-tail #f)) (let* ([t (make-bytes (min 4096 (+ sk (bytes-length s))))]
(when (and progress-requested? (zero? p-commit)) [r (read t)])
(make-progress)) (cond
#t) [(evt? r)
;; If we can sync done-evt immediately, then finish. (if unless-evt
(if (sync/timeout 0 (wrap-evt done-evt (lambda (x) #t))) ;; See note above
(finish) (choice-evt r (wrap-evt unless-evt (lambda (x) #f)))
;; We need to wait, so we'll have to release the lock. r)]
;; Send the work to a manager thread. [(eq? r 0)
(let ([result-ch (make-channel)] ;; Original read thinks a spin is ok,
[w/manager? use-manager?]) ;; so we return 0 to skin, too.
(if w/manager? 0]
;; Resume manager if it was running: [else (let ([v (if (number? r)
(resume-manager) (subbytes t 0 r)
;; Start manager if it wasn't running: r)])
(begin (let ([pr (cons v null)])
(set! manager-th (thread manage-commits)) (set-cdr! special-peeked-tail pr)
(set! use-manager? #t) (set! special-peeked-tail pr))
(thread-resume manager-th (current-thread)))) ;; Got something; now try again
;; Sets use-manager? if the manager wasn't already running: (do-peek-it s skip unless-evt))]))]
(channel-put manager-ch (list finish unless-evt done-evt result-ch)) [(eof-object? (car l))
;; Release locks: ;; No peeking past an EOF
(semaphore-post lock-semaphore) eof]
(begin0 [(procedure? (car l))
;; Wait for manager to complete commit: (if (zero? sk)
(sync result-ch) (car l)
;; Grab locks again, so they're released (loop (sub1 sk) (cdr l)))]
;; properly on exit: [(bytes? (car l))
(semaphore-wait lock-semaphore) (let ([len (bytes-length (car l))])
(when w/manager? (if (sk . < . len)
(suspend-manager)))))) (let ([n (min (bytes-length s)
(define (manage-commits) (- len sk))])
(let loop ([commits null]) (bytes-copy! s 0 (car l) sk (+ sk n))
(apply n)
sync (loop (- sk len) (cdr l))))])))])
(handle-evt manager-ch v)))
(lambda (c) (define (commit-it-with-lock amt unless-evt done-evt)
(case c (if use-manager?
[(suspend) (with-manager-lock (lambda () (do-commit-it amt unless-evt done-evt)))
(channel-get resume-ch) (do-commit-it amt unless-evt done-evt)))
(loop commits)] (define (commit-it amt unless-evt done-evt)
[else (call-with-semaphore
;; adding a commit lock-semaphore
(loop (cons c commits))]))) commit-it-with-lock
(map (lambda (c) #f
(define (send-result v) amt unless-evt done-evt))
;; Create a new thread to send the result asynchronously: (define (do-commit-it amt unless-evt done-evt)
(thread-resume (if (sync/timeout 0 unless-evt)
(thread (lambda () #f
(channel-put (list-ref c 3) v))) (let* ([avail (pipe-content-length peeked-r)]
(current-thread)) [p-commit (min avail amt)])
(when (null? (cdr commits)) (let loop ([amt (- amt p-commit)]
(set! use-manager? #f)) [l special-peeked])
(loop (remq c commits))) (cond
;; Choose between done and unless: [(amt . <= . 0)
(if (sync/timeout 0 (list-ref c 1)) ;; Enough has been peeked. Do commit...
(handle-evt always-evt (actual-commit p-commit l unless-evt done-evt)]
(lambda (x) [(null? l)
(send-result #f))) ;; Requested commit was larger than previous peeks
(choice-evt #f]
(handle-evt (list-ref c 1) [(bytes? (car l))
(lambda (x) (let ([bl (bytes-length (car l))])
;; unless ready, which means that the commit must fail (if (bl . > . amt)
(send-result #f))) ;; Split the string
(handle-evt (list-ref c 2) (let ([next (cons
(lambda (x) (subbytes (car l) amt)
;; done-evt ready, which means that the commit (cdr l))])
;; must succeed. (set-car! l (subbytes (car l) 0 amt))
;; If we get here, then commits are not (set-cdr! l next)
;; suspended, so we implicitly have the (when (eq? l special-peeked-tail)
;; lock. (set! special-peeked-tail next))
((list-ref c 0)) (loop 0 (cdr l)))
(send-result #t)))))) ;; Consume this string...
commits)))) (loop (- amt bl) (cdr l))))]
(make-input-port [else
name (loop (sub1 amt) (cdr l))])))))
;; Read (define (actual-commit p-commit l unless-evt done-evt)
read-it ;; The `finish' proc finally, actually, will commit...
;; Peek (define (finish)
(if fast-peek (unless (zero? p-commit)
(let ([fast-peek-k (lambda (s skip) (peek-byte peeked-r (sub1 p-commit))
(peek-it s skip #f))]) (port-commit-peeked p-commit unless-evt always-evt peeked-r))
(lambda (s skip unless-evt) (set! special-peeked l)
(if (or unless-evt (when (null? special-peeked)
(char-ready? peeked-r) (set! special-peeked-tail #f))
(pair? special-peeked)) (when (and progress-requested? (zero? p-commit))
(peek-it s skip unless-evt) (make-progress))
(fast-peek s skip fast-peek-k)))) #t)
peek-it) ;; If we can sync done-evt immediately, then finish.
close (if (sync/timeout 0 (wrap-evt done-evt (lambda (x) #t)))
(lambda () (finish)
(set! progress-requested? #t) ;; We need to wait, so we'll have to release the lock.
(port-progress-evt peeked-r)) ;; Send the work to a manager thread.
commit-it)) (let ([result-ch (make-channel)]
[w/manager? use-manager?])
(if w/manager?
;; Resume manager if it was running:
(resume-manager)
;; Start manager if it wasn't running:
(begin
(set! manager-th (thread manage-commits))
(set! use-manager? #t)
(thread-resume manager-th (current-thread))))
;; Sets use-manager? if the manager wasn't already running:
(channel-put manager-ch (list finish unless-evt done-evt result-ch))
;; Release locks:
(semaphore-post lock-semaphore)
(begin0
;; Wait for manager to complete commit:
(sync result-ch)
;; Grab locks again, so they're released
;; properly on exit:
(semaphore-wait lock-semaphore)
(when w/manager?
(suspend-manager))))))
(define (manage-commits)
(let loop ([commits null])
(apply
sync
(handle-evt manager-ch
(lambda (c)
(case c
[(suspend)
(channel-get resume-ch)
(loop commits)]
[else
;; adding a commit
(loop (cons c commits))])))
(map (lambda (c)
(define (send-result v)
;; Create a new thread to send the result asynchronously:
(thread-resume
(thread (lambda ()
(channel-put (list-ref c 3) v)))
(current-thread))
(when (null? (cdr commits))
(set! use-manager? #f))
(loop (remq c commits)))
;; Choose between done and unless:
(if (sync/timeout 0 (list-ref c 1))
(handle-evt always-evt
(lambda (x)
(send-result #f)))
(choice-evt
(handle-evt (list-ref c 1)
(lambda (x)
;; unless ready, which means that the commit must fail
(send-result #f)))
(handle-evt (list-ref c 2)
(lambda (x)
;; done-evt ready, which means that the commit
;; must succeed.
;; If we get here, then commits are not
;; suspended, so we implicitly have the
;; lock.
((list-ref c 0))
(send-result #t))))))
commits))))
(make-input-port
name
;; Read
read-it
;; Peek
(if fast-peek
(let ([fast-peek-k (lambda (s skip)
(peek-it s skip #f))])
(lambda (s skip unless-evt)
(if (or unless-evt
(char-ready? peeked-r)
(pair? special-peeked))
(peek-it s skip unless-evt)
(fast-peek s skip fast-peek-k))))
peek-it)
close
(lambda ()
(set! progress-requested? #t)
(port-progress-evt peeked-r))
commit-it
location-proc
count-lines!-proc
init-position
buffer-mode-proc)))
(define peeking-input-port (define peeking-input-port
(opt-lambda (orig-in [name (object-name orig-in)] [delta 0]) (opt-lambda (orig-in [name (object-name orig-in)] [delta 0])
@ -1173,7 +1182,9 @@
[buf-start 0] [buf-start 0]
[buf-end 0] [buf-end 0]
[buf-eof? #f] [buf-eof? #f]
[buf-eof-result #f]) [buf-eof-result #f]
[buffer-mode (or (file-stream-buffer-mode port)
'none)])
;; Main reader entry: ;; Main reader entry:
(define (read-it s) (define (read-it s)
(cond (cond
@ -1210,8 +1221,11 @@
(set! buf-end (- buf-end buf-start)) (set! buf-end (- buf-end buf-start))
(set! buf-start 0)) (set! buf-start 0))
(let* ([amt (bytes-length s)] (let* ([amt (bytes-length s)]
[c (read-bytes-avail!* buf port buf-end (min (bytes-length buf) [c (read-bytes-avail!* buf port buf-end
(+ buf-end amt)))]) (if (eq? buffer-mode 'block)
(bytes-length buf)
(min (bytes-length buf)
(+ buf-end amt))))])
(cond (cond
[(or (eof-object? c) [(or (eof-object? c)
(procedure? c)) (procedure? c))
@ -1271,23 +1285,25 @@
(lambda () (lambda ()
(when close? (when close?
(close-input-port port)) (close-input-port port))
(bytes-close-converter c)))))) (bytes-close-converter c))
#f void 1
(case-lambda
[() buffer-mode]
[(mode) (set! buffer-mode mode)])))))
;; -------------------------------------------------- ;; --------------------------------------------------
(define reencode-output-port (define reencode-output-port
(opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)] (opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)])
[buffer-mode (if (and (output-port? port)
(file-stream-port? port))
(file-stream-buffer-mode port)
'block)])
(let ([c (bytes-open-converter "UTF-8" encoding)] (let ([c (bytes-open-converter "UTF-8" encoding)]
[ready-bytes (make-bytes 1024)] [ready-bytes (make-bytes 1024)]
[ready-start 0] [ready-start 0]
[ready-end 0] [ready-end 0]
[out-bytes (make-bytes 1024)] [out-bytes (make-bytes 1024)]
[out-start 0] [out-start 0]
[out-end 0]) [out-end 0]
[buffer-mode (or (file-stream-buffer-mode port)
'block)])
;; The main writing entry point: ;; The main writing entry point:
(define (write-it s start end no-buffer&block? enable-break?) (define (write-it s start end no-buffer&block? enable-break?)
@ -1497,4 +1513,17 @@
(when close? (when close?
(close-output-port port)) (close-output-port port))
(bytes-close-converter c)) (bytes-close-converter c))
write-special-it))))) write-special-it
#f #f
#f void
1
(case-lambda
[() buffer-mode]
[(mode) (let ([old buffer-mode])
(set! buffer-mode mode)
(when (or (and (eq? old 'block)
(memq mode '(none line)))
(and (eq? old 'line)
(memq mode '(none))))
;; Flush output
(write-it #"" 0 0 #f #f)))]))))))