diff --git a/pkgs/racket-test-core/tests/racket/file.rktl b/pkgs/racket-test-core/tests/racket/file.rktl index a00f172330..c1083ee64c 100644 --- a/pkgs/racket-test-core/tests/racket/file.rktl +++ b/pkgs/racket-test-core/tests/racket/file.rktl @@ -583,16 +583,16 @@ (let ([th1 (thread (lambda () (display "a" out)))] [th2 (thread (lambda () - (display "a" out)))] + (display "a" out)))] [th3 (thread (lambda () - (display "a" out)))]) + (display "a" out)))]) (test #t thread-running? th1) (test #t thread-running? th2) (test #t thread-running? th3) (test 49 read-byte in) - (sleep 0.1) + (sync (system-idle-evt)) (test 2 + (if (thread-running? th1) 1 0) @@ -601,7 +601,7 @@ (test 50 read-byte in) - (sleep 0.1) + (sync (system-idle-evt)) (test 1 + (if (thread-running? th1) 1 0) @@ -610,7 +610,7 @@ (test 51 read-byte in) - (sleep 0.1) + (sync (system-idle-evt)) (test #f thread-running? th1) (test #f thread-running? th2) diff --git a/racket/src/io/common/class.rkt b/racket/src/io/common/class.rkt index edd6711820..b72525dfe9 100644 --- a/racket/src/io/common/class.rkt +++ b/racket/src/io/common/class.rkt @@ -14,6 +14,7 @@ ;; | (public [ ] ...) ;; | (private [ ] ...) ;; | (override [ ] ...) +;; | (static [ ] ...) ; not in vtable ;; | (property [ ] ...) ;; = #f ;; | (lambda ( ...) ...+) @@ -39,24 +40,31 @@ ;; Use ;; (send ...) ;; to call a method, or -;; (mewthod ) +;; (method ) ;; to get a method that expects the object as its first argument. ;; -;; In a method, fields can be accessed directly by name, and `this` is -;; bound to the current object. +;; In a method, `field`s, `private`s, and `static`s can be accessed +;; directly by name, and `this` is bound to the current object. A +;; method overridden in `new` can only access `field`s. +;; +;; Use +;; (with-object ...+) +;; to directly reference `field`s and `static`s in the s. (provide class this new send - method) + method + with-object) (define-syntax-parameter this (lambda (stx) (raise-syntax-error #f "illegal use outside of a method" stx))) (begin-for-syntax - (struct class-info (struct-info methods-id vtable-id vtable-accessor-id fields methods) + (struct class-info (struct-info methods-id vtable-id vtable-accessor-id fields methods statics) #:property prop:struct-info (lambda (ci) (class-info-struct-info ci)))) @@ -85,19 +93,21 @@ [(id expr) (list #'id #'expr (combine-ids base-id base-id "-" #'id) (combine-ids #'id "set-" base-id "-" #'id "!"))] [_ (raise-syntax-error #f (format "bad ~a clause" what) stx e)]))) - (define-values (new-fields new-methods override-methods locals properties) + (define-values (new-fields new-methods override-methods locals statics properties) (let ([l-stx (syntax-case stx () [(_ _ #:extends _ . rest) #'rest] [(_ _ . rest) #'rest])]) - (let loop ([l-stx l-stx] [new-fields null] [new-methods null] [override-methods null] [locals null] [properties null]) - (syntax-case l-stx (field public override private property) - [() (values new-fields new-methods override-methods locals properties)] + (let loop ([l-stx l-stx] [new-fields null] [new-methods null] [override-methods null] + [locals null] [statics null] [properties null]) + (syntax-case l-stx (field public override private static property) + [() (values new-fields new-methods override-methods locals statics properties)] [((field fld ...) . rest) (loop #'rest (add-procs id (syntax->list #'(fld ...)) "field" #:can-immutable? #t) new-methods override-methods locals + statics properties)] [((public method ...) . rest) (loop #'rest @@ -105,13 +115,16 @@ (add-procs methods-id (syntax->list #'(method ...)) "public") override-methods locals + statics properties)] [((override method ...) . rest) - (loop #'rest new-fields new-methods (syntax->list #'(method ...)) locals properties)] + (loop #'rest new-fields new-methods (syntax->list #'(method ...)) locals statics properties)] [((private method ...) . rest) - (loop #'rest new-fields new-methods override-methods (syntax->list #'(method ...)) properties)] + (loop #'rest new-fields new-methods override-methods (syntax->list #'(method ...)) statics properties)] + [((static method ...) . rest) + (loop #'rest new-fields new-methods override-methods locals (syntax->list #'(method ...)) properties)] [((property prop ...) . rest) - (loop #'rest new-fields new-methods override-methods locals (syntax->list #'((#:property . prop) ...)))] + (loop #'rest new-fields new-methods override-methods locals statics (syntax->list #'((#:property . prop) ...)))] [(other . _) (raise-syntax-error #f "unrecognized" stx #'other)])))) (define all-fields (if super-ci @@ -121,118 +134,118 @@ (syntax-case override () [(method-id _) (check-member stx #'method-id (if super-ci (class-info-methods super-ci) null) "method")] [_ (raise-syntax-error #f "bad override clause" stx override)])) - (with-syntax ([((field-id field-init-expr field-accessor-id field-mutator-maybe-id) ...) all-fields]) - (define wrapped-new-methods - (for/list ([new-method (in-list new-methods)]) - (syntax-case new-method () - [(method-id method-init-expr . rest) - #'(method-id (let ([method-id - (bind-fields-in-body - ([field-id field-accessor-id field-mutator-maybe-id] ...) - method-init-expr)]) - method-id) - . rest)]))) - (define all-methods/vtable (if super-ci - (append (for/list ([method (in-list (class-info-methods super-ci))]) - (syntax-case method () - [(method-id method-init-expr . rest) - (or (for/or ([override (in-list override-methods)]) - (syntax-case override () - [(override-id override-init-expr . _) - (and (eq? (syntax-e #'method-id) (syntax-e #'override-id)) - (list* #'method-id - #'(let ([method-id - (bind-fields-in-body - ([field-id field-accessor-id field-mutator-maybe-id] ...) - override-init-expr)]) - method-id) - #'rest))])) - method)])) - wrapped-new-methods) - wrapped-new-methods)) - (define vtable-id (combine-ids #'here id "-vtable")) - (define all-methods/next (for/list ([method (in-list all-methods/vtable)]) - (syntax-case method () - [(method-id method-init-expr method-accessor-id . _) - (with-syntax ([vtable-id vtable-id]) - (list #'method-id - #'(method-accessor-id vtable-id) - #'method-accessor-id))]))) - (with-syntax ([id id] - [(super-ids ...) (if super-id - (list super-id) - null)] - [quoted-super-id (and super-id #`(quote-syntax #,super-id))] - [(vtable-ids ...) (if super-id - null - (list (datum->syntax id 'vtable)))] - [vtable-accessor-id (if super-ci - (class-info-vtable-accessor-id super-ci) - (combine-ids id id "-vtable"))] - [vtable-id vtable-id] - [struct:id (combine-ids id "struct:" id)] - [make-id (combine-ids id "create-" id)] - [id? (combine-ids id id "?")] - [methods-id methods-id] - [(super-methods-ids ...) (if super-ci - (list (class-info-methods-id super-ci)) - null)] - [(new-field-id/annotated ...) (for/list ([new-field (in-list new-fields)]) - (syntax-case new-field () - [(id _ _ #f) #'id] - [(id . _) #'[id #:mutable]]))] - [((new-method-id . _) ...) new-methods] - [((_ _ rev-field-accessor-id . _) ...) (reverse all-fields)] - [((_ _ _ rev-field-mutator-maybe-id) ...) (reverse all-fields)] - [((method-id method-init-expr/vtable . _) ...) all-methods/vtable] - [((_ method-init-expr/next method-accessor-id) ...) all-methods/next] - [((local-id local-expr) ...) locals] - [(local-tmp-id ...) (generate-temporaries locals)] - [((propss ...) ...) properties]) - #`(begin - (struct id super-ids ... (vtable-ids ... new-field-id/annotated ...) - #:omit-define-syntaxes - #:constructor-name make-id - #:authentic - propss ... ...) - (struct methods-id super-methods-ids ... (new-method-id ...)) - (define vtable-id (methods-id method-init-expr/vtable ...)) - (begin - (define local-tmp-id (let ([local-id - (bind-fields-in-body ([field-id field-accessor-id field-mutator-maybe-id] ...) - local-expr)]) + (with-syntax ([((field-id field-init-expr field-accessor-id field-mutator-maybe-id) ...) all-fields] + [((local-id local-expr) ...) locals] + [((static-id static-expr) ...) statics] + [(local-tmp-id ...) (generate-temporaries locals)] + [(static-tmp-id ...) (generate-temporaries statics)]) + (with-syntax ([local-bindings #'[([field-id field-accessor-id field-mutator-maybe-id] ...) + ([local-id local-tmp-id] ... [static-id static-tmp-id] ...)]]) + (define wrapped-new-methods + (for/list ([new-method (in-list new-methods)]) + (syntax-case new-method () + [(method-id method-init-expr . rest) + #'(method-id (let ([method-id (bind-locals-in-body local-bindings method-init-expr)]) + method-id) + . rest)]))) + (define all-methods/vtable (if super-ci + (append (for/list ([method (in-list (class-info-methods super-ci))]) + (syntax-case method () + [(method-id method-init-expr . rest) + (or (for/or ([override (in-list override-methods)]) + (syntax-case override () + [(override-id override-init-expr . _) + (and (eq? (syntax-e #'method-id) (syntax-e #'override-id)) + (list* #'method-id + #'(let ([method-id + (bind-locals-in-body + local-bindings + override-init-expr)]) + method-id) + #'rest))])) + method)])) + wrapped-new-methods) + wrapped-new-methods)) + (define vtable-id (combine-ids #'here id "-vtable")) + (define all-methods/next (for/list ([method (in-list all-methods/vtable)]) + (syntax-case method () + [(method-id method-init-expr method-accessor-id . _) + (with-syntax ([vtable-id vtable-id]) + (list #'method-id + #'(method-accessor-id vtable-id) + #'method-accessor-id))]))) + (with-syntax ([id id] + [(super-ids ...) (if super-id + (list super-id) + null)] + [quoted-super-id (and super-id #`(quote-syntax #,super-id))] + [(vtable-ids ...) (if super-id + null + (list (datum->syntax id 'vtable)))] + [vtable-accessor-id (if super-ci + (class-info-vtable-accessor-id super-ci) + (combine-ids id id "-vtable"))] + [vtable-id vtable-id] + [struct:id (combine-ids id "struct:" id)] + [make-id (combine-ids id "create-" id)] + [id? (combine-ids id id "?")] + [methods-id methods-id] + [(super-methods-ids ...) (if super-ci + (list (class-info-methods-id super-ci)) + null)] + [(new-field-id/annotated ...) (for/list ([new-field (in-list new-fields)]) + (syntax-case new-field () + [(id _ _ #f) #'id] + [(id . _) #'[id #:mutable]]))] + [((new-method-id . _) ...) new-methods] + [((_ _ rev-field-accessor-id . _) ...) (reverse all-fields)] + [((_ _ _ rev-field-mutator-maybe-id) ...) (reverse all-fields)] + [((method-id method-init-expr/vtable . _) ...) all-methods/vtable] + [((_ method-init-expr/next method-accessor-id) ...) all-methods/next] + [((propss ...) ...) properties]) + #`(begin + (struct id super-ids ... (vtable-ids ... new-field-id/annotated ...) + #:omit-define-syntaxes + #:constructor-name make-id + #:authentic + propss ... ...) + (struct methods-id super-methods-ids ... (new-method-id ...)) + (define vtable-id (methods-id method-init-expr/vtable ...)) + (define static-tmp-id (let ([static-id (bind-locals-in-body local-bindings static-expr)]) + static-id)) + ... + (define local-tmp-id (let ([local-id (bind-locals-in-body local-bindings local-expr)]) local-id)) - (define-syntax (local-id stx) - (syntax-case stx () - [(_ arg (... ...)) - (with-syntax ([this-id (datum->syntax #'here 'this stx)]) - (syntax/loc stx (local-tmp-id this-id arg (... ...))))]))) - ... - (define-syntax id - (class-info (list (quote-syntax struct:id) - (quote-syntax make-id) - (quote-syntax id?) - (list (quote-syntax rev-field-accessor-id) ... (quote-syntax vtable-accessor-id)) - (list (maybe-quote-syntax rev-field-mutator-maybe-id) ... #f) - quoted-super-id) - (quote-syntax methods-id) - (quote-syntax vtable-id) - (quote-syntax vtable-accessor-id) - (list (list (quote-syntax field-id) (quote-syntax field-init-expr) - (quote-syntax field-accessor-id) (maybe-quote-syntax field-mutator-maybe-id)) - ...) - (list (list (quote-syntax method-id) (quote-syntax method-init-expr/next) - (quote-syntax method-accessor-id)) - ...))))))) + ... + (define-syntax id + (class-info (list (quote-syntax struct:id) + (quote-syntax make-id) + (quote-syntax id?) + (list (quote-syntax rev-field-accessor-id) ... (quote-syntax vtable-accessor-id)) + (list (maybe-quote-syntax rev-field-mutator-maybe-id) ... #f) + quoted-super-id) + (quote-syntax methods-id) + (quote-syntax vtable-id) + (quote-syntax vtable-accessor-id) + (list (list (quote-syntax field-id) (quote-syntax field-init-expr) + (quote-syntax field-accessor-id) (maybe-quote-syntax field-mutator-maybe-id)) + ...) + (list (list (quote-syntax method-id) (quote-syntax method-init-expr/next) + (quote-syntax method-accessor-id)) + ...) + (list (list (quote-syntax static-id) (quote-syntax static-tmp-id)) + ...)))))))) -(define-syntax (bind-fields-in-body stx) +(define-syntax (bind-locals-in-body stx) (syntax-case stx (lambda case-lambda) - [(_ fields #f) #'#f] - [(_ fields (form . rest)) - #'(bind-fields-in-body fields form (form . rest))] - [(_ fields ctx (lambda (arg ...) body0 body ...)) - #'(bind-fields-in-body fields ctx (case-lambda [(arg ...) body0 body ...]))] - [(_ fields ctx (case-lambda clause ...)) + [(_ locals #f) #'#f] + [(_ locals (form . rest)) + (with-syntax ([(_ _ orig) stx]) + #'(bind-locals-in-body locals form orig))] + [(_ locals expr) #'expr] + [(_ locals ctx (lambda (arg ...) body0 body ...)) + #'(bind-locals-in-body locals ctx (case-lambda [(arg ...) body0 body ...]))] + [(_ locals ctx (case-lambda clause ...)) (with-syntax ([(new-clause ...) (for/list ([clause (in-list (syntax->list #'(clause ...)))]) (syntax-case clause () @@ -240,21 +253,25 @@ (with-syntax ([(arg-tmp ...) (generate-temporaries #'(arg ...))]) #'[(this-id arg-tmp ...) (syntax-parameterize ([this (make-rename-transformer #'this-id)]) - (bind-fields - fields + (bind-locals + locals this-id ctx (let-syntax ([arg (make-rename-transformer #'arg-tmp)] ...) body0 body ...)))])]))]) (syntax/loc (syntax-case stx () [(_ _ _ rhs) #'rhs]) (case-lambda new-clause ...)))] - [(_ fields _ expr) + [(_ locals _ expr) #'expr])) -(define-syntax (bind-fields stx) +(define-syntax (bind-locals stx) (syntax-case stx () - [(_ ([field-id field-accessor-id field-mutator-maybe-id] ...) this-id ctx body) + [(_ [([field-id field-accessor-id field-mutator-maybe-id] ...) + ([static-id static-tmp-id] ...)] + this-id ctx body) (with-syntax ([(field-id ...) (for/list ([field-id (in-list (syntax->list #'(field-id ...)))]) - (datum->syntax #'ctx (syntax-e field-id)))]) + (datum->syntax #'ctx (syntax-e field-id)))] + [(static-id ...) (for/list ([static-id (in-list (syntax->list #'(static-id ...)))]) + (datum->syntax #'ctx (syntax-e static-id)))]) #'(let-syntax ([field-id (make-set!-transformer (lambda (stx) (syntax-case stx (set!) @@ -263,6 +280,11 @@ (raise-syntax-error #f "field is immutable" stx))] [(_ arg (... ...)) (syntax/loc stx ((field-accessor-id this-id) arg (... ...)))] [else (syntax/loc stx (field-accessor-id this-id))])))] + ... + [static-id (lambda (stx) + (syntax-case stx () + [(_ arg (... ...)) + (syntax/loc stx (static-tmp-id this-id arg (... ...)))]))] ...) body))])) @@ -309,8 +331,9 @@ (and (eq? (syntax-e #'override-id) (syntax-e #'id)) (with-syntax ([((field-id _ field-accessor-id field-mutator-maybe-id) ...) (class-info-fields ci)]) - #'(bind-fields-in-body - ([field-id field-accessor-id field-mutator-maybe-id] ...) + #'(bind-locals-in-body + [([field-id field-accessor-id field-mutator-maybe-id] ...) + ()] expr)))])) #'(selector-id vtable-id))]))]) (syntax/loc stx (make-id (methods-id method-expr ...) @@ -325,19 +348,28 @@ (syntax-local-value #'class-id (lambda () #f)))]) (unless (class-info? ci) (raise-syntax-error #f "not a class identifier" stx #'class-id)) - (define method-accessor-id + (define make-access (or (for/or ([method (in-list (class-info-methods ci))]) (syntax-case method () [(id _ accessor-id) (and (eq? (syntax-e #'id) (syntax-e #'method-id)) - #'accessor-id)])) + (lambda (o) + (with-syntax ([vtable-accessor-id (class-info-vtable-accessor-id ci)] + [o o]) + #'(accessor-id (vtable-accessor-id o)))))])) + (and call? + (for/or ([static (in-list (class-info-statics ci))]) + (syntax-case static () + [(id tmp-id) + (and (eq? (syntax-e #'id) (syntax-e #'method-id)) + (lambda (o) + #'tmp-id))]))) (raise-syntax-error #f "cannot find method" stx #'method-id))) - (with-syntax ([vtable-accessor-id (class-info-vtable-accessor-id ci)] - [method-accessor-id method-accessor-id]) - (if call? + (if call? + (with-syntax ([proc (make-access #'o)]) #'(let ([o obj]) - ((method-accessor-id (vtable-accessor-id o)) o arg ...)) - #'(method-accessor-id (vtable-accessor-id obj)))))])) + (proc o arg ...))) + (make-access #'obj)))])) (define-syntax (send stx) (send-or-method stx #t)) @@ -350,6 +382,21 @@ [(_ class-id obj method-id) (send-or-method stx #f)])) +(define-syntax (with-object stx) + (syntax-case stx () + [(_ class-id obj-expr body0 body ...) + (let ([ci (and (identifier? #'class-id) + (syntax-local-value #'class-id (lambda () #f)))]) + (unless (class-info? ci) + (raise-syntax-error #f "not a class identifier" stx #'class-id)) + (with-syntax ([((field-id _ field-accessor-id field-mutator-maybe-id) ...) + (class-info-fields ci)] + [((static-id static-tmp-id) ...) (class-info-statics ci)]) + (with-syntax ([local-bindings #'[([field-id field-accessor-id field-mutator-maybe-id] ...) + ([static-id static-tmp-id] ...)]]) + #'(let ([o obj-expr]) + (bind-locals local-bindings o obj-expr (let () body0 body ...))))))])) + (define-for-syntax (check-member stx id l what) (or (for/or ([e (in-list l)]) (syntax-case e () @@ -372,10 +419,12 @@ [b 2]) (private [other (lambda (q) (list q this))]) + (static + [enbox (lambda (v) (box (vector a v)))]) (public [q #f] [m (lambda (z) (list a (other b)))] - [n (lambda (x y z) (vector a b x y z))])) + [n (lambda (x y z) (vector a b (enbox x) y z))])) (class sub #:extends example (field @@ -395,6 +444,7 @@ (new sub [d 5]) (send example (new sub) m 'more) (set-example-b! ex 6) + (send example ex enbox 88) (define ex2 (new example #:override @@ -402,4 +452,7 @@ (box (vector x y z a b)))]) [b 'b] [a 'a])) - (send example ex2 n 1 2 3)) + (send example ex2 n 1 2 3) + + (with-object example ex + (list a b (enbox 'c)))) diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index d356e8bbbf..2deaafed5f 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -1,8 +1,7 @@ #lang racket/base (require racket/fixnum "../common/check.rkt" - "../common/fixnum.rkt" - "../common/object.rkt" + "../common/class.rkt" "../host/thread.rkt" "port.rkt" "input-port.rkt" @@ -12,468 +11,482 @@ (provide make-pipe make-pipe-ends - pipe-input-port? - pipe-output-port? - pipe-content-length - pipe-write-position - pipe-get-content - pipe-discard-all) + (rename-out [pipe-input-port?* pipe-input-port?] + [pipe-output-port?* pipe-output-port?]) + pipe-content-length) (define (min+1 a b) (if a (min (add1 a) b) b)) -(struct pipe-data (get-content-length - write-position - get-content - discard-all)) +(define pipe-input-port?* + (let ([pipe-input-port? + (lambda (p) + (define cp (->core-input-port p)) + (pipe-input-port? p))]) + pipe-input-port?)) -(define (pipe-input-port? p) - (and (input-port? p) - (pipe-data? (core-port-data (->core-input-port p))))) - -(define (pipe-output-port? p) - (and (output-port? p) - (pipe-data? (core-port-data (->core-output-port p))))) +(define pipe-output-port?* + (let ([pipe-output-port? + (lambda (p) + (define cp (->core-output-port p)) + (pipe-output-port? p))]) + pipe-output-port?)) (define (pipe-content-length p) - (define cp + (define d (cond - [(pipe-input-port? p) (->core-input-port p)] - [(pipe-output-port? p) (->core-output-port p)] + [(let ([p (->core-input-port p)]) + (and p + (pipe-input-port? p) + p)) + => (lambda (p) (pipe-input-port-d p))] + [(let ([p (->core-output-port p)]) + (and p + (pipe-output-port? p) + p)) + => (lambda (p) (pipe-output-port-d p))] [else (raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)])) - ((pipe-data-get-content-length (core-port-data cp)) - (if (core-input-port? cp) - (compat-input-port-self cp) - (compat-output-port-self cp)))) + (atomically + (send pipe-input-port (pipe-data-input d) sync-data) + (send pipe-data d content-length))) -;; in atomic mode: -(define pipe-write-position - (case-lambda - [(p) ((pipe-data-write-position (core-port-data p)) (compat-output-port-self p))] - [(p pos) ((pipe-data-write-position (core-port-data p)) (compat-output-port-self p) pos)])) +;; ---------------------------------------- -;; in atomic mode: -(define (pipe-discard-all p) - ((pipe-data-discard-all (core-port-data p)) (compat-output-port-self p))) +(class pipe-data + (field + [bstr #""] + [len 0] + [limit 0] + [peeked-amt 0] ; peeked but not yet read, effectively extends `limit` + [start 0] + [end 0] + [input #f] ; #f => closed + [output-closed? #f] + [read-ready-sema #f] + [write-ready-sema #f] + [more-read-ready-sema #f] ; for lookahead peeks + [read-ready-evt #f] + [write-ready-evt #f]) -;; in atomic mode:x -(define (pipe-get-content p bstr start-pos) - ((pipe-data-get-content (core-port-data p)) (compat-output-port-self p) bstr start-pos)) + ;; All methods in atomic mode. + ;; Beware that the input port must be synced to sure that `start` + ;; represents the current position before using these methods. + (static + [content-length + (lambda () + (define s start) + (define e end) + (if (s . fx<= . e) + (fx- e s) + (fx+ e (fx- len s))))] -(define-constructor (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe] - #:need-input? [need-input? #t]) - (define bstr (make-bytes (min+1 limit 16))) - (define len (bytes-length bstr)) - (define-fixnum peeked-amt 0) ; peeked but not yet read effectively extends `limit` - (define-fixnum start 0) - (define-fixnum end 0) - (define write-pos #f) ; to adjust the write position via `file-position` on a string port - (define input-closed? #f) - (define output-closed? #f) + [input-empty? + (lambda () + (fx= start end))] - (define (content-length) - (if (start . fx<= . end) - (fx- end start) - (fx+ end (fx- len start)))) - (define (input-empty?) (fx= start end)) - (define (output-full?) - (and limit - ((content-length) . >= . (+ limit peeked-amt)))) + [output-full? + (lambda () + (define l limit) + (and l + ((content-length) . >= . (+ l peeked-amt))))] - (define data - (pipe-data - ;; get-content-length - (method - (lambda () - (atomically (content-length)))) - ;; write-position - (method - (case-lambda - ;; in atomic mode - [() (or write-pos end)] - [(pos) - ;; `pos` must be between `start` and `end` - (if (fx= pos end) - (set! write-pos #f) - (set! write-pos pos))])) - ;; get-content - (method - (lambda (to-bstr start-pos) - ;; in atomic mode - (define pos (let ([p (fx+ start start-pos)]) - (if (p . fx>= . len) - (fx- p len) - p))) - (define end-pos (fx+ pos (bytes-length to-bstr))) - (cond - [(end-pos . fx<= . len) - (bytes-copy! to-bstr 0 bstr pos end-pos)] - [else - (bytes-copy! to-bstr 0 bstr pos len) - (bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))]))) - ;; discard-all - (method - (lambda () - ;; in atomic mode - (set! peeked-amt 0) - (set! start 0) - (set! end 0) - (set! write-pos #f))))) + ;; Used before/after read: + [check-output-unblocking + (lambda () + (when (output-full?) (semaphore-post write-ready-sema)))] + [check-input-blocking + (lambda () + (when (input-empty?) (semaphore-wait read-ready-sema)))] - (define read-ready-sema (make-semaphore)) - (define write-ready-sema (and limit (make-semaphore 1))) - (define more-read-ready-sema #f) ; for lookahead peeks - (define read-ready-evt (wrap-evt (semaphore-peek-evt read-ready-sema) - (lambda (v) 0))) - (define write-ready-evt (if limit - (semaphore-peek-evt write-ready-sema) - always-evt)) - (define progress-sema #f) + ;; Used before/after write: + [check-input-unblocking + (lambda () + (when (and (input-empty?) (not output-closed?)) (semaphore-post read-ready-sema)) + (when more-read-ready-sema + (semaphore-post more-read-ready-sema) + (set! more-read-ready-sema #f)))] + [check-output-blocking + (lambda () + (when (output-full?) + (semaphore-wait write-ready-sema) + (when input + (send pipe-input-port input on-output-full))))] - ;; Used before/after read: - (define (check-output-unblocking) - (when (output-full?) (semaphore-post write-ready-sema))) - (define (check-input-blocking) - (when (input-empty?) (semaphore-wait read-ready-sema))) + ;; Used after peeking: + [peeked! + (lambda (amt) + (when (amt . > . peeked-amt) + (check-output-unblocking) + (set! peeked-amt amt)))])) - ;; Used before/after write: - (define (check-input-unblocking) - (when (and (input-empty?) (not output-closed?)) (semaphore-post read-ready-sema)) - (when more-read-ready-sema - (semaphore-post more-read-ready-sema) - (set! more-read-ready-sema #f))) - (define (check-output-blocking) - (when (output-full?) (semaphore-wait write-ready-sema))) +;; ---------------------------------------- - ;; Used after peeking: - (define (peeked! amt) - (when (amt . > . peeked-amt) - (check-output-unblocking) - (set! peeked-amt amt))) +(class pipe-input-port #:extends core-input-port + (field + [d #f] ; pipe-data + [progress-sema #f] + [commit-manager #f]) - (define (progress!) - (when progress-sema - (semaphore-post progress-sema) - (set! progress-sema #f))) + (private + [progress! + (lambda () + (when progress-sema + (semaphore-post progress-sema) + (set! progress-sema #f)))] + + ;; [can leave atomic mode temporarily] + ;; After this function returns, complete any commit-changing work + ;; before leaving atomic mode again. + [pause-waiting-commit + (lambda () + (when commit-manager + (commit-manager-pause commit-manager)))] - (define commit-manager #f) + ;; [can leave atomic mode temporarily] + [wait-commit + (lambda (progress-evt ext-evt finish) + (cond + [(and (not commit-manager) + ;; Try shortcut: + (not (sync/timeout 0 progress-evt)) + (sync/timeout 0 ext-evt)) + (finish) + #t] + [else + ;; General case to support blocking and potentially multiple + ;; commiting threads: + (unless commit-manager + (set! commit-manager (make-commit-manager))) + (commit-manager-wait commit-manager progress-evt ext-evt finish)]))] - ;; in atomic mode [can leave atomic mode temporarily] - ;; After this function returns, complete any commit-changing work - ;; before leaving atomic mode again. - (define (pause-waiting-commit) - (when commit-manager - (commit-manager-pause commit-manager))) + [fast-mode! + (lambda (amt) + (unless (or count buffer) + (with-object pipe-data d + (define s start) + (define e end) + (unless (fx= s e) + (set! buffer bstr) + (set! buffer-pos s) + (set! buffer-end (if (s . fx< . e) e len)) + (define o offset) + (when o + (set! offset (- (+ o amt) s)))))))] - ;; in atomic mode [can leave atomic mode temporarily] - (define (wait-commit progress-evt ext-evt finish) - (cond - [(and (not commit-manager) - ;; Try shortcut: - (not (sync/timeout 0 progress-evt)) - (sync/timeout 0 ext-evt)) - (finish) - #t] - [else - ;; General case to support blocking and potentially multiple - ;; commiting threads: - (unless commit-manager - (set! commit-manager (make-commit-manager))) - (commit-manager-wait commit-manager progress-evt ext-evt finish)])) + [slow-mode! + (lambda () + (when buffer + (with-object pipe-data d + (define pos buffer-pos) + (define o offset) + (when o + (set! offset (+ o pos))) + (set! start (if (fx= pos len) + 0 + pos)) + (set! buffer #f) + (set! buffer-pos buffer-end))))]) - (values - ;; input ---------------------------------------- - (and - need-input? - (make-core-input-port - #:name input-name - #:data data - #:self self + (static + [sync-data + (lambda () + (when buffer + (with-object pipe-data d + (define pos buffer-pos) + (set! start (if (fx= pos len) + 0 + pos)))))] + [on-resize + (lambda () + (slow-mode!))] + [on-output-full + (lambda () + (slow-mode!))]) - #:prepare-change - (method - (lambda () - (pause-waiting-commit))) + (override + [prepare-change + (lambda () + (with-object pipe-data d + (pause-waiting-commit)))] - #:read-byte - (method - (lambda () - (assert-atomic) - (cond - [(input-empty?) - (if output-closed? - eof - ;; event's synchronization value is ignored: - read-ready-evt)] - [else - (define pos start) - (check-output-unblocking) - (unless (fx= 0 peeked-amt) - (set! peeked-amt (fxmax 0 (fx- peeked-amt 1)))) - (define new-pos (fx+ pos 1)) - (if (fx= new-pos len) - (set! start 0) - (set! start new-pos)) - (check-input-blocking) - (progress!) - (bytes-ref bstr pos)]))) - - #:read-in - (method - (lambda (dest-bstr dest-start dest-end copy?) - (assert-atomic) - (cond - [(input-empty?) - (if output-closed? - eof - read-ready-evt)] - [else - (check-output-unblocking) - (begin0 - (cond - [(start . fx< . end) + [read-in + (lambda (dest-bstr dest-start dest-end copy?) + (assert-atomic) + (slow-mode!) + (with-object pipe-data d + (cond + [(input-empty?) + (if output-closed? + eof + read-ready-evt)] + [else + (check-output-unblocking) + (define s start) + (define e end) + (define amt + (cond + [(s . fx< . e) (define amt (fxmin (fx- dest-end dest-start) - (fx- end start))) - (bytes-copy! dest-bstr dest-start bstr start (fx+ start amt)) - (set! start (fx+ start amt)) + (fx- e s))) + (bytes-copy! dest-bstr dest-start bstr s (fx+ s amt)) + (set! start (fx+ s amt)) (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) amt] [else (define amt (fxmin (fx- dest-end dest-start) - (fx- len start))) - (bytes-copy! dest-bstr dest-start bstr start (fx+ start amt)) - (set! start (modulo (fx+ start amt) len)) + (fx- len s))) + (bytes-copy! dest-bstr dest-start bstr s (fx+ s amt)) + (set! start (modulo (fx+ s amt) len)) (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) - amt]) - (check-input-blocking) - (progress!))]))) + amt])) + (check-input-blocking) + (progress!) + (fast-mode! amt) + amt])))] - #:peek-byte - (method - (lambda () - (assert-atomic) - (cond - [(input-empty?) - (if output-closed? - eof - read-ready-evt)] - [else - (peeked! 1) - (bytes-ref bstr start)]))) - - #:peek-in - (method - (lambda (dest-bstr dest-start dest-end skip progress-evt copy?) - (assert-atomic) - (define content-amt (content-length)) - (cond - [(and progress-evt - (sync/timeout 0 progress-evt)) - #f] - [(content-amt . <= . skip) - (cond - [output-closed? eof] - [else - (unless (or (zero? skip) more-read-ready-sema) - (set! more-read-ready-sema (make-semaphore))) - (define evt (if (zero? skip) - read-ready-evt - (wrap-evt (semaphore-peek-evt more-read-ready-sema) - (lambda (v) 0)))) - evt])] - [else - (define peek-start (fxmodulo (fx+ start skip) len)) - (cond - [(peek-start . fx< . end) - (define amt (fxmin (fx- dest-end dest-start) - (fx- end peek-start))) - (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) - (peeked! (+ skip amt)) - amt] - [else - (define amt (fxmin (fx- dest-end dest-start) - (fx- len peek-start))) - (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) - (peeked! (+ skip amt)) - amt])]))) - - #:byte-ready - (method - (lambda (work-done!) - (assert-atomic) - (or output-closed? - (not (fx= 0 (content-length)))))) - - #:close - (method - (lambda () - (unless input-closed? - (set! input-closed? #t) - (progress!)))) - - #:get-progress-evt - (method - (lambda () - (atomically + [peek-in + (lambda (dest-bstr dest-start dest-end skip progress-evt copy?) + (with-object pipe-data d + (assert-atomic) + (sync-data) + (define content-amt (content-length)) (cond - [input-closed? always-evt] + [(and progress-evt + (sync/timeout 0 progress-evt)) + #f] + [(content-amt . <= . skip) + (cond + [output-closed? eof] + [else + (unless (or (zero? skip) more-read-ready-sema) + (set! more-read-ready-sema (make-semaphore))) + (define evt (if (zero? skip) + read-ready-evt + (wrap-evt (semaphore-peek-evt more-read-ready-sema) + (lambda (v) 0)))) + evt])] [else - (unless progress-sema - (set! progress-sema (make-semaphore))) - (semaphore-peek-evt progress-sema)])))) + (define peek-start (fxmodulo (fx+ start skip) len)) + (cond + [(peek-start . fx< . end) + (define amt (fxmin (fx- dest-end dest-start) + (fx- end peek-start))) + (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) + (peeked! (+ skip amt)) + amt] + [else + (define amt (fxmin (fx- dest-end dest-start) + (fx- len peek-start))) + (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) + (peeked! (+ skip amt)) + amt])])))] - #:commit + [byte-ready + (lambda (work-done!) + (assert-atomic) + (with-object pipe-data d + (or output-closed? + (begin + (sync-data) + (not (fx= 0 (content-length)))))))] + + [close + (lambda () + (with-object pipe-data d + (when input + (slow-mode!) + (set! input #f) + (progress!))))] + + [get-progress-evt + (lambda () + (atomically + (with-object pipe-data d + (cond + [(not input) always-evt] + [else + (slow-mode!) + (unless progress-sema + (set! progress-sema (make-semaphore))) + (semaphore-peek-evt progress-sema)]))))] + + [commit ;; Allows `amt` to be zero and #f for other arguments, ;; which is helpful for `open-input-peek-via-read`. - (method - (lambda (amt progress-evt ext-evt finish) - (assert-atomic) - ;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt` - ;; is constrained; we can send them over to different threads - (cond - [(zero? amt) - (progress!)] - [else - (wait-commit - progress-evt ext-evt - ;; in atomic mode, maybe in a different thread: - (lambda () - (let ([amt (min amt (content-length))]) - (cond - [(fx= 0 amt) - ;; There was nothing to commit; claim success for 0 bytes - (finish #"")] - [else - (define dest-bstr (make-bytes amt)) - (cond - [(start . fx< . end) - (bytes-copy! dest-bstr 0 bstr start (fx+ start amt))] - [else - (define amt1 (fxmin (fx- len start) amt)) - (bytes-copy! dest-bstr 0 bstr start (fx+ start amt1)) - (when (amt1 . fx< . amt) - (bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))]) - (set! start (fxmodulo (fx+ start amt) len)) - (progress!) - (check-input-blocking) - (finish dest-bstr)]))))]))))) - - ;; output ---------------------------------------- - (make-core-output-port - #:name output-name - #:data data - #:self self + (lambda (amt progress-evt ext-evt finish) + (assert-atomic) + ;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt` + ;; is constrained; we can send them over to different threads + (cond + [(zero? amt) + (progress!)] + [else + (wait-commit + progress-evt ext-evt + ;; in atomic mode, maybe in a different thread: + (lambda () + (with-object pipe-data d + (slow-mode!) + (let ([amt (min amt (content-length))]) + (cond + [(fx= 0 amt) + ;; There was nothing to commit; claim success for 0 bytes + (finish #"")] + [else + (define dest-bstr (make-bytes amt)) + (define s start) + (define e end) + (cond + [(s . fx< . e) + (bytes-copy! dest-bstr 0 bstr s (fx+ s amt))] + [else + (define amt1 (fxmin (fx- len s) amt)) + (bytes-copy! dest-bstr 0 bstr s (fx+ s amt1)) + (when (amt1 . fx< . amt) + (bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))]) + (set! start (fxmodulo (fx+ s amt) len)) + (progress!) + (fast-mode! amt) + (check-input-blocking) + (finish dest-bstr)])))))]))] - #:evt write-ready-evt - - #:write-out - ;; in atomic mode - (method - (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) - (assert-atomic) - (let try-again () - (define top-pos (if (fx= start 0) - (fx- len 1) - len)) - (define (maybe-grow) - (cond - [(or (not limit) - ((+ limit peeked-amt) . > . (fx- len 1))) - ;; grow pipe size - (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) + [count-lines! + (lambda () + (slow-mode!))])) + +;; ---------------------------------------- + +(class pipe-output-port #:extends core-output-port + (field + [d d]) ; pipe-data + + (override + [write-out + ;; in atomic mode + (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (assert-atomic) + (with-object pipe-data d + (send pipe-input-port input sync-data) + (let try-again () + (define top-pos (if (fx= start 0) + (fx- len 1) + len)) + (define (maybe-grow) (cond - [(fx= 0 start) - (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] - [else - (bytes-copy! new-bstr 0 bstr start len) - (bytes-copy! new-bstr (fx- len start) bstr 0 end) - (set! start 0) - (set! end (fx- len 1))]) - (set! bstr new-bstr) - (set! len (bytes-length new-bstr)) - (try-again)] - [else (pipe-is-full)])) - (define (pipe-is-full) - (wrap-evt write-ready-evt (lambda (v) #f))) - (define (apply-limit amt) - (if limit - (min amt (- (+ limit peeked-amt) (content-length))) - amt)) - (cond - [(fx= src-start src-end) ;; => flush - 0] - [write-pos ; set by `file-position` on a bytes port - (define amt (apply-limit (fxmin (fx- end write-pos) - (fx- src-end src-start)))) + [(or (not limit) + ((+ limit peeked-amt) . > . (fx- len 1))) + ;; grow pipe size + (send pipe-input-port input on-resize) + (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) + (cond + [(fx= 0 start) + (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] + [else + (bytes-copy! new-bstr 0 bstr start len) + (bytes-copy! new-bstr (fx- len start) bstr 0 end) + (set! start 0) + (set! end (fx- len 1))]) + (set! bstr new-bstr) + (set! len (bytes-length new-bstr)) + (try-again)] + [else (pipe-is-full)])) + (define (pipe-is-full) + (wrap-evt write-ready-evt (lambda (v) #f))) + (define (apply-limit amt) + (if limit + (min amt (- (+ limit peeked-amt) (content-length))) + amt)) (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt)) - (let ([new-write-pos (fx+ write-pos amt)]) - (if (fx= new-write-pos end) - (set! write-pos #f) ; back to normal mode - (set! write-pos new-write-pos))) - (check-output-blocking) - amt])] - [(and (end . fx>= . start) - (end . fx< . top-pos)) - (define amt (apply-limit (fxmin (fx- top-pos end) - (fx- src-end src-start)))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (let ([new-end (fx+ end amt)]) - (set! end (if (fx= new-end len) 0 new-end))) - (check-output-blocking) - amt])] - [(fx= end top-pos) - (cond - [(fx= start 0) - (maybe-grow)] - [else - (define amt (fxmin (fx- start 1) - (fx- src-end src-start))) + [(fx= src-start src-end) ;; => flush + 0] + [(and (end . fx>= . start) + (end . fx< . top-pos)) + (define amt (apply-limit (fxmin (fx- top-pos end) + (fx- src-end src-start)))) (cond [(fx= amt 0) (pipe-is-full)] [else (check-input-unblocking) - (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) - (set! end amt) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (let ([new-end (fx+ end amt)]) + (set! end (if (fx= new-end len) 0 new-end))) (check-output-blocking) - amt])])] - [(end . fx< . (fx- start 1)) - (define amt (apply-limit (fxmin (fx- (fx- start 1) end) - (fx- src-end src-start)))) - (cond - [(fx= amt 0) (pipe-is-full)] + amt])] + [(fx= end top-pos) + (cond + [(fx= start 0) + (maybe-grow)] + [else + (define amt (fxmin (fx- start 1) + (fx- src-end src-start))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) + (set! end amt) + (check-output-blocking) + amt])])] + [(end . fx< . (fx- start 1)) + (define amt (apply-limit (fxmin (fx- (fx- start 1) end) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (set! end (fx+ end amt)) + (check-output-blocking) + amt])] [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (set! end (fx+ end amt)) - (check-output-blocking) - amt])] - [else - (maybe-grow)])))) + (maybe-grow)]))))] - #:count-write-evt-via-write-out - (method - (lambda (op v bstr start) - (port-count! op v bstr start))) + [get-write-evt + (get-write-evt-via-write-out (lambda (out v bstr start) + (port-count! out v bstr start)))] - #:close - ;; in atomic mode - (method - (lambda () - (unless output-closed? - (set! output-closed? #t) - (when write-ready-sema - (semaphore-post write-ready-sema)) - (when more-read-ready-sema - (semaphore-post more-read-ready-sema)) - (semaphore-post read-ready-sema))))))) + [close + ;; in atomic mode + (lambda () + (with-object pipe-data d + (unless output-closed? + (set! output-closed? #t) + (when write-ready-sema + (semaphore-post write-ready-sema)) + (when more-read-ready-sema + (semaphore-post more-read-ready-sema)) + (semaphore-post read-ready-sema))))])) + +;; ---------------------------------------- + +(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe]) + (define len (min+1 limit 16)) + (define read-ready-sema (make-semaphore)) + (define write-ready-sema (and limit (make-semaphore 1))) + (define write-ready-evt (if limit + (semaphore-peek-evt write-ready-sema) + always-evt)) + (define d (new pipe-data + [bstr (make-bytes len)] + [len len] + [limit limit] + [read-ready-sema read-ready-sema] + [write-ready-sema write-ready-sema] + [read-ready-evt (wrap-evt (semaphore-peek-evt read-ready-sema) + (lambda (v) 0))] + [write-ready-evt write-ready-evt])) + + (define input (new pipe-input-port + [name input-name] + [d d])) + (define output (new pipe-output-port + [name output-name] + [evt write-ready-evt] + [d d])) + + (set-pipe-data-input! d input) + + (values input output)) (define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe]) (check who #:or-false exact-positive-integer? limit)