io: avoid closure allocations for pipes and byte-string ports

This commit is contained in:
Matthew Flatt 2018-12-08 18:43:39 -07:00
parent bcf6492d56
commit 0261332ac3
7 changed files with 683 additions and 372 deletions

View File

@ -254,6 +254,8 @@
(lambda (o) (eq? o v)))]
[(eq? 'code (car args))
#%$code?]
[(eq? 'procedure (car args))
#%procedure?]
[(eq? 'ephemeron (car args))
ephemeron-pair?]
[(symbol? (car args))

View File

@ -2,7 +2,12 @@
(require (for-syntax racket/base)
racket/fixnum)
(provide define-fixnum)
(provide define-fixnum
;; to cooperate with macros that explicitly
;; manage closures, as in "object.rkt"
capture-fixnum
(for-syntax make-fixnum-transformer))
;; Representing a mutable, fixnum-valued variable with an fxvector can
;; avoid a write barrier on assignment
@ -10,10 +15,17 @@
(define-syntax-rule (define-fixnum id v)
(begin
(define cell (fxvector v))
(define-syntax id
(make-set!-transformer
(lambda (stx)
(syntax-case stx (set!)
[(set! _ r) #'(fxvector-set! cell 0 r)]
[(... (_ ...)) (raise-syntax-error stx "bad use" stx)]
[_ #'(fxvector-ref cell 0)]))))))
(define-syntax id (make-fixnum-transformer #'cell))))
(define-for-syntax (make-fixnum-transformer cell-id)
(with-syntax ([cell cell-id])
(make-set!-transformer
(lambda (stx)
(syntax-case stx (set!)
[(set! _ r) #'(fxvector-set! cell 0 r)]
[(_ #:capture-fixnum) #'cell] ; see `capture-fixnum`
[(... (_ ...)) (raise-syntax-error stx "bad use" stx)]
[_ #'(fxvector-ref cell 0)])))))
(define-syntax-rule (capture-fixnum id)
(id #:capture-fixnum))

View File

@ -0,0 +1,267 @@
#lang racket/base
(require (for-syntax racket/base
racket/pretty)
racket/stxparam
"fixnum.rkt")
;; The `define-constructor` form implements a basic object system
;; where any `(define (id ....) ....)` or `(define id (case-lambda
;; ....)` in the constructor body is treated as a method that is
;; converted to accept a vector of free variables.
;;
;; A `lambda` or `case-lambda` is any other position can be wrapped in
;; `method` to convert it, too. Note that the caller of such methods
;; must know to pass along the self vector somehow. The `self`
;; identifier is bound to the self vector.
;;
;; Constraints:
;;
;; Macros are not expanded to recognize definitions; use only
;; `define` or `define-fixnum` for definitions.
;;
;; Only `set!` any variables within a method.
;;
;; A named method cannot be used as a value unless it is wrapped
;; with `method`; calls are automatcally converted to pass along the
;; self vector, while the `method` escape obliges a called of the
;; result to pass along the self vector.
(provide define-constructor
method
self)
(define-syntax-parameter self
(lambda (stx)
(raise-syntax-error #f "misuse outside of a constructor" stx)))
(define-syntax-parameter current-constructor-fields #f)
(define-syntax-parameter current-constructor-methods #f)
(define-for-syntax (get-current-constructor-fields)
(syntax-parameter-value #'current-constructor-fields))
(define-for-syntax (get-current-constructor-methods)
(syntax-parameter-value #'current-constructor-methods))
(define-for-syntax (maybe-lift mode e)
(syntax-case mode ()
[#:lift (syntax-local-lift-expression e)]
[#:no-lift e]))
(define-syntax (method stx)
(syntax-case stx (lambda case-lambda)
[(_ id)
(identifier? #'id)
#'(id #:method)]
[(_ rhs)
#'(method #:lift rhs)]
[(_ lift-mode (lambda (arg ...) body ...))
(with-syntax ([fields (get-current-constructor-fields)]
[methods (get-current-constructor-methods)]
[(_ _ inside) stx])
(maybe-lift
#'lift-mode
#`(lambda (this arg ...)
(let-methods
this methods inside
(let-fields
this 0 fields inside
body ...)))))]
[(_ lift-mode (case-lambda [(arg ...) body ...] ...))
(with-syntax ([fields (get-current-constructor-fields)]
[methods (get-current-constructor-methods)]
[(_ _ inside) stx])
(maybe-lift
#'lift-mode
#`(case-lambda
[(this arg ...)
(let-methods
this methods inside
(let-fields
this 0 fields inside
body ...))]
...)))]))
(define-for-syntax (make-method-transformer self-id id)
(lambda (stx)
(syntax-case stx ()
[(_ #:method) id]
[(_ arg ...) (quasisyntax/loc stx
(#,id #,self-id arg ...))])))
(define-syntax (define-constructor stx)
(syntax-case stx ()
[(_ (name arg ...) body ... last-body)
(andmap identifier? (syntax->list #'(arg ...)))
(let ()
(define fields
(apply append
(syntax->list #'(arg ...))
(for/list ([body (in-list (syntax->list #'(body ...)))])
(syntax-case body (define case-lambda define-values define-fixnum)
[(define (id . _) . _)
null]
[(define id (case-lambda . _))
null]
[(define id _)
(list #'id)]
[(define-values (id ...) _)
(syntax->list #'(id ...))]
[(define-fixnum id _)
(list #'(capture-fixnum id))]
[else
null]))))
(define methods
(apply
append
(for/list ([body (in-list (syntax->list #'(body ...)))])
(syntax-case body (define define-values define-fixnum case-lambda)
[(define (id . args) . bodys)
(list (cons #'id (generate-temporaries #'(id))))]
[(define id (case-lambda . clauses))
(list (cons #'id (generate-temporaries #'(id))))]
[_ null]))))
(define (find-method-name id)
(for/or ([pr (in-list methods)])
(and (free-identifier=? id (car pr))
(cadr pr))))
(define num-args (length (syntax->list #'(arg ...))))
(define-values (method-defns other-bodys count)
(for/fold ([method-defns null] [other-bodys null] [n num-args]) ([body (in-list (syntax->list #'(body ...)))])
(syntax-case body (define define-values define-fixnum case-lambda)
[(define (id . args) . bodys)
(let ([re (lambda (s) (datum->syntax body (syntax-e s) body body))]
[tmp-id (find-method-name #'id)])
(values (cons #`(define #,tmp-id
(syntax-parameterize ([current-constructor-fields (quote-syntax #,fields)]
[current-constructor-methods (quote-syntax #,methods)])
(method #:no-lift #,(re #'(lambda args . bodys)))))
method-defns)
other-bodys
n))]
[(define id (case-lambda . clauses))
(let ([re (lambda (s) (datum->syntax body (syntax-e s) body body))]
[tmp-id (find-method-name #'id)])
(values (cons #`(define #,tmp-id
(syntax-parameterize ([current-constructor-fields (quote-syntax #,fields)]
[current-constructor-methods (quote-syntax #,methods)])
(method #:no-lift #,(re #'(case-lambda clauses)))))
method-defns)
other-bodys
n))]
[(define id _)
(values method-defns
(list* #`(vector*-set! self-vec #,n id)
body
other-bodys)
(add1 n))]
[(define-values (id ...) _)
(values method-defns
(list* #`(begin #,@(for/list ([id (in-list (syntax->list #'(id ...)))]
[n (in-naturals n)])
#`(vector*-set! self-vec #,n #,id)))
body
other-bodys)
(+ n (length (syntax->list #'(id ...)))))]
[(define-fixnum id _)
(values method-defns
(list* #`(vector*-set! self-vec #,n (capture-fixnum id))
body
other-bodys)
(add1 n))]
[else
(values method-defns
(cons body other-bodys)
n)])))
(with-syntax ([inside stx]
[count #`#,count]
[fields fields]
[methods methods]
[(init-arg ...) (for/list ([arg (in-list (syntax->list #'(arg ...)))]
[i (in-naturals)])
#`(vector*-set! self-vec #,i #,arg))]
[(other-body ...) (reverse other-bodys)]
[(method-defn ...) (reverse method-defns)])
#'(begin
method-defn ...
(define (name arg ...)
(define self-vec (make-vector count))
init-arg ...
(syntax-parameterize ([current-constructor-fields (quote-syntax fields)]
[current-constructor-methods (quote-syntax methods)]
[self (lambda (stx) #'self-vec)])
(let-methods
self-vec methods inside
(let ()
other-body ...
last-body)))))))]
[(_ (name . formals) body ... last-body)
(with-syntax ([(arg ...) (let loop ([formals #'formals])
(syntax-case formals ()
[() null]
[id
(identifier? #'id)
(list #'id)]
[(kw . formals)
(keyword? (syntax-e #'kw))
(loop #'formals)]
[([id _] . formals)
(cons #'id (loop #'formals))]
[(id . formals)
(cons #'id (loop #'formals))]))])
#`(begin
#,(datum->syntax
stx
(syntax-e #'(define-constructor (constructor arg ...)
body ... last-body))
stx
stx)
(define (name . formals)
(constructor arg ...))))]))
(define-syntax (let-fields stx)
(syntax-case stx (capture-fixnum)
[(_ self-id n () ctx body ...)
#'(let () body ...)]
[(_ self-id n ((capture-fixnum id) . captureds) ctx . bodys)
(with-syntax ([id (datum->syntax #'ctx (syntax-e #'id))]
[n+1 #`#,(+ (syntax-e #'n) 1)])
#'(let-syntax ([id (make-fixnum-transformer #'(vector*-ref self-id n))])
(let-fields self-id n+1 captureds ctx . bodys)))]
[(_ self-id n (id . captureds) ctx . bodys)
(with-syntax ([id (datum->syntax #'ctx (syntax-e #'id))]
[n+1 #`#,(+ (syntax-e #'n) 1)])
#'(let-syntax ([id (make-set!-transformer
(lambda (stx)
(syntax-case stx (set!)
[(set! _ r) #'(vector*-set! self-id n r)]
[(_ arg (... ...)) #'((vector*-ref self-id n) arg (... ...))]
[_ #'(vector*-ref self-id n)])))])
(let-fields self-id n+1 captureds ctx . bodys)))]))
(define-syntax (let-methods stx)
(syntax-case stx ()
[(_ self-id () ctx body ...)
#'(let () body ...)]
[(_ self-id ((id tmp-id) . methods) ctx . bodys)
(with-syntax ([id (datum->syntax #'ctx (syntax-e #'id))])
#'(let-syntax ([id (make-method-transformer #'self-id #'tmp-id)])
(let-methods self-id methods ctx . bodys)))]))
;; ----------------------------------------
(module+ test
(define-constructor (c x y)
(define a 12)
(define-fixnum z 120)
(define (f)
(set! z 130)
(set! y 8)
(list a x (g)))
(define (g)
(list y z))
(values (method f)
self))
(define-values (f f-self) (c 1 2))
(f f-self))

View File

@ -1,6 +1,7 @@
#lang racket/base
(require "../common/check.rkt"
"../common/fixnum.rkt"
"../common/object.rkt"
"../host/thread.rkt"
"port.rkt"
"input-port.rkt"
@ -19,6 +20,12 @@
(define/who (open-input-bytes bstr [name 'string])
(check who bytes? bstr)
(define p (make-input-bytes (bytes->immutable-bytes bstr) name))
(when (port-count-lines-enabled)
(port-count-lines! p))
p)
(define-constructor (make-input-bytes bstr name)
(define-fixnum i 0)
(define alt-pos #f)
(define len (bytes-length bstr))
@ -53,111 +60,116 @@
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))
(define p
(make-core-input-port
#:name name
#:data (input-bytes-data)
#:self #f
(make-core-input-port
#:name name
#:data (input-bytes-data)
#:self self
#:prepare-change
(lambda (self)
(pause-waiting-commit))
#:prepare-change
(method
(lambda ()
(pause-waiting-commit)))
#:read-byte
(lambda (self)
(let ([pos i])
(if (pos . < . len)
(begin
(set! i (add1 pos))
(progress!)
(bytes-ref bstr pos))
eof)))
#:read-in
(lambda (self dest-bstr start end copy?)
(define pos i)
(cond
[(pos . < . len)
(define amt (min (- end start) (- len pos)))
(set! i (+ pos amt))
(bytes-copy! dest-bstr start bstr pos (+ pos amt))
(progress!)
amt]
[else eof]))
#:peek-byte
(lambda (self)
(let ([pos i])
(if (pos . < . len)
(bytes-ref bstr pos)
eof)))
#:peek-in
(lambda (self dest-bstr start end skip progress-evt copy?)
(define pos (+ i skip))
(cond
[(and progress-evt (sync/timeout 0 progress-evt))
#f]
[(pos . < . len)
(define amt (min (- end start) (- len pos)))
(bytes-copy! dest-bstr start bstr pos (+ pos amt))
amt]
[else eof]))
#:read-byte
(method
(lambda ()
(let ([pos i])
(if (pos . < . len)
(begin
(set! i (add1 pos))
(progress!)
(bytes-ref bstr pos))
eof))))
#:byte-ready
(lambda (self work-done!)
(i . < . len))
#:read-in
(method
(lambda (dest-bstr start end copy?)
(define pos i)
(cond
[(pos . < . len)
(define amt (min (- end start) (- len pos)))
(set! i (+ pos amt))
(bytes-copy! dest-bstr start bstr pos (+ pos amt))
(progress!)
amt]
[else eof])))
#:close
(lambda (self)
(set! commit-manager #f) ; to indicate closed
(progress!))
#:peek-byte
(method
(lambda ()
(let ([pos i])
(if (pos . < . len)
(bytes-ref bstr pos)
eof))))
#:get-progress-evt
(lambda (self)
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema))
#:peek-in
(method
(lambda (dest-bstr start end skip progress-evt copy?)
(define pos (+ i skip))
(cond
[(and progress-evt (sync/timeout 0 progress-evt))
#f]
[(pos . < . len)
(define amt (min (- end start) (- len pos)))
(bytes-copy! dest-bstr start bstr pos (+ pos amt))
amt]
[else eof])))
#:commit
(lambda (self amt progress-evt ext-evt finish)
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait
commit-manager
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (min amt (- len i))])
(define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr i (+ i amt))
(set! i (+ i amt))
(progress!)
(finish dest-bstr)))))
#:byte-ready
(method
(lambda (work-done!)
(i . < . len)))
#:file-position
(case-lambda
[(self) (or alt-pos i)]
[(self new-pos)
(set! i (if (eof-object? new-pos)
len
(min len new-pos)))
(set! alt-pos
(and new-pos
(not (eof-object? new-pos))
(new-pos . > . i)
new-pos))])))
#:close
(method
(lambda ()
(set! commit-manager #f) ; to indicate closed
(progress!)))
(when (port-count-lines-enabled)
(port-count-lines! p))
p)
#:get-progress-evt
(method
(lambda ()
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema)))
#:commit
(method
(lambda (amt progress-evt ext-evt finish)
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait
commit-manager
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (min amt (- len i))])
(define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr i (+ i amt))
(set! i (+ i amt))
(progress!)
(finish dest-bstr))))))
#:file-position
(method
(case-lambda
[() (or alt-pos i)]
[(new-pos)
(set! i (if (eof-object? new-pos)
len
(min len new-pos)))
(set! alt-pos
(and new-pos
(not (eof-object? new-pos))
(new-pos . > . i)
new-pos))]))))
;; ----------------------------------------
(struct output-bytes-data (o get))
(define (open-output-bytes [name 'string])
(define-values (i/none o) (make-pipe-ends #:need-input? #f))
(define-values (i/none o) (make-pipe-ends #f name name #:need-input? #f))
(define p
(make-core-output-port
#:name name
@ -196,7 +208,7 @@
(end-atomic)
(raise-arguments-error 'file-position
"new position is too large"
"port" p
"port" o
"position" new-pos))
(pipe-write-position o len)
(define amt (- new-pos len))

View File

@ -247,8 +247,8 @@
[(pos)
(+ pos (- buffer-end buffer-start))]))
#:buffer-mode (case-lambda
[() buffer-mode]
[(mode) (set! buffer-mode mode)])))
[(self) buffer-mode]
[(self mode) (set! buffer-mode mode)])))
(define custodian-reference
(register-fd-close cust fd fd-refcount flush-handle port))

View File

@ -148,9 +148,9 @@
(write-evt
;; in atomic mode:
(lambda (self-evt)
(define v (write-out self o src-bstr src-start src-end #f #f #t))
(define v (write-out self src-bstr src-start src-end #f #f #t))
(when (exact-integer? v)
(count-write-evt-via-write-out self v src-bstr src-start))
(count-write-evt-via-write-out self o v src-bstr src-start))
(if (evt? v)
(values #f (replace-evt v self-evt))
(values (list v) #f)))))))

View File

@ -2,6 +2,7 @@
(require racket/fixnum
"../common/check.rkt"
"../common/fixnum.rkt"
"../common/object.rkt"
"../host/thread.rkt"
"port.rkt"
"input-port.rkt"
@ -57,7 +58,8 @@
(define (pipe-get-content p bstr start-pos)
((pipe-data-get-content (core-port-data p)) (core-port-self p) bstr start-pos))
(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe] #:need-input? [need-input? #t])
(define-constructor (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe]
#:need-input? [need-input? #t])
(define bstr (make-bytes (min+1 limit 16)))
(define len (bytes-length bstr))
(define-fixnum peeked-amt 0) ; peeked but not yet read effectively extends `limit`
@ -79,38 +81,42 @@
(define data
(pipe-data
;; get-content-length
(lambda (self)
(atomically (content-length)))
(method
(lambda ()
(atomically (content-length))))
;; write-position
(case-lambda
;; in atomic mode
[(self) (or write-pos end)]
[(self pos)
;; `pos` must be between `start` and `end`
(if (fx= pos end)
(set! write-pos #f)
(set! write-pos pos))])
(method
(case-lambda
;; in atomic mode
[() (or write-pos end)]
[(pos)
;; `pos` must be between `start` and `end`
(if (fx= pos end)
(set! write-pos #f)
(set! write-pos pos))]))
;; get-content
(lambda (self to-bstr start-pos)
;; in atomic mode
(define pos (let ([p (fx+ start start-pos)])
(if (p . fx>= . len)
(fx- p len)
p)))
(define end-pos (fx+ pos (bytes-length to-bstr)))
(cond
[(end-pos . fx<= . len)
(bytes-copy! to-bstr 0 bstr pos end-pos)]
[else
(bytes-copy! to-bstr 0 bstr pos len)
(bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))]))
(method
(lambda (to-bstr start-pos)
;; in atomic mode
(define pos (let ([p (fx+ start start-pos)])
(if (p . fx>= . len)
(fx- p len)
p)))
(define end-pos (fx+ pos (bytes-length to-bstr)))
(cond
[(end-pos . fx<= . len)
(bytes-copy! to-bstr 0 bstr pos end-pos)]
[else
(bytes-copy! to-bstr 0 bstr pos len)
(bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))])))
;; discard-all
(lambda (self)
;; in atomic mode
(set! peeked-amt 0)
(set! start 0)
(set! end 0)
(set! write-pos #f))))
(method
(lambda ()
;; in atomic mode
(set! peeked-amt 0)
(set! start 0)
(set! end 0)
(set! write-pos #f)))))
(define read-ready-sema (make-semaphore))
(define write-ready-sema (and limit (make-semaphore 1)))
@ -180,280 +186,292 @@
(make-core-input-port
#:name input-name
#:data data
#:self #f
#:self self
#:prepare-change
(lambda (self)
(pause-waiting-commit))
(method
(lambda ()
(pause-waiting-commit)))
#:read-byte
(lambda (self)
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
;; event's synchronization value is ignored:
read-ready-evt)]
[else
(define pos start)
(check-output-unblocking)
(unless (fx= 0 peeked-amt)
(set! peeked-amt (fxmax 0 (fx- peeked-amt 1))))
(define new-pos (fx+ pos 1))
(if (fx= new-pos len)
(set! start 0)
(set! start new-pos))
(check-input-blocking)
(progress!)
(bytes-ref bstr pos)]))
(method
(lambda ()
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
;; event's synchronization value is ignored:
read-ready-evt)]
[else
(define pos start)
(check-output-unblocking)
(unless (fx= 0 peeked-amt)
(set! peeked-amt (fxmax 0 (fx- peeked-amt 1))))
(define new-pos (fx+ pos 1))
(if (fx= new-pos len)
(set! start 0)
(set! start new-pos))
(check-input-blocking)
(progress!)
(bytes-ref bstr pos)])))
#:read-in
(lambda (self dest-bstr dest-start dest-end copy?)
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
[else
(check-output-unblocking)
(begin0
(cond
[(start . fx< . end)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end start)))
(bytes-copy! dest-bstr dest-start bstr start (fx+ start amt))
(set! start (fx+ start amt))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len start)))
(bytes-copy! dest-bstr dest-start bstr start (fx+ start amt))
(set! start (modulo (fx+ start amt) len))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt])
(check-input-blocking)
(progress!))]))
(method
(lambda (dest-bstr dest-start dest-end copy?)
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
[else
(check-output-unblocking)
(begin0
(cond
[(start . fx< . end)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end start)))
(bytes-copy! dest-bstr dest-start bstr start (fx+ start amt))
(set! start (fx+ start amt))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len start)))
(bytes-copy! dest-bstr dest-start bstr start (fx+ start amt))
(set! start (modulo (fx+ start amt) len))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt])
(check-input-blocking)
(progress!))])))
#:peek-byte
(lambda (self)
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
[else
(peeked! 1)
(bytes-ref bstr start)]))
(method
(lambda ()
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
[else
(peeked! 1)
(bytes-ref bstr start)])))
#:peek-in
(lambda (self dest-bstr dest-start dest-end skip progress-evt copy?)
(assert-atomic)
(define content-amt (content-length))
(cond
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[(content-amt . <= . skip)
(cond
[output-closed? eof]
[else
(unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore)))
(define evt (if (zero? skip)
read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema)
(lambda (v) 0))))
evt])]
[else
(define peek-start (fxmodulo (fx+ start skip) len))
(cond
[(peek-start . fx< . end)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt])]))
(method
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(assert-atomic)
(define content-amt (content-length))
(cond
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[(content-amt . <= . skip)
(cond
[output-closed? eof]
[else
(unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore)))
(define evt (if (zero? skip)
read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema)
(lambda (v) 0))))
evt])]
[else
(define peek-start (fxmodulo (fx+ start skip) len))
(cond
[(peek-start . fx< . end)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt])])))
#:byte-ready
(lambda (self work-done!)
(assert-atomic)
(or output-closed?
(not (fx= 0 (content-length)))))
(method
(lambda (work-done!)
(assert-atomic)
(or output-closed?
(not (fx= 0 (content-length))))))
#:close
(lambda (self)
(unless input-closed?
(set! input-closed? #t)
(progress!)))
(method
(lambda ()
(unless input-closed?
(set! input-closed? #t)
(progress!))))
#:get-progress-evt
(lambda (self)
(atomically
(cond
[input-closed? always-evt]
[else
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema)])))
(method
(lambda ()
(atomically
(cond
[input-closed? always-evt]
[else
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema)]))))
#:commit
;; Allows `amt` to be zero and #f for other arguments,
;; which is helpful for `open-input-peek-via-read`.
(lambda (self amt progress-evt ext-evt finish)
(assert-atomic)
;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt`
;; is constrained; we can send them over to different threads
(cond
[(zero? amt)
(progress!)]
[else
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (min amt (content-length))])
(cond
[(fx= 0 amt)
;; There was nothing to commit; claim success for 0 bytes
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(cond
[(start . fx< . end)
(bytes-copy! dest-bstr 0 bstr start (fx+ start amt))]
[else
(define amt1 (fxmin (fx- len start) amt))
(bytes-copy! dest-bstr 0 bstr start (fx+ start amt1))
(when (amt1 . fx< . amt)
(bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))])
(set! start (fxmodulo (fx+ start amt) len))
(progress!)
(check-input-blocking)
(finish dest-bstr)]))))]))))
(method
(lambda (amt progress-evt ext-evt finish)
(assert-atomic)
;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt`
;; is constrained; we can send them over to different threads
(cond
[(zero? amt)
(progress!)]
[else
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (min amt (content-length))])
(cond
[(fx= 0 amt)
;; There was nothing to commit; claim success for 0 bytes
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(cond
[(start . fx< . end)
(bytes-copy! dest-bstr 0 bstr start (fx+ start amt))]
[else
(define amt1 (fxmin (fx- len start) amt))
(bytes-copy! dest-bstr 0 bstr start (fx+ start amt1))
(when (amt1 . fx< . amt)
(bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))])
(set! start (fxmodulo (fx+ start amt) len))
(progress!)
(check-input-blocking)
(finish dest-bstr)]))))])))))
;; output ----------------------------------------
(make-core-output-port
#:name output-name
#:data data
#:self #f
#:self self
#:evt write-ready-evt
#:write-out
;; in atomic mode
(lambda (self src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
(cond
[(fx= src-start src-end) ;; => flush
0]
[write-pos ; set by `file-position` on a bytes port
(define amt (apply-limit (fxmin (fx- end write-pos)
(fx- src-end src-start))))
(method
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt))
(let ([new-write-pos (fx+ write-pos amt)])
(if (fx= new-write-pos end)
(set! write-pos #f) ; back to normal mode
(set! write-pos new-write-pos)))
(check-output-blocking)
amt])]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond
[(fx= amt 0) (pipe-is-full)]
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(check-output-blocking)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(check-output-blocking)
amt])]
[else
(maybe-grow)])))
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
(cond
[(fx= src-start src-end) ;; => flush
0]
[write-pos ; set by `file-position` on a bytes port
(define amt (apply-limit (fxmin (fx- end write-pos)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt))
(let ([new-write-pos (fx+ write-pos amt)])
(if (fx= new-write-pos end)
(set! write-pos #f) ; back to normal mode
(set! write-pos new-write-pos)))
(check-output-blocking)
amt])]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(check-output-blocking)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(check-output-blocking)
amt])]
[else
(maybe-grow)]))))
#:count-write-evt-via-write-out
(lambda (self op v bstr start)
(port-count! op v bstr start))
(method
(lambda (op v bstr start)
(port-count! op v bstr start)))
#:close
;; in atomic mode
(lambda (self)
(unless output-closed?
(set! output-closed? #t)
(when write-ready-sema
(semaphore-post write-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema))
(semaphore-post read-ready-sema))))))
(method
(lambda ()
(unless output-closed?
(set! output-closed? #t)
(when write-ready-sema
(semaphore-post write-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema))
(semaphore-post read-ready-sema)))))))
(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe])
(check who #:or-false exact-positive-integer? limit)