io: reduce overhead of a byte string output port

Make it a thinner wrapper around the output half of a pipe.
This commit is contained in:
Matthew Flatt 2018-12-08 11:09:18 -07:00
parent 9f27b90e31
commit bcf6492d56
6 changed files with 168 additions and 133 deletions

View File

@ -82,7 +82,7 @@
(raise-arguments-error who
"port does not support output events"
"port" out))
(get-write-evt (core-port-self out) bstr start-pos end-pos))))
(get-write-evt (core-port-self out) out bstr start-pos end-pos))))
(define/who (port-writes-atomic? out)
(check who output-port? out)

View File

@ -154,14 +154,18 @@
;; ----------------------------------------
(struct output-bytes-data (i reset))
(struct output-bytes-data (o get))
(define (open-output-bytes [name 'string])
(define-values (i o) (make-pipe))
(define-values (i/none o) (make-pipe-ends #:need-input? #f))
(define p
(make-core-output-port
#:name name
#:data (output-bytes-data i (lambda () (pipe-discard-all i)))
#:data (output-bytes-data o (lambda (o bstr start-pos discard?)
;; in atomic mode
(pipe-get-content o bstr start-pos)
(when discard?
(pipe-discard-all o))))
#:self o
#:evt o
#:write-out o
@ -169,8 +173,8 @@
(lambda (o) ((core-port-close o) (core-port-self o)))
#:get-write-evt
(and (core-output-port-get-write-evt o)
(lambda (o bstr start-k end-k)
((core-output-port-get-write-evt o) (core-port-self o) bstr start-k end-k)))
(lambda (o orig-o bstr start-k end-k)
((core-output-port-get-write-evt o) (core-port-self o) o bstr start-k end-k)))
#:get-location
(and (core-port-get-location o)
(lambda (o) ((core-port-get-location o) (core-port-self o))))
@ -182,7 +186,7 @@
(case-lambda
[(o) (pipe-write-position o)]
[(o new-pos)
(define len (pipe-content-length i))
(define len (pipe-content-length o))
(cond
[(eof-object? new-pos)
(pipe-write-position o len)]
@ -201,6 +205,7 @@
[else
(pipe-write-position o new-pos)])])))
(when (port-count-lines-enabled)
(port-count-lines! o)
(port-count-lines! p))
p)
@ -210,19 +215,21 @@
o)
(check who exact-nonnegative-integer? start-pos)
(check who exact-nonnegative-integer? #:or-false end-pos)
(let ([o (->core-output-port o)])
(define i (output-bytes-data-i (core-port-data o)))
(define len (pipe-content-length i))
(let ([bstr-o (->core-output-port o)])
(define o (output-bytes-data-o (core-port-data bstr-o)))
(start-atomic)
(define len (pipe-content-length o))
(when (start-pos . > . len)
(end-atomic)
(raise-range-error who "port content" "starting " start-pos o 0 len #f))
(when end-pos
(unless (<= start-pos end-pos len)
(end-atomic)
(raise-range-error who "port content" "ending " end-pos o 0 len start-pos)))
(define amt (- (min len (or end-pos len)) start-pos))
(define bstr (make-bytes amt))
(peek-bytes! bstr start-pos i)
(when reset?
((output-bytes-data-reset (core-port-data o))))
((output-bytes-data-get (core-port-data bstr-o)) o bstr start-pos reset?)
(end-atomic)
bstr))
;; ----------------------------------------

View File

@ -132,7 +132,7 @@
(wrap-check-write-evt-result '|user port write| r start end non-block/buffer?)]
[else r])]))
(define (get-write-evt self bstr start end)
(define (get-write-evt self orig-out bstr start end)
(end-atomic)
(define r (user-get-write-evt bstr start end))
(unless (evt? r)

View File

@ -220,7 +220,7 @@
[else n])]))
#:count-write-evt-via-write-out
(lambda (self v bstr start)
(lambda (self port v bstr start)
(port-count! port v bstr start))
#:close

View File

@ -70,8 +70,10 @@
write-out-special ; (any no-block/buffer? enable-break? -*> boolean?)
;; Called in atomic mode.
get-write-evt ; (bstr start-k end-k -*> evt?)
get-write-evt ; (port bstr start-k end-k -*> evt?)
;; Called in atomic mode.
;; Note the extra "self" argument as a port, which is useful
;; for implementing `count-write-evt-via-write-out`.
;; The given bstr should not be exposed to untrusted code.
get-write-special-evt ; (-*> evt?)
@ -142,11 +144,11 @@
(and count-write-evt-via-write-out
;; If `write-out` is always atomic (in no-block, no-buffer mode),
;; then an event can poll `write-out`:
(lambda (self src-bstr src-start src-end)
(lambda (self o src-bstr src-start src-end)
(write-evt
;; in atomic mode:
(lambda (self-evt)
(define v (write-out self src-bstr src-start src-end #f #f #t))
(define v (write-out self o src-bstr src-start src-end #f #f #t))
(when (exact-integer? v)
(count-write-evt-via-write-out self v src-bstr src-start))
(if (evt? v)

View File

@ -10,16 +10,19 @@
"commit-manager.rkt")
(provide make-pipe
make-pipe-ends
pipe-input-port?
pipe-output-port?
pipe-content-length
pipe-write-position
pipe-get-content
pipe-discard-all)
(define (min+1 a b) (if a (min (add1 a) b) b))
(struct pipe-data (get-content-length
write-position
get-content
discard-all))
(define (pipe-input-port? p)
@ -40,16 +43,21 @@
(raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)])))
(core-port-self p)))
;; in atomic mode:
(define pipe-write-position
(case-lambda
[(p) ((pipe-data-write-position (core-port-data p)) (core-port-self p))]
[(p pos) ((pipe-data-write-position (core-port-data p)) (core-port-self p) pos)]))
;; in atomic mode:
(define (pipe-discard-all p)
((pipe-data-discard-all (core-port-data p)) (core-port-self p)))
(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe])
(check who #:or-false exact-positive-integer? limit)
;; in atomic mode:
(define (pipe-get-content p bstr start-pos)
((pipe-data-get-content (core-port-data p)) (core-port-self p) bstr start-pos))
(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe] #:need-input? [need-input? #t])
(define bstr (make-bytes (min+1 limit 16)))
(define len (bytes-length bstr))
(define-fixnum peeked-amt 0) ; peeked but not yet read effectively extends `limit`
@ -75,14 +83,30 @@
(atomically (content-length)))
;; write-position
(case-lambda
;; in atomic mode
[(self) (or write-pos end)]
[(self pos)
;; `pos` must be between `start` and `end`
(if (fx= pos end)
(set! write-pos #f)
(set! write-pos pos))])
;; get-content
(lambda (self to-bstr start-pos)
;; in atomic mode
(define pos (let ([p (fx+ start start-pos)])
(if (p . fx>= . len)
(fx- p len)
p)))
(define end-pos (fx+ pos (bytes-length to-bstr)))
(cond
[(end-pos . fx<= . len)
(bytes-copy! to-bstr 0 bstr pos end-pos)]
[else
(bytes-copy! to-bstr 0 bstr pos len)
(bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))]))
;; discard-all
(lambda (self)
;; in atomic mode
(set! peeked-amt 0)
(set! start 0)
(set! end 0)
@ -149,8 +173,10 @@
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))
;; input ----------------------------------------
(define ip
(values
;; input ----------------------------------------
(and
need-input?
(make-core-input-port
#:name input-name
#:data data
@ -314,125 +340,125 @@
(progress!)
(check-input-blocking)
(finish dest-bstr)]))))]))))
;; output ----------------------------------------
(make-core-output-port
#:name output-name
#:data data
#:self #f
;; out ----------------------------------------
(define op
(make-core-output-port
#:name output-name
#:data data
#:self #f
#:evt write-ready-evt
#:write-out
;; in atomic mode
(lambda (self src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
#:evt write-ready-evt
#:write-out
;; in atomic mode
(lambda (self src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(fx= src-start src-end) ;; => flush
0]
[write-pos ; set by `file-position` on a bytes port
(define amt (apply-limit (fxmin (fx- end write-pos)
(fx- src-end src-start))))
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond
[(fx= amt 0) (pipe-is-full)]
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(check-input-unblocking)
(bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt))
(let ([new-write-pos (fx+ write-pos amt)])
(if (fx= new-write-pos end)
(set! write-pos #f) ; back to normal mode
(set! write-pos new-write-pos)))
(check-output-blocking)
amt])]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(check-output-blocking)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(check-output-blocking)
amt])]
[else
(maybe-grow)])))
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
(cond
[(fx= src-start src-end) ;; => flush
0]
[write-pos ; set by `file-position` on a bytes port
(define amt (apply-limit (fxmin (fx- end write-pos)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt))
(let ([new-write-pos (fx+ write-pos amt)])
(if (fx= new-write-pos end)
(set! write-pos #f) ; back to normal mode
(set! write-pos new-write-pos)))
(check-output-blocking)
amt])]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(check-output-blocking)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(check-output-blocking)
amt])]
[else
(maybe-grow)])))
#:count-write-evt-via-write-out
(lambda (self v bstr start)
(port-count! op v bstr start))
#:count-write-evt-via-write-out
(lambda (self op v bstr start)
(port-count! op v bstr start))
#:close
;; in atomic mode
(lambda (self)
(unless output-closed?
(set! output-closed? #t)
(when write-ready-sema
(semaphore-post write-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema))
(semaphore-post read-ready-sema)))))
#:close
;; in atomic mode
(lambda (self)
(unless output-closed?
(set! output-closed? #t)
(when write-ready-sema
(semaphore-post write-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema))
(semaphore-post read-ready-sema))))))
;; Results ----------------------------------------
(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe])
(check who #:or-false exact-positive-integer? limit)
(define-values (ip op) (make-pipe-ends limit input-name output-name))
(when (port-count-lines-enabled)
(port-count-lines! ip)
(port-count-lines! op))
(values ip op))