diff --git a/collects/mzlib/port.ss b/collects/mzlib/port.ss index 775d152..7b29341 100644 --- a/collects/mzlib/port.ss +++ b/collects/mzlib/port.ss @@ -136,7 +136,6 @@ (define lock-semaphore (make-semaphore 1)) (define commit-semaphore (make-semaphore 1)) (define-values (peeked-r peeked-w) (make-pipe)) - (define peeked-end 0) (define special-peeked null) (define special-peeked-tail #f) (define progress-requested? #f) @@ -154,8 +153,7 @@ (lambda (x) 0))) (define (make-progress) (write-byte 0 peeked-w) - (read-byte peeked-r) - (set! peeked-end (add1 peeked-end))) + (read-byte peeked-r)) (define (read-it s) (call-with-semaphore lock-semaphore @@ -177,7 +175,6 @@ (read s)] [else (if (bytes? (car special-peeked)) (let ([b (car special-peeked)]) - (set! peeked-end (+ peeked-end (bytes-length b))) (write-bytes b peeked-w) (set! special-peeked (cdr special-peeked)) (when (null? special-peeked) @@ -215,7 +212,6 @@ (cond [(number? r) ;; The nice case --- reading gave us more bytes - (set! peeked-end (+ r peeked-end)) (write-bytes t peeked-w 0 r) ;; Now try again (peek-bytes-avail!* s skip #f peeked-r)] @@ -235,8 +231,7 @@ (do-peek-it s skip unless-evt)]))] [else ;; Non-empty special queue, so try to use it - (let* ([pos (file-position peeked-r)] - [avail (- peeked-end pos)] + (let* ([avail (pipe-content-length peeked-r)] [sk (- skip avail)]) (let loop ([sk sk] [l special-peeked]) @@ -291,8 +286,7 @@ (define (do-commit-it amt unless-evt done-evt) (if (sync/timeout 0 unless-evt) #f - (let* ([pos (file-position peeked-r)] - [avail (- peeked-end pos)] + (let* ([avail (pipe-content-length peeked-r)] [p-commit (min avail amt)]) (let loop ([amt (- amt p-commit)] [l special-peeked]) @@ -408,13 +402,21 @@ ;; Not kill-safe. (define make-pipe-with-specials + ;; This implementation of pipes is almost CML-style, with a manager thread + ;; to guard access to the pipe content. But we only enable the manager + ;; thread when write evts are active; otherwise, we use a lock semaphore. + ;; (Actually, the lock semaphore has to be used all the time, to guard + ;; the flag indicating whether the manager thread is running.) (opt-lambda ([limit (expt 2 64)] [in-name 'pipe] [out-name 'pipe]) (let-values ([(r w) (make-pipe limit)] [(more) null] [(more-last) #f] [(more-sema) #f] [(close-w?) #f] - [(sema-semaphore) (make-semaphore 1)]) + [(lock-semaphore) (make-semaphore 1)] + [(mgr-th) #f] + [(via-manager?) #f] + [(mgr-ch) (make-channel)]) (define (flush-more) (if (null? more) (begin @@ -422,76 +424,246 @@ (when close-w? (close-output-port w))) (when (bytes? (car more)) - (write-bytes (car more) w) - (set! more (cdr more)) - (flush-more)))) - (values - (make-input-port/read-to-peek - in-name - (lambda (s) - (let ([v (read-bytes-avail!* s r)]) - (if (eq? v 0) - (if more-last - ;; Return a special - (let ([a (car more)]) - (set! more (cdr more)) - (flush-more) - (lambda (file line col ppos) - a)) - ;; Nothing available, yet. - (call-with-semaphore - sema-semaphore - (lambda () - (unless more-sema - (set! more-sema (make-semaphore))) - (wrap-evt (semaphore-peek-evt more-sema) - (lambda (x) 0))))) - v))) - #f - void) - (make-output-port - out-name - w - ;; write - (lambda (str start end buffer? w/break?) - (if (= start end) - #t - (call-with-semaphore - sema-semaphore - (lambda () - (begin0 - (if more-last - (let ([p (cons (subbytes str start end) null)]) - (set-cdr! more-last p) - (set! more-last p) - (- end start)) - (let ([v (write-bytes-avail* str w start end)]) - (if (zero? v) - (wrap-evt w (lambda (x) #f)) - v))) - (when more-sema - (semaphore-post more-sema) - (set! more-sema #f))))))) - ;; close - (lambda () - (call-with-semaphore - sema-semaphore - (lambda () - (if more-last - (set! close-w? #t) - (close-output-port w)) - (when more-sema - (semaphore-post more-sema))))) - ;; write-special - (lambda (v buffer? w/break?) - (call-with-semaphore - sema-semaphore - (lambda () - (let ([p (cons v null)]) - (if more-last - (set-cdr! more-last p) - (set! more p)) - (set! more-last p)))))))))) + (let ([amt (bytes-length (car more))]) + (let ([wrote (write-bytes-avail* (car more) w)]) + (if (= wrote amt) + (begin + (set! more (cdr more)) + (flush-more)) + (begin + ;; This means that we let too many bytes + ;; get written while a special was pending. + ;; Too bad... + (set-car! more (subbytes (car more) wrote)) + ;; By peeking, make room for more: + (peek-byte r (sub1 (min (pipe-content-length w) + (- amt wrote)))) + (flush-more)))))))) + (define (read-one s) + (let ([v (read-bytes-avail!* s r)]) + (if (eq? v 0) + (if more-last + ;; Return a special + (let ([a (car more)]) + (set! more (cdr more)) + (flush-more) + (lambda (file line col ppos) + a)) + ;; Nothing available, yet. + (begin + (unless more-sema + (set! more-sema (make-semaphore))) + (wrap-evt (semaphore-peek-evt more-sema) + (lambda (x) 0)))) + v))) + (define (close-it) + (set! close-w? #t) + (unless more-last + (close-output-port w)) + (when more-sema + (semaphore-post more-sema))) + (define (write-these-bytes str start end) + (begin0 + (if more-last + (let ([p (cons (subbytes str start end) null)]) + (set-cdr! more-last p) + (set! more-last p) + (- end start)) + (let ([v (write-bytes-avail* str w start end)]) + (if (zero? v) + (wrap-evt w (lambda (x) #f)) + v))) + (when more-sema + (semaphore-post more-sema) + (set! more-sema #f)))) + (define (write-spec v) + (let ([p (cons v null)]) + (if more-last + (set-cdr! more-last p) + (set! more p)) + (set! more-last p) + (when more-sema + (semaphore-post more-sema) + (set! more-sema #f)))) + (define (serve) + ;; A request is + ;; (list sym result-ch nack-evt . v) + ;; where `v' varies for different `sym's + ;; The possible syms are: read, reply, close, + ;; write, write-spec, write-evt, write-spec-evt + (let loop ([reqs null]) + (apply + sync + ;; Listen for a request: + (handle-evt mgr-ch + (lambda (req) + (let ([req + ;; Most requests we handle immediately and + ;; convert to a reply. The manager thread + ;; implicitly has the lock. + (let ([reply (lambda (v) + (list 'reply (cadr req) (caddr req) v))]) + (case (car req) + [(read) + (printf "read~n") + (reply (read-one (cadddr req)))] + [(close) + (reply (close-it))] + [(write) + (reply (apply write-these-bytes (cdddr req)))] + [(write-spec) + (reply (write-spec (cadddr req)))] + [else req]))]) + (loop (cons req reqs))))) + (if (and (null? reqs) + via-manager?) + ;; If we can get the lock before another request + ;; turn off manager mode: + (handle-evt lock-semaphore + (lambda (x) + (set! via-manager? #f) + (semaphore-post lock-semaphore) + (loop null))) + never-evt) + (append + (map (lambda (req) + (case (car req) + [(reply) (handle-evt (channel-put-evt (cadr req) + (cadddr req)) + (lambda (x) + (loop (remq req reqs))))] + [(write-spec-evt) (if close-w? + ;; Report close error: + (handle-evt (channel-put-evt (cadr req) 'closed) + (lambda (x) + (loop (remq req reqs)))) + ;; Try to write special: + (handle-evt (channel-put-evt (cadr req) #t) + (lambda (x) + ;; We sync'd, so now we *must* write + (write-spec (cadddr req)) + (loop (remq req reqs)))))] + [(write-evt) (if close-w? + ;; Report close error: + (handle-evt (channel-put-evt (cadr req) 'closed) + (lambda (x) + (loop (remq req reqs)))) + ;; Try to write bytes: + (let* ([start (list-ref req 4)] + [end (list-ref req 5)] + [len (if more-last + (- end start) + (min (- end start) + (max 0 + (- limit (pipe-content-length w)))))]) + (if (zero? len) + (handle-evt w (lambda (x) (loop reqs))) + (handle-evt (channel-put-evt (cadr req) len) + (lambda (x) + ;; We sync'd, so now we *must* write + (write-these-bytes (cadddr req) start (+ start len)) + (loop (remq req reqs)))))))])) + reqs) + ;; nack => remove request (could be anything) + (map (lambda (req) + (handle-evt (caddr req) + (lambda (x) + (loop (remq req reqs))))) + reqs))))) + (define (via-manager what req-sfx) + (thread-resume mgr-th (current-thread)) + (let ([ch (make-channel)]) + (sync (nack-guard-evt + (lambda (nack) + (channel-put mgr-ch (list* what ch nack req-sfx)) + ch))))) + (define (start-mgr) + (unless mgr-th + (set! mgr-th (thread serve))) + (set! via-manager? #t)) + (define (evt what req-sfx) + (nack-guard-evt + (lambda (nack) + (resume-mgr) + (let ([ch (make-channel)]) + (call-with-semaphore + lock-semaphore + (lambda () + (unless via-manager? + (set! mgr-th (thread serve))) + (set! via-manager? #t) + (thread-resume mgr-th (current-thread)) + (channel-put mgr-ch (list* what ch nack req-sfx)) + (wrap-evt ch (lambda (x) + (if (eq? x 'close) + (raise-mismatch-error 'write-evt "port is closed: " out) + x))))))))) + (define (resume-mgr) + (when mgr-th + (thread-resume mgr-th (current-thread)))) + (define in + ;; ----- Input ------ + (make-input-port/read-to-peek + in-name + (lambda (s) + (let ([v (read-bytes-avail!* s r)]) + (if (eq? v 0) + (begin + (resume-mgr) + (call-with-semaphore + lock-semaphore + (lambda () + (if via-manager? + (via-manager 'read (list s)) + (read-one s))))) + v))) + #f + void)) + (define out + ;; ----- Output ------ + (make-output-port + out-name + w + ;; write + (lambda (str start end buffer? w/break?) + (if (= start end) + #t + (begin + (resume-mgr) + (call-with-semaphore + lock-semaphore + (lambda () + (if via-manager? + (via-manager 'write (list str start end)) + (write-these-bytes str start end))))))) + ;; close + (lambda () + (resume-mgr) + (call-with-semaphore + lock-semaphore + (lambda () + (if via-manager? + (via-manager 'close null) + (close-it))))) + ;; write-special + (lambda (v buffer? w/break?) + (resume-mgr) + (call-with-semaphore + lock-semaphore + (lambda () + (if via-manager? + (via-manager 'write-spec (list v)) + (write-spec v))))) + ;; write-evt + (lambda (str start end) + (if (= start end) + (wrap-evt always-evt (lambda (x) 0)) + (evt 'write-evt (list str start end)))) + ;; write-special-evt + (lambda (v) + (evt 'write-spec-evt (list v))))) + (values in out)))) + (define input-port-append (opt-lambda (close-orig? . ports)