io: convert pipe to object style

This commit is contained in:
Matthew Flatt 2019-02-11 20:31:09 -07:00
parent 78136c0613
commit 35ceb8e3b3
3 changed files with 624 additions and 558 deletions

View File

@ -583,16 +583,16 @@
(let ([th1 (thread (lambda ()
(display "a" out)))]
[th2 (thread (lambda ()
(display "a" out)))]
(display "a" out)))]
[th3 (thread (lambda ()
(display "a" out)))])
(display "a" out)))])
(test #t thread-running? th1)
(test #t thread-running? th2)
(test #t thread-running? th3)
(test 49 read-byte in)
(sleep 0.1)
(sync (system-idle-evt))
(test 2 +
(if (thread-running? th1) 1 0)
@ -601,7 +601,7 @@
(test 50 read-byte in)
(sleep 0.1)
(sync (system-idle-evt))
(test 1 +
(if (thread-running? th1) 1 0)
@ -610,7 +610,7 @@
(test 51 read-byte in)
(sleep 0.1)
(sync (system-idle-evt))
(test #f thread-running? th1)
(test #f thread-running? th2)

View File

@ -14,6 +14,7 @@
;; | (public [<method-id> <method>] ...)
;; | (private [<method-id> <method>] ...)
;; | (override [<method-id> <method>] ...)
;; | (static [<method-id> <method>] ...) ; not in vtable
;; | (property [<property-expr> <val-expr>] ...)
;; <method> = #f
;; | (lambda (<id> ...) <expr> ...+)
@ -39,24 +40,31 @@
;; Use
;; (send <class-id> <obj-expr> <method-id> <arg-expr> ...)
;; to call a method, or
;; (mewthod <class-id> <obj-expr> <method-id>)
;; (method <class-id> <obj-expr> <method-id>)
;; to get a method that expects the object as its first argument.
;;
;; In a method, fields can be accessed directly by name, and `this` is
;; bound to the current object.
;; In a method, `field`s, `private`s, and `static`s can be accessed
;; directly by name, and `this` is bound to the current object. A
;; method overridden in `new` can only access `field`s.
;;
;; Use
;; (with-object <class-id> <object-expr)
;; <body> ...+)
;; to directly reference `field`s and `static`s in the <body>s.
(provide class
this
new
send
method)
method
with-object)
(define-syntax-parameter this
(lambda (stx)
(raise-syntax-error #f "illegal use outside of a method" stx)))
(begin-for-syntax
(struct class-info (struct-info methods-id vtable-id vtable-accessor-id fields methods)
(struct class-info (struct-info methods-id vtable-id vtable-accessor-id fields methods statics)
#:property prop:struct-info (lambda (ci)
(class-info-struct-info ci))))
@ -85,19 +93,21 @@
[(id expr)
(list #'id #'expr (combine-ids base-id base-id "-" #'id) (combine-ids #'id "set-" base-id "-" #'id "!"))]
[_ (raise-syntax-error #f (format "bad ~a clause" what) stx e)])))
(define-values (new-fields new-methods override-methods locals properties)
(define-values (new-fields new-methods override-methods locals statics properties)
(let ([l-stx (syntax-case stx ()
[(_ _ #:extends _ . rest) #'rest]
[(_ _ . rest) #'rest])])
(let loop ([l-stx l-stx] [new-fields null] [new-methods null] [override-methods null] [locals null] [properties null])
(syntax-case l-stx (field public override private property)
[() (values new-fields new-methods override-methods locals properties)]
(let loop ([l-stx l-stx] [new-fields null] [new-methods null] [override-methods null]
[locals null] [statics null] [properties null])
(syntax-case l-stx (field public override private static property)
[() (values new-fields new-methods override-methods locals statics properties)]
[((field fld ...) . rest)
(loop #'rest
(add-procs id (syntax->list #'(fld ...)) "field" #:can-immutable? #t)
new-methods
override-methods
locals
statics
properties)]
[((public method ...) . rest)
(loop #'rest
@ -105,13 +115,16 @@
(add-procs methods-id (syntax->list #'(method ...)) "public")
override-methods
locals
statics
properties)]
[((override method ...) . rest)
(loop #'rest new-fields new-methods (syntax->list #'(method ...)) locals properties)]
(loop #'rest new-fields new-methods (syntax->list #'(method ...)) locals statics properties)]
[((private method ...) . rest)
(loop #'rest new-fields new-methods override-methods (syntax->list #'(method ...)) properties)]
(loop #'rest new-fields new-methods override-methods (syntax->list #'(method ...)) statics properties)]
[((static method ...) . rest)
(loop #'rest new-fields new-methods override-methods locals (syntax->list #'(method ...)) properties)]
[((property prop ...) . rest)
(loop #'rest new-fields new-methods override-methods locals (syntax->list #'((#:property . prop) ...)))]
(loop #'rest new-fields new-methods override-methods locals statics (syntax->list #'((#:property . prop) ...)))]
[(other . _)
(raise-syntax-error #f "unrecognized" stx #'other)]))))
(define all-fields (if super-ci
@ -121,118 +134,118 @@
(syntax-case override ()
[(method-id _) (check-member stx #'method-id (if super-ci (class-info-methods super-ci) null) "method")]
[_ (raise-syntax-error #f "bad override clause" stx override)]))
(with-syntax ([((field-id field-init-expr field-accessor-id field-mutator-maybe-id) ...) all-fields])
(define wrapped-new-methods
(for/list ([new-method (in-list new-methods)])
(syntax-case new-method ()
[(method-id method-init-expr . rest)
#'(method-id (let ([method-id
(bind-fields-in-body
([field-id field-accessor-id field-mutator-maybe-id] ...)
method-init-expr)])
method-id)
. rest)])))
(define all-methods/vtable (if super-ci
(append (for/list ([method (in-list (class-info-methods super-ci))])
(syntax-case method ()
[(method-id method-init-expr . rest)
(or (for/or ([override (in-list override-methods)])
(syntax-case override ()
[(override-id override-init-expr . _)
(and (eq? (syntax-e #'method-id) (syntax-e #'override-id))
(list* #'method-id
#'(let ([method-id
(bind-fields-in-body
([field-id field-accessor-id field-mutator-maybe-id] ...)
override-init-expr)])
method-id)
#'rest))]))
method)]))
wrapped-new-methods)
wrapped-new-methods))
(define vtable-id (combine-ids #'here id "-vtable"))
(define all-methods/next (for/list ([method (in-list all-methods/vtable)])
(syntax-case method ()
[(method-id method-init-expr method-accessor-id . _)
(with-syntax ([vtable-id vtable-id])
(list #'method-id
#'(method-accessor-id vtable-id)
#'method-accessor-id))])))
(with-syntax ([id id]
[(super-ids ...) (if super-id
(list super-id)
null)]
[quoted-super-id (and super-id #`(quote-syntax #,super-id))]
[(vtable-ids ...) (if super-id
null
(list (datum->syntax id 'vtable)))]
[vtable-accessor-id (if super-ci
(class-info-vtable-accessor-id super-ci)
(combine-ids id id "-vtable"))]
[vtable-id vtable-id]
[struct:id (combine-ids id "struct:" id)]
[make-id (combine-ids id "create-" id)]
[id? (combine-ids id id "?")]
[methods-id methods-id]
[(super-methods-ids ...) (if super-ci
(list (class-info-methods-id super-ci))
null)]
[(new-field-id/annotated ...) (for/list ([new-field (in-list new-fields)])
(syntax-case new-field ()
[(id _ _ #f) #'id]
[(id . _) #'[id #:mutable]]))]
[((new-method-id . _) ...) new-methods]
[((_ _ rev-field-accessor-id . _) ...) (reverse all-fields)]
[((_ _ _ rev-field-mutator-maybe-id) ...) (reverse all-fields)]
[((method-id method-init-expr/vtable . _) ...) all-methods/vtable]
[((_ method-init-expr/next method-accessor-id) ...) all-methods/next]
[((local-id local-expr) ...) locals]
[(local-tmp-id ...) (generate-temporaries locals)]
[((propss ...) ...) properties])
#`(begin
(struct id super-ids ... (vtable-ids ... new-field-id/annotated ...)
#:omit-define-syntaxes
#:constructor-name make-id
#:authentic
propss ... ...)
(struct methods-id super-methods-ids ... (new-method-id ...))
(define vtable-id (methods-id method-init-expr/vtable ...))
(begin
(define local-tmp-id (let ([local-id
(bind-fields-in-body ([field-id field-accessor-id field-mutator-maybe-id] ...)
local-expr)])
(with-syntax ([((field-id field-init-expr field-accessor-id field-mutator-maybe-id) ...) all-fields]
[((local-id local-expr) ...) locals]
[((static-id static-expr) ...) statics]
[(local-tmp-id ...) (generate-temporaries locals)]
[(static-tmp-id ...) (generate-temporaries statics)])
(with-syntax ([local-bindings #'[([field-id field-accessor-id field-mutator-maybe-id] ...)
([local-id local-tmp-id] ... [static-id static-tmp-id] ...)]])
(define wrapped-new-methods
(for/list ([new-method (in-list new-methods)])
(syntax-case new-method ()
[(method-id method-init-expr . rest)
#'(method-id (let ([method-id (bind-locals-in-body local-bindings method-init-expr)])
method-id)
. rest)])))
(define all-methods/vtable (if super-ci
(append (for/list ([method (in-list (class-info-methods super-ci))])
(syntax-case method ()
[(method-id method-init-expr . rest)
(or (for/or ([override (in-list override-methods)])
(syntax-case override ()
[(override-id override-init-expr . _)
(and (eq? (syntax-e #'method-id) (syntax-e #'override-id))
(list* #'method-id
#'(let ([method-id
(bind-locals-in-body
local-bindings
override-init-expr)])
method-id)
#'rest))]))
method)]))
wrapped-new-methods)
wrapped-new-methods))
(define vtable-id (combine-ids #'here id "-vtable"))
(define all-methods/next (for/list ([method (in-list all-methods/vtable)])
(syntax-case method ()
[(method-id method-init-expr method-accessor-id . _)
(with-syntax ([vtable-id vtable-id])
(list #'method-id
#'(method-accessor-id vtable-id)
#'method-accessor-id))])))
(with-syntax ([id id]
[(super-ids ...) (if super-id
(list super-id)
null)]
[quoted-super-id (and super-id #`(quote-syntax #,super-id))]
[(vtable-ids ...) (if super-id
null
(list (datum->syntax id 'vtable)))]
[vtable-accessor-id (if super-ci
(class-info-vtable-accessor-id super-ci)
(combine-ids id id "-vtable"))]
[vtable-id vtable-id]
[struct:id (combine-ids id "struct:" id)]
[make-id (combine-ids id "create-" id)]
[id? (combine-ids id id "?")]
[methods-id methods-id]
[(super-methods-ids ...) (if super-ci
(list (class-info-methods-id super-ci))
null)]
[(new-field-id/annotated ...) (for/list ([new-field (in-list new-fields)])
(syntax-case new-field ()
[(id _ _ #f) #'id]
[(id . _) #'[id #:mutable]]))]
[((new-method-id . _) ...) new-methods]
[((_ _ rev-field-accessor-id . _) ...) (reverse all-fields)]
[((_ _ _ rev-field-mutator-maybe-id) ...) (reverse all-fields)]
[((method-id method-init-expr/vtable . _) ...) all-methods/vtable]
[((_ method-init-expr/next method-accessor-id) ...) all-methods/next]
[((propss ...) ...) properties])
#`(begin
(struct id super-ids ... (vtable-ids ... new-field-id/annotated ...)
#:omit-define-syntaxes
#:constructor-name make-id
#:authentic
propss ... ...)
(struct methods-id super-methods-ids ... (new-method-id ...))
(define vtable-id (methods-id method-init-expr/vtable ...))
(define static-tmp-id (let ([static-id (bind-locals-in-body local-bindings static-expr)])
static-id))
...
(define local-tmp-id (let ([local-id (bind-locals-in-body local-bindings local-expr)])
local-id))
(define-syntax (local-id stx)
(syntax-case stx ()
[(_ arg (... ...))
(with-syntax ([this-id (datum->syntax #'here 'this stx)])
(syntax/loc stx (local-tmp-id this-id arg (... ...))))])))
...
(define-syntax id
(class-info (list (quote-syntax struct:id)
(quote-syntax make-id)
(quote-syntax id?)
(list (quote-syntax rev-field-accessor-id) ... (quote-syntax vtable-accessor-id))
(list (maybe-quote-syntax rev-field-mutator-maybe-id) ... #f)
quoted-super-id)
(quote-syntax methods-id)
(quote-syntax vtable-id)
(quote-syntax vtable-accessor-id)
(list (list (quote-syntax field-id) (quote-syntax field-init-expr)
(quote-syntax field-accessor-id) (maybe-quote-syntax field-mutator-maybe-id))
...)
(list (list (quote-syntax method-id) (quote-syntax method-init-expr/next)
(quote-syntax method-accessor-id))
...)))))))
...
(define-syntax id
(class-info (list (quote-syntax struct:id)
(quote-syntax make-id)
(quote-syntax id?)
(list (quote-syntax rev-field-accessor-id) ... (quote-syntax vtable-accessor-id))
(list (maybe-quote-syntax rev-field-mutator-maybe-id) ... #f)
quoted-super-id)
(quote-syntax methods-id)
(quote-syntax vtable-id)
(quote-syntax vtable-accessor-id)
(list (list (quote-syntax field-id) (quote-syntax field-init-expr)
(quote-syntax field-accessor-id) (maybe-quote-syntax field-mutator-maybe-id))
...)
(list (list (quote-syntax method-id) (quote-syntax method-init-expr/next)
(quote-syntax method-accessor-id))
...)
(list (list (quote-syntax static-id) (quote-syntax static-tmp-id))
...))))))))
(define-syntax (bind-fields-in-body stx)
(define-syntax (bind-locals-in-body stx)
(syntax-case stx (lambda case-lambda)
[(_ fields #f) #'#f]
[(_ fields (form . rest))
#'(bind-fields-in-body fields form (form . rest))]
[(_ fields ctx (lambda (arg ...) body0 body ...))
#'(bind-fields-in-body fields ctx (case-lambda [(arg ...) body0 body ...]))]
[(_ fields ctx (case-lambda clause ...))
[(_ locals #f) #'#f]
[(_ locals (form . rest))
(with-syntax ([(_ _ orig) stx])
#'(bind-locals-in-body locals form orig))]
[(_ locals expr) #'expr]
[(_ locals ctx (lambda (arg ...) body0 body ...))
#'(bind-locals-in-body locals ctx (case-lambda [(arg ...) body0 body ...]))]
[(_ locals ctx (case-lambda clause ...))
(with-syntax ([(new-clause ...)
(for/list ([clause (in-list (syntax->list #'(clause ...)))])
(syntax-case clause ()
@ -240,21 +253,25 @@
(with-syntax ([(arg-tmp ...) (generate-temporaries #'(arg ...))])
#'[(this-id arg-tmp ...)
(syntax-parameterize ([this (make-rename-transformer #'this-id)])
(bind-fields
fields
(bind-locals
locals
this-id ctx
(let-syntax ([arg (make-rename-transformer #'arg-tmp)] ...)
body0 body ...)))])]))])
(syntax/loc (syntax-case stx () [(_ _ _ rhs) #'rhs])
(case-lambda new-clause ...)))]
[(_ fields _ expr)
[(_ locals _ expr)
#'expr]))
(define-syntax (bind-fields stx)
(define-syntax (bind-locals stx)
(syntax-case stx ()
[(_ ([field-id field-accessor-id field-mutator-maybe-id] ...) this-id ctx body)
[(_ [([field-id field-accessor-id field-mutator-maybe-id] ...)
([static-id static-tmp-id] ...)]
this-id ctx body)
(with-syntax ([(field-id ...) (for/list ([field-id (in-list (syntax->list #'(field-id ...)))])
(datum->syntax #'ctx (syntax-e field-id)))])
(datum->syntax #'ctx (syntax-e field-id)))]
[(static-id ...) (for/list ([static-id (in-list (syntax->list #'(static-id ...)))])
(datum->syntax #'ctx (syntax-e static-id)))])
#'(let-syntax ([field-id (make-set!-transformer
(lambda (stx)
(syntax-case stx (set!)
@ -263,6 +280,11 @@
(raise-syntax-error #f "field is immutable" stx))]
[(_ arg (... ...)) (syntax/loc stx ((field-accessor-id this-id) arg (... ...)))]
[else (syntax/loc stx (field-accessor-id this-id))])))]
...
[static-id (lambda (stx)
(syntax-case stx ()
[(_ arg (... ...))
(syntax/loc stx (static-tmp-id this-id arg (... ...)))]))]
...)
body))]))
@ -309,8 +331,9 @@
(and (eq? (syntax-e #'override-id) (syntax-e #'id))
(with-syntax ([((field-id _ field-accessor-id field-mutator-maybe-id) ...)
(class-info-fields ci)])
#'(bind-fields-in-body
([field-id field-accessor-id field-mutator-maybe-id] ...)
#'(bind-locals-in-body
[([field-id field-accessor-id field-mutator-maybe-id] ...)
()]
expr)))]))
#'(selector-id vtable-id))]))])
(syntax/loc stx (make-id (methods-id method-expr ...)
@ -325,19 +348,28 @@
(syntax-local-value #'class-id (lambda () #f)))])
(unless (class-info? ci)
(raise-syntax-error #f "not a class identifier" stx #'class-id))
(define method-accessor-id
(define make-access
(or (for/or ([method (in-list (class-info-methods ci))])
(syntax-case method ()
[(id _ accessor-id)
(and (eq? (syntax-e #'id) (syntax-e #'method-id))
#'accessor-id)]))
(lambda (o)
(with-syntax ([vtable-accessor-id (class-info-vtable-accessor-id ci)]
[o o])
#'(accessor-id (vtable-accessor-id o)))))]))
(and call?
(for/or ([static (in-list (class-info-statics ci))])
(syntax-case static ()
[(id tmp-id)
(and (eq? (syntax-e #'id) (syntax-e #'method-id))
(lambda (o)
#'tmp-id))])))
(raise-syntax-error #f "cannot find method" stx #'method-id)))
(with-syntax ([vtable-accessor-id (class-info-vtable-accessor-id ci)]
[method-accessor-id method-accessor-id])
(if call?
(if call?
(with-syntax ([proc (make-access #'o)])
#'(let ([o obj])
((method-accessor-id (vtable-accessor-id o)) o arg ...))
#'(method-accessor-id (vtable-accessor-id obj)))))]))
(proc o arg ...)))
(make-access #'obj)))]))
(define-syntax (send stx)
(send-or-method stx #t))
@ -350,6 +382,21 @@
[(_ class-id obj method-id)
(send-or-method stx #f)]))
(define-syntax (with-object stx)
(syntax-case stx ()
[(_ class-id obj-expr body0 body ...)
(let ([ci (and (identifier? #'class-id)
(syntax-local-value #'class-id (lambda () #f)))])
(unless (class-info? ci)
(raise-syntax-error #f "not a class identifier" stx #'class-id))
(with-syntax ([((field-id _ field-accessor-id field-mutator-maybe-id) ...)
(class-info-fields ci)]
[((static-id static-tmp-id) ...) (class-info-statics ci)])
(with-syntax ([local-bindings #'[([field-id field-accessor-id field-mutator-maybe-id] ...)
([static-id static-tmp-id] ...)]])
#'(let ([o obj-expr])
(bind-locals local-bindings o obj-expr (let () body0 body ...))))))]))
(define-for-syntax (check-member stx id l what)
(or (for/or ([e (in-list l)])
(syntax-case e ()
@ -372,10 +419,12 @@
[b 2])
(private
[other (lambda (q) (list q this))])
(static
[enbox (lambda (v) (box (vector a v)))])
(public
[q #f]
[m (lambda (z) (list a (other b)))]
[n (lambda (x y z) (vector a b x y z))]))
[n (lambda (x y z) (vector a b (enbox x) y z))]))
(class sub #:extends example
(field
@ -395,6 +444,7 @@
(new sub [d 5])
(send example (new sub) m 'more)
(set-example-b! ex 6)
(send example ex enbox 88)
(define ex2 (new example
#:override
@ -402,4 +452,7 @@
(box (vector x y z a b)))])
[b 'b]
[a 'a]))
(send example ex2 n 1 2 3))
(send example ex2 n 1 2 3)
(with-object example ex
(list a b (enbox 'c))))

View File

@ -1,8 +1,7 @@
#lang racket/base
(require racket/fixnum
"../common/check.rkt"
"../common/fixnum.rkt"
"../common/object.rkt"
"../common/class.rkt"
"../host/thread.rkt"
"port.rkt"
"input-port.rkt"
@ -12,468 +11,482 @@
(provide make-pipe
make-pipe-ends
pipe-input-port?
pipe-output-port?
pipe-content-length
pipe-write-position
pipe-get-content
pipe-discard-all)
(rename-out [pipe-input-port?* pipe-input-port?]
[pipe-output-port?* pipe-output-port?])
pipe-content-length)
(define (min+1 a b) (if a (min (add1 a) b) b))
(struct pipe-data (get-content-length
write-position
get-content
discard-all))
(define pipe-input-port?*
(let ([pipe-input-port?
(lambda (p)
(define cp (->core-input-port p))
(pipe-input-port? p))])
pipe-input-port?))
(define (pipe-input-port? p)
(and (input-port? p)
(pipe-data? (core-port-data (->core-input-port p)))))
(define (pipe-output-port? p)
(and (output-port? p)
(pipe-data? (core-port-data (->core-output-port p)))))
(define pipe-output-port?*
(let ([pipe-output-port?
(lambda (p)
(define cp (->core-output-port p))
(pipe-output-port? p))])
pipe-output-port?))
(define (pipe-content-length p)
(define cp
(define d
(cond
[(pipe-input-port? p) (->core-input-port p)]
[(pipe-output-port? p) (->core-output-port p)]
[(let ([p (->core-input-port p)])
(and p
(pipe-input-port? p)
p))
=> (lambda (p) (pipe-input-port-d p))]
[(let ([p (->core-output-port p)])
(and p
(pipe-output-port? p)
p))
=> (lambda (p) (pipe-output-port-d p))]
[else
(raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)]))
((pipe-data-get-content-length (core-port-data cp))
(if (core-input-port? cp)
(compat-input-port-self cp)
(compat-output-port-self cp))))
(atomically
(send pipe-input-port (pipe-data-input d) sync-data)
(send pipe-data d content-length)))
;; in atomic mode:
(define pipe-write-position
(case-lambda
[(p) ((pipe-data-write-position (core-port-data p)) (compat-output-port-self p))]
[(p pos) ((pipe-data-write-position (core-port-data p)) (compat-output-port-self p) pos)]))
;; ----------------------------------------
;; in atomic mode:
(define (pipe-discard-all p)
((pipe-data-discard-all (core-port-data p)) (compat-output-port-self p)))
(class pipe-data
(field
[bstr #""]
[len 0]
[limit 0]
[peeked-amt 0] ; peeked but not yet read, effectively extends `limit`
[start 0]
[end 0]
[input #f] ; #f => closed
[output-closed? #f]
[read-ready-sema #f]
[write-ready-sema #f]
[more-read-ready-sema #f] ; for lookahead peeks
[read-ready-evt #f]
[write-ready-evt #f])
;; in atomic mode:x
(define (pipe-get-content p bstr start-pos)
((pipe-data-get-content (core-port-data p)) (compat-output-port-self p) bstr start-pos))
;; All methods in atomic mode.
;; Beware that the input port must be synced to sure that `start`
;; represents the current position before using these methods.
(static
[content-length
(lambda ()
(define s start)
(define e end)
(if (s . fx<= . e)
(fx- e s)
(fx+ e (fx- len s))))]
(define-constructor (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe]
#:need-input? [need-input? #t])
(define bstr (make-bytes (min+1 limit 16)))
(define len (bytes-length bstr))
(define-fixnum peeked-amt 0) ; peeked but not yet read effectively extends `limit`
(define-fixnum start 0)
(define-fixnum end 0)
(define write-pos #f) ; to adjust the write position via `file-position` on a string port
(define input-closed? #f)
(define output-closed? #f)
[input-empty?
(lambda ()
(fx= start end))]
(define (content-length)
(if (start . fx<= . end)
(fx- end start)
(fx+ end (fx- len start))))
(define (input-empty?) (fx= start end))
(define (output-full?)
(and limit
((content-length) . >= . (+ limit peeked-amt))))
[output-full?
(lambda ()
(define l limit)
(and l
((content-length) . >= . (+ l peeked-amt))))]
(define data
(pipe-data
;; get-content-length
(method
(lambda ()
(atomically (content-length))))
;; write-position
(method
(case-lambda
;; in atomic mode
[() (or write-pos end)]
[(pos)
;; `pos` must be between `start` and `end`
(if (fx= pos end)
(set! write-pos #f)
(set! write-pos pos))]))
;; get-content
(method
(lambda (to-bstr start-pos)
;; in atomic mode
(define pos (let ([p (fx+ start start-pos)])
(if (p . fx>= . len)
(fx- p len)
p)))
(define end-pos (fx+ pos (bytes-length to-bstr)))
(cond
[(end-pos . fx<= . len)
(bytes-copy! to-bstr 0 bstr pos end-pos)]
[else
(bytes-copy! to-bstr 0 bstr pos len)
(bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))])))
;; discard-all
(method
(lambda ()
;; in atomic mode
(set! peeked-amt 0)
(set! start 0)
(set! end 0)
(set! write-pos #f)))))
;; Used before/after read:
[check-output-unblocking
(lambda ()
(when (output-full?) (semaphore-post write-ready-sema)))]
[check-input-blocking
(lambda ()
(when (input-empty?) (semaphore-wait read-ready-sema)))]
(define read-ready-sema (make-semaphore))
(define write-ready-sema (and limit (make-semaphore 1)))
(define more-read-ready-sema #f) ; for lookahead peeks
(define read-ready-evt (wrap-evt (semaphore-peek-evt read-ready-sema)
(lambda (v) 0)))
(define write-ready-evt (if limit
(semaphore-peek-evt write-ready-sema)
always-evt))
(define progress-sema #f)
;; Used before/after write:
[check-input-unblocking
(lambda ()
(when (and (input-empty?) (not output-closed?)) (semaphore-post read-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema)
(set! more-read-ready-sema #f)))]
[check-output-blocking
(lambda ()
(when (output-full?)
(semaphore-wait write-ready-sema)
(when input
(send pipe-input-port input on-output-full))))]
;; Used before/after read:
(define (check-output-unblocking)
(when (output-full?) (semaphore-post write-ready-sema)))
(define (check-input-blocking)
(when (input-empty?) (semaphore-wait read-ready-sema)))
;; Used after peeking:
[peeked!
(lambda (amt)
(when (amt . > . peeked-amt)
(check-output-unblocking)
(set! peeked-amt amt)))]))
;; Used before/after write:
(define (check-input-unblocking)
(when (and (input-empty?) (not output-closed?)) (semaphore-post read-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema)
(set! more-read-ready-sema #f)))
(define (check-output-blocking)
(when (output-full?) (semaphore-wait write-ready-sema)))
;; ----------------------------------------
;; Used after peeking:
(define (peeked! amt)
(when (amt . > . peeked-amt)
(check-output-unblocking)
(set! peeked-amt amt)))
(class pipe-input-port #:extends core-input-port
(field
[d #f] ; pipe-data
[progress-sema #f]
[commit-manager #f])
(define (progress!)
(when progress-sema
(semaphore-post progress-sema)
(set! progress-sema #f)))
(private
[progress!
(lambda ()
(when progress-sema
(semaphore-post progress-sema)
(set! progress-sema #f)))]
;; [can leave atomic mode temporarily]
;; After this function returns, complete any commit-changing work
;; before leaving atomic mode again.
[pause-waiting-commit
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))]
(define commit-manager #f)
;; [can leave atomic mode temporarily]
[wait-commit
(lambda (progress-evt ext-evt finish)
(cond
[(and (not commit-manager)
;; Try shortcut:
(not (sync/timeout 0 progress-evt))
(sync/timeout 0 ext-evt))
(finish)
#t]
[else
;; General case to support blocking and potentially multiple
;; commiting threads:
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))]
;; in atomic mode [can leave atomic mode temporarily]
;; After this function returns, complete any commit-changing work
;; before leaving atomic mode again.
(define (pause-waiting-commit)
(when commit-manager
(commit-manager-pause commit-manager)))
[fast-mode!
(lambda (amt)
(unless (or count buffer)
(with-object pipe-data d
(define s start)
(define e end)
(unless (fx= s e)
(set! buffer bstr)
(set! buffer-pos s)
(set! buffer-end (if (s . fx< . e) e len))
(define o offset)
(when o
(set! offset (- (+ o amt) s)))))))]
;; in atomic mode [can leave atomic mode temporarily]
(define (wait-commit progress-evt ext-evt finish)
(cond
[(and (not commit-manager)
;; Try shortcut:
(not (sync/timeout 0 progress-evt))
(sync/timeout 0 ext-evt))
(finish)
#t]
[else
;; General case to support blocking and potentially multiple
;; commiting threads:
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))
[slow-mode!
(lambda ()
(when buffer
(with-object pipe-data d
(define pos buffer-pos)
(define o offset)
(when o
(set! offset (+ o pos)))
(set! start (if (fx= pos len)
0
pos))
(set! buffer #f)
(set! buffer-pos buffer-end))))])
(values
;; input ----------------------------------------
(and
need-input?
(make-core-input-port
#:name input-name
#:data data
#:self self
(static
[sync-data
(lambda ()
(when buffer
(with-object pipe-data d
(define pos buffer-pos)
(set! start (if (fx= pos len)
0
pos)))))]
[on-resize
(lambda ()
(slow-mode!))]
[on-output-full
(lambda ()
(slow-mode!))])
#:prepare-change
(method
(lambda ()
(pause-waiting-commit)))
(override
[prepare-change
(lambda ()
(with-object pipe-data d
(pause-waiting-commit)))]
#:read-byte
(method
(lambda ()
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
;; event's synchronization value is ignored:
read-ready-evt)]
[else
(define pos start)
(check-output-unblocking)
(unless (fx= 0 peeked-amt)
(set! peeked-amt (fxmax 0 (fx- peeked-amt 1))))
(define new-pos (fx+ pos 1))
(if (fx= new-pos len)
(set! start 0)
(set! start new-pos))
(check-input-blocking)
(progress!)
(bytes-ref bstr pos)])))
#:read-in
(method
(lambda (dest-bstr dest-start dest-end copy?)
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
[else
(check-output-unblocking)
(begin0
(cond
[(start . fx< . end)
[read-in
(lambda (dest-bstr dest-start dest-end copy?)
(assert-atomic)
(slow-mode!)
(with-object pipe-data d
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
[else
(check-output-unblocking)
(define s start)
(define e end)
(define amt
(cond
[(s . fx< . e)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end start)))
(bytes-copy! dest-bstr dest-start bstr start (fx+ start amt))
(set! start (fx+ start amt))
(fx- e s)))
(bytes-copy! dest-bstr dest-start bstr s (fx+ s amt))
(set! start (fx+ s amt))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len start)))
(bytes-copy! dest-bstr dest-start bstr start (fx+ start amt))
(set! start (modulo (fx+ start amt) len))
(fx- len s)))
(bytes-copy! dest-bstr dest-start bstr s (fx+ s amt))
(set! start (modulo (fx+ s amt) len))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt])
(check-input-blocking)
(progress!))])))
amt]))
(check-input-blocking)
(progress!)
(fast-mode! amt)
amt])))]
#:peek-byte
(method
(lambda ()
(assert-atomic)
(cond
[(input-empty?)
(if output-closed?
eof
read-ready-evt)]
[else
(peeked! 1)
(bytes-ref bstr start)])))
#:peek-in
(method
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(assert-atomic)
(define content-amt (content-length))
(cond
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[(content-amt . <= . skip)
(cond
[output-closed? eof]
[else
(unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore)))
(define evt (if (zero? skip)
read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema)
(lambda (v) 0))))
evt])]
[else
(define peek-start (fxmodulo (fx+ start skip) len))
(cond
[(peek-start . fx< . end)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt])])))
#:byte-ready
(method
(lambda (work-done!)
(assert-atomic)
(or output-closed?
(not (fx= 0 (content-length))))))
#:close
(method
(lambda ()
(unless input-closed?
(set! input-closed? #t)
(progress!))))
#:get-progress-evt
(method
(lambda ()
(atomically
[peek-in
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(with-object pipe-data d
(assert-atomic)
(sync-data)
(define content-amt (content-length))
(cond
[input-closed? always-evt]
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[(content-amt . <= . skip)
(cond
[output-closed? eof]
[else
(unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore)))
(define evt (if (zero? skip)
read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema)
(lambda (v) 0))))
evt])]
[else
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema)]))))
(define peek-start (fxmodulo (fx+ start skip) len))
(cond
[(peek-start . fx< . end)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt])])))]
#:commit
[byte-ready
(lambda (work-done!)
(assert-atomic)
(with-object pipe-data d
(or output-closed?
(begin
(sync-data)
(not (fx= 0 (content-length)))))))]
[close
(lambda ()
(with-object pipe-data d
(when input
(slow-mode!)
(set! input #f)
(progress!))))]
[get-progress-evt
(lambda ()
(atomically
(with-object pipe-data d
(cond
[(not input) always-evt]
[else
(slow-mode!)
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema)]))))]
[commit
;; Allows `amt` to be zero and #f for other arguments,
;; which is helpful for `open-input-peek-via-read`.
(method
(lambda (amt progress-evt ext-evt finish)
(assert-atomic)
;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt`
;; is constrained; we can send them over to different threads
(cond
[(zero? amt)
(progress!)]
[else
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (min amt (content-length))])
(cond
[(fx= 0 amt)
;; There was nothing to commit; claim success for 0 bytes
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(cond
[(start . fx< . end)
(bytes-copy! dest-bstr 0 bstr start (fx+ start amt))]
[else
(define amt1 (fxmin (fx- len start) amt))
(bytes-copy! dest-bstr 0 bstr start (fx+ start amt1))
(when (amt1 . fx< . amt)
(bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))])
(set! start (fxmodulo (fx+ start amt) len))
(progress!)
(check-input-blocking)
(finish dest-bstr)]))))])))))
;; output ----------------------------------------
(make-core-output-port
#:name output-name
#:data data
#:self self
(lambda (amt progress-evt ext-evt finish)
(assert-atomic)
;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt`
;; is constrained; we can send them over to different threads
(cond
[(zero? amt)
(progress!)]
[else
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(with-object pipe-data d
(slow-mode!)
(let ([amt (min amt (content-length))])
(cond
[(fx= 0 amt)
;; There was nothing to commit; claim success for 0 bytes
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(define s start)
(define e end)
(cond
[(s . fx< . e)
(bytes-copy! dest-bstr 0 bstr s (fx+ s amt))]
[else
(define amt1 (fxmin (fx- len s) amt))
(bytes-copy! dest-bstr 0 bstr s (fx+ s amt1))
(when (amt1 . fx< . amt)
(bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))])
(set! start (fxmodulo (fx+ s amt) len))
(progress!)
(fast-mode! amt)
(check-input-blocking)
(finish dest-bstr)])))))]))]
#:evt write-ready-evt
#:write-out
;; in atomic mode
(method
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
[count-lines!
(lambda ()
(slow-mode!))]))
;; ----------------------------------------
(class pipe-output-port #:extends core-output-port
(field
[d d]) ; pipe-data
(override
[write-out
;; in atomic mode
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(with-object pipe-data d
(send pipe-input-port input sync-data)
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
(cond
[(fx= src-start src-end) ;; => flush
0]
[write-pos ; set by `file-position` on a bytes port
(define amt (apply-limit (fxmin (fx- end write-pos)
(fx- src-end src-start))))
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(send pipe-input-port input on-resize)
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt))
(let ([new-write-pos (fx+ write-pos amt)])
(if (fx= new-write-pos end)
(set! write-pos #f) ; back to normal mode
(set! write-pos new-write-pos)))
(check-output-blocking)
amt])]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
[(fx= src-start src-end) ;; => flush
0]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(check-output-blocking)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(check-output-blocking)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(check-output-blocking)
amt])]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(check-output-blocking)
amt])]
[else
(maybe-grow)]))))
(maybe-grow)]))))]
#:count-write-evt-via-write-out
(method
(lambda (op v bstr start)
(port-count! op v bstr start)))
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! out v bstr start)))]
#:close
;; in atomic mode
(method
(lambda ()
(unless output-closed?
(set! output-closed? #t)
(when write-ready-sema
(semaphore-post write-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema))
(semaphore-post read-ready-sema)))))))
[close
;; in atomic mode
(lambda ()
(with-object pipe-data d
(unless output-closed?
(set! output-closed? #t)
(when write-ready-sema
(semaphore-post write-ready-sema))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema))
(semaphore-post read-ready-sema))))]))
;; ----------------------------------------
(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe])
(define len (min+1 limit 16))
(define read-ready-sema (make-semaphore))
(define write-ready-sema (and limit (make-semaphore 1)))
(define write-ready-evt (if limit
(semaphore-peek-evt write-ready-sema)
always-evt))
(define d (new pipe-data
[bstr (make-bytes len)]
[len len]
[limit limit]
[read-ready-sema read-ready-sema]
[write-ready-sema write-ready-sema]
[read-ready-evt (wrap-evt (semaphore-peek-evt read-ready-sema)
(lambda (v) 0))]
[write-ready-evt write-ready-evt]))
(define input (new pipe-input-port
[name input-name]
[d d]))
(define output (new pipe-output-port
[name output-name]
[evt write-ready-evt]
[d d]))
(set-pipe-data-input! d input)
(values input output))
(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe])
(check who #:or-false exact-positive-integer? limit)