io: fix pipe slow path and GC

Make the slow path faster by reducing input- and output-end
coordination. Also, avoid retaining one end just because the other end
is retained.

This change involves adding an indirection for the fast-path buffers
so that management for both ends of a pipe can be centralized
independent of the ports.
This commit is contained in:
Matthew Flatt 2019-02-13 07:06:45 -07:00
parent f0aa8573fe
commit e266da929d
10 changed files with 271 additions and 227 deletions

View File

@ -26,11 +26,12 @@
(check who output-port? out) (check who output-port? out)
(let ([out (->core-output-port out)]) (let ([out (->core-output-port out)])
(start-atomic) (start-atomic)
(define pos (core-port-buffer-pos out)) (define buffer (core-port-buffer out))
(define pos (direct-pos buffer))
(cond (cond
[(pos . fx< . (core-port-buffer-end out)) [(pos . fx< . (direct-end buffer))
(bytes-set! (core-port-buffer out) pos b) (bytes-set! (direct-bstr buffer) pos b)
(set-core-port-buffer-pos! out (fx+ pos 1)) (set-direct-pos! buffer (fx+ pos 1))
(when (core-port-count out) (when (core-port-count out)
(port-count-byte! out b)) (port-count-byte! out b))
(end-atomic)] (end-atomic)]

View File

@ -33,8 +33,9 @@
;; in atomic mode ;; in atomic mode
[in-buffer-pos [in-buffer-pos
(lambda () (lambda ()
(if buffer (define b buffer)
buffer-pos (if (direct-bstr b)
(direct-pos b)
pos))]) pos))])
(override (override
@ -43,19 +44,21 @@
(set! commit-manager #f) ; to indicate closed (set! commit-manager #f) ; to indicate closed
(progress!) (progress!)
(set! bstr #f) (set! bstr #f)
(when buffer (define b buffer)
(set! offset buffer-pos) (when (direct-bstr b)
(set! buffer #f)))] (set! offset (direct-pos b))
(set-direct-bstr! b #f)))]
[file-position [file-position
(case-lambda (case-lambda
[() (or alt-pos (in-buffer-pos))] [() (or alt-pos (in-buffer-pos))]
[(given-pos) [(given-pos)
(define len buffer-end) (define b buffer)
(define len (direct-end b))
(define new-pos (if (eof-object? given-pos) (define new-pos (if (eof-object? given-pos)
len len
(min len given-pos))) (min len given-pos)))
(if buffer (if (direct-bstr b)
(set! buffer-pos new-pos) (set-direct-pos! b new-pos)
(set! pos new-pos)) (set! pos new-pos))
(set! alt-pos (and (not (eof-object? given-pos)) (set! alt-pos (and (not (eof-object? given-pos))
(given-pos . > . new-pos) (given-pos . > . new-pos)
@ -67,16 +70,17 @@
[read-in [read-in
(lambda (dest-bstr start end copy?) (lambda (dest-bstr start end copy?)
(define len buffer-end) (define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos)) (define i (in-buffer-pos))
(cond (cond
[(i . < . len) [(i . < . len)
(define amt (min (- end start) (fx- len i))) (define amt (min (- end start) (fx- len i)))
(define new-pos (fx+ i amt)) (define new-pos (fx+ i amt))
;; Keep/resume fast mode ;; Keep/resume fast mode
(set! buffer-pos new-pos) (set-direct-pos! b new-pos)
(set! offset 0) (set! offset 0)
(set! buffer bstr) (set-direct-bstr! b bstr)
(bytes-copy! dest-bstr start bstr i new-pos) (bytes-copy! dest-bstr start bstr i new-pos)
(progress!) (progress!)
amt] amt]
@ -84,8 +88,9 @@
[peek-in [peek-in
(lambda (dest-bstr start end skip progress-evt copy?) (lambda (dest-bstr start end skip progress-evt copy?)
(define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos)) (define i (in-buffer-pos))
(define len buffer-end)
(define at-pos (+ i skip)) (define at-pos (+ i skip))
(cond (cond
[(and progress-evt (sync/timeout 0 progress-evt)) [(and progress-evt (sync/timeout 0 progress-evt))
@ -98,19 +103,20 @@
[byte-ready [byte-ready
(lambda (work-done!) (lambda (work-done!)
((in-buffer-pos) . < . buffer-end))] ((in-buffer-pos) . < . (direct-end buffer)))]
[get-progress-evt [get-progress-evt
(lambda () (lambda ()
(atomically (atomically
(unless progress-sema (unless progress-sema
;; set port to slow mode: ;; set port to slow mode:
(when buffer (define b buffer)
(define i buffer-pos) (when (direct-bstr b)
(define i (direct-pos b))
(set! pos i) (set! pos i)
(set! offset i) (set! offset i)
(set! buffer #f) (set-direct-bstr! b #f)
(set! buffer-pos buffer-end))) (set-direct-pos! b (direct-end b))))
(make-progress-evt)))] (make-progress-evt)))]
[commit [commit
@ -119,32 +125,23 @@
progress-evt ext-evt progress-evt ext-evt
;; in atomic mode, maybe in a different thread: ;; in atomic mode, maybe in a different thread:
(lambda () (lambda ()
(define len buffer-end) (define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos)) (define i (in-buffer-pos))
(let ([amt (min amt (- len i))]) (let ([amt (min amt (- len i))])
(define dest-bstr (make-bytes amt)) (define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr i (+ i amt)) (bytes-copy! dest-bstr 0 bstr i (+ i amt))
;; Keep/resume fast mode ;; Keep/resume fast mode
(set! buffer-pos (fx+ i amt)) (set-direct-pos! b (fx+ i amt))
(set! buffer bstr) (set-direct-bstr! b bstr)
(set! offset 0) (set! offset 0)
(progress!) (progress!)
(finish dest-bstr)))))] (finish dest-bstr)))))]))
[count-lines!
(lambda ()
(when buffer
(define i buffer-pos)
(set! offset i)
(set! pos i)
(set! buffer #f)
(set! buffer-pos buffer-end)))]))
(define (make-input-bytes bstr name) (define (make-input-bytes bstr name)
(new bytes-input-port (new bytes-input-port
[name name] [name name]
[buffer bstr] [buffer (direct bstr 0 (bytes-length bstr))]
[buffer-end (bytes-length bstr)]
[bstr bstr])) [bstr bstr]))
;; ---------------------------------------- ;; ----------------------------------------
@ -180,19 +177,21 @@
[slow-mode! [slow-mode!
(lambda () (lambda ()
(when buffer (define b buffer)
(define s buffer-pos) (when (direct-bstr b)
(define s (direct-pos b))
(set! pos s) (set! pos s)
(set! buffer-pos buffer-end) (set-direct-pos! b (direct-end b))
(set! buffer #f) (set-direct-bstr! b #f)
(set! offset s) (set! offset s)
(set! max-pos (fxmax s max-pos))))] (set! max-pos (fxmax s max-pos))))]
[fast-mode! [fast-mode!
(lambda () (lambda ()
(set! buffer bstr) (define b buffer)
(set! buffer-pos pos) (set-direct-bstr! b bstr)
(set! buffer-end (bytes-length bstr)) (set-direct-pos! b pos)
(set-direct-end! b (bytes-length bstr))
(set! offset 0))]) (set! offset 0))])
(override (override
@ -214,7 +213,9 @@
(port-count! out v bstr start)))] (port-count! out v bstr start)))]
[file-position [file-position
(case-lambda (case-lambda
[() (if buffer buffer-pos pos)] [()
(define b buffer)
(if (direct-bstr b) (direct-pos b) pos)]
[(new-pos) [(new-pos)
(slow-mode!) (slow-mode!)
(define len (bytes-length bstr)) (define len (bytes-length bstr))

View File

@ -207,7 +207,7 @@
;; in atomic mode ;; in atomic mode
(define (increment-offset! in amt) (define (increment-offset! in amt)
(unless (core-port-buffer in) (unless (direct-bstr (core-port-buffer in))
(define old-offset (core-port-offset in)) (define old-offset (core-port-offset in))
(when old-offset (when old-offset
(set-core-port-offset! in (+ amt old-offset))))) (set-core-port-offset! in (+ amt old-offset)))))

View File

@ -126,24 +126,26 @@
[fast-mode! [fast-mode!
(lambda (amt) ; amt = not yet added to `offset` (lambda (amt) ; amt = not yet added to `offset`
(when (eq? buffer-mode 'block) (when (eq? buffer-mode 'block)
(define b buffer)
(define e end-pos) (define e end-pos)
(set! buffer bstr) (set-direct-bstr! b bstr)
(set! buffer-pos e) (set-direct-pos! b e)
(set! buffer-end (bytes-length bstr)) (set-direct-end! b (bytes-length bstr))
(define o offset) (define o offset)
(when o (when o
(set! offset (- (+ o amt) e)))))] (set! offset (- (+ o amt) e)))))]
[slow-mode! [slow-mode!
(lambda () (lambda ()
(when buffer (define b buffer)
(set! buffer #f) (when (direct-bstr b)
(define pos buffer-pos) (set-direct-bstr! b #f)
(define pos (direct-pos b))
(set! end-pos pos) (set! end-pos pos)
(define o offset) (define o offset)
(when o (when o
(set! offset (+ o pos))) (set! offset (+ o pos)))
(set! buffer-pos buffer-end)))]) (set-direct-pos! b (direct-end b))))])
(public (public
[on-close (lambda () (void))] [on-close (lambda () (void))]
@ -257,7 +259,8 @@
(case-lambda (case-lambda
[() [()
(define pos (get-file-position fd)) (define pos (get-file-position fd))
(and pos (+ pos (fx- (if buffer buffer-pos end-pos) start-pos)))] (define b buffer)
(and pos (+ pos (fx- (if (direct-bstr b) (direct-pos b) end-pos) start-pos)))]
[(pos) [(pos)
(flush-buffer-fully #f) (flush-buffer-fully #f)
;; flushing can leave atomic mode, so make sure the ;; flushing can leave atomic mode, so make sure the

View File

@ -40,7 +40,8 @@
[buffer-adjust-pos [buffer-adjust-pos
(lambda (i) (lambda (i)
(- i (fx- end-pos (if buffer buffer-pos pos))))] (define b buffer)
(- i (fx- end-pos (if (direct-bstr b) (direct-pos b) pos))))]
;; in atomic mode ;; in atomic mode
[default-buffer-mode [default-buffer-mode
@ -93,10 +94,11 @@
;; in atomic mode ;; in atomic mode
[fast-mode! [fast-mode!
(lambda (amt) ; amt = not yet added to `offset` (lambda (amt) ; amt = not yet added to `offset`
(set! buffer bstr) (define b buffer)
(set-direct-bstr! b bstr)
(define s pos) (define s pos)
(set! buffer-pos s) (set-direct-pos! b s)
(set! buffer-end end-pos) (set-direct-end! b end-pos)
(define o offset) (define o offset)
(when o (when o
(set! offset (- (+ o amt) s))))] (set! offset (- (+ o amt) s))))]
@ -104,14 +106,15 @@
;; in atomic mode ;; in atomic mode
[slow-mode! [slow-mode!
(lambda () (lambda ()
(when buffer (define b buffer)
(define s buffer-pos) (when (direct-bstr b)
(define s (direct-pos b))
(define o offset) (define o offset)
(when o (when o
(set! offset (+ o s))) (set! offset (+ o s)))
(set! pos s) (set! pos s)
(set! buffer #f) (set-direct-bstr! b #f)
(set! buffer-pos buffer-end)))]) (set-direct-pos! b (direct-end b))))])
(override (override
;; in atomic mode ;; in atomic mode
@ -159,7 +162,8 @@
(sync/timeout 0 progress-evt)) (sync/timeout 0 progress-evt))
#f] #f]
[else [else
(define s (if buffer buffer-pos pos)) (define b buffer)
(define s (if (direct-bstr b) (direct-pos b) pos))
(define peeked-amt (fx- end-pos s)) (define peeked-amt (fx- end-pos s))
(cond (cond
[(peeked-amt . > . skip) [(peeked-amt . > . skip)
@ -180,7 +184,8 @@
[byte-ready [byte-ready
(lambda (work-done!) (lambda (work-done!)
(let loop () (let loop ()
(define peeked-amt (fx- end-pos (if buffer buffer-pos pos))) (define b buffer)
(define peeked-amt (fx- end-pos (if (direct-bstr b) (direct-pos b) pos)))
(cond (cond
[(peeked-amt . fx> . 0) #t] [(peeked-amt . fx> . 0) #t]
[peeked-eof? #t] [peeked-eof? #t]

View File

@ -47,11 +47,9 @@
[else [else
(raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)])) (raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)]))
(atomically (atomically
(let ([input (pipe-data-input d)]) (with-object pipe-data d
(when input (send pipe-input-port input sync-data))) (sync-both)
(let ([output (pipe-data-output d)]) (content-length))))
(when output (send pipe-output-port output sync-data)))
(send pipe-data d content-length)))
;; ---------------------------------------- ;; ----------------------------------------
@ -63,18 +61,46 @@
[peeked-amt 0] ; peeked but not yet read, effectively extends `limit` [peeked-amt 0] ; peeked but not yet read, effectively extends `limit`
[start 0] [start 0]
[end 0] [end 0]
[input #f] ; #f => closed [input-ref #f] ; #f => closed
[output #f] ; #f => closed [output-ref #f] ; #f => closed
[input-buffer #f]
[output-buffer #f]
[read-ready-sema #f] [read-ready-sema #f]
[write-ready-sema #f] [write-ready-sema #f]
[more-read-ready-sema #f] ; for lookahead peeks [more-read-ready-sema #f] ; for lookahead peeks
[read-ready-evt #f] [read-ready-evt #f]
[write-ready-evt #f]) [write-ready-evt #f])
;; All methods in atomic mode. (private)
;; Beware that the input port must be synced to sure that `start`
;; represents the current position before using these methods. ;; All static methods in atomic mode.
(static (static
;; sync local fields with input buffer without implying slow mode
[sync-input
(lambda ()
(define b input-buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(set! start (if (fx= pos len)
0
pos))))]
;; sync local fields with output buffer without implying slow mode
[sync-output
(lambda ()
(define b output-buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(set! end (if (fx= pos len)
0
pos))))]
[sync-both
(lambda ()
(sync-input)
(sync-output))]
;; assumes sync'ed
[content-length [content-length
(lambda () (lambda ()
(define s start) (define s start)
@ -83,40 +109,34 @@
(fx- e s) (fx- e s)
(fx+ e (fx- len s))))] (fx+ e (fx- len s))))]
;; assumes sync'ed
[input-empty? [input-empty?
(lambda () (lambda ()
(fx= start end))] (fx= start end))]
;; assumes sync'ed
[output-full? [output-full?
(lambda () (lambda ()
(define l limit) (define l limit)
(and l (and l
((content-length) . >= . (+ l peeked-amt))))] ((content-length) . >= . (+ l peeked-amt))))]
;; Used before/after read: ;; Used before read:
[check-output-unblocking [check-output-unblocking
(lambda () (lambda ()
(when (output-full?) (semaphore-post write-ready-sema)))] (when write-ready-sema
[check-input-blocking (semaphore-post write-ready-sema)
(lambda () (set! write-ready-sema #f)))]
(when (input-empty?)
(semaphore-wait read-ready-sema)
(when output
(send pipe-output-port output on-input-empty))))]
;; Used before/after write: ;; Used before write:
[check-input-unblocking [check-input-unblocking
(lambda () (lambda ()
(when (and (input-empty?) output) (semaphore-post read-ready-sema)) (when read-ready-sema
(semaphore-post read-ready-sema)
(set! read-ready-sema #f))
(when more-read-ready-sema (when more-read-ready-sema
(semaphore-post more-read-ready-sema) (semaphore-post more-read-ready-sema)
(set! more-read-ready-sema #f)))] (set! more-read-ready-sema #f)))]
[check-output-blocking
(lambda ()
(when (output-full?)
(semaphore-wait write-ready-sema)
(when input
(send pipe-input-port input on-output-full))))]
;; Used after peeking: ;; Used after peeking:
[peeked! [peeked!
@ -125,6 +145,9 @@
(check-output-unblocking) (check-output-unblocking)
(set! peeked-amt amt)))])) (set! peeked-amt amt)))]))
(define (make-ref v) (make-weak-box v))
(define (ref-value r) (weak-box-value r))
;; ---------------------------------------- ;; ----------------------------------------
(class pipe-input-port #:extends commit-input-port (class pipe-input-port #:extends commit-input-port
@ -138,12 +161,10 @@
(define s start) (define s start)
(define e end) (define e end)
(unless (fx= s e) (unless (fx= s e)
(set! buffer bstr) (define b buffer)
(set! buffer-pos s) (set-direct-bstr! b bstr)
;; don't read last byte, because the output (set-direct-pos! b s)
;; end needs to know about a transition to (set-direct-end! b (if (s . fx< . e) e len))
;; the empty state
(set! buffer-end (fx- (if (s . fx< . e) e len) 1))
(define o offset) (define o offset)
(when o (when o
(set! offset (- (+ o amt) s))))))] (set! offset (- (+ o amt) s))))))]
@ -151,34 +172,18 @@
[slow-mode! [slow-mode!
(lambda () (lambda ()
(with-object pipe-data d (with-object pipe-data d
(when buffer (define b buffer)
(define pos buffer-pos) (when (direct-bstr b)
(define pos (direct-pos b))
(define o offset) (define o offset)
(when o (when o
(set! offset (+ o pos))) (set! offset (+ o pos)))
(set! start (if (fx= pos len) 0 pos)) (set! start (if (fx= pos len) 0 pos))
(set! buffer #f) (set-direct-bstr! b #f)
(set! buffer-pos buffer-end)) (set-direct-pos! b (direct-end b)))
(define out output) (sync-output)))])
(when out
(send pipe-output-port out sync-data))))])
(static (static
[sync-data
(lambda ()
(when buffer
(with-object pipe-data d
(define pos buffer-pos)
(set! start (if (fx= pos len)
0
pos)))))]
[sync-data-both
(lambda ()
(sync-data)
(with-object pipe-data d
(define out output)
(when out
(send pipe-output-port out sync-data))))]
[on-resize [on-resize
(lambda () (lambda ()
(slow-mode!))] (slow-mode!))]
@ -199,7 +204,7 @@
(with-object pipe-data d (with-object pipe-data d
(cond (cond
[(input-empty?) [(input-empty?)
(if output (if output-ref
read-ready-evt read-ready-evt
eof)] eof)]
[else [else
@ -222,7 +227,6 @@
(set! start (modulo (fx+ s amt) len)) (set! start (modulo (fx+ s amt) len))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) (set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt])) amt]))
(check-input-blocking)
(progress!) (progress!)
(fast-mode! amt) (fast-mode! amt)
amt])))] amt])))]
@ -231,7 +235,7 @@
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?) (lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(with-object pipe-data d (with-object pipe-data d
(assert-atomic) (assert-atomic)
(sync-data-both) (sync-both)
(define content-amt (content-length)) (define content-amt (content-length))
(cond (cond
[(and progress-evt [(and progress-evt
@ -239,12 +243,13 @@
#f] #f]
[(content-amt . <= . skip) [(content-amt . <= . skip)
(cond (cond
[(not output) eof] [(not output-ref) eof]
[else [else
(unless (or (zero? skip) more-read-ready-sema) (unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore)) (set! more-read-ready-sema (make-semaphore))
(when output (define out (ref-value output-ref))
(send pipe-output-port output on-need-more-ready))) (when out
(send pipe-output-port out on-need-more-ready)))
(define evt (if (zero? skip) (define evt (if (zero? skip)
read-ready-evt read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema) (wrap-evt (semaphore-peek-evt more-read-ready-sema)
@ -270,17 +275,17 @@
(lambda (work-done!) (lambda (work-done!)
(assert-atomic) (assert-atomic)
(with-object pipe-data d (with-object pipe-data d
(or (not output) (or (not output-ref)
(begin (begin
(sync-data-both) (sync-both)
(not (fx= 0 (content-length)))))))] (not (fx= 0 (content-length)))))))]
[close [close
(lambda () (lambda ()
(with-object pipe-data d (with-object pipe-data d
(when input (when input-ref
(slow-mode!) (slow-mode!)
(set! input #f) (set! input-ref #f)
(progress!))))] (progress!))))]
[get-progress-evt [get-progress-evt
@ -288,7 +293,7 @@
(atomically (atomically
(with-object pipe-data d (with-object pipe-data d
(cond (cond
[(not input) always-evt] [(not input-ref) always-evt]
[else [else
(slow-mode!) (slow-mode!)
(make-progress-evt)]))))] (make-progress-evt)]))))]
@ -330,7 +335,6 @@
(set! start (fxmodulo (fx+ s amt) len)) (set! start (fxmodulo (fx+ s amt) len))
(progress!) (progress!)
(fast-mode! amt) (fast-mode! amt)
(check-input-blocking)
(finish dest-bstr)])))))]))] (finish dest-bstr)])))))]))]
[count-lines! [count-lines!
@ -348,25 +352,22 @@
(lambda (amt) ; amt = not yet added to `offset` (lambda (amt) ; amt = not yet added to `offset`
(with-object pipe-data d (with-object pipe-data d
(define lim limit) (define lim limit)
(define avail (and lim (- lim (content-length) (define avail (and lim (- lim (content-length))))
;; don't fill last byte, because the input
;; end needs to know about a trasition to the
;; full state
1)))
(when (or (not avail) (avail . <= . 0)) (when (or (not avail) (avail . <= . 0))
(define s start) (define s start)
(define e end) (define e end)
(set! buffer bstr) (define b buffer)
(set! buffer-pos e) (set-direct-bstr! b bstr)
(set! buffer-end (let ([end (if (s . fx<= . e) (set-direct-pos! b e)
(if (fx= s 0) (set-direct-end! b (let ([end (if (s . fx<= . e)
(fx- len 1) (if (fx= s 0)
len) (fx- len 1)
(fx- s 1))]) len)
(if (and avail (fx- s 1))])
((fx- end e) . > . avail)) (if (and avail
(fx+ e avail) ((fx- end e) . > . avail))
end))) (fx+ e avail)
end)))
(define o offset) (define o offset)
(when o (when o
(set! offset (- (+ o amt) e))))))] (set! offset (- (+ o amt) e))))))]
@ -374,34 +375,18 @@
[slow-mode! [slow-mode!
(lambda () (lambda ()
(with-object pipe-data d (with-object pipe-data d
(when buffer (define b buffer)
(define pos buffer-pos) (when (direct-bstr b)
(define pos (direct-pos b))
(define o offset) (define o offset)
(when o (when o
(set! offset (+ o pos))) (set! offset (+ o pos)))
(set! end (if (fx= pos len) 0 pos)) (set! end (if (fx= pos len) 0 pos))
(set! buffer #f) (set-direct-bstr! b #f)
(set! buffer-pos buffer-end)) (set-direct-pos! b (direct-end b)))
(define in input) (sync-input)))])
(when in
(send pipe-input-port in sync-data))))])
(static (static
[sync-data
(lambda ()
(when buffer
(with-object pipe-data d
(define pos buffer-pos)
(set! end (if (fx= pos len)
0
pos)))))]
[sync-data-both
(lambda ()
(sync-data)
(with-object pipe-data d
(define in input)
(when in
(send pipe-output-port in sync-data #f))))]
[on-input-empty [on-input-empty
(lambda () (lambda ()
(slow-mode!))] (slow-mode!))]
@ -416,7 +401,6 @@
(assert-atomic) (assert-atomic)
(slow-mode!) (slow-mode!)
(with-object pipe-data d (with-object pipe-data d
(send pipe-input-port input sync-data)
(let try-again () (let try-again ()
(define top-pos (if (fx= start 0) (define top-pos (if (fx= start 0)
(fx- len 1) (fx- len 1)
@ -426,7 +410,9 @@
[(or (not limit) [(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1))) ((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size ;; grow pipe size
(send pipe-input-port input on-resize) (define in (ref-value input-ref))
(when in
(send pipe-input-port in on-resize))
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond (cond
[(fx= 0 start) [(fx= 0 start)
@ -460,7 +446,6 @@
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)]) (let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end))) (set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
(fast-mode! amt) (fast-mode! amt)
amt])] amt])]
[(fx= end top-pos) [(fx= end top-pos)
@ -476,7 +461,6 @@
(check-input-unblocking) (check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt) (set! end amt)
(check-output-blocking)
(fast-mode! amt) (fast-mode! amt)
amt])])] amt])])]
[(end . fx< . (fx- start 1)) [(end . fx< . (fx- start 1))
@ -488,7 +472,6 @@
(check-input-unblocking) (check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt)) (set! end (fx+ end amt))
(check-output-blocking)
(fast-mode! amt) (fast-mode! amt)
amt])] amt])]
[else [else
@ -502,33 +485,25 @@
;; in atomic mode ;; in atomic mode
(lambda () (lambda ()
(with-object pipe-data d (with-object pipe-data d
(when output (when output-ref
(slow-mode!) (slow-mode!)
(set! output #f) (set! output-ref #f)
(when write-ready-sema (check-input-unblocking))))]))
(semaphore-post write-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema))
(semaphore-post read-ready-sema))))]))
;; ---------------------------------------- ;; ----------------------------------------
(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe]) (define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe])
(define len (min+1 limit 16)) (define len (min+1 limit 16))
(define read-ready-sema (make-semaphore))
(define write-ready-sema (and limit (make-semaphore 1)))
(define write-ready-evt (if limit
(semaphore-peek-evt write-ready-sema)
always-evt))
(define d (new pipe-data (define d (new pipe-data
[bstr (make-bytes len)] [bstr (make-bytes len)]
[len len] [len len]
[limit limit] [limit limit]))
[read-ready-sema read-ready-sema]
[write-ready-sema write-ready-sema] (define write-ready-evt (if limit
[read-ready-evt (wrap-evt (semaphore-peek-evt read-ready-sema) (pipe-write-poller d)
(lambda (v) 0))] always-evt))
[write-ready-evt write-ready-evt])) (define read-ready-evt (pipe-read-poller d))
(define input (new pipe-input-port (define input (new pipe-input-port
[name input-name] [name input-name]
@ -538,8 +513,12 @@
[evt write-ready-evt] [evt write-ready-evt]
[d d])) [d d]))
(set-pipe-data-input! d input) (set-pipe-data-input-buffer! d (core-port-buffer input))
(set-pipe-data-output! d output) (set-pipe-data-output-buffer! d (core-port-buffer output))
(set-pipe-data-input-ref! d (make-ref input))
(set-pipe-data-output-ref! d (make-ref output))
(set-pipe-data-write-ready-evt! d write-ready-evt)
(set-pipe-data-read-ready-evt! d read-ready-evt)
(values input output)) (values input output))
@ -550,3 +529,45 @@
(port-count-lines! ip) (port-count-lines! ip)
(port-count-lines! op)) (port-count-lines! op))
(values ip op)) (values ip op))
;; ----------------------------------------
;; Note: a thread blocked on writing to a limited pipe cannot be GCed
;; due to the use of `replace-evt`.
(struct pipe-write-poller (d)
#:property
prop:evt
(poller
(lambda (pwp ctx)
(with-object pipe-data (pipe-write-poller-d pwp)
(sync-both)
(cond
[(not (output-full?))
(values (list pwp) #f)]
[else
(unless write-ready-sema
(set! write-ready-sema (make-semaphore)))
(define in (ref-value input-ref))
(when in
(send pipe-input-port in on-output-full))
(values #f (replace-evt (semaphore-peek-evt write-ready-sema)
(lambda (v) pwp)))])))))
(struct pipe-read-poller (d)
#:property
prop:evt
(poller
(lambda (prp ctx)
(with-object pipe-data (pipe-read-poller-d prp)
(sync-both)
(cond
[(not (input-empty?))
(values (list 0) #f)]
[else
(unless read-ready-sema
(set! read-ready-sema (make-semaphore)))
(define out (ref-value output-ref))
(when out
(send pipe-output-port out on-input-empty))
(values #f (wrap-evt (semaphore-peek-evt read-ready-sema)
(lambda (v) 0)))])))))

View File

@ -5,6 +5,7 @@
"evt.rkt") "evt.rkt")
(provide (struct-out core-port) (provide (struct-out core-port)
(struct-out direct)
(struct-out location) (struct-out location)
get-core-port-offset) get-core-port-offset)
@ -14,23 +15,23 @@
[data #f] ; FIXME: remove after all uses are converted [data #f] ; FIXME: remove after all uses are converted
;; When `buffer` is #f, it enables a shortcut for reading and ;; When `(direct-bstr buffer)` is not #f, it enables a shortcut for
;; writing, where `buffer-pos` must also be less than `buffer-end` ;; reading and writing, where `(direct-pos buffer)` must also be
;; for the shortcut to apply. The shortcut is not necessarily ;; less than `(direct-end buffer)` for the shortcut to apply. The
;; always taken, just if it is used, the `buffer-pos` position can ;; shortcut is not necessarily always taken, just if it is used,
;; be adjusted and the port's methods must adapt accordingly. The ;; the `(direct-pos buffer)` position can be adjusted and the
;; `buffer` and `buffer-end` fields are modified only by the port's ;; port's methods must adapt accordingly. The `(direct-bstr
;; methods, however. ;; buffer)` and `(direct-end buffer)` fields are modified only by
;; the port's methods, however.
;; ;;
;; For an input port, shortcut mode implies that `prepare-change` ;; For an input port, shortcut mode implies that `prepare-change`
;; does not need to be called, and no checking is needed for whether ;; does not need to be called, and no checking is needed for whether
;; the port is closed. ;; the port is closed.
;; ;;
;; A non-#f `buffer` further implies that `buffer-pos` should be ;; A non-#f `(direct-bstr buffer)` further implies that
;; added to `offset` to get the true offset. ;; `(direct-pos buffer)` should be added to `offset` to get the
[buffer #f] ;; true offset.
[buffer-pos 0] ; if < `buffer-end`, allows direct read/write on `buffer` [buffer (direct #f 0 0)]
[buffer-end 0]
[closed? #f] [closed? #f]
[closed-sema #f] [closed-sema #f]
@ -81,15 +82,22 @@
[prop:object-name (struct-field-index name)] [prop:object-name (struct-field-index name)]
[prop:secondary-evt port->evt])) [prop:secondary-evt port->evt]))
(struct direct ([bstr #:mutable]
[pos #:mutable]
[end #:mutable])
#:authentic)
(struct location ([state #:mutable] ; state of UTF-8 decoding (struct location ([state #:mutable] ; state of UTF-8 decoding
[cr-state #:mutable] ; state of CRLF counting as a single LF [cr-state #:mutable] ; state of CRLF counting as a single LF
[line #:mutable] ; count newlines [line #:mutable] ; count newlines
[column #:mutable] ; count UTF-8 characters in line [column #:mutable] ; count UTF-8 characters in line
[position #:mutable])) ; count UTF-8 characters [position #:mutable]) ; count UTF-8 characters
#:authentic)
(define (get-core-port-offset p) (define (get-core-port-offset p)
(define offset (core-port-offset p)) (define offset (core-port-offset p))
(define buffer (core-port-buffer p))
(and offset (and offset
(if (core-port-buffer p) (if (direct-bstr buffer)
(+ offset (core-port-buffer-pos p)) (+ offset (direct-pos buffer))
offset))) offset)))

View File

@ -58,15 +58,16 @@
(end-atomic) (end-atomic)
eof] eof]
[else [else
(define buf-pos (core-port-buffer-pos in)) (define buffer (core-port-buffer in))
(define buf-end (core-port-buffer-end in)) (define buf-pos (direct-pos buffer))
(define buf-end (direct-end buffer))
(cond (cond
[(buf-pos . fx< . buf-end) [(buf-pos . fx< . buf-end)
;; Read bytes directly from buffer ;; Read bytes directly from buffer
(define v (fxmin (fx- buf-end buf-pos) (fx- end start))) (define v (fxmin (fx- buf-end buf-pos) (fx- end start)))
(define new-pos (fx+ buf-pos v)) (define new-pos (fx+ buf-pos v))
(bytes-copy! bstr start (core-port-buffer in) buf-pos new-pos) (bytes-copy! bstr start (direct-bstr buffer) buf-pos new-pos)
(set-core-port-buffer-pos! in new-pos) (set-direct-pos! buffer new-pos)
(when (or (pair? extra-count-ins) (core-port-count in)) (when (or (pair? extra-count-ins) (core-port-count in))
(port-count-all! in extra-count-ins v bstr start)) (port-count-all! in extra-count-ins v bstr start))
(end-atomic) (end-atomic)
@ -152,13 +153,14 @@
(end-atomic) (end-atomic)
eof] eof]
[else [else
(define buf-pos (+ (core-port-buffer-pos in) skip)) (define buffer (core-port-buffer in))
(define buf-end (core-port-buffer-end in)) (define buf-pos (+ (direct-pos buffer) skip))
(define buf-end (direct-end buffer))
(cond (cond
[(buf-pos . < . buf-end) [(buf-pos . < . buf-end)
;; Copy bytes from buffer ;; Copy bytes from buffer
(define v (min (- buf-end buf-pos) (- end start))) (define v (min (- buf-end buf-pos) (- end start)))
(bytes-copy! bstr start (core-port-buffer in) buf-pos (fx+ buf-pos v)) (bytes-copy! bstr start (direct-bstr buffer) buf-pos (fx+ buf-pos v))
(end-atomic) (end-atomic)
v] v]
[else [else
@ -205,11 +207,12 @@
;; Try the buffer shortcut first ;; Try the buffer shortcut first
(define (read-a-byte who in #:special-ok? [special-ok? #f]) (define (read-a-byte who in #:special-ok? [special-ok? #f])
(start-atomic) (start-atomic)
(define pos (core-port-buffer-pos in)) (define buffer (core-port-buffer in))
(define pos (direct-pos buffer))
(cond (cond
[(pos . fx< . (core-port-buffer-end in)) [(pos . fx< . (direct-end buffer))
(define b (bytes-ref (core-port-buffer in) pos)) (define b (bytes-ref (direct-bstr buffer) pos))
(set-core-port-buffer-pos! in (fx+ pos 1)) (set-direct-pos! buffer (fx+ pos 1))
(when (core-port-count in) (when (core-port-count in)
(port-count-byte! in b)) (port-count-byte! in b))
(end-atomic) (end-atomic)
@ -232,10 +235,11 @@
;; Try the buffer shortcut first ;; Try the buffer shortcut first
(define (peek-a-byte who in skip-k #:special-ok? [special-ok? #f]) (define (peek-a-byte who in skip-k #:special-ok? [special-ok? #f])
(start-atomic) (start-atomic)
(define pos (+ (core-port-buffer-pos in) skip-k)) (define buffer (core-port-buffer in))
(define pos (+ (direct-pos buffer) skip-k))
(cond (cond
[(pos . < . (core-port-buffer-end in)) [(pos . < . (direct-end buffer))
(define b (bytes-ref (core-port-buffer in) pos)) (define b (bytes-ref (direct-bstr buffer) pos))
(end-atomic) (end-atomic)
b] b]
[else [else

View File

@ -23,14 +23,15 @@
(end-atomic) (end-atomic)
0] 0]
[else [else
(define buf-pos (core-port-buffer-pos out)) (define buffer (core-port-buffer out))
(define buf-end (core-port-buffer-end out)) (define buf-pos (direct-pos buffer))
(define buf-end (direct-end buffer))
(cond (cond
[(buf-pos . fx< . buf-end) [(buf-pos . fx< . buf-end)
;; Copy bytes directly to buffer ;; Copy bytes directly to buffer
(define v (fxmin (fx- buf-end buf-pos) (fx- end start))) (define v (fxmin (fx- buf-end buf-pos) (fx- end start)))
(bytes-copy! (core-port-buffer out) buf-pos bstr start (fx+ start v)) (bytes-copy! (direct-bstr buffer) buf-pos bstr start (fx+ start v))
(set-core-port-buffer-pos! out (fx+ buf-pos v)) (set-direct-pos! buffer (fx+ buf-pos v))
(when (or (pair? extra-count-outs) (core-port-count out)) (when (or (pair? extra-count-outs) (core-port-count out))
(port-count-all! out extra-count-outs v bstr start)) (port-count-all! out extra-count-outs v bstr start))
(end-atomic) (end-atomic)

View File

@ -133,7 +133,7 @@
(sandman (sandman
;; sleep ;; sleep
(lambda (timeout-at) (lambda (timeout-at)
(host:sleep (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0))) (host:sleep (max 0.0 (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0))))
;; poll ;; poll
(lambda (mode wakeup) (lambda (mode wakeup)