From c2d53143c11d3e6a2b667495404dbd105638c465 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Wed, 13 Feb 2019 10:03:00 -0700 Subject: [PATCH] io: improve syntax of internal class forms --- racket/src/io/common/class.rkt | 156 ++--- racket/src/io/common/fixnum.rkt | 31 - racket/src/io/network/tcp-port.rkt | 58 +- racket/src/io/port/bytes-port.rkt | 404 +++++------ racket/src/io/port/commit-port.rkt | 78 +-- racket/src/io/port/count.rkt | 6 + racket/src/io/port/custom-input-port.rkt | 87 ++- racket/src/io/port/custom-output-port.rkt | 50 +- racket/src/io/port/fd-port.rkt | 440 ++++++------ racket/src/io/port/input-port.rkt | 169 ++--- racket/src/io/port/max-output-port.rkt | 43 +- racket/src/io/port/nowhere.rkt | 17 +- racket/src/io/port/output-port.rkt | 85 +-- racket/src/io/port/peek-via-read-port.rkt | 410 +++++------ racket/src/io/port/pipe.rkt | 793 +++++++++++----------- racket/src/io/port/port.rkt | 134 ++-- 16 files changed, 1481 insertions(+), 1480 deletions(-) delete mode 100644 racket/src/io/common/fixnum.rkt diff --git a/racket/src/io/common/class.rkt b/racket/src/io/common/class.rkt index 5786600c96..bded821db7 100644 --- a/racket/src/io/common/class.rkt +++ b/racket/src/io/common/class.rkt @@ -13,12 +13,12 @@ ;; ;; = (class ...) ;; | (class #:extends ...) -;; = (field [ ] ...) -;; | (public [ ] ...) -;; | (private [ ] ...) -;; | (override [ ] ...) -;; | (static [ ] ...) ; not in vtable -;; | (property [ ] ...) +;; = #:field [ ] ... +;; | #:public [ ] ... +;; | #:private [ ] ... ; cannot `send` +;; | #:override [ ] ... +;; | #:static [ ] ... ; cannot override +;; | #:property [ ] ... ;; = #f ;; | (lambda ...+) ;; | (case-lambda [ ...+] ...) @@ -40,12 +40,13 @@ ;; unnecessary indirections through methods that can be overridden). ;; ;; Normally, use -;; (new [ #:field [ gets its default value. To override methods for just ;; this object, use -;; (new #:override ([ ] ...) -;; [ +;; #:field [ ] ...) ;; but beware that it involves allocating a new vtable each ;; time the `new` expression is evaluated. ;; @@ -105,42 +106,20 @@ [(id expr) (list #'id #'expr (combine-ids base-id base-id "-" #'id) (combine-ids #'id "set-" base-id "-" #'id "!"))] [_ (raise-syntax-error #f (format "bad ~a clause" what) stx e)]))) - (define-values (new-fields new-methods override-methods locals statics properties) - (let ([l-stx (syntax-case stx () - [(_ _ #:extends _ . rest) #'rest] - [(_ _ . rest) #'rest])]) - (let loop ([l-stx l-stx] [new-fields null] [new-methods null] [override-methods null] - [locals null] [statics null] [properties null]) - (syntax-case l-stx (field public override private static property) - [() (values new-fields new-methods override-methods locals statics properties)] - [((field fld ...) . rest) - (loop #'rest - (append new-fields - (add-procs id (syntax->list #'(fld ...)) "field" #:can-immutable? #t)) - new-methods override-methods locals statics properties)] - [((public method ...) . rest) - (loop #'rest new-fields - (append new-methods - (add-procs methods-id (syntax->list #'(method ...)) "public")) - override-methods locals statics properties)] - [((override method ...) . rest) - (loop #'rest new-fields new-methods - (append override-methods (syntax->list #'(method ...))) - locals statics properties)] - [((private method ...) . rest) - (loop #'rest new-fields - new-methods override-methods - (append locals (syntax->list #'(method ...))) - statics properties)] - [((static method ...) . rest) - (loop #'rest new-fields new-methods override-methods locals - (append statics (syntax->list #'(method ...))) - properties)] - [((property prop ...) . rest) - (loop #'rest new-fields new-methods override-methods locals statics - (append properties (syntax->list #'((#:property . prop) ...))))] - [(other . _) - (raise-syntax-error #f "unrecognized" stx #'other)])))) + (define groups (extract-groups + stx + (syntax-case stx () + [(_ _ #:extends _ . rest) #'rest] + [(_ _ . rest) #'rest]) + '(#:field #:public #:override #:private #:static #:property))) + (define (extract-group tag) (reverse (hash-ref groups tag '()))) + (define new-fields (add-procs id (extract-group '#:field) "field" #:can-immutable? #t)) + (define new-methods (add-procs methods-id (extract-group '#:public) "public")) + (define override-methods (extract-group '#:override)) + (define locals (extract-group '#:private)) + (define statics (extract-group '#:static)) + (define properties (for/list ([prop (in-list (extract-group '#:property))]) + (cons '#:property prop))) (define all-fields (if super-ci (append (class-info-fields super-ci) new-fields) new-fields)) @@ -321,29 +300,31 @@ (define-syntax (new stx) (syntax-case stx () - [(_ class-id #:override (override ...) init ...) + [(_ class-id clause ...) (let ([ci (and (identifier? #'class-id) (syntax-local-value #'class-id (lambda () #f)))]) (unless (class-info? ci) (raise-syntax-error #f "not a class identifier" stx #'class-id)) - (for ([init (in-list (syntax->list #'(init ...)))]) + (define groups (extract-groups stx #'(clause ...) '(#:field #:override))) + (define inits (reverse (hash-ref groups '#:field '()))) + (define overrides (reverse (hash-ref groups '#:override '()))) + (for ([init (in-list inits)]) (syntax-case init () [(field-id _) (check-member stx #'field-id (class-info-fields ci) "field")] [_ (raise-syntax-error #f "bad field-inialization clause" stx init)])) - (for ([override (in-list (syntax->list #'(override ...)))]) + (for ([override (in-list overrides)]) (syntax-case override () [(method-id _) (check-member stx #'method-id (class-info-methods ci) "method")] [_ (raise-syntax-error #f "bad method-override clause" stx override)])) (define field-exprs (for/list ([field (in-list (class-info-fields ci))]) (syntax-case field () [(field-id field-expr . _) - (or (for/or ([init (in-list (syntax->list #'(init ...)))]) + (or (for/or ([init (in-list inits)]) (syntax-case init () [(id expr) (and (eq? (syntax-e #'id) (syntax-e #'field-id)) #'expr)])) #'field-expr)]))) - (define overrides (syntax->list #'(override ...))) (with-syntax ([make-id (cadr (class-info-struct-info ci))] [vtable-id (class-info-vtable-id ci)] [(field-expr ...) field-exprs]) @@ -428,6 +409,25 @@ #'(let ([o obj-expr]) (bind-locals local-bindings o obj-expr (let () body0 body ...))))))])) +;; ---------------------------------------- + +(define-for-syntax (extract-groups stx l-stx ok-groups) + (let loop ([l-stx l-stx] [current-group #f] [groups #hasheq()]) + (syntax-case l-stx () + [() groups] + [(kw . rest) + (memq (syntax-e #'kw) ok-groups) + (loop #'rest (syntax-e #'kw) groups)] + [(kw . rest) + (keyword? (syntax-e #'kw)) + (raise-syntax-error #f "unrecognized section keyword" stx #'kw)] + [(other . rest) + (if current-group + (loop #'rest + current-group + (hash-update groups current-group (lambda (l) (cons #'other l)) null)) + (raise-syntax-error #f "need an initial section keyword, such as `#:field`" stx #'other))]))) + (define-for-syntax (check-member stx id l what) (or (for/or ([e (in-list l)]) (syntax-case e () @@ -492,37 +492,40 @@ ;; ---------------------------------------- -(module+ test +(module+ main (class example - (field - [a 1 #:immutable] - [b 2]) - (private - [other (lambda (q) (list q this))]) - (static - [enbox (lambda (v #:opt [opt (vector v a)]) - (box (vector a v opt)))]) - (public - [q #f] - [m (lambda (z #:maybe [maybe 9]) (list a (other b) maybe))] - [n (lambda (x y z) (vector a b (enbox x) y z))])) + #:field + [a 1 #:immutable] + [b 2] + #:private + [other (lambda (q) (list q this))] + #:static + [enbox (lambda (v #:opt [opt (vector v a)]) + (box (vector a v opt)))] + #:public + [q #f] + [m (lambda (z #:maybe [maybe 9]) (list a (other b) maybe))] + [n (lambda (x y z) (vector a b (enbox x) y z))]) (class sub #:extends example - (field - [c 3] - [d 4]) - (override - [m (lambda (z) 'other)]) - (property - [prop:custom-write (lambda (s o m) - (write 'sub: o) - (write (sub-d s) o))])) + #:override + [m (lambda (z) 'other)] + #:field + [c 3] + #:property + [prop:custom-write (lambda (s o m) + (write 'sub: o) + (write (sub-d s) o))] + #:field + [d 4]) - (define ex (new example [b 5])) + (define ex (new example + #:field + [b 5])) (send example ex m 'ok #:maybe 'yep) (method example ex m) - (new sub [d 5]) + (new sub #:field [d 5]) (send example (new sub) m 'more) (set-example-b! ex 6) (send example ex enbox 88) @@ -530,8 +533,9 @@ (define ex2 (new example #:override - ([q (lambda (x y z) - (box (vector x y z a b)))]) + [q (lambda (x y z) + (box (vector x y z a b)))] + #:field [b 'b] [a 'a])) (send example ex2 n 1 2 3) diff --git a/racket/src/io/common/fixnum.rkt b/racket/src/io/common/fixnum.rkt deleted file mode 100644 index 70a7b317b5..0000000000 --- a/racket/src/io/common/fixnum.rkt +++ /dev/null @@ -1,31 +0,0 @@ -#lang racket/base -(require (for-syntax racket/base) - racket/fixnum) - -(provide define-fixnum - - ;; to cooperate with macros that explicitly - ;; manage closures, as in "object.rkt" - capture-fixnum - (for-syntax make-fixnum-transformer)) - -;; Representing a mutable, fixnum-valued variable with an fxvector can -;; avoid a write barrier on assignment - -(define-syntax-rule (define-fixnum id v) - (begin - (define cell (fxvector v)) - (define-syntax id (make-fixnum-transformer #'cell)))) - -(define-for-syntax (make-fixnum-transformer cell-id) - (with-syntax ([cell cell-id]) - (make-set!-transformer - (lambda (stx) - (syntax-case stx (set!) - [(set! _ r) #'(fxvector-set! cell 0 r)] - [(_ #:capture-fixnum) #'cell] ; see `capture-fixnum` - [(... (_ ...)) (raise-syntax-error stx "bad use" stx)] - [_ #'(fxvector-ref cell 0)]))))) - -(define-syntax-rule (capture-fixnum id) - (id #:capture-fixnum)) diff --git a/racket/src/io/network/tcp-port.rkt b/racket/src/io/network/tcp-port.rkt index fbf90e40c8..5e8179fc58 100644 --- a/racket/src/io/network/tcp-port.rkt +++ b/racket/src/io/network/tcp-port.rkt @@ -15,49 +15,51 @@ tcp-abandon-port) (class tcp-input-port #:extends fd-input-port - (field - [abandon? #f]) - (override - [on-close - (lambda () - (unless abandon? - (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ)))] - [raise-read-error - (lambda (n) - (raise-network-error #f n "error reading from stream port"))]) - (property - [prop:file-stream #f] - [prop:fd-place-message-opener (lambda (fd name) - (make-tcp-input-port fd name))])) + #:field + [abandon? #f] + #:override + [on-close + (lambda () + (unless abandon? + (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ)))] + [raise-read-error + (lambda (n) + (raise-network-error #f n "error reading from stream port"))] + #:property + [prop:file-stream #f] + [prop:fd-place-message-opener (lambda (fd name) + (make-tcp-input-port fd name))]) (define (make-tcp-input-port fd name #:fd-refcount [fd-refcount (box 1)]) (finish-fd-input-port (new tcp-input-port + #:field [name name] [fd fd] [fd-refcount fd-refcount]))) (class tcp-output-port #:extends fd-output-port - (field - [abandon? #f]) - (override - [on-close - (lambda () - (unless abandon? - (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE)))] - [raise-write-error - (lambda (n) - (raise-network-error #f n "error writing to stream port"))]) - (property - [prop:file-stream #f] - [prop:fd-place-message-opener (lambda (fd name) - (make-tcp-output-port fd name))])) + #:field + [abandon? #f] + #:override + [on-close + (lambda () + (unless abandon? + (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE)))] + [raise-write-error + (lambda (n) + (raise-network-error #f n "error writing to stream port"))] + #:property + [prop:file-stream #f] + [prop:fd-place-message-opener (lambda (fd name) + (make-tcp-output-port fd name))]) (define (make-tcp-output-port fd name #:fd-refcount [fd-refcount (box 1)]) (finish-fd-output-port (new tcp-output-port + #:field [name name] [fd fd] [fd-refcount fd-refcount] diff --git a/racket/src/io/port/bytes-port.rkt b/racket/src/io/port/bytes-port.rkt index 03ec2e8463..ad2fcc37be 100644 --- a/racket/src/io/port/bytes-port.rkt +++ b/racket/src/io/port/bytes-port.rkt @@ -1,7 +1,6 @@ #lang racket/base (require racket/fixnum "../common/check.rkt" - "../common/fixnum.rkt" "../common/class.rkt" "../host/thread.rkt" "port.rkt" @@ -24,227 +23,228 @@ p) (class bytes-input-port #:extends commit-input-port - (field - [bstr #f] ; normally installed as buffer - [pos 0] ; used when bstr is not installed as buffer - [alt-pos #f]) + #:field + [bstr #f] ; normally installed as buffer + [pos 0] ; used when bstr is not installed as buffer + [alt-pos #f] - (private - ;; in atomic mode - [in-buffer-pos - (lambda () - (define b buffer) - (if (direct-bstr b) - (direct-pos b) - pos))]) + #:private + ;; in atomic mode + [in-buffer-pos + (lambda () + (define b buffer) + (if (direct-bstr b) + (direct-pos b) + pos))] - (override - [close - (lambda () - (set! commit-manager #f) ; to indicate closed - (progress!) - (set! bstr #f) - (define b buffer) - (when (direct-bstr b) - (set! offset (direct-pos b)) - (set-direct-bstr! b #f)))] - [file-position - (case-lambda - [() (or alt-pos (in-buffer-pos))] - [(given-pos) + #:override + [close + (lambda () + (set! commit-manager #f) ; to indicate closed + (progress!) + (set! bstr #f) + (define b buffer) + (when (direct-bstr b) + (set! offset (direct-pos b)) + (set-direct-bstr! b #f)))] + [file-position + (case-lambda + [() (or alt-pos (in-buffer-pos))] + [(given-pos) + (define b buffer) + (define len (direct-end b)) + (define new-pos (if (eof-object? given-pos) + len + (min len given-pos))) + (if (direct-bstr b) + (set-direct-pos! b new-pos) + (set! pos new-pos)) + (set! alt-pos (and (not (eof-object? given-pos)) + (given-pos . > . new-pos) + given-pos))])] + + [prepare-change + (lambda () + (pause-waiting-commit))] + + [read-in + (lambda (dest-bstr start end copy?) + (define b buffer) + (define len (direct-end b)) + (define i (in-buffer-pos)) + (cond + [(i . < . len) + (define amt (min (- end start) (fx- len i))) + (define new-pos (fx+ i amt)) + ;; Keep/resume fast mode + (set-direct-pos! b new-pos) + (set! offset 0) + (set-direct-bstr! b bstr) + (bytes-copy! dest-bstr start bstr i new-pos) + (progress!) + amt] + [else eof]))] + + [peek-in + (lambda (dest-bstr start end skip progress-evt copy?) + (define b buffer) + (define len (direct-end b)) + (define i (in-buffer-pos)) + (define at-pos (+ i skip)) + (cond + [(and progress-evt (sync/timeout 0 progress-evt)) + #f] + [(at-pos . < . len) + (define amt (min (- end start) (fx- len at-pos))) + (bytes-copy! dest-bstr start bstr at-pos (fx+ at-pos amt)) + amt] + [else eof]))] + + [byte-ready + (lambda (work-done!) + ((in-buffer-pos) . < . (direct-end buffer)))] + + [get-progress-evt + (lambda () + (atomically + (unless progress-sema + ;; set port to slow mode: + (define b buffer) + (when (direct-bstr b) + (define i (direct-pos b)) + (set! pos i) + (set! offset i) + (set-direct-bstr! b #f) + (set-direct-pos! b (direct-end b)))) + (make-progress-evt)))] + + [commit + (lambda (amt progress-evt ext-evt finish) + (wait-commit + progress-evt ext-evt + ;; in atomic mode, maybe in a different thread: + (lambda () (define b buffer) (define len (direct-end b)) - (define new-pos (if (eof-object? given-pos) - len - (min len given-pos))) - (if (direct-bstr b) - (set-direct-pos! b new-pos) - (set! pos new-pos)) - (set! alt-pos (and (not (eof-object? given-pos)) - (given-pos . > . new-pos) - given-pos))])] - - [prepare-change - (lambda () - (pause-waiting-commit))] - - [read-in - (lambda (dest-bstr start end copy?) - (define b buffer) - (define len (direct-end b)) - (define i (in-buffer-pos)) - (cond - [(i . < . len) - (define amt (min (- end start) (fx- len i))) - (define new-pos (fx+ i amt)) + (define i (in-buffer-pos)) + (let ([amt (min amt (- len i))]) + (define dest-bstr (make-bytes amt)) + (bytes-copy! dest-bstr 0 bstr i (+ i amt)) ;; Keep/resume fast mode - (set-direct-pos! b new-pos) - (set! offset 0) + (set-direct-pos! b (fx+ i amt)) (set-direct-bstr! b bstr) - (bytes-copy! dest-bstr start bstr i new-pos) + (set! offset 0) (progress!) - amt] - [else eof]))] - - [peek-in - (lambda (dest-bstr start end skip progress-evt copy?) - (define b buffer) - (define len (direct-end b)) - (define i (in-buffer-pos)) - (define at-pos (+ i skip)) - (cond - [(and progress-evt (sync/timeout 0 progress-evt)) - #f] - [(at-pos . < . len) - (define amt (min (- end start) (fx- len at-pos))) - (bytes-copy! dest-bstr start bstr at-pos (fx+ at-pos amt)) - amt] - [else eof]))] - - [byte-ready - (lambda (work-done!) - ((in-buffer-pos) . < . (direct-end buffer)))] - - [get-progress-evt - (lambda () - (atomically - (unless progress-sema - ;; set port to slow mode: - (define b buffer) - (when (direct-bstr b) - (define i (direct-pos b)) - (set! pos i) - (set! offset i) - (set-direct-bstr! b #f) - (set-direct-pos! b (direct-end b)))) - (make-progress-evt)))] - - [commit - (lambda (amt progress-evt ext-evt finish) - (wait-commit - progress-evt ext-evt - ;; in atomic mode, maybe in a different thread: - (lambda () - (define b buffer) - (define len (direct-end b)) - (define i (in-buffer-pos)) - (let ([amt (min amt (- len i))]) - (define dest-bstr (make-bytes amt)) - (bytes-copy! dest-bstr 0 bstr i (+ i amt)) - ;; Keep/resume fast mode - (set-direct-pos! b (fx+ i amt)) - (set-direct-bstr! b bstr) - (set! offset 0) - (progress!) - (finish dest-bstr)))))])) + (finish dest-bstr)))))]) (define (make-input-bytes bstr name) - (new bytes-input-port - [name name] - [buffer (direct bstr 0 (bytes-length bstr))] - [bstr bstr])) + (finish-port/count + (new bytes-input-port + #:field + [name name] + [buffer (direct bstr 0 (bytes-length bstr))] + [bstr bstr]))) ;; ---------------------------------------- (class bytes-output-port #:extends core-output-port - (field - [bstr #""] - [pos 0] - [max-pos 0]) + #:field + [bstr #""] + [pos 0] + [max-pos 0] - (public - [get-length (lambda () - (start-atomic) - (slow-mode!) - (end-atomic) - max-pos)] - [get-bytes (lambda (dest-bstr start-pos discard?) - (start-atomic) - (slow-mode!) - (bytes-copy! dest-bstr 0 bstr start-pos (fx+ start-pos (bytes-length dest-bstr))) - (when discard? - (set! bstr #"") - (set! pos 0) - (set! max-pos 0)) - (end-atomic))]) + #:public + [get-length (lambda () + (start-atomic) + (slow-mode!) + (end-atomic) + max-pos)] + [get-bytes (lambda (dest-bstr start-pos discard?) + (start-atomic) + (slow-mode!) + (bytes-copy! dest-bstr 0 bstr start-pos (fx+ start-pos (bytes-length dest-bstr))) + (when discard? + (set! bstr #"") + (set! pos 0) + (set! max-pos 0)) + (end-atomic))] - (private - [enlarge! - (lambda (len) - (define new-bstr (make-bytes (fx* 2 len))) - (bytes-copy! new-bstr 0 bstr 0 pos) - (set! bstr new-bstr))] + #:private + [enlarge! + (lambda (len) + (define new-bstr (make-bytes (fx* 2 len))) + (bytes-copy! new-bstr 0 bstr 0 pos) + (set! bstr new-bstr))] - [slow-mode! - (lambda () - (define b buffer) - (when (direct-bstr b) - (define s (direct-pos b)) - (set! pos s) - (set-direct-pos! b (direct-end b)) - (set-direct-bstr! b #f) - (set! offset s) - (set! max-pos (fxmax s max-pos))))] + [slow-mode! + (lambda () + (define b buffer) + (when (direct-bstr b) + (define s (direct-pos b)) + (set! pos s) + (set-direct-pos! b (direct-end b)) + (set-direct-bstr! b #f) + (set! offset s) + (set! max-pos (fxmax s max-pos))))] - [fast-mode! - (lambda () - (define b buffer) - (set-direct-bstr! b bstr) - (set-direct-pos! b pos) - (set-direct-end! b (bytes-length bstr)) - (set! offset 0))]) + [fast-mode! + (lambda () + (define b buffer) + (set-direct-bstr! b bstr) + (set-direct-pos! b pos) + (set-direct-end! b (bytes-length bstr)) + (set! offset 0))] - (override - [write-out - (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) - (slow-mode!) - (define i pos) - (define amt (min (fx- src-end src-start) 4096)) - (define end-i (fx+ i amt)) - (when ((bytes-length bstr) . < . end-i) - (enlarge! end-i)) - (bytes-copy! bstr i src-bstr src-start (fx+ src-start amt)) - (set! pos end-i) - (set! max-pos (fxmax pos max-pos)) - (fast-mode!) - amt)] - [get-write-evt - (get-write-evt-via-write-out (lambda (out v bstr start) - (port-count! out v bstr start)))] - [file-position - (case-lambda - [() - (define b buffer) - (if (direct-bstr b) (direct-pos b) pos)] - [(new-pos) - (slow-mode!) - (define len (bytes-length bstr)) - (cond - [(eof-object? new-pos) - (set! pos max-pos)] - [(new-pos . > . len) - (when (new-pos . >= . (expt 2 48)) - ;; implausibly large - (end-atomic) - (raise-arguments-error 'file-position - "new position is too large" - "port" this - "position" new-pos)) - (enlarge! len) - (set! pos new-pos) - (set! max-pos new-pos)] - [else - (set! pos new-pos) - (set! max-pos (fxmax max-pos new-pos))])])])) + #:override + [write-out + (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (slow-mode!) + (define i pos) + (define amt (min (fx- src-end src-start) 4096)) + (define end-i (fx+ i amt)) + (when ((bytes-length bstr) . < . end-i) + (enlarge! end-i)) + (bytes-copy! bstr i src-bstr src-start (fx+ src-start amt)) + (set! pos end-i) + (set! max-pos (fxmax pos max-pos)) + (fast-mode!) + amt)] + [get-write-evt + (get-write-evt-via-write-out (lambda (out v bstr start) + (port-count! out v bstr start)))] + [file-position + (case-lambda + [() + (define b buffer) + (if (direct-bstr b) (direct-pos b) pos)] + [(new-pos) + (slow-mode!) + (define len (bytes-length bstr)) + (cond + [(eof-object? new-pos) + (set! pos max-pos)] + [(new-pos . > . len) + (when (new-pos . >= . (expt 2 48)) + ;; implausibly large + (end-atomic) + (raise-arguments-error 'file-position + "new position is too large" + "port" this + "position" new-pos)) + (enlarge! len) + (set! pos new-pos) + (set! max-pos new-pos)] + [else + (set! pos new-pos) + (set! max-pos (fxmax max-pos new-pos))])])]) (define (open-output-bytes [name 'string]) - (define p (new bytes-output-port - [bstr (make-bytes 16)] - [name name] - [evt always-evt])) - (when (port-count-lines-enabled) - (port-count-lines! p)) - p) + (finish-port/count + (new bytes-output-port + #:field + [bstr (make-bytes 16)] + [name name] + [evt always-evt]))) (define/who (get-output-bytes o [reset? #f] [start-pos 0] [end-pos #f]) (check who (lambda (v) (and (output-port? o) (string-port? o))) diff --git a/racket/src/io/port/commit-port.rkt b/racket/src/io/port/commit-port.rkt index f821ce74cb..f1c464baac 100644 --- a/racket/src/io/port/commit-port.rkt +++ b/racket/src/io/port/commit-port.rkt @@ -8,46 +8,46 @@ (provide commit-input-port) (class commit-input-port #:extends core-input-port - (field - [progress-sema #f] - [commit-manager #f]) + #:field + [progress-sema #f] + [commit-manager #f] - (static - ;; in atomic mode - [progress! - (lambda () - (when progress-sema - (semaphore-post progress-sema) - (set! progress-sema #f)))] + #:static + ;; in atomic mode + [progress! + (lambda () + (when progress-sema + (semaphore-post progress-sema) + (set! progress-sema #f)))] - ;; in atomic mode [can leave atomic mode temporarily] - ;; After this function returns, complete any commit-changing work - ;; before leaving atomic mode again. - [pause-waiting-commit - (lambda () - (when commit-manager - (commit-manager-pause commit-manager)))] + ;; in atomic mode [can leave atomic mode temporarily] + ;; After this function returns, complete any commit-changing work + ;; before leaving atomic mode again. + [pause-waiting-commit + (lambda () + (when commit-manager + (commit-manager-pause commit-manager)))] - ;; in atomic mode [can leave atomic mode temporarily] - [wait-commit - (lambda (progress-evt ext-evt finish) - (cond - [(and (not commit-manager) - ;; Try shortcut: - (not (sync/timeout 0 progress-evt)) - (sync/timeout 0 ext-evt)) - (finish) - #t] - [else - ;; General case to support blocking and potentially multiple - ;; commiting threads: - (unless commit-manager - (set! commit-manager (make-commit-manager))) - (commit-manager-wait commit-manager progress-evt ext-evt finish)]))] + ;; in atomic mode [can leave atomic mode temporarily] + [wait-commit + (lambda (progress-evt ext-evt finish) + (cond + [(and (not commit-manager) + ;; Try shortcut: + (not (sync/timeout 0 progress-evt)) + (sync/timeout 0 ext-evt)) + (finish) + #t] + [else + ;; General case to support blocking and potentially multiple + ;; commiting threads: + (unless commit-manager + (set! commit-manager (make-commit-manager))) + (commit-manager-wait commit-manager progress-evt ext-evt finish)]))] - ;; in atomic mode - [make-progress-evt - (lambda () - (unless progress-sema - (set! progress-sema (make-semaphore))) - (semaphore-peek-evt progress-sema))])) + ;; in atomic mode + [make-progress-evt + (lambda () + (unless progress-sema + (set! progress-sema (make-semaphore))) + (semaphore-peek-evt progress-sema))]) diff --git a/racket/src/io/port/count.rkt b/racket/src/io/port/count.rkt index 85d64f00a6..46273822a4 100644 --- a/racket/src/io/port/count.rkt +++ b/racket/src/io/port/count.rkt @@ -10,6 +10,7 @@ "../string/utf-8-decode.rkt") (provide port-count-lines-enabled + finish-port/count port-count-lines! port-counts-lines? @@ -25,6 +26,11 @@ (define port-count-lines-enabled (make-parameter #f (lambda (v) (and v #t)))) +(define (finish-port/count p) + (when (port-count-lines-enabled) + (port-count-lines! p)) + p) + (define/who (port-count-lines! p) (let ([p (cond [(input-port? p) (->core-input-port p)] diff --git a/racket/src/io/port/custom-input-port.rkt b/racket/src/io/port/custom-input-port.rkt index 936ae05f0f..aaef9a6369 100644 --- a/racket/src/io/port/custom-input-port.rkt +++ b/racket/src/io/port/custom-input-port.rkt @@ -260,48 +260,45 @@ (and user-buffer-mode (make-buffer-mode user-buffer-mode))) - (define port - (cond - [user-peek-in - (new core-input-port - #:override - ([read-in (if (input-port? user-read-in) - user-read-in - read-in)] - [peek-in (if (input-port? user-peek-in) - user-peek-in - peek-in)] - [byte-ready (if (input-port? user-peek-in) - user-peek-in - byte-ready)] - [close close] - [get-progress-evt (and user-get-progress-evt get-progress-evt)] - [commit (and user-commit commit)] - [get-location get-location] - [count-lines! count-lines!] - [file-position file-position] - [buffer-mode buffer-mode]) - [name name] - [offset init-offset])] - [else - (new peek-via-read-input-port - #:override - ([read-in/inner read-in] - [close (values - (lambda (self) - (close self) - (send peek-via-read-input-port self close-peek-buffer)))] - [get-location get-location] - [count-lines! count-lines!] - [file-position file-position] - [buffer-mode (or buffer-mode - (case-lambda - [(self) (send peek-via-read-input-port self default-buffer-mode)] - [(self mode) (send peek-via-read-input-port self default-buffer-mode mode)]))]) - [name name] - [offset init-offset])])) - - (when (port-count-lines-enabled) - (port-count-lines! port)) - - port) + (finish-port/count + (cond + [user-peek-in + (new core-input-port + #:field + [name name] + [offset init-offset] + #:override + [read-in (if (input-port? user-read-in) + user-read-in + read-in)] + [peek-in (if (input-port? user-peek-in) + user-peek-in + peek-in)] + [byte-ready (if (input-port? user-peek-in) + user-peek-in + byte-ready)] + [close close] + [get-progress-evt (and user-get-progress-evt get-progress-evt)] + [commit (and user-commit commit)] + [get-location get-location] + [count-lines! count-lines!] + [file-position file-position] + [buffer-mode buffer-mode])] + [else + (new peek-via-read-input-port + #:field + [name name] + [offset init-offset] + #:override + [read-in/inner read-in] + [close (values + (lambda (self) + (close self) + (send peek-via-read-input-port self close-peek-buffer)))] + [get-location get-location] + [count-lines! count-lines!] + [file-position file-position] + [buffer-mode (or buffer-mode + (case-lambda + [(self) (send peek-via-read-input-port self default-buffer-mode)] + [(self mode) (send peek-via-read-input-port self default-buffer-mode mode)]))])]))) diff --git a/racket/src/io/port/custom-output-port.rkt b/racket/src/io/port/custom-output-port.rkt index 103eab827f..54ed91e7cf 100644 --- a/racket/src/io/port/custom-output-port.rkt +++ b/racket/src/io/port/custom-output-port.rkt @@ -168,30 +168,26 @@ (user-close) (start-atomic)) - (define port - (new core-output-port - #:override - ([write-out (if (output-port? user-write-out) - user-write-out - write-out)] - [close close] - [write-out-special - (if (output-port? user-write-out-special) - user-write-out-special - (and user-write-out-special write-out-special))] - [get-write-evt (and user-get-write-evt get-write-evt)] - [get-write-special-evt (and user-get-write-special-evt - (lambda (self v) - (user-get-write-special-evt v)))] - [get-location get-location] - [count-lines! count-lines!] - [file-position file-position] - [buffer-mode buffer-mode]) - [name name] - [evt evt] - [offset init-offset])) - - (when (port-count-lines-enabled) - (port-count-lines! port)) - - port) + (finish-port/count + (new core-output-port + #:field + [name name] + [evt evt] + [offset init-offset] + #:override + [write-out (if (output-port? user-write-out) + user-write-out + write-out)] + [close close] + [write-out-special + (if (output-port? user-write-out-special) + user-write-out-special + (and user-write-out-special write-out-special))] + [get-write-evt (and user-get-write-evt get-write-evt)] + [get-write-special-evt (and user-get-write-special-evt + (lambda (self v) + (user-get-write-special-evt v)))] + [get-location get-location] + [count-lines! count-lines!] + [file-position file-position] + [buffer-mode buffer-mode]))) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index e8b8fb7952..1aedfe8ba2 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -41,50 +41,50 @@ ;; ---------------------------------------- (class fd-input-port #:extends peek-via-read-input-port - (field - [fd #f] - [fd-refcount (box 1)] - [custodian-reference #f]) - - (public - [on-close (lambda () (void))] - [raise-read-error (lambda (n) - (raise-filesystem-error #f n "error reading from stream port"))]) + #:field + [fd #f] + [fd-refcount (box 1)] + [custodian-reference #f] - (override - [read-in/inner - (lambda (dest-bstr start end copy?) - (define n (rktio_read_in rktio fd dest-bstr start end)) - (cond - [(rktio-error? n) - (end-atomic) - (send fd-input-port this raise-read-error n)] - [(eqv? n RKTIO_READ_EOF) eof] - [(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this) - (lambda (v) 0))] - [else n]))] + #:public + [on-close (lambda () (void))] + [raise-read-error (lambda (n) + (raise-filesystem-error #f n "error reading from stream port"))] + + #:override + [read-in/inner + (lambda (dest-bstr start end copy?) + (define n (rktio_read_in rktio fd dest-bstr start end)) + (cond + [(rktio-error? n) + (end-atomic) + (send fd-input-port this raise-read-error n)] + [(eqv? n RKTIO_READ_EOF) eof] + [(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this) + (lambda (v) 0))] + [else n]))] - [close - (lambda () - (send fd-input-port this on-close) - (fd-close fd fd-refcount) - (unsafe-custodian-unregister fd custodian-reference) - (close-peek-buffer))] + [close + (lambda () + (send fd-input-port this on-close) + (fd-close fd fd-refcount) + (unsafe-custodian-unregister fd custodian-reference) + (close-peek-buffer))] - [file-position - (case-lambda - [() - (define pos (get-file-position fd)) - (and pos (buffer-adjust-pos pos))] - [(pos) - (purge-buffer) - (set-file-position fd pos)])]) + [file-position + (case-lambda + [() + (define pos (get-file-position fd)) + (and pos (buffer-adjust-pos pos))] + [(pos) + (purge-buffer) + (set-file-position fd pos)])] - (property - [prop:file-stream (lambda (p) (fd-input-port-fd p))] - [prop:data-place-message (lambda (port) - (lambda () - (fd-port->place-message port)))])) + #:property + [prop:file-stream (lambda (p) (fd-input-port-fd p))] + [prop:data-place-message (lambda (port) + (lambda () + (fd-port->place-message port)))]) ;; ---------------------------------------- @@ -95,6 +95,7 @@ #:custodian [cust (current-custodian)]) (finish-fd-input-port (new fd-input-port + #:field [name name] [fd fd] [fd-refcount fd-refcount]) @@ -105,193 +106,191 @@ (define fd (fd-input-port-fd p)) (define fd-refcount (fd-input-port-fd-refcount p)) (set-fd-input-port-custodian-reference! p (register-fd-close cust fd fd-refcount #f p)) - (when (port-count-lines-enabled) - (port-count-lines! p)) - p) + (finish-port/count p)) ;; ---------------------------------------- (class fd-output-port #:extends core-output-port - (field - [fd fd] - [fd-refcount (box 1)] - [bstr (make-bytes 4096)] - [start-pos 0] - [end-pos 0] - [flush-handle #f] - [buffer-mode 'block] - [custodian-reference #f]) + #:field + [fd fd] + [fd-refcount (box 1)] + [bstr (make-bytes 4096)] + [start-pos 0] + [end-pos 0] + [flush-handle #f] + [buffer-mode 'block] + [custodian-reference #f] - (static - [fast-mode! - (lambda (amt) ; amt = not yet added to `offset` - (when (eq? buffer-mode 'block) - (define b buffer) - (define e end-pos) - (set-direct-bstr! b bstr) - (set-direct-pos! b e) - (set-direct-end! b (bytes-length bstr)) - (define o offset) - (when o - (set! offset (- (+ o amt) e)))))] + #:static + [fast-mode! + (lambda (amt) ; amt = not yet added to `offset` + (when (eq? buffer-mode 'block) + (define b buffer) + (define e end-pos) + (set-direct-bstr! b bstr) + (set-direct-pos! b e) + (set-direct-end! b (bytes-length bstr)) + (define o offset) + (when o + (set! offset (- (+ o amt) e)))))] - [slow-mode! - (lambda () + [slow-mode! + (lambda () + (define b buffer) + (when (direct-bstr b) + (set-direct-bstr! b #f) + (define pos (direct-pos b)) + (set! end-pos pos) + (define o offset) + (when o + (set! offset (+ o pos))) + (set-direct-pos! b (direct-end b))))] + + #:public + [on-close (lambda () (void))] + [raise-write-error + (lambda (n) + (raise-filesystem-error #f n "error writing to stream port"))] + + #:private + ;; in atomic mode + ;; Returns `#t` if the buffer is already or successfully flushed + [flush-buffer + (lambda () + (slow-mode!) + (cond + [(not (fx= start-pos end-pos)) + (define n (rktio_write_in rktio fd bstr start-pos end-pos)) + (cond + [(rktio-error? n) + (end-atomic) + (send fd-output-port this raise-write-error n)] + [(fx= n 0) + #f] + [else + (define new-start-pos (fx+ start-pos n)) + (cond + [(fx= new-start-pos end-pos) + (set! start-pos 0) + (set! end-pos 0) + #t] + [else + (set! start-pos new-start-pos) + #f])])] + [else #t]))] + + ;; in atomic mode, but may leave it temporarily + [flush-buffer-fully + (lambda (enable-break?) + (let loop () + (unless (flush-buffer) + (end-atomic) + (if enable-break? + (sync/enable-break evt) + (sync evt)) + (start-atomic) + (when bstr ; in case it was closed + (loop)))))] + + ;; in atomic mode, but may leave it temporarily + [flush-buffer-fully-if-newline + (lambda (src-bstr src-start src-end enable-break?) + (for ([b (in-bytes src-bstr src-start src-end)]) + (define newline? (or (eqv? b (char->integer #\newline)) + (eqv? b (char->integer #\return)))) + (when newline? (flush-buffer-fully enable-break?)) + #:break newline? + (void)))] + + #:static + [flush-buffer/external + (lambda () + (flush-buffer-fully #f))] + + #:override + ;; in atomic mode + [write-out + (lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?) + (slow-mode!) + (cond + [(fx= src-start src-end) + ;; Flush request + (and (flush-buffer) 0)] + [(and (not (eq? buffer-mode 'none)) + (not nonbuffer/nonblock?) + (fx< end-pos (bytes-length bstr))) + (define amt (fxmin (fx- src-end src-start) (fx- (bytes-length bstr) end-pos))) + (bytes-copy! bstr end-pos src-bstr src-start (fx+ src-start amt)) + (set! end-pos (fx+ end-pos amt)) + (when (eq? buffer-mode 'line) + ;; can temporarily leave atomic mode: + (flush-buffer-fully-if-newline src-bstr src-start src-end enable-break?)) + (fast-mode! amt) + amt] + [(not (flush-buffer)) ; <- can temporarily leave atomic mode + #f] + [else + (define n (rktio_write_in rktio fd src-bstr src-start src-end)) + (cond + [(rktio-error? n) + (end-atomic) + (send fd-output-port this raise-write-error n)] + [(fx= n 0) (wrap-evt evt (lambda (v) #f))] + [else n])]))] + + [get-write-evt + (get-write-evt-via-write-out (lambda (out v bstr start) + (port-count! out v bstr start)))] + + ;; in atomic mode + [close + (lambda () + (flush-buffer-fully #f) ; can temporarily leave atomic mode + (when bstr ; <- in case a concurrent close succeeded + (send fd-output-port this on-close) + (plumber-flush-handle-remove! flush-handle) + (set! bstr #f) + (fd-close fd fd-refcount) + (unsafe-custodian-unregister fd custodian-reference)))] + + ;; in atomic mode + [file-position + (case-lambda + [() + (define pos (get-file-position fd)) (define b buffer) - (when (direct-bstr b) - (set-direct-bstr! b #f) - (define pos (direct-pos b)) - (set! end-pos pos) - (define o offset) - (when o - (set! offset (+ o pos))) - (set-direct-pos! b (direct-end b))))]) + (and pos (+ pos (fx- (if (direct-bstr b) (direct-pos b) end-pos) start-pos)))] + [(pos) + (flush-buffer-fully #f) + ;; flushing can leave atomic mode, so make sure the + ;; port is still open before continuing + (unless bstr + (check-not-closed 'file-position this)) + (set-file-position fd pos)])] - (public - [on-close (lambda () (void))] - [raise-write-error - (lambda (n) - (raise-filesystem-error #f n "error writing to stream port"))]) + ;; in atomic mode + [buffer-mode + (case-lambda + [(self) buffer-mode] + [(self mode) (set! buffer-mode mode)])] - (private - ;; in atomic mode - ;; Returns `#t` if the buffer is already or successfully flushed - [flush-buffer - (lambda () - (slow-mode!) - (cond - [(not (fx= start-pos end-pos)) - (define n (rktio_write_in rktio fd bstr start-pos end-pos)) - (cond - [(rktio-error? n) - (end-atomic) - (send fd-output-port this raise-write-error n)] - [(fx= n 0) - #f] - [else - (define new-start-pos (fx+ start-pos n)) - (cond - [(fx= new-start-pos end-pos) - (set! start-pos 0) - (set! end-pos 0) - #t] - [else - (set! start-pos new-start-pos) - #f])])] - [else #t]))] - - ;; in atomic mode, but may leave it temporarily - [flush-buffer-fully - (lambda (enable-break?) - (let loop () - (unless (flush-buffer) - (end-atomic) - (if enable-break? - (sync/enable-break evt) - (sync evt)) - (start-atomic) - (when bstr ; in case it was closed - (loop)))))] - - ;; in atomic mode, but may leave it temporarily - [flush-buffer-fully-if-newline - (lambda (src-bstr src-start src-end enable-break?) - (for ([b (in-bytes src-bstr src-start src-end)]) - (define newline? (or (eqv? b (char->integer #\newline)) - (eqv? b (char->integer #\return)))) - (when newline? (flush-buffer-fully enable-break?)) - #:break newline? - (void)))]) - - (static - [flush-buffer/external - (lambda () - (flush-buffer-fully #f))]) - - (override - ;; in atomic mode - [write-out - (lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?) - (slow-mode!) - (cond - [(fx= src-start src-end) - ;; Flush request - (and (flush-buffer) 0)] - [(and (not (eq? buffer-mode 'none)) - (not nonbuffer/nonblock?) - (fx< end-pos (bytes-length bstr))) - (define amt (fxmin (fx- src-end src-start) (fx- (bytes-length bstr) end-pos))) - (bytes-copy! bstr end-pos src-bstr src-start (fx+ src-start amt)) - (set! end-pos (fx+ end-pos amt)) - (when (eq? buffer-mode 'line) - ;; can temporarily leave atomic mode: - (flush-buffer-fully-if-newline src-bstr src-start src-end enable-break?)) - (fast-mode! amt) - amt] - [(not (flush-buffer)) ; <- can temporarily leave atomic mode - #f] - [else - (define n (rktio_write_in rktio fd src-bstr src-start src-end)) - (cond - [(rktio-error? n) - (end-atomic) - (send fd-output-port this raise-write-error n)] - [(fx= n 0) (wrap-evt evt (lambda (v) #f))] - [else n])]))] - - [get-write-evt - (get-write-evt-via-write-out (lambda (out v bstr start) - (port-count! out v bstr start)))] - - ;; in atomic mode - [close - (lambda () - (flush-buffer-fully #f) ; can temporarily leave atomic mode - (when bstr ; <- in case a concurrent close succeeded - (send fd-output-port this on-close) - (plumber-flush-handle-remove! flush-handle) - (set! bstr #f) - (fd-close fd fd-refcount) - (unsafe-custodian-unregister fd custodian-reference)))] - - ;; in atomic mode - [file-position - (case-lambda - [() - (define pos (get-file-position fd)) - (define b buffer) - (and pos (+ pos (fx- (if (direct-bstr b) (direct-pos b) end-pos) start-pos)))] - [(pos) - (flush-buffer-fully #f) - ;; flushing can leave atomic mode, so make sure the - ;; port is still open before continuing - (unless bstr - (check-not-closed 'file-position this)) - (set-file-position fd pos)])] - - ;; in atomic mode - [buffer-mode - (case-lambda - [(self) buffer-mode] - [(self mode) (set! buffer-mode mode)])]) - - (property - [prop:file-stream (lambda (p) (fd-output-port-fd p))] - [prop:file-truncate (lambda (p pos) - ;; in atomic mode - (send fd-output-port p flush-buffer/external) - (define result - (rktio_set_file_size rktio - (fd-output-port-fd p) - pos)) - (cond - [(rktio-error? result) - (end-atomic) - (raise-rktio-error 'file-truncate result "error setting file size")] - [else result]))] - [prop:data-place-message (lambda (port) - (lambda () - (fd-port->place-message port)))])) + #:property + [prop:file-stream (lambda (p) (fd-output-port-fd p))] + [prop:file-truncate (lambda (p pos) + ;; in atomic mode + (send fd-output-port p flush-buffer/external) + (define result + (rktio_set_file_size rktio + (fd-output-port-fd p) + pos)) + (cond + [(rktio-error? result) + (end-atomic) + (raise-rktio-error 'file-truncate result "error setting file size")] + [else result]))] + [prop:data-place-message (lambda (port) + (lambda () + (fd-port->place-message port)))]) ;; ---------------------------------------- @@ -304,6 +303,7 @@ #:custodian [cust (current-custodian)]) (finish-fd-output-port (new fd-output-port + #:field [name name] [fd fd] [fd-refcount fd-refcount] @@ -330,9 +330,7 @@ (set-core-output-port-evt! p evt) (set-fd-output-port-flush-handle! p flush-handle) (set-fd-output-port-custodian-reference! p custodian-reference) - (when (port-count-lines-enabled) - (port-count-lines! p)) - p) + (finish-port/count p)) ;; ---------------------------------------- diff --git a/racket/src/io/port/input-port.rkt b/racket/src/io/port/input-port.rkt index 46070ea41b..f25bf29274 100644 --- a/racket/src/io/port/input-port.rkt +++ b/racket/src/io/port/input-port.rkt @@ -49,100 +49,101 @@ [else default])) (class core-input-port #:extends core-port - (field - [pending-eof? #f] - [read-handler #f]) + #:field + [pending-eof? #f] + [read-handler #f] - (public + #:public + + ;; #f or (-*> void) + ;; Called in atomic mode + ;; May leave atomic mode temporarily, but on return, ensures that + ;; other atomic operations are ok to change the port. The main use + ;; of `prepare-change` is to pause and `port-commit-peeked` + ;; attempts to not succeed while a potential change is in progress, + ;; where the commit attempts can resume after atomic mode is left. + ;; The `close` operation is *not* guarded by a call to + ;; `prepare-change`. + [prepare-change #f] - ;; #f or (-*> void) - ;; Called in atomic mode - ;; May leave atomic mode temporarily, but on return, ensures that - ;; other atomic operations are ok to change the port. The main use - ;; of `prepare-change` is to pause and `port-commit-peeked` - ;; attempts to not succeed while a potential change is in progress, - ;; where the commit attempts can resume after atomic mode is left. - ;; The `close` operation is *not* guarded by a call to - ;; `prepare-change`. - [prepare-change #f] + ;; port or (bytes start-k end-k copy? -*> (or/c integer? ...)) + ;; Called in atomic mode. + ;; A port value redirects to the port. Otherwise, the function + ;; never blocks, and can assume `(- end-k start-k)` is non-zero. + ;; The `copy?` flag indicates that the given byte string should not + ;; be exposed to untrusted code, and instead of should be copied if + ;; necessary. The return values are the same as documented for + ;; `make-input-port`, except that a pipe result is not allowed (or, + ;; more precisely, it's treated as an event). + [read-in (lambda (bstr start end copy?) eof)] - ;; port or (bytes start-k end-k copy? -*> (or/c integer? ...)) - ;; Called in atomic mode. - ;; A port value redirects to the port. Otherwise, the function - ;; never blocks, and can assume `(- end-k start-k)` is non-zero. - ;; The `copy?` flag indicates that the given byte string should not - ;; be exposed to untrusted code, and instead of should be copied if - ;; necessary. The return values are the same as documented for - ;; `make-input-port`, except that a pipe result is not allowed (or, - ;; more precisely, it's treated as an event). - [read-in (lambda (bstr start end copy?) eof)] + ;; port or (bytes start-k end-k skip-k progress-evt copy? -*> (or/c integer? ...)) + ;; Called in atomic mode. + ;; A port value redirects to the port. Otherwise, the function + ;; never blocks, and it can assume that `(- end-k start-k)` is + ;; non-zero. The `copy?` flag is the same as for `read-in`. The + ;; return values are the same as documented for `make-input-port`. + [peek-in (lambda (bstr start end progress-evt copy?) eof)] - ;; port or (bytes start-k end-k skip-k progress-evt copy? -*> (or/c integer? ...)) - ;; Called in atomic mode. - ;; A port value redirects to the port. Otherwise, the function - ;; never blocks, and it can assume that `(- end-k start-k)` is - ;; non-zero. The `copy?` flag is the same as for `read-in`. The - ;; return values are the same as documented for `make-input-port`. - [peek-in (lambda (bstr start end progress-evt copy?) eof)] + ;; port or ((->) -*> (or/c boolean? evt)) + ;; Called in atomic mode. + ;; A port value makes sense when `peek-in` has a port value. + ;; Otherwise, check whether a peek on one byte would succeed + ;; without blocking and return a boolean, or return an event that + ;; effectively does the same. The event's value doesn't matter, + ;; because it will be wrapped to return some original port. When + ;; `byte-ready` is a function, it should call the given function + ;; (for its side effect) when work has been done that might unblock + ;; this port or some other port. + [byte-ready (lambda (work-done!) #t)] - ;; port or ((->) -*> (or/c boolean? evt)) - ;; Called in atomic mode. - ;; A port value makes sense when `peek-in` has a port value. - ;; Otherwise, check whether a peek on one byte would succeed - ;; without blocking and return a boolean, or return an event that - ;; effectively does the same. The event's value doesn't matter, - ;; because it will be wrapped to return some original port. When - ;; `byte-ready` is a function, it should call the given function - ;; (for its side effect) when work has been done that might unblock - ;; this port or some other port. - [byte-ready (lambda (work-done!) #t)] + ;; #f or (-*> evt?) + ;; *Not* called in atomic mode. + ;; Optional support for progress events, and may be called on a + ;; closed port. + [get-progress-evt #f] - ;; #f or (-*> evt?) - ;; *Not* called in atomic mode. - ;; Optional support for progress events, and may be called on a - ;; closed port. - [get-progress-evt #f] + ;; (amt-k progress-evt? evt? (bytes? -> any) -*> boolean) + ;; Called in atomic mode. + ;; Goes with `get-progress-evt`. The final `evt?` argument is + ;; constrained to a few kinds of events; see docs for + ;; `port-commit-peeked` for more information. On success, a + ;; completion function is called in atomic mode, but possibly in a + ;; different thread, with the committed bytes. The result is a + ;; boolean indicating success or failure. + [commit (lambda (amt progress-evt ext-evt finish) #f)] - ;; (amt-k progress-evt? evt? (bytes? -> any) -*> boolean) - ;; Called in atomic mode. - ;; Goes with `get-progress-evt`. The final `evt?` argument is - ;; constrained to a few kinds of events; see docs for - ;; `port-commit-peeked` for more information. On success, a - ;; completion function is called in atomic mode, but possibly in a - ;; different thread, with the committed bytes. The result is a - ;; boolean indicating success or failure. - [commit (lambda (amt progress-evt ext-evt finish) #f)]) - - (property - [prop:input-port-evt (lambda (i) - ;; not atomic mode - (let ([i (->core-input-port i)]) - (cond - [(core-port-closed? i) - always-evt] - [else - (define byte-ready (method core-input-port i byte-ready)) - (cond - [(input-port? byte-ready) - byte-ready] - [else - (poller-evt - (poller - (lambda (self poll-ctx) - ;; atomic mode - (define v (byte-ready i - (lambda () - (schedule-info-did-work! (poll-ctx-sched-info poll-ctx))))) - (cond - [(evt? v) - (values #f v)] - [(eq? v #t) - (values (list #t) #f)] - [else - (values #f self)]))))])])))])) + #:property + [prop:input-port-evt (lambda (i) + ;; not atomic mode + (let ([i (->core-input-port i)]) + (cond + [(core-port-closed? i) + always-evt] + [else + (define byte-ready (method core-input-port i byte-ready)) + (cond + [(input-port? byte-ready) + byte-ready] + [else + (poller-evt + (poller + (lambda (self poll-ctx) + ;; atomic mode + (define v (byte-ready i + (lambda () + (schedule-info-did-work! (poll-ctx-sched-info poll-ctx))))) + (cond + [(evt? v) + (values #f v)] + [(eq? v #t) + (values (list #t) #f)] + [else + (values #f self)]))))])])))]) ;; ---------------------------------------- (define empty-input-port (new core-input-port + #:field [name 'empty])) diff --git a/racket/src/io/port/max-output-port.rkt b/racket/src/io/port/max-output-port.rkt index a3accbf5ee..40e2475b67 100644 --- a/racket/src/io/port/max-output-port.rkt +++ b/racket/src/io/port/max-output-port.rkt @@ -8,32 +8,33 @@ max-output-port-max-length) (class max-output-port #:extends core-output-port - (field - [o #f] - [max-length 0]) - (override - [write-out - (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) - (cond - [max-length - (define len (- src-end src-start)) - (unless (eq? max-length 'full) - (define write-len (min len max-length)) - (end-atomic) - (define wrote-len (write-bytes src-bstr o src-start (+ src-start write-len))) - (start-atomic) - (if (= max-length wrote-len) - (set! max-length 'full) - (set! max-length (- max-length wrote-len)))) - len] - [else + #:field + [o #f] + [max-length 0] + #:override + [write-out + (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (cond + [max-length + (define len (- src-end src-start)) + (unless (eq? max-length 'full) + (define write-len (min len max-length)) (end-atomic) - (define len (write-bytes src-bstr o src-start src-end)) + (define wrote-len (write-bytes src-bstr o src-start (+ src-start write-len))) (start-atomic) - len]))])) + (if (= max-length wrote-len) + (set! max-length 'full) + (set! max-length (- max-length wrote-len)))) + len] + [else + (end-atomic) + (define len (write-bytes src-bstr o src-start src-end)) + (start-atomic) + len]))]) (define (make-max-output-port o max-length) (new max-output-port + #:field [name (object-name o)] [evt o] [o o] diff --git a/racket/src/io/port/nowhere.rkt b/racket/src/io/port/nowhere.rkt index 9620150a50..6f0aade984 100644 --- a/racket/src/io/port/nowhere.rkt +++ b/racket/src/io/port/nowhere.rkt @@ -1,15 +1,18 @@ #lang racket/base (require "../common/class.rkt" - "output-port.rkt") + "output-port.rkt" + "count.rkt") (provide open-output-nowhere) (class nowhere-output-port #:extends core-output-port - (override - [write-out-special - (lambda (any no-block/buffer? enable-break?) - #t)])) + #:override + [write-out-special + (lambda (any no-block/buffer? enable-break?) + #t)]) (define (open-output-nowhere) - (new nowhere-output-port - [name 'nowhere])) + (finish-port/count + (new nowhere-output-port + #:field + [name 'nowhere]))) diff --git a/racket/src/io/port/output-port.rkt b/racket/src/io/port/output-port.rkt index 7f53b27bf9..46d2ddb931 100644 --- a/racket/src/io/port/output-port.rkt +++ b/racket/src/io/port/output-port.rkt @@ -50,52 +50,52 @@ [else default])) (class core-output-port #:extends core-port - (field - [evt always-evt] ; An evt that is ready when writing a byte won't block - [write-handler #f] - [print-handler #f] - [display-handler #f]) + #:field + [evt always-evt] ; An evt that is ready when writing a byte won't block + [write-handler #f] + [print-handler #f] + [display-handler #f] - (public - ;; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -*> ...) - ;; Called in atomic mode. - ;; Doesn't block if `no-block/buffer?` is true. Does enable breaks - ;; while blocking if `enable-break?` is true. The `copy?` flag - ;; indicates that the given byte string should not be exposed to - ;; untrusted code, and instead of should be copied if necessary. - ;; The return values are the same as documented for - ;; `make-output-port`. - [write-out (lambda (bstr start-k end-k no-block/buffer? enable-break? copy?) - (- end-k start-k))] + #:public + ;; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -*> ...) + ;; Called in atomic mode. + ;; Doesn't block if `no-block/buffer?` is true. Does enable breaks + ;; while blocking if `enable-break?` is true. The `copy?` flag + ;; indicates that the given byte string should not be exposed to + ;; untrusted code, and instead of should be copied if necessary. + ;; The return values are the same as documented for + ;; `make-output-port`. + [write-out (lambda (bstr start-k end-k no-block/buffer? enable-break? copy?) + (- end-k start-k))] - ;; #f or (any no-block/buffer? enable-break? -*> boolean?) - ;; Called in atomic mode. - [write-out-special #f] - - ;; #f or (bstr start-k end-k -*> evt?) - ;; Called in atomic mode. - ;; The given bstr should not be exposed to untrusted code. - [get-write-evt (lambda (bstr start-k end-k) always-evt)] + ;; #f or (any no-block/buffer? enable-break? -*> boolean?) + ;; Called in atomic mode. + [write-out-special #f] + + ;; #f or (bstr start-k end-k -*> evt?) + ;; Called in atomic mode. + ;; The given bstr should not be exposed to untrusted code. + [get-write-evt (lambda (bstr start-k end-k) always-evt)] - ;; #f or (any -*> evt?) - ;; *Not* called in atomic mode. - [get-write-special-evt #f]) + ;; #f or (any -*> evt?) + ;; *Not* called in atomic mode. + [get-write-special-evt #f] - (property - [prop:output-port-evt (lambda (o) - ;; not atomic mode - (let ([o (->core-output-port o)]) - (choice-evt - (list - (poller-evt - (poller - (lambda (self sched-info) - ;; atomic mode - (cond - [(core-port-closed? o) - (values '(#t) #f)] - [else (values #f self)])))) - (core-output-port-evt o)))))])) + #:property + [prop:output-port-evt (lambda (o) + ;; not atomic mode + (let ([o (->core-output-port o)]) + (choice-evt + (list + (poller-evt + (poller + (lambda (self sched-info) + ;; atomic mode + (cond + [(core-port-closed? o) + (values '(#t) #f)] + [else (values #f self)])))) + (core-output-port-evt o)))))]) ;; If `write-out` is always atomic (in no-block, no-buffer mode), ;; then an event can poll `write-out` @@ -118,4 +118,5 @@ (define empty-output-port (new core-output-port + #:field [name 'empty])) diff --git a/racket/src/io/port/peek-via-read-port.rkt b/racket/src/io/port/peek-via-read-port.rkt index 9214776c7c..e4479a8944 100644 --- a/racket/src/io/port/peek-via-read-port.rkt +++ b/racket/src/io/port/peek-via-read-port.rkt @@ -11,227 +11,227 @@ (provide peek-via-read-input-port) (class peek-via-read-input-port #:extends commit-input-port - (field - [bstr (make-bytes 4096)] - [pos 0] - [end-pos 0] - [peeked-eof? #f] - [buffer-mode 'block]) + #:field + [bstr (make-bytes 4096)] + [pos 0] + [end-pos 0] + [peeked-eof? #f] + [buffer-mode 'block] - (public - ;; in atomic mode; must override - [read-in/inner - (lambda (dest-bstr start end copy?) - 0)]) + #:public + ;; in atomic mode; must override + [read-in/inner + (lambda (dest-bstr start end copy?) + 0)] - (static - ;; in atomic mode - [purge-buffer - (lambda () - (slow-mode!) - (set! pos 0) - (set! end-pos 0) - (set! peeked-eof? #f))] + #:static + ;; in atomic mode + [purge-buffer + (lambda () + (slow-mode!) + (set! pos 0) + (set! end-pos 0) + (set! peeked-eof? #f))] - [close-peek-buffer - (lambda () - (purge-buffer) - (set! bstr #""))] + [close-peek-buffer + (lambda () + (purge-buffer) + (set! bstr #""))] - [buffer-adjust-pos - (lambda (i) - (define b buffer) - (- i (fx- end-pos (if (direct-bstr b) (direct-pos b) pos))))] + [buffer-adjust-pos + (lambda (i) + (define b buffer) + (- i (fx- end-pos (if (direct-bstr b) (direct-pos b) pos))))] - ;; in atomic mode - [default-buffer-mode + ;; in atomic mode + [default-buffer-mode (case-lambda [() buffer-mode] - [(mode) (set! buffer-mode mode)])]) + [(mode) (set! buffer-mode mode)])] - (private - ;; in atomic mode - [pull-some-bytes - (lambda ([amt (if (eq? 'block buffer-mode) (bytes-length bstr) 1)] [offset 0] [init-pos 0]) - (define get-end (min (+ amt offset) (bytes-length bstr))) - (define v (send peek-via-read-input-port this read-in/inner bstr offset get-end #f)) - (cond - [(eof-object? v) - (set! peeked-eof? #t) - eof] - [(evt? v) v] - [(eqv? v 0) 0] - [else - (set! pos init-pos) - (set! end-pos (fx+ offset v)) - v]))] + #:private + ;; in atomic mode + [pull-some-bytes + (lambda ([amt (if (eq? 'block buffer-mode) (bytes-length bstr) 1)] [offset 0] [init-pos 0]) + (define get-end (min (+ amt offset) (bytes-length bstr))) + (define v (send peek-via-read-input-port this read-in/inner bstr offset get-end #f)) + (cond + [(eof-object? v) + (set! peeked-eof? #t) + eof] + [(evt? v) v] + [(eqv? v 0) 0] + [else + (set! pos init-pos) + (set! end-pos (fx+ offset v)) + v]))] - ;; in atomic mode - [pull-more-bytes - (lambda (amt) - (cond - [(end-pos . fx< . (bytes-length bstr)) - ;; add to end of buffer - (pull-some-bytes amt end-pos pos)] - [(fx= pos 0) - ;; extend buffer - (define new-bstr (make-bytes (fx* 2 (bytes-length bstr)))) - (bytes-copy! new-bstr 0 bstr 0 end-pos) - (set! bstr new-bstr) - (pull-some-bytes amt end-pos)] - [else - ;; shift to start of buffer and retry - (bytes-copy! bstr 0 bstr pos end-pos) - (set! end-pos (fx- end-pos pos)) - (set! pos 0) - (pull-more-bytes amt)]))] + ;; in atomic mode + [pull-more-bytes + (lambda (amt) + (cond + [(end-pos . fx< . (bytes-length bstr)) + ;; add to end of buffer + (pull-some-bytes amt end-pos pos)] + [(fx= pos 0) + ;; extend buffer + (define new-bstr (make-bytes (fx* 2 (bytes-length bstr)))) + (bytes-copy! new-bstr 0 bstr 0 end-pos) + (set! bstr new-bstr) + (pull-some-bytes amt end-pos)] + [else + ;; shift to start of buffer and retry + (bytes-copy! bstr 0 bstr pos end-pos) + (set! end-pos (fx- end-pos pos)) + (set! pos 0) + (pull-more-bytes amt)]))] - ;; in atomic mode - [retry-pull? - (lambda (v) - (and (integer? v) (not (eqv? v 0))))] + ;; in atomic mode + [retry-pull? + (lambda (v) + (and (integer? v) (not (eqv? v 0))))] - ;; in atomic mode - [fast-mode! - (lambda (amt) ; amt = not yet added to `offset` - (define b buffer) - (set-direct-bstr! b bstr) - (define s pos) - (set-direct-pos! b s) - (set-direct-end! b end-pos) + ;; in atomic mode + [fast-mode! + (lambda (amt) ; amt = not yet added to `offset` + (define b buffer) + (set-direct-bstr! b bstr) + (define s pos) + (set-direct-pos! b s) + (set-direct-end! b end-pos) + (define o offset) + (when o + (set! offset (- (+ o amt) s))))] + + ;; in atomic mode + [slow-mode! + (lambda () + (define b buffer) + (when (direct-bstr b) + (define s (direct-pos b)) (define o offset) (when o - (set! offset (- (+ o amt) s))))] + (set! offset (+ o s))) + (set! pos s) + (set-direct-bstr! b #f) + (set-direct-pos! b (direct-end b))))] - ;; in atomic mode - [slow-mode! - (lambda () + #:override + ;; in atomic mode + [prepare-change + (lambda () + (pause-waiting-commit))] + + ;; in atomic mode + [read-in + (lambda (dest-bstr start end copy?) + (slow-mode!) + (let try-again () + (cond + [(pos . fx< . end-pos) + (define amt (min (fx- end-pos pos) (fx- end start))) + (bytes-copy! dest-bstr start bstr pos (fx+ pos amt)) + (set! pos (fx+ pos amt)) + (progress!) + (fast-mode! amt) + amt] + [peeked-eof? + (set! peeked-eof? #f) + ;; an EOF doesn't count as progress + eof] + [else + (cond + [(and (eq? 'block buffer-mode) + (fx< (fx- end start) (fxrshift (bytes-length bstr) 1))) + (define v (pull-some-bytes)) + (cond + [(or (eqv? v 0) (evt? v)) v] + [else (try-again)])] + [else + (define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?)) + (unless (eqv? v 0) + (progress!)) + v])])))] + + ;; in atomic mode + [peek-in + (lambda (dest-bstr start end skip progress-evt copy?) + (let try-again () + (cond + [(and progress-evt + (sync/timeout 0 progress-evt)) + #f] + [else + (define b buffer) + (define s (if (direct-bstr b) (direct-pos b) pos)) + (define peeked-amt (fx- end-pos s)) + (cond + [(peeked-amt . > . skip) + (define amt (min (fx- peeked-amt skip) (fx- end start))) + (define s-pos (fx+ s skip)) + (bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt)) + amt] + [peeked-eof? + eof] + [else + (slow-mode!) + (define v (pull-more-bytes (+ (- skip peeked-amt) (fx- end start)))) + (if (retry-pull? v) + (try-again) + v)])])))] + + ;; in atomic mode + [byte-ready + (lambda (work-done!) + (let loop () (define b buffer) - (when (direct-bstr b) - (define s (direct-pos b)) - (define o offset) - (when o - (set! offset (+ o s))) - (set! pos s) - (set-direct-bstr! b #f) - (set-direct-pos! b (direct-end b))))]) + (define peeked-amt (fx- end-pos (if (direct-bstr b) (direct-pos b) pos))) + (cond + [(peeked-amt . fx> . 0) #t] + [peeked-eof? #t] + [else + (slow-mode!) + (define v (pull-some-bytes)) + (work-done!) + (cond + [(retry-pull? v) + (loop)] + [(evt? v) v] + [else + (not (eqv? v 0))])])))] - (override - ;; in atomic mode - [prepare-change - (lambda () - (pause-waiting-commit))] - - ;; in atomic mode - [read-in - (lambda (dest-bstr start end copy?) - (slow-mode!) - (let try-again () - (cond - [(pos . fx< . end-pos) - (define amt (min (fx- end-pos pos) (fx- end start))) - (bytes-copy! dest-bstr start bstr pos (fx+ pos amt)) - (set! pos (fx+ pos amt)) - (progress!) - (fast-mode! amt) - amt] - [peeked-eof? - (set! peeked-eof? #f) - ;; an EOF doesn't count as progress - eof] - [else - (cond - [(and (eq? 'block buffer-mode) - (fx< (fx- end start) (fxrshift (bytes-length bstr) 1))) - (define v (pull-some-bytes)) - (cond - [(or (eqv? v 0) (evt? v)) v] - [else (try-again)])] - [else - (define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?)) - (unless (eqv? v 0) - (progress!)) - v])])))] + [get-progress-evt + (lambda () + (atomically + (slow-mode!) + (make-progress-evt)))] - ;; in atomic mode - [peek-in - (lambda (dest-bstr start end skip progress-evt copy?) - (let try-again () - (cond - [(and progress-evt - (sync/timeout 0 progress-evt)) - #f] - [else - (define b buffer) - (define s (if (direct-bstr b) (direct-pos b) pos)) - (define peeked-amt (fx- end-pos s)) - (cond - [(peeked-amt . > . skip) - (define amt (min (fx- peeked-amt skip) (fx- end start))) - (define s-pos (fx+ s skip)) - (bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt)) - amt] - [peeked-eof? - eof] - [else - (slow-mode!) - (define v (pull-more-bytes (+ (- skip peeked-amt) (fx- end start)))) - (if (retry-pull? v) - (try-again) - v)])])))] + ;; in atomic mode + [commit + (lambda (amt progress-evt ext-evt finish) + (slow-mode!) + (wait-commit + progress-evt ext-evt + ;; in atomic mode, maybe in a different thread: + (lambda () + (let ([amt (fxmin amt (fx- end-pos pos))]) + (cond + [(fx= 0 amt) + (finish #"")] + [else + (define dest-bstr (make-bytes amt)) + (bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt)) + (set! pos (fx+ pos amt)) + (progress!) + (finish dest-bstr)])))))] - ;; in atomic mode - [byte-ready - (lambda (work-done!) - (let loop () - (define b buffer) - (define peeked-amt (fx- end-pos (if (direct-bstr b) (direct-pos b) pos))) - (cond - [(peeked-amt . fx> . 0) #t] - [peeked-eof? #t] - [else - (slow-mode!) - (define v (pull-some-bytes)) - (work-done!) - (cond - [(retry-pull? v) - (loop)] - [(evt? v) v] - [else - (not (eqv? v 0))])])))] + ;; in atomic mode + [buffer-mode + (case-lambda + [() (default-buffer-mode)] + [(mode) (default-buffer-mode mode)])] - [get-progress-evt - (lambda () - (atomically - (slow-mode!) - (make-progress-evt)))] - - ;; in atomic mode - [commit - (lambda (amt progress-evt ext-evt finish) - (slow-mode!) - (wait-commit - progress-evt ext-evt - ;; in atomic mode, maybe in a different thread: - (lambda () - (let ([amt (fxmin amt (fx- end-pos pos))]) - (cond - [(fx= 0 amt) - (finish #"")] - [else - (define dest-bstr (make-bytes amt)) - (bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt)) - (set! pos (fx+ pos amt)) - (progress!) - (finish dest-bstr)])))))] - - ;; in atomic mode - [buffer-mode - (case-lambda - [() (default-buffer-mode)] - [(mode) (default-buffer-mode mode)])] - - ;; in atomic mode - [close - (lambda () - (close-peek-buffer))])) + ;; in atomic mode + [close + (lambda () + (close-peek-buffer))]) diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index 91e7c39dd7..b82be0e8b7 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -54,96 +54,94 @@ ;; ---------------------------------------- (class pipe-data - (field - [bstr #""] - [len 0] - [limit 0] - [peeked-amt 0] ; peeked but not yet read, effectively extends `limit` - [start 0] - [end 0] - [input-ref #f] ; #f => closed - [output-ref #f] ; #f => closed - [input-buffer #f] - [output-buffer #f] - [read-ready-sema #f] - [write-ready-sema #f] - [more-read-ready-sema #f] ; for lookahead peeks - [read-ready-evt #f] - [write-ready-evt #f]) + #:field + [bstr #""] + [len 0] + [limit 0] + [peeked-amt 0] ; peeked but not yet read, effectively extends `limit` + [start 0] + [end 0] + [input-ref #f] ; #f => closed + [output-ref #f] ; #f => closed + [input-buffer #f] + [output-buffer #f] + [read-ready-sema #f] + [write-ready-sema #f] + [more-read-ready-sema #f] ; for lookahead peeks + [read-ready-evt #f] + [write-ready-evt #f] - (private) + ;; in atomic mode for all static methods - ;; All static methods in atomic mode. - (static - ;; sync local fields with input buffer without implying slow mode - [sync-input - (lambda () - (define b input-buffer) - (when (direct-bstr b) - (define pos (direct-pos b)) - (set! start (if (fx= pos len) - 0 - pos))))] - ;; sync local fields with output buffer without implying slow mode - [sync-output - (lambda () - (define b output-buffer) - (when (direct-bstr b) - (define pos (direct-pos b)) - (set! end (if (fx= pos len) - 0 - pos))))] + #:static + ;; sync local fields with input buffer without implying slow mode + [sync-input + (lambda () + (define b input-buffer) + (when (direct-bstr b) + (define pos (direct-pos b)) + (set! start (if (fx= pos len) + 0 + pos))))] + ;; sync local fields with output buffer without implying slow mode + [sync-output + (lambda () + (define b output-buffer) + (when (direct-bstr b) + (define pos (direct-pos b)) + (set! end (if (fx= pos len) + 0 + pos))))] + + [sync-both + (lambda () + (sync-input) + (sync-output))] + ;; assumes sync'ed + [content-length + (lambda () + (define s start) + (define e end) + (if (s . fx<= . e) + (fx- e s) + (fx+ e (fx- len s))))] - [sync-both - (lambda () - (sync-input) - (sync-output))] + ;; assumes sync'ed + [input-empty? + (lambda () + (fx= start end))] - ;; assumes sync'ed - [content-length - (lambda () - (define s start) - (define e end) - (if (s . fx<= . e) - (fx- e s) - (fx+ e (fx- len s))))] + ;; assumes sync'ed + [output-full? + (lambda () + (define l limit) + (and l + ((content-length) . >= . (+ l peeked-amt))))] - ;; assumes sync'ed - [input-empty? - (lambda () - (fx= start end))] + ;; Used before read: + [check-output-unblocking + (lambda () + (when write-ready-sema + (semaphore-post write-ready-sema) + (set! write-ready-sema #f)))] - ;; assumes sync'ed - [output-full? - (lambda () - (define l limit) - (and l - ((content-length) . >= . (+ l peeked-amt))))] + ;; Used before write: + [check-input-unblocking + (lambda () + (when read-ready-sema + (semaphore-post read-ready-sema) + (set! read-ready-sema #f)) + (when more-read-ready-sema + (semaphore-post more-read-ready-sema) + (set! more-read-ready-sema #f)))] - ;; Used before read: - [check-output-unblocking - (lambda () - (when write-ready-sema - (semaphore-post write-ready-sema) - (set! write-ready-sema #f)))] - - ;; Used before write: - [check-input-unblocking - (lambda () - (when read-ready-sema - (semaphore-post read-ready-sema) - (set! read-ready-sema #f)) - (when more-read-ready-sema - (semaphore-post more-read-ready-sema) - (set! more-read-ready-sema #f)))] - - ;; Used after peeking: - [peeked! - (lambda (amt) - (when (amt . > . peeked-amt) - (check-output-unblocking) - (set! peeked-amt amt)))])) + ;; Used after peeking: + [peeked! + (lambda (amt) + (when (amt . > . peeked-amt) + (check-output-unblocking) + (set! peeked-amt amt)))]) (define (make-ref v) (make-weak-box v)) (define (ref-value r) (weak-box-value r)) @@ -151,351 +149,352 @@ ;; ---------------------------------------- (class pipe-input-port #:extends commit-input-port - (field - [d #f]) ; pipe-data - - (private - [fast-mode! - (lambda (amt) ; amt = not yet added to `offset` - (with-object pipe-data d - (define s start) - (define e end) - (unless (fx= s e) - (define b buffer) - (set-direct-bstr! b bstr) - (set-direct-pos! b s) - (set-direct-end! b (if (s . fx< . e) e len)) - (define o offset) - (when o - (set! offset (- (+ o amt) s))))))] - - [slow-mode! - (lambda () - (with-object pipe-data d + #:field + [d #f] ; pipe-data + + #:private + [fast-mode! + (lambda (amt) ; amt = not yet added to `offset` + (with-object pipe-data d + (define s start) + (define e end) + (unless (fx= s e) (define b buffer) - (when (direct-bstr b) - (define pos (direct-pos b)) - (define o offset) - (when o - (set! offset (+ o pos))) - (set! start (if (fx= pos len) 0 pos)) - (set-direct-bstr! b #f) - (set-direct-pos! b (direct-end b))) - (sync-output)))]) + (set-direct-bstr! b bstr) + (set-direct-pos! b s) + (set-direct-end! b (if (s . fx< . e) e len)) + (define o offset) + (when o + (set! offset (- (+ o amt) s))))))] - (static - [on-resize - (lambda () - (slow-mode!))] - [on-output-full - (lambda () - (slow-mode!))]) + [slow-mode! + (lambda () + (with-object pipe-data d + (define b buffer) + (when (direct-bstr b) + (define pos (direct-pos b)) + (define o offset) + (when o + (set! offset (+ o pos))) + (set! start (if (fx= pos len) 0 pos)) + (set-direct-bstr! b #f) + (set-direct-pos! b (direct-end b))) + (sync-output)))] - (override - [prepare-change - (lambda () - (with-object pipe-data d - (pause-waiting-commit)))] + #:static + [on-resize + (lambda () + (slow-mode!))] + [on-output-full + (lambda () + (slow-mode!))] - [read-in - (lambda (dest-bstr dest-start dest-end copy?) - (assert-atomic) - (slow-mode!) - (with-object pipe-data d - (cond - [(input-empty?) - (if output-ref - read-ready-evt - eof)] - [else - (check-output-unblocking) - (define s start) - (define e end) - (define amt - (cond - [(s . fx< . e) - (define amt (fxmin (fx- dest-end dest-start) - (fx- e s))) - (bytes-copy! dest-bstr dest-start bstr s (fx+ s amt)) - (set! start (fx+ s amt)) - (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) - amt] - [else - (define amt (fxmin (fx- dest-end dest-start) - (fx- len s))) - (bytes-copy! dest-bstr dest-start bstr s (fx+ s amt)) - (set! start (modulo (fx+ s amt) len)) - (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) - amt])) - (progress!) - (fast-mode! amt) - amt])))] + #:override + [prepare-change + (lambda () + (with-object pipe-data d + (pause-waiting-commit)))] - [peek-in - (lambda (dest-bstr dest-start dest-end skip progress-evt copy?) - (with-object pipe-data d - (assert-atomic) - (sync-both) - (define content-amt (content-length)) - (cond - [(and progress-evt - (sync/timeout 0 progress-evt)) - #f] - [(content-amt . <= . skip) + [read-in + (lambda (dest-bstr dest-start dest-end copy?) + (assert-atomic) + (slow-mode!) + (with-object pipe-data d + (cond + [(input-empty?) + (if output-ref + read-ready-evt + eof)] + [else + (check-output-unblocking) + (define s start) + (define e end) + (define amt (cond - [(not output-ref) eof] - [else - (unless (or (zero? skip) more-read-ready-sema) - (set! more-read-ready-sema (make-semaphore)) - (define out (ref-value output-ref)) - (when out - (send pipe-output-port out on-need-more-ready))) - (define evt (if (zero? skip) - read-ready-evt - (wrap-evt (semaphore-peek-evt more-read-ready-sema) - (lambda (v) 0)))) - evt])] - [else - (define peek-start (fxmodulo (fx+ start skip) len)) - (cond - [(peek-start . fx< . end) + [(s . fx< . e) (define amt (fxmin (fx- dest-end dest-start) - (fx- end peek-start))) - (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) - (peeked! (+ skip amt)) + (fx- e s))) + (bytes-copy! dest-bstr dest-start bstr s (fx+ s amt)) + (set! start (fx+ s amt)) + (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) amt] [else (define amt (fxmin (fx- dest-end dest-start) - (fx- len peek-start))) - (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) - (peeked! (+ skip amt)) - amt])])))] + (fx- len s))) + (bytes-copy! dest-bstr dest-start bstr s (fx+ s amt)) + (set! start (modulo (fx+ s amt) len)) + (set! peeked-amt (fxmax 0 (fx- peeked-amt amt))) + amt])) + (progress!) + (fast-mode! amt) + amt])))] - [byte-ready - (lambda (work-done!) + [peek-in + (lambda (dest-bstr dest-start dest-end skip progress-evt copy?) + (with-object pipe-data d (assert-atomic) - (with-object pipe-data d - (or (not output-ref) - (begin - (sync-both) - (not (fx= 0 (content-length)))))))] - - [close - (lambda () - (with-object pipe-data d - (when input-ref - (slow-mode!) - (set! input-ref #f) - (progress!))))] - - [get-progress-evt - (lambda () - (atomically - (with-object pipe-data d - (cond - [(not input-ref) always-evt] - [else - (slow-mode!) - (make-progress-evt)]))))] - - [commit - ;; Allows `amt` to be zero and #f for other arguments, - ;; which is helpful for `open-input-peek-via-read`. - (lambda (amt progress-evt ext-evt finish) - (assert-atomic) - ;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt` - ;; is constrained; we can send them over to different threads + (sync-both) + (define content-amt (content-length)) (cond - [(zero? amt) - (progress!)] + [(and progress-evt + (sync/timeout 0 progress-evt)) + #f] + [(content-amt . <= . skip) + (cond + [(not output-ref) eof] + [else + (unless (or (zero? skip) more-read-ready-sema) + (set! more-read-ready-sema (make-semaphore)) + (define out (ref-value output-ref)) + (when out + (send pipe-output-port out on-need-more-ready))) + (define evt (if (zero? skip) + read-ready-evt + (wrap-evt (semaphore-peek-evt more-read-ready-sema) + (lambda (v) 0)))) + evt])] [else - (wait-commit - progress-evt ext-evt - ;; in atomic mode, maybe in a different thread: - (lambda () - (with-object pipe-data d - (slow-mode!) - (let ([amt (min amt (content-length))]) - (cond - [(fx= 0 amt) - ;; There was nothing to commit; claim success for 0 bytes - (finish #"")] - [else - (define dest-bstr (make-bytes amt)) - (define s start) - (define e end) - (cond - [(s . fx< . e) - (bytes-copy! dest-bstr 0 bstr s (fx+ s amt))] - [else - (define amt1 (fxmin (fx- len s) amt)) - (bytes-copy! dest-bstr 0 bstr s (fx+ s amt1)) - (when (amt1 . fx< . amt) - (bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))]) - (set! start (fxmodulo (fx+ s amt) len)) - (progress!) - (fast-mode! amt) - (finish dest-bstr)])))))]))] + (define peek-start (fxmodulo (fx+ start skip) len)) + (cond + [(peek-start . fx< . end) + (define amt (fxmin (fx- dest-end dest-start) + (fx- end peek-start))) + (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) + (peeked! (+ skip amt)) + amt] + [else + (define amt (fxmin (fx- dest-end dest-start) + (fx- len peek-start))) + (bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt)) + (peeked! (+ skip amt)) + amt])])))] - [count-lines! - (lambda () - (slow-mode!))])) + [byte-ready + (lambda (work-done!) + (assert-atomic) + (with-object pipe-data d + (or (not output-ref) + (begin + (sync-both) + (not (fx= 0 (content-length)))))))] + + [close + (lambda () + (with-object pipe-data d + (when input-ref + (slow-mode!) + (set! input-ref #f) + (progress!))))] + + [get-progress-evt + (lambda () + (atomically + (with-object pipe-data d + (cond + [(not input-ref) always-evt] + [else + (slow-mode!) + (make-progress-evt)]))))] + + [commit + ;; Allows `amt` to be zero and #f for other arguments, + ;; which is helpful for `open-input-peek-via-read`. + (lambda (amt progress-evt ext-evt finish) + (assert-atomic) + ;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt` + ;; is constrained; we can send them over to different threads + (cond + [(zero? amt) + (progress!)] + [else + (wait-commit + progress-evt ext-evt + ;; in atomic mode, maybe in a different thread: + (lambda () + (with-object pipe-data d + (slow-mode!) + (let ([amt (min amt (content-length))]) + (cond + [(fx= 0 amt) + ;; There was nothing to commit; claim success for 0 bytes + (finish #"")] + [else + (define dest-bstr (make-bytes amt)) + (define s start) + (define e end) + (cond + [(s . fx< . e) + (bytes-copy! dest-bstr 0 bstr s (fx+ s amt))] + [else + (define amt1 (fxmin (fx- len s) amt)) + (bytes-copy! dest-bstr 0 bstr s (fx+ s amt1)) + (when (amt1 . fx< . amt) + (bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))]) + (set! start (fxmodulo (fx+ s amt) len)) + (progress!) + (fast-mode! amt) + (finish dest-bstr)])))))]))] + + [count-lines! + (lambda () + (slow-mode!))]) ;; ---------------------------------------- (class pipe-output-port #:extends core-output-port - (field - [d d]) ; pipe-data + #:field + [d d] ; pipe-data - (private - [fast-mode! - (lambda (amt) ; amt = not yet added to `offset` - (with-object pipe-data d - (define lim limit) - (define avail (and lim (- lim (content-length)))) - (when (or (not avail) (avail . <= . 0)) - (define s start) - (define e end) - (define b buffer) - (set-direct-bstr! b bstr) - (set-direct-pos! b e) - (set-direct-end! b (let ([end (if (s . fx<= . e) - (if (fx= s 0) - (fx- len 1) - len) - (fx- s 1))]) - (if (and avail - ((fx- end e) . > . avail)) - (fx+ e avail) - end))) - (define o offset) - (when o - (set! offset (- (+ o amt) e))))))] - - [slow-mode! - (lambda () - (with-object pipe-data d + #:private + [fast-mode! + (lambda (amt) ; amt = not yet added to `offset` + (with-object pipe-data d + (define lim limit) + (define avail (and lim (- lim (content-length)))) + (when (or (not avail) (avail . <= . 0)) + (define s start) + (define e end) (define b buffer) - (when (direct-bstr b) - (define pos (direct-pos b)) - (define o offset) - (when o - (set! offset (+ o pos))) - (set! end (if (fx= pos len) 0 pos)) - (set-direct-bstr! b #f) - (set-direct-pos! b (direct-end b))) - (sync-input)))]) + (set-direct-bstr! b bstr) + (set-direct-pos! b e) + (set-direct-end! b (let ([end (if (s . fx<= . e) + (if (fx= s 0) + (fx- len 1) + len) + (fx- s 1))]) + (if (and avail + ((fx- end e) . > . avail)) + (fx+ e avail) + end))) + (define o offset) + (when o + (set! offset (- (+ o amt) e))))))] - (static - [on-input-empty - (lambda () - (slow-mode!))] - [on-need-more-ready - (lambda () - (slow-mode!))]) + [slow-mode! + (lambda () + (with-object pipe-data d + (define b buffer) + (when (direct-bstr b) + (define pos (direct-pos b)) + (define o offset) + (when o + (set! offset (+ o pos))) + (set! end (if (fx= pos len) 0 pos)) + (set-direct-bstr! b #f) + (set-direct-pos! b (direct-end b))) + (sync-input)))] - (override - [write-out - ;; in atomic mode - (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) - (assert-atomic) - (slow-mode!) - (with-object pipe-data d - (let try-again () - (define top-pos (if (fx= start 0) - (fx- len 1) - len)) - (define (maybe-grow) - (cond - [(or (not limit) - ((+ limit peeked-amt) . > . (fx- len 1))) - ;; grow pipe size - (define in (ref-value input-ref)) - (when in - (send pipe-input-port in on-resize)) - (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) - (cond - [(fx= 0 start) - (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] - [else - (bytes-copy! new-bstr 0 bstr start len) - (bytes-copy! new-bstr (fx- len start) bstr 0 end) - (set! start 0) - (set! end (fx- len 1))]) - (set! bstr new-bstr) - (set! len (bytes-length new-bstr)) - (try-again)] - [else (pipe-is-full)])) - (define (pipe-is-full) - (wrap-evt write-ready-evt (lambda (v) #f))) - (define (apply-limit amt) - (if limit - (min amt (- (+ limit peeked-amt) (content-length))) - amt)) + #:static + [on-input-empty + (lambda () + (slow-mode!))] + [on-need-more-ready + (lambda () + (slow-mode!))] + + #:override + [write-out + ;; in atomic mode + (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (assert-atomic) + (slow-mode!) + (with-object pipe-data d + (let try-again () + (define top-pos (if (fx= start 0) + (fx- len 1) + len)) + (define (maybe-grow) (cond - [(fx= src-start src-end) ;; => flush - 0] - [(and (end . fx>= . start) - (end . fx< . top-pos)) - (define amt (apply-limit (fxmin (fx- top-pos end) - (fx- src-end src-start)))) + [(or (not limit) + ((+ limit peeked-amt) . > . (fx- len 1))) + ;; grow pipe size + (define in (ref-value input-ref)) + (when in + (send pipe-input-port in on-resize)) + (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) (cond - [(fx= amt 0) (pipe-is-full)] + [(fx= 0 start) + (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (let ([new-end (fx+ end amt)]) - (set! end (if (fx= new-end len) 0 new-end))) - (fast-mode! amt) - amt])] - [(fx= end top-pos) - (cond - [(fx= start 0) - (maybe-grow)] - [else - (define amt (fxmin (fx- start 1) - (fx- src-end src-start))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) - (set! end amt) - (fast-mode! amt) - amt])])] - [(end . fx< . (fx- start 1)) - (define amt (apply-limit (fxmin (fx- (fx- start 1) end) - (fx- src-end src-start)))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (set! end (fx+ end amt)) - (fast-mode! amt) - amt])] - [else - (maybe-grow)]))))] + (bytes-copy! new-bstr 0 bstr start len) + (bytes-copy! new-bstr (fx- len start) bstr 0 end) + (set! start 0) + (set! end (fx- len 1))]) + (set! bstr new-bstr) + (set! len (bytes-length new-bstr)) + (try-again)] + [else (pipe-is-full)])) + (define (pipe-is-full) + (wrap-evt write-ready-evt (lambda (v) #f))) + (define (apply-limit amt) + (if limit + (min amt (- (+ limit peeked-amt) (content-length))) + amt)) + (cond + [(fx= src-start src-end) ;; => flush + 0] + [(and (end . fx>= . start) + (end . fx< . top-pos)) + (define amt (apply-limit (fxmin (fx- top-pos end) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (let ([new-end (fx+ end amt)]) + (set! end (if (fx= new-end len) 0 new-end))) + (fast-mode! amt) + amt])] + [(fx= end top-pos) + (cond + [(fx= start 0) + (maybe-grow)] + [else + (define amt (fxmin (fx- start 1) + (fx- src-end src-start))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) + (set! end amt) + (fast-mode! amt) + amt])])] + [(end . fx< . (fx- start 1)) + (define amt (apply-limit (fxmin (fx- (fx- start 1) end) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (set! end (fx+ end amt)) + (fast-mode! amt) + amt])] + [else + (maybe-grow)]))))] - [get-write-evt - (get-write-evt-via-write-out (lambda (out v bstr start) - (port-count! out v bstr start)))] + [get-write-evt + (get-write-evt-via-write-out (lambda (out v bstr start) + (port-count! out v bstr start)))] - [close - ;; in atomic mode - (lambda () - (with-object pipe-data d - (when output-ref - (slow-mode!) - (set! output-ref #f) - (check-input-unblocking))))])) + [close + ;; in atomic mode + (lambda () + (with-object pipe-data d + (when output-ref + (slow-mode!) + (set! output-ref #f) + (check-input-unblocking))))]) ;; ---------------------------------------- - + (define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe]) (define len (min+1 limit 16)) (define d (new pipe-data + #:field [bstr (make-bytes len)] [len len] [limit limit])) @@ -506,9 +505,11 @@ (define read-ready-evt (pipe-read-poller d)) (define input (new pipe-input-port + #:field [name input-name] [d d])) (define output (new pipe-output-port + #:field [name output-name] [evt write-ready-evt] [d d])) @@ -525,10 +526,8 @@ (define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe]) (check who #:or-false exact-positive-integer? limit) (define-values (ip op) (make-pipe-ends limit input-name output-name)) - (when (port-count-lines-enabled) - (port-count-lines! ip) - (port-count-lines! op)) - (values ip op)) + (values (finish-port/count ip) + (finish-port/count op))) ;; ---------------------------------------- diff --git a/racket/src/io/port/port.rkt b/racket/src/io/port/port.rkt index df3e38b2bf..c264cd0a98 100644 --- a/racket/src/io/port/port.rkt +++ b/racket/src/io/port/port.rkt @@ -9,36 +9,60 @@ (struct-out location) get-core-port-offset) +;; Port class hierarchy +;; - with "virtual" in square brackets +;; - with per-instance in curly braces +;; +;; [core] +;; | +;; ,------------------------------, +;; [input] [output] +;; | | +;; ,---------, ,---------------------------------, +;; | | | | | | | +;; [commit] {custom} {custom} pipe bytes fd max nowhere +;; | (when peek | +;; | provided) tcp +;; -------------------------, +;; ' | | +;; bytes [peek-via-read] pipe +;; | +;; ,-------, +;; | | +;; fd {custom} +;; | (when no peek provided) +;; tcp + (class core-port - (field - [name 'port #:immutable] ; anything, reported as `object-name` for the port + #:field + [name 'port #:immutable] ; anything, reported as `object-name` for the port - [data #f] ; FIXME: remove after all uses are converted - - ;; When `(direct-bstr buffer)` is not #f, it enables a shortcut for - ;; reading and writing, where `(direct-pos buffer)` must also be - ;; less than `(direct-end buffer)` for the shortcut to apply. The - ;; shortcut is not necessarily always taken, just if it is used, - ;; the `(direct-pos buffer)` position can be adjusted and the - ;; port's methods must adapt accordingly. The `(direct-bstr - ;; buffer)` and `(direct-end buffer)` fields are modified only by - ;; the port's methods, however. - ;; - ;; For an input port, shortcut mode implies that `prepare-change` - ;; does not need to be called, and no checking is needed for whether - ;; the port is closed. - ;; - ;; A non-#f `(direct-bstr buffer)` further implies that - ;; `(direct-pos buffer)` should be added to `offset` to get the - ;; true offset. - [buffer (direct #f 0 0)] - - [closed? #f] - [closed-sema #f] - - [offset 0] ; count plain bytes; add `(- buffer-pos buffer-start)` - [count #f]) ; #f or a `location` + ;; When `(direct-bstr buffer)` is not #f, it enables a shortcut for + ;; reading and writing, where `(direct-pos buffer)` must also be + ;; less than `(direct-end buffer)` for the shortcut to apply. The + ;; shortcut is not necessarily always taken, but if it is used, the + ;; `(direct-pos buffer)` position can be adjusted and the port's + ;; methods must adapt accordingly. The `(direct-bstr buffer)` and + ;; `(direct-end buffer)` fields are modified only by the port's + ;; methods, however. + ;; + ;; Shortcut mode implies that the port is still open, so no checking + ;; is needed for whether the port is closed. + ;; + ;; For an input port, shortcut mode implies that `prepare-change` + ;; does not need to be called. + ;; + ;; A non-#f `(direct-bstr buffer)` further implies that + ;; `(direct-pos buffer)` should be added to `offset` to get the + ;; true offset. + [buffer (direct #f 0 0)] + [closed? #f] + [closed-sema #f] ; created on demand + + [offset 0] ; count plain bytes; add `(- buffer-pos buffer-start)` + [count #f] ; #f or a `location` + ;; Various methods below are called in atomic mode. The intent of ;; atomic mode is to ensure that the completion and return of the ;; function is atomic with respect to some further activity, such @@ -48,39 +72,39 @@ ;; the burden of re-checking for a closed port. Leave atomic mode ;; explicitly before raising an exception. - (public - ;; -*> (void) - ;; Called in atomic mode. - ;; Reqeusts a close, and the port is closed if/when - ;; the method returns. - [close (lambda () (void))] + #:public + ;; -*> (void) + ;; Called in atomic mode. + ;; Reqeusts a close, and the port is closed if/when + ;; the method returns. + [close (lambda () (void))] - ;; #f or (-*> (void)) - ;; Called in atomic mode. - ;; Notifies the port that line counting is enabled, and - ;; `get-location` can be called afterward (if it is defined) - [count-lines! #f] + ;; #f or (-*> (void)) + ;; Called in atomic mode. + ;; Notifies the port that line counting is enabled, and + ;; `get-location` can be called afterward (if it is defined) + [count-lines! #f] - ;; #f or (-*> (values line-or-#f column-or-#f position-or-#f)) - ;; Called in atomic mode. - ;; Returns the location of the next character. If #f, this method - ;; is implemented externally. - [get-location #f] ; #f or method called in atomic mode + ;; #f or (-*> (values line-or-#f column-or-#f position-or-#f)) + ;; Called in atomic mode. + ;; Returns the location of the next character. If #f, this method + ;; is implemented externally. + [get-location #f] ; #f or method called in atomic mode - ;; #f or (U (-*> position-k) (position-k -*> (void)) - ;; Called in atomic mode. - ;; If not #f, the port implements `file-position`. - [file-position #f] + ;; #f or (U (-*> position-k) (position-k -*> (void)) + ;; Called in atomic mode. + ;; If not #f, the port implements `file-position`. + [file-position #f] - ;; #f or (U (-*> mode-sym) (mode-sym -*> (void)) - ;; Called in atomic mode. - ;; If not #f, the port implements buffer-mode selection. - [buffer-mode #f]) + ;; #f or (U (-*> mode-sym) (mode-sym -*> (void)) + ;; Called in atomic mode. + ;; If not #f, the port implements buffer-mode selection. + [buffer-mode #f] - (property - [prop:unsafe-authentic-override #t] ; allow evt chaperone - [prop:object-name (struct-field-index name)] - [prop:secondary-evt port->evt])) + #:property + [prop:unsafe-authentic-override #t] ; allow evt chaperone + [prop:object-name (struct-field-index name)] + [prop:secondary-evt port->evt]) (struct direct ([bstr #:mutable] [pos #:mutable]