original commit: f20d6adba440da05f369c8eb5bc5d87a89871439
This commit is contained in:
Matthew Flatt 2004-10-21 02:45:15 +00:00
parent da1dca04d3
commit cd545366d4

View File

@ -136,7 +136,6 @@
(define lock-semaphore (make-semaphore 1))
(define commit-semaphore (make-semaphore 1))
(define-values (peeked-r peeked-w) (make-pipe))
(define peeked-end 0)
(define special-peeked null)
(define special-peeked-tail #f)
(define progress-requested? #f)
@ -154,8 +153,7 @@
(lambda (x) 0)))
(define (make-progress)
(write-byte 0 peeked-w)
(read-byte peeked-r)
(set! peeked-end (add1 peeked-end)))
(read-byte peeked-r))
(define (read-it s)
(call-with-semaphore
lock-semaphore
@ -177,7 +175,6 @@
(read s)]
[else (if (bytes? (car special-peeked))
(let ([b (car special-peeked)])
(set! peeked-end (+ peeked-end (bytes-length b)))
(write-bytes b peeked-w)
(set! special-peeked (cdr special-peeked))
(when (null? special-peeked)
@ -215,7 +212,6 @@
(cond
[(number? r)
;; The nice case --- reading gave us more bytes
(set! peeked-end (+ r peeked-end))
(write-bytes t peeked-w 0 r)
;; Now try again
(peek-bytes-avail!* s skip #f peeked-r)]
@ -235,8 +231,7 @@
(do-peek-it s skip unless-evt)]))]
[else
;; Non-empty special queue, so try to use it
(let* ([pos (file-position peeked-r)]
[avail (- peeked-end pos)]
(let* ([avail (pipe-content-length peeked-r)]
[sk (- skip avail)])
(let loop ([sk sk]
[l special-peeked])
@ -291,8 +286,7 @@
(define (do-commit-it amt unless-evt done-evt)
(if (sync/timeout 0 unless-evt)
#f
(let* ([pos (file-position peeked-r)]
[avail (- peeked-end pos)]
(let* ([avail (pipe-content-length peeked-r)]
[p-commit (min avail amt)])
(let loop ([amt (- amt p-commit)]
[l special-peeked])
@ -408,13 +402,21 @@
;; Not kill-safe.
(define make-pipe-with-specials
;; This implementation of pipes is almost CML-style, with a manager thread
;; to guard access to the pipe content. But we only enable the manager
;; thread when write evts are active; otherwise, we use a lock semaphore.
;; (Actually, the lock semaphore has to be used all the time, to guard
;; the flag indicating whether the manager thread is running.)
(opt-lambda ([limit (expt 2 64)] [in-name 'pipe] [out-name 'pipe])
(let-values ([(r w) (make-pipe limit)]
[(more) null]
[(more-last) #f]
[(more-sema) #f]
[(close-w?) #f]
[(sema-semaphore) (make-semaphore 1)])
[(lock-semaphore) (make-semaphore 1)]
[(mgr-th) #f]
[(via-manager?) #f]
[(mgr-ch) (make-channel)])
(define (flush-more)
(if (null? more)
(begin
@ -422,76 +424,246 @@
(when close-w?
(close-output-port w)))
(when (bytes? (car more))
(write-bytes (car more) w)
(set! more (cdr more))
(flush-more))))
(values
(make-input-port/read-to-peek
in-name
(lambda (s)
(let ([v (read-bytes-avail!* s r)])
(if (eq? v 0)
(if more-last
;; Return a special
(let ([a (car more)])
(set! more (cdr more))
(flush-more)
(lambda (file line col ppos)
a))
;; Nothing available, yet.
(call-with-semaphore
sema-semaphore
(lambda ()
(unless more-sema
(set! more-sema (make-semaphore)))
(wrap-evt (semaphore-peek-evt more-sema)
(lambda (x) 0)))))
v)))
#f
void)
(make-output-port
out-name
w
;; write
(lambda (str start end buffer? w/break?)
(if (= start end)
#t
(call-with-semaphore
sema-semaphore
(lambda ()
(begin0
(if more-last
(let ([p (cons (subbytes str start end) null)])
(set-cdr! more-last p)
(set! more-last p)
(- end start))
(let ([v (write-bytes-avail* str w start end)])
(if (zero? v)
(wrap-evt w (lambda (x) #f))
v)))
(when more-sema
(semaphore-post more-sema)
(set! more-sema #f)))))))
;; close
(lambda ()
(call-with-semaphore
sema-semaphore
(lambda ()
(if more-last
(set! close-w? #t)
(close-output-port w))
(when more-sema
(semaphore-post more-sema)))))
;; write-special
(lambda (v buffer? w/break?)
(call-with-semaphore
sema-semaphore
(lambda ()
(let ([p (cons v null)])
(if more-last
(set-cdr! more-last p)
(set! more p))
(set! more-last p))))))))))
(let ([amt (bytes-length (car more))])
(let ([wrote (write-bytes-avail* (car more) w)])
(if (= wrote amt)
(begin
(set! more (cdr more))
(flush-more))
(begin
;; This means that we let too many bytes
;; get written while a special was pending.
;; Too bad...
(set-car! more (subbytes (car more) wrote))
;; By peeking, make room for more:
(peek-byte r (sub1 (min (pipe-content-length w)
(- amt wrote))))
(flush-more))))))))
(define (read-one s)
(let ([v (read-bytes-avail!* s r)])
(if (eq? v 0)
(if more-last
;; Return a special
(let ([a (car more)])
(set! more (cdr more))
(flush-more)
(lambda (file line col ppos)
a))
;; Nothing available, yet.
(begin
(unless more-sema
(set! more-sema (make-semaphore)))
(wrap-evt (semaphore-peek-evt more-sema)
(lambda (x) 0))))
v)))
(define (close-it)
(set! close-w? #t)
(unless more-last
(close-output-port w))
(when more-sema
(semaphore-post more-sema)))
(define (write-these-bytes str start end)
(begin0
(if more-last
(let ([p (cons (subbytes str start end) null)])
(set-cdr! more-last p)
(set! more-last p)
(- end start))
(let ([v (write-bytes-avail* str w start end)])
(if (zero? v)
(wrap-evt w (lambda (x) #f))
v)))
(when more-sema
(semaphore-post more-sema)
(set! more-sema #f))))
(define (write-spec v)
(let ([p (cons v null)])
(if more-last
(set-cdr! more-last p)
(set! more p))
(set! more-last p)
(when more-sema
(semaphore-post more-sema)
(set! more-sema #f))))
(define (serve)
;; A request is
;; (list sym result-ch nack-evt . v)
;; where `v' varies for different `sym's
;; The possible syms are: read, reply, close,
;; write, write-spec, write-evt, write-spec-evt
(let loop ([reqs null])
(apply
sync
;; Listen for a request:
(handle-evt mgr-ch
(lambda (req)
(let ([req
;; Most requests we handle immediately and
;; convert to a reply. The manager thread
;; implicitly has the lock.
(let ([reply (lambda (v)
(list 'reply (cadr req) (caddr req) v))])
(case (car req)
[(read)
(printf "read~n")
(reply (read-one (cadddr req)))]
[(close)
(reply (close-it))]
[(write)
(reply (apply write-these-bytes (cdddr req)))]
[(write-spec)
(reply (write-spec (cadddr req)))]
[else req]))])
(loop (cons req reqs)))))
(if (and (null? reqs)
via-manager?)
;; If we can get the lock before another request
;; turn off manager mode:
(handle-evt lock-semaphore
(lambda (x)
(set! via-manager? #f)
(semaphore-post lock-semaphore)
(loop null)))
never-evt)
(append
(map (lambda (req)
(case (car req)
[(reply) (handle-evt (channel-put-evt (cadr req)
(cadddr req))
(lambda (x)
(loop (remq req reqs))))]
[(write-spec-evt) (if close-w?
;; Report close error:
(handle-evt (channel-put-evt (cadr req) 'closed)
(lambda (x)
(loop (remq req reqs))))
;; Try to write special:
(handle-evt (channel-put-evt (cadr req) #t)
(lambda (x)
;; We sync'd, so now we *must* write
(write-spec (cadddr req))
(loop (remq req reqs)))))]
[(write-evt) (if close-w?
;; Report close error:
(handle-evt (channel-put-evt (cadr req) 'closed)
(lambda (x)
(loop (remq req reqs))))
;; Try to write bytes:
(let* ([start (list-ref req 4)]
[end (list-ref req 5)]
[len (if more-last
(- end start)
(min (- end start)
(max 0
(- limit (pipe-content-length w)))))])
(if (zero? len)
(handle-evt w (lambda (x) (loop reqs)))
(handle-evt (channel-put-evt (cadr req) len)
(lambda (x)
;; We sync'd, so now we *must* write
(write-these-bytes (cadddr req) start (+ start len))
(loop (remq req reqs)))))))]))
reqs)
;; nack => remove request (could be anything)
(map (lambda (req)
(handle-evt (caddr req)
(lambda (x)
(loop (remq req reqs)))))
reqs)))))
(define (via-manager what req-sfx)
(thread-resume mgr-th (current-thread))
(let ([ch (make-channel)])
(sync (nack-guard-evt
(lambda (nack)
(channel-put mgr-ch (list* what ch nack req-sfx))
ch)))))
(define (start-mgr)
(unless mgr-th
(set! mgr-th (thread serve)))
(set! via-manager? #t))
(define (evt what req-sfx)
(nack-guard-evt
(lambda (nack)
(resume-mgr)
(let ([ch (make-channel)])
(call-with-semaphore
lock-semaphore
(lambda ()
(unless via-manager?
(set! mgr-th (thread serve)))
(set! via-manager? #t)
(thread-resume mgr-th (current-thread))
(channel-put mgr-ch (list* what ch nack req-sfx))
(wrap-evt ch (lambda (x)
(if (eq? x 'close)
(raise-mismatch-error 'write-evt "port is closed: " out)
x)))))))))
(define (resume-mgr)
(when mgr-th
(thread-resume mgr-th (current-thread))))
(define in
;; ----- Input ------
(make-input-port/read-to-peek
in-name
(lambda (s)
(let ([v (read-bytes-avail!* s r)])
(if (eq? v 0)
(begin
(resume-mgr)
(call-with-semaphore
lock-semaphore
(lambda ()
(if via-manager?
(via-manager 'read (list s))
(read-one s)))))
v)))
#f
void))
(define out
;; ----- Output ------
(make-output-port
out-name
w
;; write
(lambda (str start end buffer? w/break?)
(if (= start end)
#t
(begin
(resume-mgr)
(call-with-semaphore
lock-semaphore
(lambda ()
(if via-manager?
(via-manager 'write (list str start end))
(write-these-bytes str start end)))))))
;; close
(lambda ()
(resume-mgr)
(call-with-semaphore
lock-semaphore
(lambda ()
(if via-manager?
(via-manager 'close null)
(close-it)))))
;; write-special
(lambda (v buffer? w/break?)
(resume-mgr)
(call-with-semaphore
lock-semaphore
(lambda ()
(if via-manager?
(via-manager 'write-spec (list v))
(write-spec v)))))
;; write-evt
(lambda (str start end)
(if (= start end)
(wrap-evt always-evt (lambda (x) 0))
(evt 'write-evt (list str start end))))
;; write-special-evt
(lambda (v)
(evt 'write-spec-evt (list v)))))
(values in out))))
(define input-port-append
(opt-lambda (close-orig? . ports)