io: add output fast path

Add a fast path for output that is like input, based on a exposed
buffer. Make bytes-output-port and pipe use it.
This commit is contained in:
Matthew Flatt 2019-02-12 17:58:02 -07:00
parent d6af78cebd
commit 45347465df
7 changed files with 212 additions and 70 deletions

View File

@ -319,7 +319,7 @@
(let ()
(define-values (i o) (make-pipe))
(for ([n 3])
(write-bytes (make-bytes 4096 (char->integer #\a)) o)
(test 4096 (write-bytes (make-bytes 4096 (char->integer #\a)) o))
(for ([j (in-range 4096)])
(read-byte i))
(unless (zero? (pipe-content-length i))
@ -347,7 +347,6 @@
(error))
(loop (add1 x) (cdr content) (list* bstr bstr accum))])))
(let ()
(define path (build-path "compiled" "demo-out"))
(define o (open-output-file path 'truncate))
@ -401,6 +400,7 @@
(test (void) (file-position out 10))
(test #"hola!!\0\0\0\0" (get-output-bytes out)))
(log-error "start")
(let ()
(define-values (i o) (make-pipe))
(port-count-lines! i)
@ -428,6 +428,7 @@
(write-bytes #"!" o)
(test '(3 1 8) (next-location o))
(log-error "here")
(test #"x\r" (read-bytes 2 i))
(test '(3 0 7) (next-location i))
(test #"\n!" (read-bytes 2 i))

View File

@ -4,6 +4,7 @@
"../common/class.rkt"
"../host/thread.rkt"
"port.rkt"
"count.rkt"
"output-port.rkt"
"parameter.rkt"
"write.rkt"
@ -24,7 +25,18 @@
(check who byte? b)
(check who output-port? out)
(let ([out (->core-output-port out)])
(write-some-bytes 'write-byte out (bytes b) 0 1 #:buffer-ok? #t #:copy-bstr? #f))
(start-atomic)
(define pos (core-port-buffer-pos out))
(cond
[(pos . fx< . (core-port-buffer-end out))
(bytes-set! (core-port-buffer out) pos b)
(set-core-port-buffer-pos! out (fx+ pos 1))
(when (core-port-count out)
(port-count-byte! out b))
(end-atomic)]
[else
(end-atomic)
(write-some-bytes 'write-byte out (bytes b) 0 1 #:buffer-ok? #t #:copy-bstr? #f)]))
(void))
(define (do-write-bytes who out bstr start end)

View File

@ -156,9 +156,14 @@
[max-pos 0])
(public
[get-length (lambda () max-pos)]
[get-length (lambda ()
(start-atomic)
(slow-mode!)
(end-atomic)
max-pos)]
[get-bytes (lambda (dest-bstr start-pos discard?)
(start-atomic)
(slow-mode!)
(bytes-copy! dest-bstr 0 bstr start-pos (fx+ start-pos (bytes-length dest-bstr)))
(when discard?
(set! bstr #"")
@ -171,11 +176,29 @@
(lambda (len)
(define new-bstr (make-bytes (fx* 2 len)))
(bytes-copy! new-bstr 0 bstr 0 pos)
(set! bstr new-bstr))])
(set! bstr new-bstr))]
[slow-mode!
(lambda ()
(when buffer
(define s buffer-pos)
(set! pos s)
(set! buffer-pos buffer-end)
(set! buffer #f)
(set! offset s)
(set! max-pos (fxmax s max-pos))))]
[fast-mode!
(lambda ()
(set! buffer bstr)
(set! buffer-pos pos)
(set! buffer-end (bytes-length bstr))
(set! offset 0))])
(override
[write-out
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(slow-mode!)
(define i pos)
(define amt (min (fx- src-end src-start) 4096))
(define end-i (fx+ i amt))
@ -184,6 +207,7 @@
(bytes-copy! bstr i src-bstr src-start (fx+ src-start amt))
(set! pos end-i)
(set! max-pos (fxmax pos max-pos))
(fast-mode!)
amt)]
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
@ -192,6 +216,7 @@
(case-lambda
[() pos]
[(new-pos)
(slow-mode!)
(define len (bytes-length bstr))
(cond
[(eof-object? new-pos)

View File

@ -47,7 +47,10 @@
[else
(raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)]))
(atomically
(send pipe-input-port (pipe-data-input d) sync-data)
(let ([input (pipe-data-input d)])
(when input (send pipe-input-port input sync-data)))
(let ([output (pipe-data-output d)])
(when output (send pipe-output-port output sync-data)))
(send pipe-data d content-length)))
;; ----------------------------------------
@ -60,8 +63,8 @@
[peeked-amt 0] ; peeked but not yet read, effectively extends `limit`
[start 0]
[end 0]
[input #f] ; #f => closed
[output-closed? #f]
[input #f] ; #f => closed
[output #f] ; #f => closed
[read-ready-sema #f]
[write-ready-sema #f]
[more-read-ready-sema #f] ; for lookahead peeks
@ -96,12 +99,15 @@
(when (output-full?) (semaphore-post write-ready-sema)))]
[check-input-blocking
(lambda ()
(when (input-empty?) (semaphore-wait read-ready-sema)))]
(when (input-empty?)
(semaphore-wait read-ready-sema)
(when output
(send pipe-output-port output on-input-empty))))]
;; Used before/after write:
[check-input-unblocking
(lambda ()
(when (and (input-empty?) (not output-closed?)) (semaphore-post read-ready-sema))
(when (and (input-empty?) output) (semaphore-post read-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema)
(set! more-read-ready-sema #f)))]
@ -128,31 +134,34 @@
(private
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(unless buffer
(with-object pipe-data d
(define s start)
(define e end)
(unless (fx= s e)
(set! buffer bstr)
(set! buffer-pos s)
(set! buffer-end (if (s . fx< . e) e len))
(define o offset)
(when o
(set! offset (- (+ o amt) s)))))))]
(with-object pipe-data d
(define s start)
(define e end)
(unless (fx= s e)
(set! buffer bstr)
(set! buffer-pos s)
;; don't read last byte, because the output
;; end needs to know about a transition to
;; the empty state
(set! buffer-end (fx- (if (s . fx< . e) e len) 1))
(define o offset)
(when o
(set! offset (- (+ o amt) s))))))]
[slow-mode!
(lambda ()
(when buffer
(with-object pipe-data d
(with-object pipe-data d
(when buffer
(define pos buffer-pos)
(define o offset)
(when o
(set! offset (+ o pos)))
(set! start (if (fx= pos len)
0
pos))
(set! start (if (fx= pos len) 0 pos))
(set! buffer #f)
(set! buffer-pos buffer-end))))])
(set! buffer-pos buffer-end))
(define out output)
(when out
(send pipe-output-port out sync-data))))])
(static
[sync-data
@ -163,6 +172,13 @@
(set! start (if (fx= pos len)
0
pos)))))]
[sync-data-both
(lambda ()
(sync-data)
(with-object pipe-data d
(define out output)
(when out
(send pipe-output-port out sync-data))))]
[on-resize
(lambda ()
(slow-mode!))]
@ -183,9 +199,9 @@
(with-object pipe-data d
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
(if output
read-ready-evt
eof)]
[else
(check-output-unblocking)
(define s start)
@ -215,7 +231,7 @@
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(with-object pipe-data d
(assert-atomic)
(sync-data)
(sync-data-both)
(define content-amt (content-length))
(cond
[(and progress-evt
@ -223,10 +239,12 @@
#f]
[(content-amt . <= . skip)
(cond
[output-closed? eof]
[(not output) eof]
[else
(unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore)))
(set! more-read-ready-sema (make-semaphore))
(when output
(send pipe-output-port output on-need-more-ready)))
(define evt (if (zero? skip)
read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema)
@ -252,9 +270,9 @@
(lambda (work-done!)
(assert-atomic)
(with-object pipe-data d
(or output-closed?
(or (not output)
(begin
(sync-data)
(sync-data-both)
(not (fx= 0 (content-length)))))))]
[close
@ -325,11 +343,78 @@
(field
[d d]) ; pipe-data
(private
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(with-object pipe-data d
(define lim limit)
(define avail (and lim (- lim (content-length)
;; don't fill last byte, because the input
;; end needs to know about a trasition to the
;; full state
1)))
(when (or (not avail) (avail . <= . 0))
(define s start)
(define e end)
(set! buffer bstr)
(set! buffer-pos e)
(set! buffer-end (let ([end (if (s . fx<= . e)
(if (fx= s 0)
(fx- len 1)
len)
(fx- s 1))])
(if (and avail
((fx- end e) . > . avail))
(fx+ e avail)
end)))
(define o offset)
(when o
(set! offset (- (+ o amt) e))))))]
[slow-mode!
(lambda ()
(with-object pipe-data d
(when buffer
(define pos buffer-pos)
(define o offset)
(when o
(set! offset (+ o pos)))
(set! end (if (fx= pos len) 0 pos))
(set! buffer #f)
(set! buffer-pos buffer-end))
(define in input)
(when in
(send pipe-input-port in sync-data))))])
(static
[sync-data
(lambda ()
(when buffer
(with-object pipe-data d
(define pos buffer-pos)
(set! end (if (fx= pos len)
0
pos)))))]
[sync-data-both
(lambda ()
(sync-data)
(with-object pipe-data d
(define in input)
(when in
(send pipe-output-port in sync-data #f))))]
[on-input-empty
(lambda ()
(slow-mode!))]
[on-need-more-ready
(lambda ()
(slow-mode!))])
(override
[write-out
;; in atomic mode
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(slow-mode!)
(with-object pipe-data d
(send pipe-input-port input sync-data)
(let try-again ()
@ -376,6 +461,7 @@
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
(fast-mode! amt)
amt])]
[(fx= end top-pos)
(cond
@ -391,6 +477,7 @@
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(check-output-blocking)
(fast-mode! amt)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
@ -402,6 +489,7 @@
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(check-output-blocking)
(fast-mode! amt)
amt])]
[else
(maybe-grow)]))))]
@ -414,8 +502,9 @@
;; in atomic mode
(lambda ()
(with-object pipe-data d
(unless output-closed?
(set! output-closed? #t)
(when output
(slow-mode!)
(set! output #f)
(when write-ready-sema
(semaphore-post write-ready-sema))
(when more-read-ready-sema
@ -450,6 +539,7 @@
[d d]))
(set-pipe-data-input! d input)
(set-pipe-data-output! d output)
(values input output))

View File

@ -46,7 +46,7 @@
(start-atomic)
(prepare-change in)
(cond
[(= start end) ; intentionally before the port-closed check
[(fx= start end) ; intentionally before the port-closed check
(end-atomic)
0]
[(core-port-closed? in)
@ -61,9 +61,9 @@
(define buf-pos (core-port-buffer-pos in))
(define buf-end (core-port-buffer-end in))
(cond
[(buf-pos . < . buf-end)
[(buf-pos . fx< . buf-end)
;; Read bytes from buffer
(define v (min (- buf-end buf-pos) (- end start)))
(define v (fxmin (fx- buf-end buf-pos) (fx- end start)))
(define new-pos (fx+ buf-pos v))
(bytes-copy! bstr start (core-port-buffer in) buf-pos new-pos)
(set-core-port-buffer-pos! in new-pos)

View File

@ -11,7 +11,8 @@
(define/who (write-char ch [out (current-output-port)])
(check who char? ch)
(check who output-port? out)
(write-string (string ch) out 0 1))
(write-string (string ch) out 0 1)
(void))
(define/who (write-string str [out (current-output-port)] [start 0] [end (and (string? str)
(string-length str))])

View File

@ -23,34 +23,47 @@
(end-atomic)
0]
[else
(define write-out (method core-output-port out write-out))
(define buf-pos (core-port-buffer-pos out))
(define buf-end (core-port-buffer-end out))
(cond
[(procedure? write-out)
(define v (write-out out bstr start end (not buffer-ok?) enable-break? copy-bstr?))
(let result-loop ([v v])
(cond
[(not v)
(end-atomic)
(if zero-ok?
0
(try-again out extra-count-outs))]
[(exact-positive-integer? v)
(port-count-all! out extra-count-outs v bstr start)
(end-atomic)
v]
[(evt? v)
(end-atomic)
(cond
[zero-ok? 0]
[else
(define new-v (if enable-break?
(sync/enable-break v)
(sync v)))
(start-atomic)
(result-loop new-v)])]
[else
(end-atomic)
(internal-error (format "write-some-bytes: weird result ~s for ~s ~s ~s at ~s" v bstr start end out))]))]
[else
[(buf-pos . fx< . buf-end)
;; Copy bytes from buffer
(define v (fxmin (fx- buf-end buf-pos) (fx- end start)))
(bytes-copy! (core-port-buffer out) buf-pos bstr start (fx+ start v))
(set-core-port-buffer-pos! out (fx+ buf-pos v))
(when (core-port-count out)
(port-count! out v bstr start))
(end-atomic)
(try-again (->core-output-port write-out) (cons out extra-count-outs))])])))
v]
[else
(define write-out (method core-output-port out write-out))
(cond
[(procedure? write-out)
(define v (write-out out bstr start end (not buffer-ok?) enable-break? copy-bstr?))
(let result-loop ([v v])
(cond
[(not v)
(end-atomic)
(if zero-ok?
0
(try-again out extra-count-outs))]
[(exact-positive-integer? v)
(port-count-all! out extra-count-outs v bstr start)
(end-atomic)
v]
[(evt? v)
(end-atomic)
(cond
[zero-ok? 0]
[else
(define new-v (if enable-break?
(sync/enable-break v)
(sync v)))
(start-atomic)
(result-loop new-v)])]
[else
(end-atomic)
(internal-error (format "write-some-bytes: weird result ~s for ~s ~s ~s at ~s" v bstr start end out))]))]
[else
(end-atomic)
(try-again (->core-output-port write-out) (cons out extra-count-outs))])])])))