diff --git a/racket/src/cs/rumble/memory.ss b/racket/src/cs/rumble/memory.ss index 2cc89da2df..d6ccf12c85 100644 --- a/racket/src/cs/rumble/memory.ss +++ b/racket/src/cs/rumble/memory.ss @@ -254,6 +254,8 @@ (lambda (o) (eq? o v)))] [(eq? 'code (car args)) #%$code?] + [(eq? 'procedure (car args)) + #%procedure?] [(eq? 'ephemeron (car args)) ephemeron-pair?] [(symbol? (car args)) diff --git a/racket/src/io/common/fixnum.rkt b/racket/src/io/common/fixnum.rkt index 8c6178318f..70a7b317b5 100644 --- a/racket/src/io/common/fixnum.rkt +++ b/racket/src/io/common/fixnum.rkt @@ -2,7 +2,12 @@ (require (for-syntax racket/base) racket/fixnum) -(provide define-fixnum) +(provide define-fixnum + + ;; to cooperate with macros that explicitly + ;; manage closures, as in "object.rkt" + capture-fixnum + (for-syntax make-fixnum-transformer)) ;; Representing a mutable, fixnum-valued variable with an fxvector can ;; avoid a write barrier on assignment @@ -10,10 +15,17 @@ (define-syntax-rule (define-fixnum id v) (begin (define cell (fxvector v)) - (define-syntax id - (make-set!-transformer - (lambda (stx) - (syntax-case stx (set!) - [(set! _ r) #'(fxvector-set! cell 0 r)] - [(... (_ ...)) (raise-syntax-error stx "bad use" stx)] - [_ #'(fxvector-ref cell 0)])))))) + (define-syntax id (make-fixnum-transformer #'cell)))) + +(define-for-syntax (make-fixnum-transformer cell-id) + (with-syntax ([cell cell-id]) + (make-set!-transformer + (lambda (stx) + (syntax-case stx (set!) + [(set! _ r) #'(fxvector-set! cell 0 r)] + [(_ #:capture-fixnum) #'cell] ; see `capture-fixnum` + [(... (_ ...)) (raise-syntax-error stx "bad use" stx)] + [_ #'(fxvector-ref cell 0)]))))) + +(define-syntax-rule (capture-fixnum id) + (id #:capture-fixnum)) diff --git a/racket/src/io/common/object.rkt b/racket/src/io/common/object.rkt new file mode 100644 index 0000000000..c343f27f8e --- /dev/null +++ b/racket/src/io/common/object.rkt @@ -0,0 +1,267 @@ +#lang racket/base +(require (for-syntax racket/base + racket/pretty) + racket/stxparam + "fixnum.rkt") + +;; The `define-constructor` form implements a basic object system +;; where any `(define (id ....) ....)` or `(define id (case-lambda +;; ....)` in the constructor body is treated as a method that is +;; converted to accept a vector of free variables. +;; +;; A `lambda` or `case-lambda` is any other position can be wrapped in +;; `method` to convert it, too. Note that the caller of such methods +;; must know to pass along the self vector somehow. The `self` +;; identifier is bound to the self vector. +;; +;; Constraints: +;; +;; Macros are not expanded to recognize definitions; use only +;; `define` or `define-fixnum` for definitions. +;; +;; Only `set!` any variables within a method. +;; +;; A named method cannot be used as a value unless it is wrapped +;; with `method`; calls are automatcally converted to pass along the +;; self vector, while the `method` escape obliges a called of the +;; result to pass along the self vector. + +(provide define-constructor + method + self) + +(define-syntax-parameter self + (lambda (stx) + (raise-syntax-error #f "misuse outside of a constructor" stx))) + +(define-syntax-parameter current-constructor-fields #f) +(define-syntax-parameter current-constructor-methods #f) + +(define-for-syntax (get-current-constructor-fields) + (syntax-parameter-value #'current-constructor-fields)) +(define-for-syntax (get-current-constructor-methods) + (syntax-parameter-value #'current-constructor-methods)) + +(define-for-syntax (maybe-lift mode e) + (syntax-case mode () + [#:lift (syntax-local-lift-expression e)] + [#:no-lift e])) + +(define-syntax (method stx) + (syntax-case stx (lambda case-lambda) + [(_ id) + (identifier? #'id) + #'(id #:method)] + [(_ rhs) + #'(method #:lift rhs)] + [(_ lift-mode (lambda (arg ...) body ...)) + (with-syntax ([fields (get-current-constructor-fields)] + [methods (get-current-constructor-methods)] + [(_ _ inside) stx]) + (maybe-lift + #'lift-mode + #`(lambda (this arg ...) + (let-methods + this methods inside + (let-fields + this 0 fields inside + body ...)))))] + [(_ lift-mode (case-lambda [(arg ...) body ...] ...)) + (with-syntax ([fields (get-current-constructor-fields)] + [methods (get-current-constructor-methods)] + [(_ _ inside) stx]) + (maybe-lift + #'lift-mode + #`(case-lambda + [(this arg ...) + (let-methods + this methods inside + (let-fields + this 0 fields inside + body ...))] + ...)))])) + +(define-for-syntax (make-method-transformer self-id id) + (lambda (stx) + (syntax-case stx () + [(_ #:method) id] + [(_ arg ...) (quasisyntax/loc stx + (#,id #,self-id arg ...))]))) + +(define-syntax (define-constructor stx) + (syntax-case stx () + [(_ (name arg ...) body ... last-body) + (andmap identifier? (syntax->list #'(arg ...))) + (let () + (define fields + (apply append + (syntax->list #'(arg ...)) + (for/list ([body (in-list (syntax->list #'(body ...)))]) + (syntax-case body (define case-lambda define-values define-fixnum) + [(define (id . _) . _) + null] + [(define id (case-lambda . _)) + null] + [(define id _) + (list #'id)] + [(define-values (id ...) _) + (syntax->list #'(id ...))] + [(define-fixnum id _) + (list #'(capture-fixnum id))] + [else + null])))) + (define methods + (apply + append + (for/list ([body (in-list (syntax->list #'(body ...)))]) + (syntax-case body (define define-values define-fixnum case-lambda) + [(define (id . args) . bodys) + (list (cons #'id (generate-temporaries #'(id))))] + [(define id (case-lambda . clauses)) + (list (cons #'id (generate-temporaries #'(id))))] + [_ null])))) + (define (find-method-name id) + (for/or ([pr (in-list methods)]) + (and (free-identifier=? id (car pr)) + (cadr pr)))) + (define num-args (length (syntax->list #'(arg ...)))) + (define-values (method-defns other-bodys count) + (for/fold ([method-defns null] [other-bodys null] [n num-args]) ([body (in-list (syntax->list #'(body ...)))]) + (syntax-case body (define define-values define-fixnum case-lambda) + [(define (id . args) . bodys) + (let ([re (lambda (s) (datum->syntax body (syntax-e s) body body))] + [tmp-id (find-method-name #'id)]) + (values (cons #`(define #,tmp-id + (syntax-parameterize ([current-constructor-fields (quote-syntax #,fields)] + [current-constructor-methods (quote-syntax #,methods)]) + (method #:no-lift #,(re #'(lambda args . bodys))))) + method-defns) + other-bodys + n))] + [(define id (case-lambda . clauses)) + (let ([re (lambda (s) (datum->syntax body (syntax-e s) body body))] + [tmp-id (find-method-name #'id)]) + (values (cons #`(define #,tmp-id + (syntax-parameterize ([current-constructor-fields (quote-syntax #,fields)] + [current-constructor-methods (quote-syntax #,methods)]) + (method #:no-lift #,(re #'(case-lambda clauses))))) + method-defns) + other-bodys + n))] + [(define id _) + (values method-defns + (list* #`(vector*-set! self-vec #,n id) + body + other-bodys) + (add1 n))] + [(define-values (id ...) _) + (values method-defns + (list* #`(begin #,@(for/list ([id (in-list (syntax->list #'(id ...)))] + [n (in-naturals n)]) + #`(vector*-set! self-vec #,n #,id))) + body + other-bodys) + (+ n (length (syntax->list #'(id ...)))))] + [(define-fixnum id _) + (values method-defns + (list* #`(vector*-set! self-vec #,n (capture-fixnum id)) + body + other-bodys) + (add1 n))] + [else + (values method-defns + (cons body other-bodys) + n)]))) + (with-syntax ([inside stx] + [count #`#,count] + [fields fields] + [methods methods] + [(init-arg ...) (for/list ([arg (in-list (syntax->list #'(arg ...)))] + [i (in-naturals)]) + #`(vector*-set! self-vec #,i #,arg))] + [(other-body ...) (reverse other-bodys)] + [(method-defn ...) (reverse method-defns)]) + #'(begin + method-defn ... + (define (name arg ...) + (define self-vec (make-vector count)) + init-arg ... + (syntax-parameterize ([current-constructor-fields (quote-syntax fields)] + [current-constructor-methods (quote-syntax methods)] + [self (lambda (stx) #'self-vec)]) + (let-methods + self-vec methods inside + (let () + other-body ... + last-body)))))))] + [(_ (name . formals) body ... last-body) + (with-syntax ([(arg ...) (let loop ([formals #'formals]) + (syntax-case formals () + [() null] + [id + (identifier? #'id) + (list #'id)] + [(kw . formals) + (keyword? (syntax-e #'kw)) + (loop #'formals)] + [([id _] . formals) + (cons #'id (loop #'formals))] + [(id . formals) + (cons #'id (loop #'formals))]))]) + #`(begin + #,(datum->syntax + stx + (syntax-e #'(define-constructor (constructor arg ...) + body ... last-body)) + stx + stx) + (define (name . formals) + (constructor arg ...))))])) + +(define-syntax (let-fields stx) + (syntax-case stx (capture-fixnum) + [(_ self-id n () ctx body ...) + #'(let () body ...)] + [(_ self-id n ((capture-fixnum id) . captureds) ctx . bodys) + (with-syntax ([id (datum->syntax #'ctx (syntax-e #'id))] + [n+1 #`#,(+ (syntax-e #'n) 1)]) + #'(let-syntax ([id (make-fixnum-transformer #'(vector*-ref self-id n))]) + (let-fields self-id n+1 captureds ctx . bodys)))] + [(_ self-id n (id . captureds) ctx . bodys) + (with-syntax ([id (datum->syntax #'ctx (syntax-e #'id))] + [n+1 #`#,(+ (syntax-e #'n) 1)]) + #'(let-syntax ([id (make-set!-transformer + (lambda (stx) + (syntax-case stx (set!) + [(set! _ r) #'(vector*-set! self-id n r)] + [(_ arg (... ...)) #'((vector*-ref self-id n) arg (... ...))] + [_ #'(vector*-ref self-id n)])))]) + (let-fields self-id n+1 captureds ctx . bodys)))])) + +(define-syntax (let-methods stx) + (syntax-case stx () + [(_ self-id () ctx body ...) + #'(let () body ...)] + [(_ self-id ((id tmp-id) . methods) ctx . bodys) + (with-syntax ([id (datum->syntax #'ctx (syntax-e #'id))]) + #'(let-syntax ([id (make-method-transformer #'self-id #'tmp-id)]) + (let-methods self-id methods ctx . bodys)))])) + +;; ---------------------------------------- + +(module+ test + (define-constructor (c x y) + (define a 12) + (define-fixnum z 120) + (define (f) + (set! z 130) + (set! y 8) + (list a x (g))) + (define (g) + (list y z)) + (values (method f) + self)) + + (define-values (f f-self) (c 1 2)) + (f f-self)) + diff --git a/racket/src/io/port/bytes-port.rkt b/racket/src/io/port/bytes-port.rkt index fe8a2a84be..c750676d08 100644 --- a/racket/src/io/port/bytes-port.rkt +++ b/racket/src/io/port/bytes-port.rkt @@ -1,6 +1,7 @@ #lang racket/base (require "../common/check.rkt" "../common/fixnum.rkt" + "../common/object.rkt" "../host/thread.rkt" "port.rkt" "input-port.rkt" @@ -19,6 +20,12 @@ (define/who (open-input-bytes bstr [name 'string]) (check who bytes? bstr) + (define p (make-input-bytes (bytes->immutable-bytes bstr) name)) + (when (port-count-lines-enabled) + (port-count-lines! p)) + p) + +(define-constructor (make-input-bytes bstr name) (define-fixnum i 0) (define alt-pos #f) (define len (bytes-length bstr)) @@ -53,111 +60,116 @@ (set! commit-manager (make-commit-manager))) (commit-manager-wait commit-manager progress-evt ext-evt finish)])) - (define p - (make-core-input-port - #:name name - #:data (input-bytes-data) - #:self #f + (make-core-input-port + #:name name + #:data (input-bytes-data) + #:self self - #:prepare-change - (lambda (self) - (pause-waiting-commit)) + #:prepare-change + (method + (lambda () + (pause-waiting-commit))) - #:read-byte - (lambda (self) - (let ([pos i]) - (if (pos . < . len) - (begin - (set! i (add1 pos)) - (progress!) - (bytes-ref bstr pos)) - eof))) - - #:read-in - (lambda (self dest-bstr start end copy?) - (define pos i) - (cond - [(pos . < . len) - (define amt (min (- end start) (- len pos))) - (set! i (+ pos amt)) - (bytes-copy! dest-bstr start bstr pos (+ pos amt)) - (progress!) - amt] - [else eof])) - - #:peek-byte - (lambda (self) - (let ([pos i]) - (if (pos . < . len) - (bytes-ref bstr pos) - eof))) - - #:peek-in - (lambda (self dest-bstr start end skip progress-evt copy?) - (define pos (+ i skip)) - (cond - [(and progress-evt (sync/timeout 0 progress-evt)) - #f] - [(pos . < . len) - (define amt (min (- end start) (- len pos))) - (bytes-copy! dest-bstr start bstr pos (+ pos amt)) - amt] - [else eof])) + #:read-byte + (method + (lambda () + (let ([pos i]) + (if (pos . < . len) + (begin + (set! i (add1 pos)) + (progress!) + (bytes-ref bstr pos)) + eof)))) - #:byte-ready - (lambda (self work-done!) - (i . < . len)) + #:read-in + (method + (lambda (dest-bstr start end copy?) + (define pos i) + (cond + [(pos . < . len) + (define amt (min (- end start) (- len pos))) + (set! i (+ pos amt)) + (bytes-copy! dest-bstr start bstr pos (+ pos amt)) + (progress!) + amt] + [else eof]))) - #:close - (lambda (self) - (set! commit-manager #f) ; to indicate closed - (progress!)) + #:peek-byte + (method + (lambda () + (let ([pos i]) + (if (pos . < . len) + (bytes-ref bstr pos) + eof)))) - #:get-progress-evt - (lambda (self) - (unless progress-sema - (set! progress-sema (make-semaphore))) - (semaphore-peek-evt progress-sema)) + #:peek-in + (method + (lambda (dest-bstr start end skip progress-evt copy?) + (define pos (+ i skip)) + (cond + [(and progress-evt (sync/timeout 0 progress-evt)) + #f] + [(pos . < . len) + (define amt (min (- end start) (- len pos))) + (bytes-copy! dest-bstr start bstr pos (+ pos amt)) + amt] + [else eof]))) - #:commit - (lambda (self amt progress-evt ext-evt finish) - (unless commit-manager - (set! commit-manager (make-commit-manager))) - (commit-manager-wait - commit-manager - progress-evt ext-evt - ;; in atomic mode, maybe in a different thread: - (lambda () - (let ([amt (min amt (- len i))]) - (define dest-bstr (make-bytes amt)) - (bytes-copy! dest-bstr 0 bstr i (+ i amt)) - (set! i (+ i amt)) - (progress!) - (finish dest-bstr))))) + #:byte-ready + (method + (lambda (work-done!) + (i . < . len))) - #:file-position - (case-lambda - [(self) (or alt-pos i)] - [(self new-pos) - (set! i (if (eof-object? new-pos) - len - (min len new-pos))) - (set! alt-pos - (and new-pos - (not (eof-object? new-pos)) - (new-pos . > . i) - new-pos))]))) + #:close + (method + (lambda () + (set! commit-manager #f) ; to indicate closed + (progress!))) - (when (port-count-lines-enabled) - (port-count-lines! p)) - p) + #:get-progress-evt + (method + (lambda () + (unless progress-sema + (set! progress-sema (make-semaphore))) + (semaphore-peek-evt progress-sema))) + + #:commit + (method + (lambda (amt progress-evt ext-evt finish) + (unless commit-manager + (set! commit-manager (make-commit-manager))) + (commit-manager-wait + commit-manager + progress-evt ext-evt + ;; in atomic mode, maybe in a different thread: + (lambda () + (let ([amt (min amt (- len i))]) + (define dest-bstr (make-bytes amt)) + (bytes-copy! dest-bstr 0 bstr i (+ i amt)) + (set! i (+ i amt)) + (progress!) + (finish dest-bstr)))))) + + #:file-position + (method + (case-lambda + [() (or alt-pos i)] + [(new-pos) + (set! i (if (eof-object? new-pos) + len + (min len new-pos))) + (set! alt-pos + (and new-pos + (not (eof-object? new-pos)) + (new-pos . > . i) + new-pos))])))) ;; ---------------------------------------- (struct output-bytes-data (o get)) (define (open-output-bytes [name 'string]) - (define-values (i/none o) (make-pipe-ends #:need-input? #f)) + (define-values (i/none o) (make-pipe-ends #f name name #:need-input? #f)) (define p (make-core-output-port #:name name @@ -196,7 +208,7 @@ (end-atomic) (raise-arguments-error 'file-position "new position is too large" - "port" p + "port" o "position" new-pos)) (pipe-write-position o len) (define amt (- new-pos len)) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 009b56de03..521d91f866 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -247,8 +247,8 @@ [(pos) (+ pos (- buffer-end buffer-start))])) #:buffer-mode (case-lambda - [() buffer-mode] - [(mode) (set! buffer-mode mode)]))) + [(self) buffer-mode] + [(self mode) (set! buffer-mode mode)]))) (define custodian-reference (register-fd-close cust fd fd-refcount flush-handle port)) diff --git a/racket/src/io/port/output-port.rkt b/racket/src/io/port/output-port.rkt index 650c33df18..49c55f4c74 100644 --- a/racket/src/io/port/output-port.rkt +++ b/racket/src/io/port/output-port.rkt @@ -148,9 +148,9 @@ (write-evt ;; in atomic mode: (lambda (self-evt) - (define v (write-out self o src-bstr src-start src-end #f #f #t)) + (define v (write-out self src-bstr src-start src-end #f #f #t)) (when (exact-integer? v) - (count-write-evt-via-write-out self v src-bstr src-start)) + (count-write-evt-via-write-out self o v src-bstr src-start)) (if (evt? v) (values #f (replace-evt v self-evt)) (values (list v) #f))))))) diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index 2e28edb279..d38222655c 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -2,6 +2,7 @@ (require racket/fixnum "../common/check.rkt" "../common/fixnum.rkt" + "../common/object.rkt" "../host/thread.rkt" "port.rkt" "input-port.rkt" @@ -57,7 +58,8 @@ (define (pipe-get-content p bstr start-pos) ((pipe-data-get-content (core-port-data p)) (core-port-self p) bstr start-pos)) -(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe] #:need-input? [need-input? #t]) +(define-constructor (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe] + #:need-input? [need-input? #t]) (define bstr (make-bytes (min+1 limit 16))) (define len (bytes-length bstr)) (define-fixnum peeked-amt 0) ; peeked but not yet read effectively extends `limit` @@ -79,38 +81,42 @@ (define data (pipe-data ;; get-content-length - (lambda (self) - (atomically (content-length))) + (method + (lambda () + (atomically (content-length)))) ;; write-position - (case-lambda - ;; in atomic mode - [(self) (or write-pos end)] - [(self pos) - ;; `pos` must be between `start` and `end` - (if (fx= pos end) - (set! write-pos #f) - (set! write-pos pos))]) + (method + (case-lambda + ;; in atomic mode + [() (or write-pos end)] + [(pos) + ;; `pos` must be between `start` and `end` + (if (fx= pos end) + (set! write-pos #f) + (set! write-pos pos))])) ;; get-content - (lambda (self to-bstr start-pos) - ;; in atomic mode - (define pos (let ([p (fx+ start start-pos)]) - (if (p . fx>= . len) - (fx- p len) - p))) - (define end-pos (fx+ pos (bytes-length to-bstr))) - (cond - [(end-pos . fx<= . len) - (bytes-copy! to-bstr 0 bstr pos end-pos)] - [else - (bytes-copy! to-bstr 0 bstr pos len) - (bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))])) + (method + (lambda (to-bstr start-pos) + ;; in atomic mode + (define pos (let ([p (fx+ start start-pos)]) + (if (p . fx>= . len) + (fx- p len) + p))) + (define end-pos (fx+ pos (bytes-length to-bstr))) + (cond + [(end-pos . fx<= . len) + (bytes-copy! to-bstr 0 bstr pos end-pos)] + [else + (bytes-copy! to-bstr 0 bstr pos len) + (bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))]))) ;; discard-all - (lambda (self) - ;; in atomic mode - (set! peeked-amt 0) - (set! start 0) - (set! end 0) - (set! write-pos #f)))) + (method + (lambda () + ;; in atomic mode + (set! peeked-amt 0) + (set! start 0) + (set! end 0) + (set! write-pos #f))))) (define read-ready-sema (make-semaphore)) (define write-ready-sema (and limit (make-semaphore 1))) @@ -180,280 +186,292 @@ (make-core-input-port #:name input-name #:data data - #:self #f + #:self self #:prepare-change - (lambda (self) - (pause-waiting-commit)) - + (method + (lambda () + (pause-waiting-commit))) + #:read-byte - (lambda (self) - (assert-atomic) - (cond - [(input-empty?) - (if output-closed? - eof - ;; event's synchronization value is ignored: - read-ready-evt)] - [else - (define pos start) - (check-output-unblocking) - (unless (fx= 0 peeked-amt) - (set! peeked-amt (fxmax 0 (fx- peeked-amt 1)))) - (define new-pos (fx+ pos 1)) - (if (fx= new-pos len) - (set! start 0) - (set! start new-pos)) - (check-input-blocking) - (progress!) - (bytes-ref bstr pos)])) + (method + (lambda () + (assert-atomic) + (cond + [(input-empty?) + (if output-closed? + eof + ;; event's synchronization value is ignored: + read-ready-evt)] + [else + (define pos start) + (check-output-unblocking) + (unless (fx= 0 peeked-amt) + (set! peeked-amt (fxmax 0 (fx- peeked-amt 1)))) + (define new-pos (fx+ pos 1)) + (if (fx= new-pos len) + (set! start 0) + (set! start new-pos)) + (check-input-blocking) + (progress!) + (bytes-ref bstr pos)]))) #:read-in - (lambda (self dest-bstr dest-start dest-end copy?) - (assert-atomic) - (cond - [(input-empty?) - (if output-closed? - eof - read-ready-evt)] - [else - (check-output-unblocking) - (begin0 - (cond - [(start . fx< . end) - (define amt (fxmin (fx- dest-end dest-start) - (fx- end start))) - (bytes-copy! dest-bstr dest-start bstr start (fx+ start amt)) - (set! start (fx+ start amt)) - (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) - amt] - [else - (define amt (fxmin (fx- dest-end dest-start) - (fx- len start))) - (bytes-copy! dest-bstr dest-start bstr start (fx+ start amt)) - (set! start (modulo (fx+ start amt) len)) - (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) - amt]) - (check-input-blocking) - (progress!))])) + (method + (lambda (dest-bstr dest-start dest-end copy?) + (assert-atomic) + (cond + [(input-empty?) + (if output-closed? + eof + read-ready-evt)] + [else + (check-output-unblocking) + (begin0 + (cond + [(start . fx< . end) + (define amt (fxmin (fx- dest-end dest-start) + (fx- end start))) + (bytes-copy! dest-bstr dest-start bstr start (fx+ start amt)) + (set! start (fx+ start amt)) + (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) + amt] + [else + (define amt (fxmin (fx- dest-end dest-start) + (fx- len start))) + (bytes-copy! dest-bstr dest-start bstr start (fx+ start amt)) + (set! start (modulo (fx+ start amt) len)) + (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) + amt]) + (check-input-blocking) + (progress!))]))) #:peek-byte - (lambda (self) - (assert-atomic) - (cond - [(input-empty?) - (if output-closed? - eof - read-ready-evt)] - [else - (peeked! 1) - (bytes-ref bstr start)])) - + (method + (lambda () + (assert-atomic) + (cond + [(input-empty?) + (if output-closed? + eof + read-ready-evt)] + [else + (peeked! 1) + (bytes-ref bstr start)]))) + #:peek-in - (lambda (self dest-bstr dest-start dest-end skip progress-evt copy?) - (assert-atomic) - (define content-amt (content-length)) - (cond - [(and progress-evt - (sync/timeout 0 progress-evt)) - #f] - [(content-amt . <= . skip) - (cond - [output-closed? eof] - [else - (unless (or (zero? skip) more-read-ready-sema) - (set! more-read-ready-sema (make-semaphore))) - (define evt (if (zero? skip) - read-ready-evt - (wrap-evt (semaphore-peek-evt more-read-ready-sema) - (lambda (v) 0)))) - evt])] - [else - (define peek-start (fxmodulo (fx+ start skip) len)) - (cond - [(peek-start . fx< . end) - (define amt (fxmin (fx- dest-end dest-start) - (fx- end peek-start))) - (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) - (peeked! (+ skip amt)) - amt] - [else - (define amt (fxmin (fx- dest-end dest-start) - (fx- len peek-start))) - (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) - (peeked! (+ skip amt)) - amt])])) + (method + (lambda (dest-bstr dest-start dest-end skip progress-evt copy?) + (assert-atomic) + (define content-amt (content-length)) + (cond + [(and progress-evt + (sync/timeout 0 progress-evt)) + #f] + [(content-amt . <= . skip) + (cond + [output-closed? eof] + [else + (unless (or (zero? skip) more-read-ready-sema) + (set! more-read-ready-sema (make-semaphore))) + (define evt (if (zero? skip) + read-ready-evt + (wrap-evt (semaphore-peek-evt more-read-ready-sema) + (lambda (v) 0)))) + evt])] + [else + (define peek-start (fxmodulo (fx+ start skip) len)) + (cond + [(peek-start . fx< . end) + (define amt (fxmin (fx- dest-end dest-start) + (fx- end peek-start))) + (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) + (peeked! (+ skip amt)) + amt] + [else + (define amt (fxmin (fx- dest-end dest-start) + (fx- len peek-start))) + (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) + (peeked! (+ skip amt)) + amt])]))) #:byte-ready - (lambda (self work-done!) - (assert-atomic) - (or output-closed? - (not (fx= 0 (content-length))))) + (method + (lambda (work-done!) + (assert-atomic) + (or output-closed? + (not (fx= 0 (content-length)))))) #:close - (lambda (self) - (unless input-closed? - (set! input-closed? #t) - (progress!))) + (method + (lambda () + (unless input-closed? + (set! input-closed? #t) + (progress!)))) #:get-progress-evt - (lambda (self) - (atomically - (cond - [input-closed? always-evt] - [else - (unless progress-sema - (set! progress-sema (make-semaphore))) - (semaphore-peek-evt progress-sema)]))) + (method + (lambda () + (atomically + (cond + [input-closed? always-evt] + [else + (unless progress-sema + (set! progress-sema (make-semaphore))) + (semaphore-peek-evt progress-sema)])))) #:commit ;; Allows `amt` to be zero and #f for other arguments, ;; which is helpful for `open-input-peek-via-read`. - (lambda (self amt progress-evt ext-evt finish) - (assert-atomic) - ;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt` - ;; is constrained; we can send them over to different threads - (cond - [(zero? amt) - (progress!)] - [else - (wait-commit - progress-evt ext-evt - ;; in atomic mode, maybe in a different thread: - (lambda () - (let ([amt (min amt (content-length))]) - (cond - [(fx= 0 amt) - ;; There was nothing to commit; claim success for 0 bytes - (finish #"")] - [else - (define dest-bstr (make-bytes amt)) - (cond - [(start . fx< . end) - (bytes-copy! dest-bstr 0 bstr start (fx+ start amt))] - [else - (define amt1 (fxmin (fx- len start) amt)) - (bytes-copy! dest-bstr 0 bstr start (fx+ start amt1)) - (when (amt1 . fx< . amt) - (bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))]) - (set! start (fxmodulo (fx+ start amt) len)) - (progress!) - (check-input-blocking) - (finish dest-bstr)]))))])))) + (method + (lambda (amt progress-evt ext-evt finish) + (assert-atomic) + ;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt` + ;; is constrained; we can send them over to different threads + (cond + [(zero? amt) + (progress!)] + [else + (wait-commit + progress-evt ext-evt + ;; in atomic mode, maybe in a different thread: + (lambda () + (let ([amt (min amt (content-length))]) + (cond + [(fx= 0 amt) + ;; There was nothing to commit; claim success for 0 bytes + (finish #"")] + [else + (define dest-bstr (make-bytes amt)) + (cond + [(start . fx< . end) + (bytes-copy! dest-bstr 0 bstr start (fx+ start amt))] + [else + (define amt1 (fxmin (fx- len start) amt)) + (bytes-copy! dest-bstr 0 bstr start (fx+ start amt1)) + (when (amt1 . fx< . amt) + (bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))]) + (set! start (fxmodulo (fx+ start amt) len)) + (progress!) + (check-input-blocking) + (finish dest-bstr)]))))]))))) ;; output ---------------------------------------- (make-core-output-port #:name output-name #:data data - #:self #f + #:self self #:evt write-ready-evt #:write-out ;; in atomic mode - (lambda (self src-bstr src-start src-end nonblock? enable-break? copy?) - (assert-atomic) - (let try-again () - (define top-pos (if (fx= start 0) - (fx- len 1) - len)) - (define (maybe-grow) - (cond - [(or (not limit) - ((+ limit peeked-amt) . > . (fx- len 1))) - ;; grow pipe size - (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) - (cond - [(fx= 0 start) - (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] - [else - (bytes-copy! new-bstr 0 bstr start len) - (bytes-copy! new-bstr (fx- len start) bstr 0 end) - (set! start 0) - (set! end (fx- len 1))]) - (set! bstr new-bstr) - (set! len (bytes-length new-bstr)) - (try-again)] - [else (pipe-is-full)])) - (define (pipe-is-full) - (wrap-evt write-ready-evt (lambda (v) #f))) - (define (apply-limit amt) - (if limit - (min amt (- (+ limit peeked-amt) (content-length))) - amt)) - (cond - [(fx= src-start src-end) ;; => flush - 0] - [write-pos ; set by `file-position` on a bytes port - (define amt (apply-limit (fxmin (fx- end write-pos) - (fx- src-end src-start)))) + (method + (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (assert-atomic) + (let try-again () + (define top-pos (if (fx= start 0) + (fx- len 1) + len)) + (define (maybe-grow) (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt)) - (let ([new-write-pos (fx+ write-pos amt)]) - (if (fx= new-write-pos end) - (set! write-pos #f) ; back to normal mode - (set! write-pos new-write-pos))) - (check-output-blocking) - amt])] - [(and (end . fx>= . start) - (end . fx< . top-pos)) - (define amt (apply-limit (fxmin (fx- top-pos end) - (fx- src-end src-start)))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (let ([new-end (fx+ end amt)]) - (set! end (if (fx= new-end len) 0 new-end))) - (check-output-blocking) - amt])] - [(fx= end top-pos) - (cond - [(fx= start 0) - (maybe-grow)] - [else - (define amt (fxmin (fx- start 1) - (fx- src-end src-start))) + [(or (not limit) + ((+ limit peeked-amt) . > . (fx- len 1))) + ;; grow pipe size + (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) (cond - [(fx= amt 0) (pipe-is-full)] + [(fx= 0 start) + (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] [else - (check-input-unblocking) - (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) - (set! end amt) - (check-output-blocking) - amt])])] - [(end . fx< . (fx- start 1)) - (define amt (apply-limit (fxmin (fx- (fx- start 1) end) - (fx- src-end src-start)))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (set! end (fx+ end amt)) - (check-output-blocking) - amt])] - [else - (maybe-grow)]))) + (bytes-copy! new-bstr 0 bstr start len) + (bytes-copy! new-bstr (fx- len start) bstr 0 end) + (set! start 0) + (set! end (fx- len 1))]) + (set! bstr new-bstr) + (set! len (bytes-length new-bstr)) + (try-again)] + [else (pipe-is-full)])) + (define (pipe-is-full) + (wrap-evt write-ready-evt (lambda (v) #f))) + (define (apply-limit amt) + (if limit + (min amt (- (+ limit peeked-amt) (content-length))) + amt)) + (cond + [(fx= src-start src-end) ;; => flush + 0] + [write-pos ; set by `file-position` on a bytes port + (define amt (apply-limit (fxmin (fx- end write-pos) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt)) + (let ([new-write-pos (fx+ write-pos amt)]) + (if (fx= new-write-pos end) + (set! write-pos #f) ; back to normal mode + (set! write-pos new-write-pos))) + (check-output-blocking) + amt])] + [(and (end . fx>= . start) + (end . fx< . top-pos)) + (define amt (apply-limit (fxmin (fx- top-pos end) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (let ([new-end (fx+ end amt)]) + (set! end (if (fx= new-end len) 0 new-end))) + (check-output-blocking) + amt])] + [(fx= end top-pos) + (cond + [(fx= start 0) + (maybe-grow)] + [else + (define amt (fxmin (fx- start 1) + (fx- src-end src-start))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) + (set! end amt) + (check-output-blocking) + amt])])] + [(end . fx< . (fx- start 1)) + (define amt (apply-limit (fxmin (fx- (fx- start 1) end) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (set! end (fx+ end amt)) + (check-output-blocking) + amt])] + [else + (maybe-grow)])))) #:count-write-evt-via-write-out - (lambda (self op v bstr start) - (port-count! op v bstr start)) + (method + (lambda (op v bstr start) + (port-count! op v bstr start))) #:close ;; in atomic mode - (lambda (self) - (unless output-closed? - (set! output-closed? #t) - (when write-ready-sema - (semaphore-post write-ready-sema)) - (when more-read-ready-sema - (semaphore-post more-read-ready-sema)) - (semaphore-post read-ready-sema)))))) + (method + (lambda () + (unless output-closed? + (set! output-closed? #t) + (when write-ready-sema + (semaphore-post write-ready-sema)) + (when more-read-ready-sema + (semaphore-post more-read-ready-sema)) + (semaphore-post read-ready-sema))))))) (define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe]) (check who #:or-false exact-positive-integer? limit)