io: improve syntax of internal class forms

This commit is contained in:
Matthew Flatt 2019-02-13 10:03:00 -07:00
parent d197e1b8f4
commit c2d53143c1
16 changed files with 1481 additions and 1480 deletions

View File

@ -13,12 +13,12 @@
;;
;; <class-defn> = (class <class-id> <clause> ...)
;; | (class <class-id> #:extends <class-id> <clause> ...)
;; <clause> = (field [<field-id> <duplicatable-init-expr>] ...)
;; | (public [<method-id> <method>] ...)
;; | (private [<method-id> <method>] ...)
;; | (override [<method-id> <method>] ...)
;; | (static [<method-id> <method>] ...) ; not in vtable
;; | (property [<property-expr> <val-expr>] ...)
;; <clause> = #:field [<field-id> <duplicatable-init-expr>] ...
;; | #:public [<method-id> <method>] ...
;; | #:private [<method-id> <method>] ... ; cannot `send`
;; | #:override [<method-id> <method>] ...
;; | #:static [<method-id> <method>] ... ; cannot override
;; | #:property [<property-expr> <val-expr>] ...
;; <method> = #f
;; | (lambda <formals> <expr> ...+)
;; | (case-lambda [<formals> <expr> ...+] ...)
@ -40,12 +40,13 @@
;; unnecessary indirections through methods that can be overridden).
;;
;; Normally, use
;; (new <class-id> [<field-id> <expr] ...)
;; (new <class-id> #:field [<field-id> <expr] ...)
;; to create an instance of the class, where each unmentioned
;; <field-id> gets its default value. To override methods for just
;; this object, use
;; (new <class-id> #:override ([<method-id> <method>] ...)
;; [<field-id> <expr] ...)
;; (new <class-id>
;; #:field [<field-id> <expr] ...
;; #:override [<method-id> <method>] ...)
;; but beware that it involves allocating a new vtable each
;; time the `new` expression is evaluated.
;;
@ -105,42 +106,20 @@
[(id expr)
(list #'id #'expr (combine-ids base-id base-id "-" #'id) (combine-ids #'id "set-" base-id "-" #'id "!"))]
[_ (raise-syntax-error #f (format "bad ~a clause" what) stx e)])))
(define-values (new-fields new-methods override-methods locals statics properties)
(let ([l-stx (syntax-case stx ()
[(_ _ #:extends _ . rest) #'rest]
[(_ _ . rest) #'rest])])
(let loop ([l-stx l-stx] [new-fields null] [new-methods null] [override-methods null]
[locals null] [statics null] [properties null])
(syntax-case l-stx (field public override private static property)
[() (values new-fields new-methods override-methods locals statics properties)]
[((field fld ...) . rest)
(loop #'rest
(append new-fields
(add-procs id (syntax->list #'(fld ...)) "field" #:can-immutable? #t))
new-methods override-methods locals statics properties)]
[((public method ...) . rest)
(loop #'rest new-fields
(append new-methods
(add-procs methods-id (syntax->list #'(method ...)) "public"))
override-methods locals statics properties)]
[((override method ...) . rest)
(loop #'rest new-fields new-methods
(append override-methods (syntax->list #'(method ...)))
locals statics properties)]
[((private method ...) . rest)
(loop #'rest new-fields
new-methods override-methods
(append locals (syntax->list #'(method ...)))
statics properties)]
[((static method ...) . rest)
(loop #'rest new-fields new-methods override-methods locals
(append statics (syntax->list #'(method ...)))
properties)]
[((property prop ...) . rest)
(loop #'rest new-fields new-methods override-methods locals statics
(append properties (syntax->list #'((#:property . prop) ...))))]
[(other . _)
(raise-syntax-error #f "unrecognized" stx #'other)]))))
(define groups (extract-groups
stx
(syntax-case stx ()
[(_ _ #:extends _ . rest) #'rest]
[(_ _ . rest) #'rest])
'(#:field #:public #:override #:private #:static #:property)))
(define (extract-group tag) (reverse (hash-ref groups tag '())))
(define new-fields (add-procs id (extract-group '#:field) "field" #:can-immutable? #t))
(define new-methods (add-procs methods-id (extract-group '#:public) "public"))
(define override-methods (extract-group '#:override))
(define locals (extract-group '#:private))
(define statics (extract-group '#:static))
(define properties (for/list ([prop (in-list (extract-group '#:property))])
(cons '#:property prop)))
(define all-fields (if super-ci
(append (class-info-fields super-ci) new-fields)
new-fields))
@ -321,29 +300,31 @@
(define-syntax (new stx)
(syntax-case stx ()
[(_ class-id #:override (override ...) init ...)
[(_ class-id clause ...)
(let ([ci (and (identifier? #'class-id)
(syntax-local-value #'class-id (lambda () #f)))])
(unless (class-info? ci)
(raise-syntax-error #f "not a class identifier" stx #'class-id))
(for ([init (in-list (syntax->list #'(init ...)))])
(define groups (extract-groups stx #'(clause ...) '(#:field #:override)))
(define inits (reverse (hash-ref groups '#:field '())))
(define overrides (reverse (hash-ref groups '#:override '())))
(for ([init (in-list inits)])
(syntax-case init ()
[(field-id _) (check-member stx #'field-id (class-info-fields ci) "field")]
[_ (raise-syntax-error #f "bad field-inialization clause" stx init)]))
(for ([override (in-list (syntax->list #'(override ...)))])
(for ([override (in-list overrides)])
(syntax-case override ()
[(method-id _) (check-member stx #'method-id (class-info-methods ci) "method")]
[_ (raise-syntax-error #f "bad method-override clause" stx override)]))
(define field-exprs (for/list ([field (in-list (class-info-fields ci))])
(syntax-case field ()
[(field-id field-expr . _)
(or (for/or ([init (in-list (syntax->list #'(init ...)))])
(or (for/or ([init (in-list inits)])
(syntax-case init ()
[(id expr)
(and (eq? (syntax-e #'id) (syntax-e #'field-id))
#'expr)]))
#'field-expr)])))
(define overrides (syntax->list #'(override ...)))
(with-syntax ([make-id (cadr (class-info-struct-info ci))]
[vtable-id (class-info-vtable-id ci)]
[(field-expr ...) field-exprs])
@ -428,6 +409,25 @@
#'(let ([o obj-expr])
(bind-locals local-bindings o obj-expr (let () body0 body ...))))))]))
;; ----------------------------------------
(define-for-syntax (extract-groups stx l-stx ok-groups)
(let loop ([l-stx l-stx] [current-group #f] [groups #hasheq()])
(syntax-case l-stx ()
[() groups]
[(kw . rest)
(memq (syntax-e #'kw) ok-groups)
(loop #'rest (syntax-e #'kw) groups)]
[(kw . rest)
(keyword? (syntax-e #'kw))
(raise-syntax-error #f "unrecognized section keyword" stx #'kw)]
[(other . rest)
(if current-group
(loop #'rest
current-group
(hash-update groups current-group (lambda (l) (cons #'other l)) null))
(raise-syntax-error #f "need an initial section keyword, such as `#:field`" stx #'other))])))
(define-for-syntax (check-member stx id l what)
(or (for/or ([e (in-list l)])
(syntax-case e ()
@ -492,37 +492,40 @@
;; ----------------------------------------
(module+ test
(module+ main
(class example
(field
[a 1 #:immutable]
[b 2])
(private
[other (lambda (q) (list q this))])
(static
[enbox (lambda (v #:opt [opt (vector v a)])
(box (vector a v opt)))])
(public
[q #f]
[m (lambda (z #:maybe [maybe 9]) (list a (other b) maybe))]
[n (lambda (x y z) (vector a b (enbox x) y z))]))
#:field
[a 1 #:immutable]
[b 2]
#:private
[other (lambda (q) (list q this))]
#:static
[enbox (lambda (v #:opt [opt (vector v a)])
(box (vector a v opt)))]
#:public
[q #f]
[m (lambda (z #:maybe [maybe 9]) (list a (other b) maybe))]
[n (lambda (x y z) (vector a b (enbox x) y z))])
(class sub #:extends example
(field
[c 3]
[d 4])
(override
[m (lambda (z) 'other)])
(property
[prop:custom-write (lambda (s o m)
(write 'sub: o)
(write (sub-d s) o))]))
#:override
[m (lambda (z) 'other)]
#:field
[c 3]
#:property
[prop:custom-write (lambda (s o m)
(write 'sub: o)
(write (sub-d s) o))]
#:field
[d 4])
(define ex (new example [b 5]))
(define ex (new example
#:field
[b 5]))
(send example ex m 'ok #:maybe 'yep)
(method example ex m)
(new sub [d 5])
(new sub #:field [d 5])
(send example (new sub) m 'more)
(set-example-b! ex 6)
(send example ex enbox 88)
@ -530,8 +533,9 @@
(define ex2 (new example
#:override
([q (lambda (x y z)
(box (vector x y z a b)))])
[q (lambda (x y z)
(box (vector x y z a b)))]
#:field
[b 'b]
[a 'a]))
(send example ex2 n 1 2 3)

View File

@ -1,31 +0,0 @@
#lang racket/base
(require (for-syntax racket/base)
racket/fixnum)
(provide define-fixnum
;; to cooperate with macros that explicitly
;; manage closures, as in "object.rkt"
capture-fixnum
(for-syntax make-fixnum-transformer))
;; Representing a mutable, fixnum-valued variable with an fxvector can
;; avoid a write barrier on assignment
(define-syntax-rule (define-fixnum id v)
(begin
(define cell (fxvector v))
(define-syntax id (make-fixnum-transformer #'cell))))
(define-for-syntax (make-fixnum-transformer cell-id)
(with-syntax ([cell cell-id])
(make-set!-transformer
(lambda (stx)
(syntax-case stx (set!)
[(set! _ r) #'(fxvector-set! cell 0 r)]
[(_ #:capture-fixnum) #'cell] ; see `capture-fixnum`
[(... (_ ...)) (raise-syntax-error stx "bad use" stx)]
[_ #'(fxvector-ref cell 0)])))))
(define-syntax-rule (capture-fixnum id)
(id #:capture-fixnum))

View File

@ -15,49 +15,51 @@
tcp-abandon-port)
(class tcp-input-port #:extends fd-input-port
(field
[abandon? #f])
(override
[on-close
(lambda ()
(unless abandon?
(rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ)))]
[raise-read-error
(lambda (n)
(raise-network-error #f n "error reading from stream port"))])
(property
[prop:file-stream #f]
[prop:fd-place-message-opener (lambda (fd name)
(make-tcp-input-port fd name))]))
#:field
[abandon? #f]
#:override
[on-close
(lambda ()
(unless abandon?
(rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ)))]
[raise-read-error
(lambda (n)
(raise-network-error #f n "error reading from stream port"))]
#:property
[prop:file-stream #f]
[prop:fd-place-message-opener (lambda (fd name)
(make-tcp-input-port fd name))])
(define (make-tcp-input-port fd name
#:fd-refcount [fd-refcount (box 1)])
(finish-fd-input-port
(new tcp-input-port
#:field
[name name]
[fd fd]
[fd-refcount fd-refcount])))
(class tcp-output-port #:extends fd-output-port
(field
[abandon? #f])
(override
[on-close
(lambda ()
(unless abandon?
(rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE)))]
[raise-write-error
(lambda (n)
(raise-network-error #f n "error writing to stream port"))])
(property
[prop:file-stream #f]
[prop:fd-place-message-opener (lambda (fd name)
(make-tcp-output-port fd name))]))
#:field
[abandon? #f]
#:override
[on-close
(lambda ()
(unless abandon?
(rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE)))]
[raise-write-error
(lambda (n)
(raise-network-error #f n "error writing to stream port"))]
#:property
[prop:file-stream #f]
[prop:fd-place-message-opener (lambda (fd name)
(make-tcp-output-port fd name))])
(define (make-tcp-output-port fd name
#:fd-refcount [fd-refcount (box 1)])
(finish-fd-output-port
(new tcp-output-port
#:field
[name name]
[fd fd]
[fd-refcount fd-refcount]

View File

@ -1,7 +1,6 @@
#lang racket/base
(require racket/fixnum
"../common/check.rkt"
"../common/fixnum.rkt"
"../common/class.rkt"
"../host/thread.rkt"
"port.rkt"
@ -24,227 +23,228 @@
p)
(class bytes-input-port #:extends commit-input-port
(field
[bstr #f] ; normally installed as buffer
[pos 0] ; used when bstr is not installed as buffer
[alt-pos #f])
#:field
[bstr #f] ; normally installed as buffer
[pos 0] ; used when bstr is not installed as buffer
[alt-pos #f]
(private
;; in atomic mode
[in-buffer-pos
(lambda ()
(define b buffer)
(if (direct-bstr b)
(direct-pos b)
pos))])
#:private
;; in atomic mode
[in-buffer-pos
(lambda ()
(define b buffer)
(if (direct-bstr b)
(direct-pos b)
pos))]
(override
[close
(lambda ()
(set! commit-manager #f) ; to indicate closed
(progress!)
(set! bstr #f)
(define b buffer)
(when (direct-bstr b)
(set! offset (direct-pos b))
(set-direct-bstr! b #f)))]
[file-position
(case-lambda
[() (or alt-pos (in-buffer-pos))]
[(given-pos)
#:override
[close
(lambda ()
(set! commit-manager #f) ; to indicate closed
(progress!)
(set! bstr #f)
(define b buffer)
(when (direct-bstr b)
(set! offset (direct-pos b))
(set-direct-bstr! b #f)))]
[file-position
(case-lambda
[() (or alt-pos (in-buffer-pos))]
[(given-pos)
(define b buffer)
(define len (direct-end b))
(define new-pos (if (eof-object? given-pos)
len
(min len given-pos)))
(if (direct-bstr b)
(set-direct-pos! b new-pos)
(set! pos new-pos))
(set! alt-pos (and (not (eof-object? given-pos))
(given-pos . > . new-pos)
given-pos))])]
[prepare-change
(lambda ()
(pause-waiting-commit))]
[read-in
(lambda (dest-bstr start end copy?)
(define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos))
(cond
[(i . < . len)
(define amt (min (- end start) (fx- len i)))
(define new-pos (fx+ i amt))
;; Keep/resume fast mode
(set-direct-pos! b new-pos)
(set! offset 0)
(set-direct-bstr! b bstr)
(bytes-copy! dest-bstr start bstr i new-pos)
(progress!)
amt]
[else eof]))]
[peek-in
(lambda (dest-bstr start end skip progress-evt copy?)
(define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos))
(define at-pos (+ i skip))
(cond
[(and progress-evt (sync/timeout 0 progress-evt))
#f]
[(at-pos . < . len)
(define amt (min (- end start) (fx- len at-pos)))
(bytes-copy! dest-bstr start bstr at-pos (fx+ at-pos amt))
amt]
[else eof]))]
[byte-ready
(lambda (work-done!)
((in-buffer-pos) . < . (direct-end buffer)))]
[get-progress-evt
(lambda ()
(atomically
(unless progress-sema
;; set port to slow mode:
(define b buffer)
(when (direct-bstr b)
(define i (direct-pos b))
(set! pos i)
(set! offset i)
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b))))
(make-progress-evt)))]
[commit
(lambda (amt progress-evt ext-evt finish)
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(define b buffer)
(define len (direct-end b))
(define new-pos (if (eof-object? given-pos)
len
(min len given-pos)))
(if (direct-bstr b)
(set-direct-pos! b new-pos)
(set! pos new-pos))
(set! alt-pos (and (not (eof-object? given-pos))
(given-pos . > . new-pos)
given-pos))])]
[prepare-change
(lambda ()
(pause-waiting-commit))]
[read-in
(lambda (dest-bstr start end copy?)
(define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos))
(cond
[(i . < . len)
(define amt (min (- end start) (fx- len i)))
(define new-pos (fx+ i amt))
(define i (in-buffer-pos))
(let ([amt (min amt (- len i))])
(define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr i (+ i amt))
;; Keep/resume fast mode
(set-direct-pos! b new-pos)
(set! offset 0)
(set-direct-pos! b (fx+ i amt))
(set-direct-bstr! b bstr)
(bytes-copy! dest-bstr start bstr i new-pos)
(set! offset 0)
(progress!)
amt]
[else eof]))]
[peek-in
(lambda (dest-bstr start end skip progress-evt copy?)
(define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos))
(define at-pos (+ i skip))
(cond
[(and progress-evt (sync/timeout 0 progress-evt))
#f]
[(at-pos . < . len)
(define amt (min (- end start) (fx- len at-pos)))
(bytes-copy! dest-bstr start bstr at-pos (fx+ at-pos amt))
amt]
[else eof]))]
[byte-ready
(lambda (work-done!)
((in-buffer-pos) . < . (direct-end buffer)))]
[get-progress-evt
(lambda ()
(atomically
(unless progress-sema
;; set port to slow mode:
(define b buffer)
(when (direct-bstr b)
(define i (direct-pos b))
(set! pos i)
(set! offset i)
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b))))
(make-progress-evt)))]
[commit
(lambda (amt progress-evt ext-evt finish)
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(define b buffer)
(define len (direct-end b))
(define i (in-buffer-pos))
(let ([amt (min amt (- len i))])
(define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr i (+ i amt))
;; Keep/resume fast mode
(set-direct-pos! b (fx+ i amt))
(set-direct-bstr! b bstr)
(set! offset 0)
(progress!)
(finish dest-bstr)))))]))
(finish dest-bstr)))))])
(define (make-input-bytes bstr name)
(new bytes-input-port
[name name]
[buffer (direct bstr 0 (bytes-length bstr))]
[bstr bstr]))
(finish-port/count
(new bytes-input-port
#:field
[name name]
[buffer (direct bstr 0 (bytes-length bstr))]
[bstr bstr])))
;; ----------------------------------------
(class bytes-output-port #:extends core-output-port
(field
[bstr #""]
[pos 0]
[max-pos 0])
#:field
[bstr #""]
[pos 0]
[max-pos 0]
(public
[get-length (lambda ()
(start-atomic)
(slow-mode!)
(end-atomic)
max-pos)]
[get-bytes (lambda (dest-bstr start-pos discard?)
(start-atomic)
(slow-mode!)
(bytes-copy! dest-bstr 0 bstr start-pos (fx+ start-pos (bytes-length dest-bstr)))
(when discard?
(set! bstr #"")
(set! pos 0)
(set! max-pos 0))
(end-atomic))])
#:public
[get-length (lambda ()
(start-atomic)
(slow-mode!)
(end-atomic)
max-pos)]
[get-bytes (lambda (dest-bstr start-pos discard?)
(start-atomic)
(slow-mode!)
(bytes-copy! dest-bstr 0 bstr start-pos (fx+ start-pos (bytes-length dest-bstr)))
(when discard?
(set! bstr #"")
(set! pos 0)
(set! max-pos 0))
(end-atomic))]
(private
[enlarge!
(lambda (len)
(define new-bstr (make-bytes (fx* 2 len)))
(bytes-copy! new-bstr 0 bstr 0 pos)
(set! bstr new-bstr))]
#:private
[enlarge!
(lambda (len)
(define new-bstr (make-bytes (fx* 2 len)))
(bytes-copy! new-bstr 0 bstr 0 pos)
(set! bstr new-bstr))]
[slow-mode!
(lambda ()
(define b buffer)
(when (direct-bstr b)
(define s (direct-pos b))
(set! pos s)
(set-direct-pos! b (direct-end b))
(set-direct-bstr! b #f)
(set! offset s)
(set! max-pos (fxmax s max-pos))))]
[slow-mode!
(lambda ()
(define b buffer)
(when (direct-bstr b)
(define s (direct-pos b))
(set! pos s)
(set-direct-pos! b (direct-end b))
(set-direct-bstr! b #f)
(set! offset s)
(set! max-pos (fxmax s max-pos))))]
[fast-mode!
(lambda ()
(define b buffer)
(set-direct-bstr! b bstr)
(set-direct-pos! b pos)
(set-direct-end! b (bytes-length bstr))
(set! offset 0))])
[fast-mode!
(lambda ()
(define b buffer)
(set-direct-bstr! b bstr)
(set-direct-pos! b pos)
(set-direct-end! b (bytes-length bstr))
(set! offset 0))]
(override
[write-out
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(slow-mode!)
(define i pos)
(define amt (min (fx- src-end src-start) 4096))
(define end-i (fx+ i amt))
(when ((bytes-length bstr) . < . end-i)
(enlarge! end-i))
(bytes-copy! bstr i src-bstr src-start (fx+ src-start amt))
(set! pos end-i)
(set! max-pos (fxmax pos max-pos))
(fast-mode!)
amt)]
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! out v bstr start)))]
[file-position
(case-lambda
[()
(define b buffer)
(if (direct-bstr b) (direct-pos b) pos)]
[(new-pos)
(slow-mode!)
(define len (bytes-length bstr))
(cond
[(eof-object? new-pos)
(set! pos max-pos)]
[(new-pos . > . len)
(when (new-pos . >= . (expt 2 48))
;; implausibly large
(end-atomic)
(raise-arguments-error 'file-position
"new position is too large"
"port" this
"position" new-pos))
(enlarge! len)
(set! pos new-pos)
(set! max-pos new-pos)]
[else
(set! pos new-pos)
(set! max-pos (fxmax max-pos new-pos))])])]))
#:override
[write-out
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(slow-mode!)
(define i pos)
(define amt (min (fx- src-end src-start) 4096))
(define end-i (fx+ i amt))
(when ((bytes-length bstr) . < . end-i)
(enlarge! end-i))
(bytes-copy! bstr i src-bstr src-start (fx+ src-start amt))
(set! pos end-i)
(set! max-pos (fxmax pos max-pos))
(fast-mode!)
amt)]
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! out v bstr start)))]
[file-position
(case-lambda
[()
(define b buffer)
(if (direct-bstr b) (direct-pos b) pos)]
[(new-pos)
(slow-mode!)
(define len (bytes-length bstr))
(cond
[(eof-object? new-pos)
(set! pos max-pos)]
[(new-pos . > . len)
(when (new-pos . >= . (expt 2 48))
;; implausibly large
(end-atomic)
(raise-arguments-error 'file-position
"new position is too large"
"port" this
"position" new-pos))
(enlarge! len)
(set! pos new-pos)
(set! max-pos new-pos)]
[else
(set! pos new-pos)
(set! max-pos (fxmax max-pos new-pos))])])])
(define (open-output-bytes [name 'string])
(define p (new bytes-output-port
[bstr (make-bytes 16)]
[name name]
[evt always-evt]))
(when (port-count-lines-enabled)
(port-count-lines! p))
p)
(finish-port/count
(new bytes-output-port
#:field
[bstr (make-bytes 16)]
[name name]
[evt always-evt])))
(define/who (get-output-bytes o [reset? #f] [start-pos 0] [end-pos #f])
(check who (lambda (v) (and (output-port? o) (string-port? o)))

View File

@ -8,46 +8,46 @@
(provide commit-input-port)
(class commit-input-port #:extends core-input-port
(field
[progress-sema #f]
[commit-manager #f])
#:field
[progress-sema #f]
[commit-manager #f]
(static
;; in atomic mode
[progress!
(lambda ()
(when progress-sema
(semaphore-post progress-sema)
(set! progress-sema #f)))]
#:static
;; in atomic mode
[progress!
(lambda ()
(when progress-sema
(semaphore-post progress-sema)
(set! progress-sema #f)))]
;; in atomic mode [can leave atomic mode temporarily]
;; After this function returns, complete any commit-changing work
;; before leaving atomic mode again.
[pause-waiting-commit
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))]
;; in atomic mode [can leave atomic mode temporarily]
;; After this function returns, complete any commit-changing work
;; before leaving atomic mode again.
[pause-waiting-commit
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))]
;; in atomic mode [can leave atomic mode temporarily]
[wait-commit
(lambda (progress-evt ext-evt finish)
(cond
[(and (not commit-manager)
;; Try shortcut:
(not (sync/timeout 0 progress-evt))
(sync/timeout 0 ext-evt))
(finish)
#t]
[else
;; General case to support blocking and potentially multiple
;; commiting threads:
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))]
;; in atomic mode [can leave atomic mode temporarily]
[wait-commit
(lambda (progress-evt ext-evt finish)
(cond
[(and (not commit-manager)
;; Try shortcut:
(not (sync/timeout 0 progress-evt))
(sync/timeout 0 ext-evt))
(finish)
#t]
[else
;; General case to support blocking and potentially multiple
;; commiting threads:
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))]
;; in atomic mode
[make-progress-evt
(lambda ()
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema))]))
;; in atomic mode
[make-progress-evt
(lambda ()
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema))])

View File

@ -10,6 +10,7 @@
"../string/utf-8-decode.rkt")
(provide port-count-lines-enabled
finish-port/count
port-count-lines!
port-counts-lines?
@ -25,6 +26,11 @@
(define port-count-lines-enabled
(make-parameter #f (lambda (v) (and v #t))))
(define (finish-port/count p)
(when (port-count-lines-enabled)
(port-count-lines! p))
p)
(define/who (port-count-lines! p)
(let ([p (cond
[(input-port? p) (->core-input-port p)]

View File

@ -260,48 +260,45 @@
(and user-buffer-mode
(make-buffer-mode user-buffer-mode)))
(define port
(cond
[user-peek-in
(new core-input-port
#:override
([read-in (if (input-port? user-read-in)
user-read-in
read-in)]
[peek-in (if (input-port? user-peek-in)
user-peek-in
peek-in)]
[byte-ready (if (input-port? user-peek-in)
user-peek-in
byte-ready)]
[close close]
[get-progress-evt (and user-get-progress-evt get-progress-evt)]
[commit (and user-commit commit)]
[get-location get-location]
[count-lines! count-lines!]
[file-position file-position]
[buffer-mode buffer-mode])
[name name]
[offset init-offset])]
[else
(new peek-via-read-input-port
#:override
([read-in/inner read-in]
[close (values
(lambda (self)
(close self)
(send peek-via-read-input-port self close-peek-buffer)))]
[get-location get-location]
[count-lines! count-lines!]
[file-position file-position]
[buffer-mode (or buffer-mode
(case-lambda
[(self) (send peek-via-read-input-port self default-buffer-mode)]
[(self mode) (send peek-via-read-input-port self default-buffer-mode mode)]))])
[name name]
[offset init-offset])]))
(when (port-count-lines-enabled)
(port-count-lines! port))
port)
(finish-port/count
(cond
[user-peek-in
(new core-input-port
#:field
[name name]
[offset init-offset]
#:override
[read-in (if (input-port? user-read-in)
user-read-in
read-in)]
[peek-in (if (input-port? user-peek-in)
user-peek-in
peek-in)]
[byte-ready (if (input-port? user-peek-in)
user-peek-in
byte-ready)]
[close close]
[get-progress-evt (and user-get-progress-evt get-progress-evt)]
[commit (and user-commit commit)]
[get-location get-location]
[count-lines! count-lines!]
[file-position file-position]
[buffer-mode buffer-mode])]
[else
(new peek-via-read-input-port
#:field
[name name]
[offset init-offset]
#:override
[read-in/inner read-in]
[close (values
(lambda (self)
(close self)
(send peek-via-read-input-port self close-peek-buffer)))]
[get-location get-location]
[count-lines! count-lines!]
[file-position file-position]
[buffer-mode (or buffer-mode
(case-lambda
[(self) (send peek-via-read-input-port self default-buffer-mode)]
[(self mode) (send peek-via-read-input-port self default-buffer-mode mode)]))])])))

View File

@ -168,30 +168,26 @@
(user-close)
(start-atomic))
(define port
(new core-output-port
#:override
([write-out (if (output-port? user-write-out)
user-write-out
write-out)]
[close close]
[write-out-special
(if (output-port? user-write-out-special)
user-write-out-special
(and user-write-out-special write-out-special))]
[get-write-evt (and user-get-write-evt get-write-evt)]
[get-write-special-evt (and user-get-write-special-evt
(lambda (self v)
(user-get-write-special-evt v)))]
[get-location get-location]
[count-lines! count-lines!]
[file-position file-position]
[buffer-mode buffer-mode])
[name name]
[evt evt]
[offset init-offset]))
(when (port-count-lines-enabled)
(port-count-lines! port))
port)
(finish-port/count
(new core-output-port
#:field
[name name]
[evt evt]
[offset init-offset]
#:override
[write-out (if (output-port? user-write-out)
user-write-out
write-out)]
[close close]
[write-out-special
(if (output-port? user-write-out-special)
user-write-out-special
(and user-write-out-special write-out-special))]
[get-write-evt (and user-get-write-evt get-write-evt)]
[get-write-special-evt (and user-get-write-special-evt
(lambda (self v)
(user-get-write-special-evt v)))]
[get-location get-location]
[count-lines! count-lines!]
[file-position file-position]
[buffer-mode buffer-mode])))

View File

@ -41,50 +41,50 @@
;; ----------------------------------------
(class fd-input-port #:extends peek-via-read-input-port
(field
[fd #f]
[fd-refcount (box 1)]
[custodian-reference #f])
(public
[on-close (lambda () (void))]
[raise-read-error (lambda (n)
(raise-filesystem-error #f n "error reading from stream port"))])
#:field
[fd #f]
[fd-refcount (box 1)]
[custodian-reference #f]
(override
[read-in/inner
(lambda (dest-bstr start end copy?)
(define n (rktio_read_in rktio fd dest-bstr start end))
(cond
[(rktio-error? n)
(end-atomic)
(send fd-input-port this raise-read-error n)]
[(eqv? n RKTIO_READ_EOF) eof]
[(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this)
(lambda (v) 0))]
[else n]))]
#:public
[on-close (lambda () (void))]
[raise-read-error (lambda (n)
(raise-filesystem-error #f n "error reading from stream port"))]
#:override
[read-in/inner
(lambda (dest-bstr start end copy?)
(define n (rktio_read_in rktio fd dest-bstr start end))
(cond
[(rktio-error? n)
(end-atomic)
(send fd-input-port this raise-read-error n)]
[(eqv? n RKTIO_READ_EOF) eof]
[(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this)
(lambda (v) 0))]
[else n]))]
[close
(lambda ()
(send fd-input-port this on-close)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference)
(close-peek-buffer))]
[close
(lambda ()
(send fd-input-port this on-close)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference)
(close-peek-buffer))]
[file-position
(case-lambda
[()
(define pos (get-file-position fd))
(and pos (buffer-adjust-pos pos))]
[(pos)
(purge-buffer)
(set-file-position fd pos)])])
[file-position
(case-lambda
[()
(define pos (get-file-position fd))
(and pos (buffer-adjust-pos pos))]
[(pos)
(purge-buffer)
(set-file-position fd pos)])]
(property
[prop:file-stream (lambda (p) (fd-input-port-fd p))]
[prop:data-place-message (lambda (port)
(lambda ()
(fd-port->place-message port)))]))
#:property
[prop:file-stream (lambda (p) (fd-input-port-fd p))]
[prop:data-place-message (lambda (port)
(lambda ()
(fd-port->place-message port)))])
;; ----------------------------------------
@ -95,6 +95,7 @@
#:custodian [cust (current-custodian)])
(finish-fd-input-port
(new fd-input-port
#:field
[name name]
[fd fd]
[fd-refcount fd-refcount])
@ -105,193 +106,191 @@
(define fd (fd-input-port-fd p))
(define fd-refcount (fd-input-port-fd-refcount p))
(set-fd-input-port-custodian-reference! p (register-fd-close cust fd fd-refcount #f p))
(when (port-count-lines-enabled)
(port-count-lines! p))
p)
(finish-port/count p))
;; ----------------------------------------
(class fd-output-port #:extends core-output-port
(field
[fd fd]
[fd-refcount (box 1)]
[bstr (make-bytes 4096)]
[start-pos 0]
[end-pos 0]
[flush-handle #f]
[buffer-mode 'block]
[custodian-reference #f])
#:field
[fd fd]
[fd-refcount (box 1)]
[bstr (make-bytes 4096)]
[start-pos 0]
[end-pos 0]
[flush-handle #f]
[buffer-mode 'block]
[custodian-reference #f]
(static
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(when (eq? buffer-mode 'block)
(define b buffer)
(define e end-pos)
(set-direct-bstr! b bstr)
(set-direct-pos! b e)
(set-direct-end! b (bytes-length bstr))
(define o offset)
(when o
(set! offset (- (+ o amt) e)))))]
#:static
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(when (eq? buffer-mode 'block)
(define b buffer)
(define e end-pos)
(set-direct-bstr! b bstr)
(set-direct-pos! b e)
(set-direct-end! b (bytes-length bstr))
(define o offset)
(when o
(set! offset (- (+ o amt) e)))))]
[slow-mode!
(lambda ()
[slow-mode!
(lambda ()
(define b buffer)
(when (direct-bstr b)
(set-direct-bstr! b #f)
(define pos (direct-pos b))
(set! end-pos pos)
(define o offset)
(when o
(set! offset (+ o pos)))
(set-direct-pos! b (direct-end b))))]
#:public
[on-close (lambda () (void))]
[raise-write-error
(lambda (n)
(raise-filesystem-error #f n "error writing to stream port"))]
#:private
;; in atomic mode
;; Returns `#t` if the buffer is already or successfully flushed
[flush-buffer
(lambda ()
(slow-mode!)
(cond
[(not (fx= start-pos end-pos))
(define n (rktio_write_in rktio fd bstr start-pos end-pos))
(cond
[(rktio-error? n)
(end-atomic)
(send fd-output-port this raise-write-error n)]
[(fx= n 0)
#f]
[else
(define new-start-pos (fx+ start-pos n))
(cond
[(fx= new-start-pos end-pos)
(set! start-pos 0)
(set! end-pos 0)
#t]
[else
(set! start-pos new-start-pos)
#f])])]
[else #t]))]
;; in atomic mode, but may leave it temporarily
[flush-buffer-fully
(lambda (enable-break?)
(let loop ()
(unless (flush-buffer)
(end-atomic)
(if enable-break?
(sync/enable-break evt)
(sync evt))
(start-atomic)
(when bstr ; in case it was closed
(loop)))))]
;; in atomic mode, but may leave it temporarily
[flush-buffer-fully-if-newline
(lambda (src-bstr src-start src-end enable-break?)
(for ([b (in-bytes src-bstr src-start src-end)])
(define newline? (or (eqv? b (char->integer #\newline))
(eqv? b (char->integer #\return))))
(when newline? (flush-buffer-fully enable-break?))
#:break newline?
(void)))]
#:static
[flush-buffer/external
(lambda ()
(flush-buffer-fully #f))]
#:override
;; in atomic mode
[write-out
(lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?)
(slow-mode!)
(cond
[(fx= src-start src-end)
;; Flush request
(and (flush-buffer) 0)]
[(and (not (eq? buffer-mode 'none))
(not nonbuffer/nonblock?)
(fx< end-pos (bytes-length bstr)))
(define amt (fxmin (fx- src-end src-start) (fx- (bytes-length bstr) end-pos)))
(bytes-copy! bstr end-pos src-bstr src-start (fx+ src-start amt))
(set! end-pos (fx+ end-pos amt))
(when (eq? buffer-mode 'line)
;; can temporarily leave atomic mode:
(flush-buffer-fully-if-newline src-bstr src-start src-end enable-break?))
(fast-mode! amt)
amt]
[(not (flush-buffer)) ; <- can temporarily leave atomic mode
#f]
[else
(define n (rktio_write_in rktio fd src-bstr src-start src-end))
(cond
[(rktio-error? n)
(end-atomic)
(send fd-output-port this raise-write-error n)]
[(fx= n 0) (wrap-evt evt (lambda (v) #f))]
[else n])]))]
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! out v bstr start)))]
;; in atomic mode
[close
(lambda ()
(flush-buffer-fully #f) ; can temporarily leave atomic mode
(when bstr ; <- in case a concurrent close succeeded
(send fd-output-port this on-close)
(plumber-flush-handle-remove! flush-handle)
(set! bstr #f)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference)))]
;; in atomic mode
[file-position
(case-lambda
[()
(define pos (get-file-position fd))
(define b buffer)
(when (direct-bstr b)
(set-direct-bstr! b #f)
(define pos (direct-pos b))
(set! end-pos pos)
(define o offset)
(when o
(set! offset (+ o pos)))
(set-direct-pos! b (direct-end b))))])
(and pos (+ pos (fx- (if (direct-bstr b) (direct-pos b) end-pos) start-pos)))]
[(pos)
(flush-buffer-fully #f)
;; flushing can leave atomic mode, so make sure the
;; port is still open before continuing
(unless bstr
(check-not-closed 'file-position this))
(set-file-position fd pos)])]
(public
[on-close (lambda () (void))]
[raise-write-error
(lambda (n)
(raise-filesystem-error #f n "error writing to stream port"))])
;; in atomic mode
[buffer-mode
(case-lambda
[(self) buffer-mode]
[(self mode) (set! buffer-mode mode)])]
(private
;; in atomic mode
;; Returns `#t` if the buffer is already or successfully flushed
[flush-buffer
(lambda ()
(slow-mode!)
(cond
[(not (fx= start-pos end-pos))
(define n (rktio_write_in rktio fd bstr start-pos end-pos))
(cond
[(rktio-error? n)
(end-atomic)
(send fd-output-port this raise-write-error n)]
[(fx= n 0)
#f]
[else
(define new-start-pos (fx+ start-pos n))
(cond
[(fx= new-start-pos end-pos)
(set! start-pos 0)
(set! end-pos 0)
#t]
[else
(set! start-pos new-start-pos)
#f])])]
[else #t]))]
;; in atomic mode, but may leave it temporarily
[flush-buffer-fully
(lambda (enable-break?)
(let loop ()
(unless (flush-buffer)
(end-atomic)
(if enable-break?
(sync/enable-break evt)
(sync evt))
(start-atomic)
(when bstr ; in case it was closed
(loop)))))]
;; in atomic mode, but may leave it temporarily
[flush-buffer-fully-if-newline
(lambda (src-bstr src-start src-end enable-break?)
(for ([b (in-bytes src-bstr src-start src-end)])
(define newline? (or (eqv? b (char->integer #\newline))
(eqv? b (char->integer #\return))))
(when newline? (flush-buffer-fully enable-break?))
#:break newline?
(void)))])
(static
[flush-buffer/external
(lambda ()
(flush-buffer-fully #f))])
(override
;; in atomic mode
[write-out
(lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?)
(slow-mode!)
(cond
[(fx= src-start src-end)
;; Flush request
(and (flush-buffer) 0)]
[(and (not (eq? buffer-mode 'none))
(not nonbuffer/nonblock?)
(fx< end-pos (bytes-length bstr)))
(define amt (fxmin (fx- src-end src-start) (fx- (bytes-length bstr) end-pos)))
(bytes-copy! bstr end-pos src-bstr src-start (fx+ src-start amt))
(set! end-pos (fx+ end-pos amt))
(when (eq? buffer-mode 'line)
;; can temporarily leave atomic mode:
(flush-buffer-fully-if-newline src-bstr src-start src-end enable-break?))
(fast-mode! amt)
amt]
[(not (flush-buffer)) ; <- can temporarily leave atomic mode
#f]
[else
(define n (rktio_write_in rktio fd src-bstr src-start src-end))
(cond
[(rktio-error? n)
(end-atomic)
(send fd-output-port this raise-write-error n)]
[(fx= n 0) (wrap-evt evt (lambda (v) #f))]
[else n])]))]
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! out v bstr start)))]
;; in atomic mode
[close
(lambda ()
(flush-buffer-fully #f) ; can temporarily leave atomic mode
(when bstr ; <- in case a concurrent close succeeded
(send fd-output-port this on-close)
(plumber-flush-handle-remove! flush-handle)
(set! bstr #f)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference)))]
;; in atomic mode
[file-position
(case-lambda
[()
(define pos (get-file-position fd))
(define b buffer)
(and pos (+ pos (fx- (if (direct-bstr b) (direct-pos b) end-pos) start-pos)))]
[(pos)
(flush-buffer-fully #f)
;; flushing can leave atomic mode, so make sure the
;; port is still open before continuing
(unless bstr
(check-not-closed 'file-position this))
(set-file-position fd pos)])]
;; in atomic mode
[buffer-mode
(case-lambda
[(self) buffer-mode]
[(self mode) (set! buffer-mode mode)])])
(property
[prop:file-stream (lambda (p) (fd-output-port-fd p))]
[prop:file-truncate (lambda (p pos)
;; in atomic mode
(send fd-output-port p flush-buffer/external)
(define result
(rktio_set_file_size rktio
(fd-output-port-fd p)
pos))
(cond
[(rktio-error? result)
(end-atomic)
(raise-rktio-error 'file-truncate result "error setting file size")]
[else result]))]
[prop:data-place-message (lambda (port)
(lambda ()
(fd-port->place-message port)))]))
#:property
[prop:file-stream (lambda (p) (fd-output-port-fd p))]
[prop:file-truncate (lambda (p pos)
;; in atomic mode
(send fd-output-port p flush-buffer/external)
(define result
(rktio_set_file_size rktio
(fd-output-port-fd p)
pos))
(cond
[(rktio-error? result)
(end-atomic)
(raise-rktio-error 'file-truncate result "error setting file size")]
[else result]))]
[prop:data-place-message (lambda (port)
(lambda ()
(fd-port->place-message port)))])
;; ----------------------------------------
@ -304,6 +303,7 @@
#:custodian [cust (current-custodian)])
(finish-fd-output-port
(new fd-output-port
#:field
[name name]
[fd fd]
[fd-refcount fd-refcount]
@ -330,9 +330,7 @@
(set-core-output-port-evt! p evt)
(set-fd-output-port-flush-handle! p flush-handle)
(set-fd-output-port-custodian-reference! p custodian-reference)
(when (port-count-lines-enabled)
(port-count-lines! p))
p)
(finish-port/count p))
;; ----------------------------------------

View File

@ -49,100 +49,101 @@
[else default]))
(class core-input-port #:extends core-port
(field
[pending-eof? #f]
[read-handler #f])
#:field
[pending-eof? #f]
[read-handler #f]
(public
#:public
;; #f or (-*> void)
;; Called in atomic mode
;; May leave atomic mode temporarily, but on return, ensures that
;; other atomic operations are ok to change the port. The main use
;; of `prepare-change` is to pause and `port-commit-peeked`
;; attempts to not succeed while a potential change is in progress,
;; where the commit attempts can resume after atomic mode is left.
;; The `close` operation is *not* guarded by a call to
;; `prepare-change`.
[prepare-change #f]
;; #f or (-*> void)
;; Called in atomic mode
;; May leave atomic mode temporarily, but on return, ensures that
;; other atomic operations are ok to change the port. The main use
;; of `prepare-change` is to pause and `port-commit-peeked`
;; attempts to not succeed while a potential change is in progress,
;; where the commit attempts can resume after atomic mode is left.
;; The `close` operation is *not* guarded by a call to
;; `prepare-change`.
[prepare-change #f]
;; port or (bytes start-k end-k copy? -*> (or/c integer? ...))
;; Called in atomic mode.
;; A port value redirects to the port. Otherwise, the function
;; never blocks, and can assume `(- end-k start-k)` is non-zero.
;; The `copy?` flag indicates that the given byte string should not
;; be exposed to untrusted code, and instead of should be copied if
;; necessary. The return values are the same as documented for
;; `make-input-port`, except that a pipe result is not allowed (or,
;; more precisely, it's treated as an event).
[read-in (lambda (bstr start end copy?) eof)]
;; port or (bytes start-k end-k copy? -*> (or/c integer? ...))
;; Called in atomic mode.
;; A port value redirects to the port. Otherwise, the function
;; never blocks, and can assume `(- end-k start-k)` is non-zero.
;; The `copy?` flag indicates that the given byte string should not
;; be exposed to untrusted code, and instead of should be copied if
;; necessary. The return values are the same as documented for
;; `make-input-port`, except that a pipe result is not allowed (or,
;; more precisely, it's treated as an event).
[read-in (lambda (bstr start end copy?) eof)]
;; port or (bytes start-k end-k skip-k progress-evt copy? -*> (or/c integer? ...))
;; Called in atomic mode.
;; A port value redirects to the port. Otherwise, the function
;; never blocks, and it can assume that `(- end-k start-k)` is
;; non-zero. The `copy?` flag is the same as for `read-in`. The
;; return values are the same as documented for `make-input-port`.
[peek-in (lambda (bstr start end progress-evt copy?) eof)]
;; port or (bytes start-k end-k skip-k progress-evt copy? -*> (or/c integer? ...))
;; Called in atomic mode.
;; A port value redirects to the port. Otherwise, the function
;; never blocks, and it can assume that `(- end-k start-k)` is
;; non-zero. The `copy?` flag is the same as for `read-in`. The
;; return values are the same as documented for `make-input-port`.
[peek-in (lambda (bstr start end progress-evt copy?) eof)]
;; port or ((->) -*> (or/c boolean? evt))
;; Called in atomic mode.
;; A port value makes sense when `peek-in` has a port value.
;; Otherwise, check whether a peek on one byte would succeed
;; without blocking and return a boolean, or return an event that
;; effectively does the same. The event's value doesn't matter,
;; because it will be wrapped to return some original port. When
;; `byte-ready` is a function, it should call the given function
;; (for its side effect) when work has been done that might unblock
;; this port or some other port.
[byte-ready (lambda (work-done!) #t)]
;; port or ((->) -*> (or/c boolean? evt))
;; Called in atomic mode.
;; A port value makes sense when `peek-in` has a port value.
;; Otherwise, check whether a peek on one byte would succeed
;; without blocking and return a boolean, or return an event that
;; effectively does the same. The event's value doesn't matter,
;; because it will be wrapped to return some original port. When
;; `byte-ready` is a function, it should call the given function
;; (for its side effect) when work has been done that might unblock
;; this port or some other port.
[byte-ready (lambda (work-done!) #t)]
;; #f or (-*> evt?)
;; *Not* called in atomic mode.
;; Optional support for progress events, and may be called on a
;; closed port.
[get-progress-evt #f]
;; #f or (-*> evt?)
;; *Not* called in atomic mode.
;; Optional support for progress events, and may be called on a
;; closed port.
[get-progress-evt #f]
;; (amt-k progress-evt? evt? (bytes? -> any) -*> boolean)
;; Called in atomic mode.
;; Goes with `get-progress-evt`. The final `evt?` argument is
;; constrained to a few kinds of events; see docs for
;; `port-commit-peeked` for more information. On success, a
;; completion function is called in atomic mode, but possibly in a
;; different thread, with the committed bytes. The result is a
;; boolean indicating success or failure.
[commit (lambda (amt progress-evt ext-evt finish) #f)]
;; (amt-k progress-evt? evt? (bytes? -> any) -*> boolean)
;; Called in atomic mode.
;; Goes with `get-progress-evt`. The final `evt?` argument is
;; constrained to a few kinds of events; see docs for
;; `port-commit-peeked` for more information. On success, a
;; completion function is called in atomic mode, but possibly in a
;; different thread, with the committed bytes. The result is a
;; boolean indicating success or failure.
[commit (lambda (amt progress-evt ext-evt finish) #f)])
(property
[prop:input-port-evt (lambda (i)
;; not atomic mode
(let ([i (->core-input-port i)])
(cond
[(core-port-closed? i)
always-evt]
[else
(define byte-ready (method core-input-port i byte-ready))
(cond
[(input-port? byte-ready)
byte-ready]
[else
(poller-evt
(poller
(lambda (self poll-ctx)
;; atomic mode
(define v (byte-ready i
(lambda ()
(schedule-info-did-work! (poll-ctx-sched-info poll-ctx)))))
(cond
[(evt? v)
(values #f v)]
[(eq? v #t)
(values (list #t) #f)]
[else
(values #f self)]))))])])))]))
#:property
[prop:input-port-evt (lambda (i)
;; not atomic mode
(let ([i (->core-input-port i)])
(cond
[(core-port-closed? i)
always-evt]
[else
(define byte-ready (method core-input-port i byte-ready))
(cond
[(input-port? byte-ready)
byte-ready]
[else
(poller-evt
(poller
(lambda (self poll-ctx)
;; atomic mode
(define v (byte-ready i
(lambda ()
(schedule-info-did-work! (poll-ctx-sched-info poll-ctx)))))
(cond
[(evt? v)
(values #f v)]
[(eq? v #t)
(values (list #t) #f)]
[else
(values #f self)]))))])])))])
;; ----------------------------------------
(define empty-input-port
(new core-input-port
#:field
[name 'empty]))

View File

@ -8,32 +8,33 @@
max-output-port-max-length)
(class max-output-port #:extends core-output-port
(field
[o #f]
[max-length 0])
(override
[write-out
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(cond
[max-length
(define len (- src-end src-start))
(unless (eq? max-length 'full)
(define write-len (min len max-length))
(end-atomic)
(define wrote-len (write-bytes src-bstr o src-start (+ src-start write-len)))
(start-atomic)
(if (= max-length wrote-len)
(set! max-length 'full)
(set! max-length (- max-length wrote-len))))
len]
[else
#:field
[o #f]
[max-length 0]
#:override
[write-out
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(cond
[max-length
(define len (- src-end src-start))
(unless (eq? max-length 'full)
(define write-len (min len max-length))
(end-atomic)
(define len (write-bytes src-bstr o src-start src-end))
(define wrote-len (write-bytes src-bstr o src-start (+ src-start write-len)))
(start-atomic)
len]))]))
(if (= max-length wrote-len)
(set! max-length 'full)
(set! max-length (- max-length wrote-len))))
len]
[else
(end-atomic)
(define len (write-bytes src-bstr o src-start src-end))
(start-atomic)
len]))])
(define (make-max-output-port o max-length)
(new max-output-port
#:field
[name (object-name o)]
[evt o]
[o o]

View File

@ -1,15 +1,18 @@
#lang racket/base
(require "../common/class.rkt"
"output-port.rkt")
"output-port.rkt"
"count.rkt")
(provide open-output-nowhere)
(class nowhere-output-port #:extends core-output-port
(override
[write-out-special
(lambda (any no-block/buffer? enable-break?)
#t)]))
#:override
[write-out-special
(lambda (any no-block/buffer? enable-break?)
#t)])
(define (open-output-nowhere)
(new nowhere-output-port
[name 'nowhere]))
(finish-port/count
(new nowhere-output-port
#:field
[name 'nowhere])))

View File

@ -50,52 +50,52 @@
[else default]))
(class core-output-port #:extends core-port
(field
[evt always-evt] ; An evt that is ready when writing a byte won't block
[write-handler #f]
[print-handler #f]
[display-handler #f])
#:field
[evt always-evt] ; An evt that is ready when writing a byte won't block
[write-handler #f]
[print-handler #f]
[display-handler #f]
(public
;; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -*> ...)
;; Called in atomic mode.
;; Doesn't block if `no-block/buffer?` is true. Does enable breaks
;; while blocking if `enable-break?` is true. The `copy?` flag
;; indicates that the given byte string should not be exposed to
;; untrusted code, and instead of should be copied if necessary.
;; The return values are the same as documented for
;; `make-output-port`.
[write-out (lambda (bstr start-k end-k no-block/buffer? enable-break? copy?)
(- end-k start-k))]
#:public
;; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -*> ...)
;; Called in atomic mode.
;; Doesn't block if `no-block/buffer?` is true. Does enable breaks
;; while blocking if `enable-break?` is true. The `copy?` flag
;; indicates that the given byte string should not be exposed to
;; untrusted code, and instead of should be copied if necessary.
;; The return values are the same as documented for
;; `make-output-port`.
[write-out (lambda (bstr start-k end-k no-block/buffer? enable-break? copy?)
(- end-k start-k))]
;; #f or (any no-block/buffer? enable-break? -*> boolean?)
;; Called in atomic mode.
[write-out-special #f]
;; #f or (bstr start-k end-k -*> evt?)
;; Called in atomic mode.
;; The given bstr should not be exposed to untrusted code.
[get-write-evt (lambda (bstr start-k end-k) always-evt)]
;; #f or (any no-block/buffer? enable-break? -*> boolean?)
;; Called in atomic mode.
[write-out-special #f]
;; #f or (bstr start-k end-k -*> evt?)
;; Called in atomic mode.
;; The given bstr should not be exposed to untrusted code.
[get-write-evt (lambda (bstr start-k end-k) always-evt)]
;; #f or (any -*> evt?)
;; *Not* called in atomic mode.
[get-write-special-evt #f])
;; #f or (any -*> evt?)
;; *Not* called in atomic mode.
[get-write-special-evt #f]
(property
[prop:output-port-evt (lambda (o)
;; not atomic mode
(let ([o (->core-output-port o)])
(choice-evt
(list
(poller-evt
(poller
(lambda (self sched-info)
;; atomic mode
(cond
[(core-port-closed? o)
(values '(#t) #f)]
[else (values #f self)]))))
(core-output-port-evt o)))))]))
#:property
[prop:output-port-evt (lambda (o)
;; not atomic mode
(let ([o (->core-output-port o)])
(choice-evt
(list
(poller-evt
(poller
(lambda (self sched-info)
;; atomic mode
(cond
[(core-port-closed? o)
(values '(#t) #f)]
[else (values #f self)]))))
(core-output-port-evt o)))))])
;; If `write-out` is always atomic (in no-block, no-buffer mode),
;; then an event can poll `write-out`
@ -118,4 +118,5 @@
(define empty-output-port
(new core-output-port
#:field
[name 'empty]))

View File

@ -11,227 +11,227 @@
(provide peek-via-read-input-port)
(class peek-via-read-input-port #:extends commit-input-port
(field
[bstr (make-bytes 4096)]
[pos 0]
[end-pos 0]
[peeked-eof? #f]
[buffer-mode 'block])
#:field
[bstr (make-bytes 4096)]
[pos 0]
[end-pos 0]
[peeked-eof? #f]
[buffer-mode 'block]
(public
;; in atomic mode; must override
[read-in/inner
(lambda (dest-bstr start end copy?)
0)])
#:public
;; in atomic mode; must override
[read-in/inner
(lambda (dest-bstr start end copy?)
0)]
(static
;; in atomic mode
[purge-buffer
(lambda ()
(slow-mode!)
(set! pos 0)
(set! end-pos 0)
(set! peeked-eof? #f))]
#:static
;; in atomic mode
[purge-buffer
(lambda ()
(slow-mode!)
(set! pos 0)
(set! end-pos 0)
(set! peeked-eof? #f))]
[close-peek-buffer
(lambda ()
(purge-buffer)
(set! bstr #""))]
[close-peek-buffer
(lambda ()
(purge-buffer)
(set! bstr #""))]
[buffer-adjust-pos
(lambda (i)
(define b buffer)
(- i (fx- end-pos (if (direct-bstr b) (direct-pos b) pos))))]
[buffer-adjust-pos
(lambda (i)
(define b buffer)
(- i (fx- end-pos (if (direct-bstr b) (direct-pos b) pos))))]
;; in atomic mode
[default-buffer-mode
;; in atomic mode
[default-buffer-mode
(case-lambda
[() buffer-mode]
[(mode) (set! buffer-mode mode)])])
[(mode) (set! buffer-mode mode)])]
(private
;; in atomic mode
[pull-some-bytes
(lambda ([amt (if (eq? 'block buffer-mode) (bytes-length bstr) 1)] [offset 0] [init-pos 0])
(define get-end (min (+ amt offset) (bytes-length bstr)))
(define v (send peek-via-read-input-port this read-in/inner bstr offset get-end #f))
(cond
[(eof-object? v)
(set! peeked-eof? #t)
eof]
[(evt? v) v]
[(eqv? v 0) 0]
[else
(set! pos init-pos)
(set! end-pos (fx+ offset v))
v]))]
#:private
;; in atomic mode
[pull-some-bytes
(lambda ([amt (if (eq? 'block buffer-mode) (bytes-length bstr) 1)] [offset 0] [init-pos 0])
(define get-end (min (+ amt offset) (bytes-length bstr)))
(define v (send peek-via-read-input-port this read-in/inner bstr offset get-end #f))
(cond
[(eof-object? v)
(set! peeked-eof? #t)
eof]
[(evt? v) v]
[(eqv? v 0) 0]
[else
(set! pos init-pos)
(set! end-pos (fx+ offset v))
v]))]
;; in atomic mode
[pull-more-bytes
(lambda (amt)
(cond
[(end-pos . fx< . (bytes-length bstr))
;; add to end of buffer
(pull-some-bytes amt end-pos pos)]
[(fx= pos 0)
;; extend buffer
(define new-bstr (make-bytes (fx* 2 (bytes-length bstr))))
(bytes-copy! new-bstr 0 bstr 0 end-pos)
(set! bstr new-bstr)
(pull-some-bytes amt end-pos)]
[else
;; shift to start of buffer and retry
(bytes-copy! bstr 0 bstr pos end-pos)
(set! end-pos (fx- end-pos pos))
(set! pos 0)
(pull-more-bytes amt)]))]
;; in atomic mode
[pull-more-bytes
(lambda (amt)
(cond
[(end-pos . fx< . (bytes-length bstr))
;; add to end of buffer
(pull-some-bytes amt end-pos pos)]
[(fx= pos 0)
;; extend buffer
(define new-bstr (make-bytes (fx* 2 (bytes-length bstr))))
(bytes-copy! new-bstr 0 bstr 0 end-pos)
(set! bstr new-bstr)
(pull-some-bytes amt end-pos)]
[else
;; shift to start of buffer and retry
(bytes-copy! bstr 0 bstr pos end-pos)
(set! end-pos (fx- end-pos pos))
(set! pos 0)
(pull-more-bytes amt)]))]
;; in atomic mode
[retry-pull?
(lambda (v)
(and (integer? v) (not (eqv? v 0))))]
;; in atomic mode
[retry-pull?
(lambda (v)
(and (integer? v) (not (eqv? v 0))))]
;; in atomic mode
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(define b buffer)
(set-direct-bstr! b bstr)
(define s pos)
(set-direct-pos! b s)
(set-direct-end! b end-pos)
;; in atomic mode
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(define b buffer)
(set-direct-bstr! b bstr)
(define s pos)
(set-direct-pos! b s)
(set-direct-end! b end-pos)
(define o offset)
(when o
(set! offset (- (+ o amt) s))))]
;; in atomic mode
[slow-mode!
(lambda ()
(define b buffer)
(when (direct-bstr b)
(define s (direct-pos b))
(define o offset)
(when o
(set! offset (- (+ o amt) s))))]
(set! offset (+ o s)))
(set! pos s)
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b))))]
;; in atomic mode
[slow-mode!
(lambda ()
#:override
;; in atomic mode
[prepare-change
(lambda ()
(pause-waiting-commit))]
;; in atomic mode
[read-in
(lambda (dest-bstr start end copy?)
(slow-mode!)
(let try-again ()
(cond
[(pos . fx< . end-pos)
(define amt (min (fx- end-pos pos) (fx- end start)))
(bytes-copy! dest-bstr start bstr pos (fx+ pos amt))
(set! pos (fx+ pos amt))
(progress!)
(fast-mode! amt)
amt]
[peeked-eof?
(set! peeked-eof? #f)
;; an EOF doesn't count as progress
eof]
[else
(cond
[(and (eq? 'block buffer-mode)
(fx< (fx- end start) (fxrshift (bytes-length bstr) 1)))
(define v (pull-some-bytes))
(cond
[(or (eqv? v 0) (evt? v)) v]
[else (try-again)])]
[else
(define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?))
(unless (eqv? v 0)
(progress!))
v])])))]
;; in atomic mode
[peek-in
(lambda (dest-bstr start end skip progress-evt copy?)
(let try-again ()
(cond
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[else
(define b buffer)
(define s (if (direct-bstr b) (direct-pos b) pos))
(define peeked-amt (fx- end-pos s))
(cond
[(peeked-amt . > . skip)
(define amt (min (fx- peeked-amt skip) (fx- end start)))
(define s-pos (fx+ s skip))
(bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt))
amt]
[peeked-eof?
eof]
[else
(slow-mode!)
(define v (pull-more-bytes (+ (- skip peeked-amt) (fx- end start))))
(if (retry-pull? v)
(try-again)
v)])])))]
;; in atomic mode
[byte-ready
(lambda (work-done!)
(let loop ()
(define b buffer)
(when (direct-bstr b)
(define s (direct-pos b))
(define o offset)
(when o
(set! offset (+ o s)))
(set! pos s)
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b))))])
(define peeked-amt (fx- end-pos (if (direct-bstr b) (direct-pos b) pos)))
(cond
[(peeked-amt . fx> . 0) #t]
[peeked-eof? #t]
[else
(slow-mode!)
(define v (pull-some-bytes))
(work-done!)
(cond
[(retry-pull? v)
(loop)]
[(evt? v) v]
[else
(not (eqv? v 0))])])))]
(override
;; in atomic mode
[prepare-change
(lambda ()
(pause-waiting-commit))]
;; in atomic mode
[read-in
(lambda (dest-bstr start end copy?)
(slow-mode!)
(let try-again ()
(cond
[(pos . fx< . end-pos)
(define amt (min (fx- end-pos pos) (fx- end start)))
(bytes-copy! dest-bstr start bstr pos (fx+ pos amt))
(set! pos (fx+ pos amt))
(progress!)
(fast-mode! amt)
amt]
[peeked-eof?
(set! peeked-eof? #f)
;; an EOF doesn't count as progress
eof]
[else
(cond
[(and (eq? 'block buffer-mode)
(fx< (fx- end start) (fxrshift (bytes-length bstr) 1)))
(define v (pull-some-bytes))
(cond
[(or (eqv? v 0) (evt? v)) v]
[else (try-again)])]
[else
(define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?))
(unless (eqv? v 0)
(progress!))
v])])))]
[get-progress-evt
(lambda ()
(atomically
(slow-mode!)
(make-progress-evt)))]
;; in atomic mode
[peek-in
(lambda (dest-bstr start end skip progress-evt copy?)
(let try-again ()
(cond
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[else
(define b buffer)
(define s (if (direct-bstr b) (direct-pos b) pos))
(define peeked-amt (fx- end-pos s))
(cond
[(peeked-amt . > . skip)
(define amt (min (fx- peeked-amt skip) (fx- end start)))
(define s-pos (fx+ s skip))
(bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt))
amt]
[peeked-eof?
eof]
[else
(slow-mode!)
(define v (pull-more-bytes (+ (- skip peeked-amt) (fx- end start))))
(if (retry-pull? v)
(try-again)
v)])])))]
;; in atomic mode
[commit
(lambda (amt progress-evt ext-evt finish)
(slow-mode!)
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (fxmin amt (fx- end-pos pos))])
(cond
[(fx= 0 amt)
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt))
(set! pos (fx+ pos amt))
(progress!)
(finish dest-bstr)])))))]
;; in atomic mode
[byte-ready
(lambda (work-done!)
(let loop ()
(define b buffer)
(define peeked-amt (fx- end-pos (if (direct-bstr b) (direct-pos b) pos)))
(cond
[(peeked-amt . fx> . 0) #t]
[peeked-eof? #t]
[else
(slow-mode!)
(define v (pull-some-bytes))
(work-done!)
(cond
[(retry-pull? v)
(loop)]
[(evt? v) v]
[else
(not (eqv? v 0))])])))]
;; in atomic mode
[buffer-mode
(case-lambda
[() (default-buffer-mode)]
[(mode) (default-buffer-mode mode)])]
[get-progress-evt
(lambda ()
(atomically
(slow-mode!)
(make-progress-evt)))]
;; in atomic mode
[commit
(lambda (amt progress-evt ext-evt finish)
(slow-mode!)
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (fxmin amt (fx- end-pos pos))])
(cond
[(fx= 0 amt)
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt))
(set! pos (fx+ pos amt))
(progress!)
(finish dest-bstr)])))))]
;; in atomic mode
[buffer-mode
(case-lambda
[() (default-buffer-mode)]
[(mode) (default-buffer-mode mode)])]
;; in atomic mode
[close
(lambda ()
(close-peek-buffer))]))
;; in atomic mode
[close
(lambda ()
(close-peek-buffer))])

View File

@ -54,96 +54,94 @@
;; ----------------------------------------
(class pipe-data
(field
[bstr #""]
[len 0]
[limit 0]
[peeked-amt 0] ; peeked but not yet read, effectively extends `limit`
[start 0]
[end 0]
[input-ref #f] ; #f => closed
[output-ref #f] ; #f => closed
[input-buffer #f]
[output-buffer #f]
[read-ready-sema #f]
[write-ready-sema #f]
[more-read-ready-sema #f] ; for lookahead peeks
[read-ready-evt #f]
[write-ready-evt #f])
#:field
[bstr #""]
[len 0]
[limit 0]
[peeked-amt 0] ; peeked but not yet read, effectively extends `limit`
[start 0]
[end 0]
[input-ref #f] ; #f => closed
[output-ref #f] ; #f => closed
[input-buffer #f]
[output-buffer #f]
[read-ready-sema #f]
[write-ready-sema #f]
[more-read-ready-sema #f] ; for lookahead peeks
[read-ready-evt #f]
[write-ready-evt #f]
(private)
;; in atomic mode for all static methods
;; All static methods in atomic mode.
(static
;; sync local fields with input buffer without implying slow mode
[sync-input
(lambda ()
(define b input-buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(set! start (if (fx= pos len)
0
pos))))]
;; sync local fields with output buffer without implying slow mode
[sync-output
(lambda ()
(define b output-buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(set! end (if (fx= pos len)
0
pos))))]
#:static
;; sync local fields with input buffer without implying slow mode
[sync-input
(lambda ()
(define b input-buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(set! start (if (fx= pos len)
0
pos))))]
;; sync local fields with output buffer without implying slow mode
[sync-output
(lambda ()
(define b output-buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(set! end (if (fx= pos len)
0
pos))))]
[sync-both
(lambda ()
(sync-input)
(sync-output))]
;; assumes sync'ed
[content-length
(lambda ()
(define s start)
(define e end)
(if (s . fx<= . e)
(fx- e s)
(fx+ e (fx- len s))))]
[sync-both
(lambda ()
(sync-input)
(sync-output))]
;; assumes sync'ed
[input-empty?
(lambda ()
(fx= start end))]
;; assumes sync'ed
[content-length
(lambda ()
(define s start)
(define e end)
(if (s . fx<= . e)
(fx- e s)
(fx+ e (fx- len s))))]
;; assumes sync'ed
[output-full?
(lambda ()
(define l limit)
(and l
((content-length) . >= . (+ l peeked-amt))))]
;; assumes sync'ed
[input-empty?
(lambda ()
(fx= start end))]
;; Used before read:
[check-output-unblocking
(lambda ()
(when write-ready-sema
(semaphore-post write-ready-sema)
(set! write-ready-sema #f)))]
;; assumes sync'ed
[output-full?
(lambda ()
(define l limit)
(and l
((content-length) . >= . (+ l peeked-amt))))]
;; Used before write:
[check-input-unblocking
(lambda ()
(when read-ready-sema
(semaphore-post read-ready-sema)
(set! read-ready-sema #f))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema)
(set! more-read-ready-sema #f)))]
;; Used before read:
[check-output-unblocking
(lambda ()
(when write-ready-sema
(semaphore-post write-ready-sema)
(set! write-ready-sema #f)))]
;; Used before write:
[check-input-unblocking
(lambda ()
(when read-ready-sema
(semaphore-post read-ready-sema)
(set! read-ready-sema #f))
(when more-read-ready-sema
(semaphore-post more-read-ready-sema)
(set! more-read-ready-sema #f)))]
;; Used after peeking:
[peeked!
(lambda (amt)
(when (amt . > . peeked-amt)
(check-output-unblocking)
(set! peeked-amt amt)))]))
;; Used after peeking:
[peeked!
(lambda (amt)
(when (amt . > . peeked-amt)
(check-output-unblocking)
(set! peeked-amt amt)))])
(define (make-ref v) (make-weak-box v))
(define (ref-value r) (weak-box-value r))
@ -151,351 +149,352 @@
;; ----------------------------------------
(class pipe-input-port #:extends commit-input-port
(field
[d #f]) ; pipe-data
(private
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(with-object pipe-data d
(define s start)
(define e end)
(unless (fx= s e)
(define b buffer)
(set-direct-bstr! b bstr)
(set-direct-pos! b s)
(set-direct-end! b (if (s . fx< . e) e len))
(define o offset)
(when o
(set! offset (- (+ o amt) s))))))]
[slow-mode!
(lambda ()
(with-object pipe-data d
#:field
[d #f] ; pipe-data
#:private
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(with-object pipe-data d
(define s start)
(define e end)
(unless (fx= s e)
(define b buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(define o offset)
(when o
(set! offset (+ o pos)))
(set! start (if (fx= pos len) 0 pos))
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b)))
(sync-output)))])
(set-direct-bstr! b bstr)
(set-direct-pos! b s)
(set-direct-end! b (if (s . fx< . e) e len))
(define o offset)
(when o
(set! offset (- (+ o amt) s))))))]
(static
[on-resize
(lambda ()
(slow-mode!))]
[on-output-full
(lambda ()
(slow-mode!))])
[slow-mode!
(lambda ()
(with-object pipe-data d
(define b buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(define o offset)
(when o
(set! offset (+ o pos)))
(set! start (if (fx= pos len) 0 pos))
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b)))
(sync-output)))]
(override
[prepare-change
(lambda ()
(with-object pipe-data d
(pause-waiting-commit)))]
#:static
[on-resize
(lambda ()
(slow-mode!))]
[on-output-full
(lambda ()
(slow-mode!))]
[read-in
(lambda (dest-bstr dest-start dest-end copy?)
(assert-atomic)
(slow-mode!)
(with-object pipe-data d
(cond
[(input-empty?)
(if output-ref
read-ready-evt
eof)]
[else
(check-output-unblocking)
(define s start)
(define e end)
(define amt
(cond
[(s . fx< . e)
(define amt (fxmin (fx- dest-end dest-start)
(fx- e s)))
(bytes-copy! dest-bstr dest-start bstr s (fx+ s amt))
(set! start (fx+ s amt))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len s)))
(bytes-copy! dest-bstr dest-start bstr s (fx+ s amt))
(set! start (modulo (fx+ s amt) len))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt]))
(progress!)
(fast-mode! amt)
amt])))]
#:override
[prepare-change
(lambda ()
(with-object pipe-data d
(pause-waiting-commit)))]
[peek-in
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(with-object pipe-data d
(assert-atomic)
(sync-both)
(define content-amt (content-length))
(cond
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[(content-amt . <= . skip)
[read-in
(lambda (dest-bstr dest-start dest-end copy?)
(assert-atomic)
(slow-mode!)
(with-object pipe-data d
(cond
[(input-empty?)
(if output-ref
read-ready-evt
eof)]
[else
(check-output-unblocking)
(define s start)
(define e end)
(define amt
(cond
[(not output-ref) eof]
[else
(unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore))
(define out (ref-value output-ref))
(when out
(send pipe-output-port out on-need-more-ready)))
(define evt (if (zero? skip)
read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema)
(lambda (v) 0))))
evt])]
[else
(define peek-start (fxmodulo (fx+ start skip) len))
(cond
[(peek-start . fx< . end)
[(s . fx< . e)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
(fx- e s)))
(bytes-copy! dest-bstr dest-start bstr s (fx+ s amt))
(set! start (fx+ s amt))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt])])))]
(fx- len s)))
(bytes-copy! dest-bstr dest-start bstr s (fx+ s amt))
(set! start (modulo (fx+ s amt) len))
(set! peeked-amt (fxmax 0 (fx- peeked-amt amt)))
amt]))
(progress!)
(fast-mode! amt)
amt])))]
[byte-ready
(lambda (work-done!)
[peek-in
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(with-object pipe-data d
(assert-atomic)
(with-object pipe-data d
(or (not output-ref)
(begin
(sync-both)
(not (fx= 0 (content-length)))))))]
[close
(lambda ()
(with-object pipe-data d
(when input-ref
(slow-mode!)
(set! input-ref #f)
(progress!))))]
[get-progress-evt
(lambda ()
(atomically
(with-object pipe-data d
(cond
[(not input-ref) always-evt]
[else
(slow-mode!)
(make-progress-evt)]))))]
[commit
;; Allows `amt` to be zero and #f for other arguments,
;; which is helpful for `open-input-peek-via-read`.
(lambda (amt progress-evt ext-evt finish)
(assert-atomic)
;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt`
;; is constrained; we can send them over to different threads
(sync-both)
(define content-amt (content-length))
(cond
[(zero? amt)
(progress!)]
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[(content-amt . <= . skip)
(cond
[(not output-ref) eof]
[else
(unless (or (zero? skip) more-read-ready-sema)
(set! more-read-ready-sema (make-semaphore))
(define out (ref-value output-ref))
(when out
(send pipe-output-port out on-need-more-ready)))
(define evt (if (zero? skip)
read-ready-evt
(wrap-evt (semaphore-peek-evt more-read-ready-sema)
(lambda (v) 0))))
evt])]
[else
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(with-object pipe-data d
(slow-mode!)
(let ([amt (min amt (content-length))])
(cond
[(fx= 0 amt)
;; There was nothing to commit; claim success for 0 bytes
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(define s start)
(define e end)
(cond
[(s . fx< . e)
(bytes-copy! dest-bstr 0 bstr s (fx+ s amt))]
[else
(define amt1 (fxmin (fx- len s) amt))
(bytes-copy! dest-bstr 0 bstr s (fx+ s amt1))
(when (amt1 . fx< . amt)
(bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))])
(set! start (fxmodulo (fx+ s amt) len))
(progress!)
(fast-mode! amt)
(finish dest-bstr)])))))]))]
(define peek-start (fxmodulo (fx+ start skip) len))
(cond
[(peek-start . fx< . end)
(define amt (fxmin (fx- dest-end dest-start)
(fx- end peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt]
[else
(define amt (fxmin (fx- dest-end dest-start)
(fx- len peek-start)))
(bytes-copy! dest-bstr dest-start bstr peek-start (fx+ peek-start amt))
(peeked! (+ skip amt))
amt])])))]
[count-lines!
(lambda ()
(slow-mode!))]))
[byte-ready
(lambda (work-done!)
(assert-atomic)
(with-object pipe-data d
(or (not output-ref)
(begin
(sync-both)
(not (fx= 0 (content-length)))))))]
[close
(lambda ()
(with-object pipe-data d
(when input-ref
(slow-mode!)
(set! input-ref #f)
(progress!))))]
[get-progress-evt
(lambda ()
(atomically
(with-object pipe-data d
(cond
[(not input-ref) always-evt]
[else
(slow-mode!)
(make-progress-evt)]))))]
[commit
;; Allows `amt` to be zero and #f for other arguments,
;; which is helpful for `open-input-peek-via-read`.
(lambda (amt progress-evt ext-evt finish)
(assert-atomic)
;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt`
;; is constrained; we can send them over to different threads
(cond
[(zero? amt)
(progress!)]
[else
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(with-object pipe-data d
(slow-mode!)
(let ([amt (min amt (content-length))])
(cond
[(fx= 0 amt)
;; There was nothing to commit; claim success for 0 bytes
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(define s start)
(define e end)
(cond
[(s . fx< . e)
(bytes-copy! dest-bstr 0 bstr s (fx+ s amt))]
[else
(define amt1 (fxmin (fx- len s) amt))
(bytes-copy! dest-bstr 0 bstr s (fx+ s amt1))
(when (amt1 . fx< . amt)
(bytes-copy! dest-bstr amt1 bstr 0 (fx- amt amt1)))])
(set! start (fxmodulo (fx+ s amt) len))
(progress!)
(fast-mode! amt)
(finish dest-bstr)])))))]))]
[count-lines!
(lambda ()
(slow-mode!))])
;; ----------------------------------------
(class pipe-output-port #:extends core-output-port
(field
[d d]) ; pipe-data
#:field
[d d] ; pipe-data
(private
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(with-object pipe-data d
(define lim limit)
(define avail (and lim (- lim (content-length))))
(when (or (not avail) (avail . <= . 0))
(define s start)
(define e end)
(define b buffer)
(set-direct-bstr! b bstr)
(set-direct-pos! b e)
(set-direct-end! b (let ([end (if (s . fx<= . e)
(if (fx= s 0)
(fx- len 1)
len)
(fx- s 1))])
(if (and avail
((fx- end e) . > . avail))
(fx+ e avail)
end)))
(define o offset)
(when o
(set! offset (- (+ o amt) e))))))]
[slow-mode!
(lambda ()
(with-object pipe-data d
#:private
[fast-mode!
(lambda (amt) ; amt = not yet added to `offset`
(with-object pipe-data d
(define lim limit)
(define avail (and lim (- lim (content-length))))
(when (or (not avail) (avail . <= . 0))
(define s start)
(define e end)
(define b buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(define o offset)
(when o
(set! offset (+ o pos)))
(set! end (if (fx= pos len) 0 pos))
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b)))
(sync-input)))])
(set-direct-bstr! b bstr)
(set-direct-pos! b e)
(set-direct-end! b (let ([end (if (s . fx<= . e)
(if (fx= s 0)
(fx- len 1)
len)
(fx- s 1))])
(if (and avail
((fx- end e) . > . avail))
(fx+ e avail)
end)))
(define o offset)
(when o
(set! offset (- (+ o amt) e))))))]
(static
[on-input-empty
(lambda ()
(slow-mode!))]
[on-need-more-ready
(lambda ()
(slow-mode!))])
[slow-mode!
(lambda ()
(with-object pipe-data d
(define b buffer)
(when (direct-bstr b)
(define pos (direct-pos b))
(define o offset)
(when o
(set! offset (+ o pos)))
(set! end (if (fx= pos len) 0 pos))
(set-direct-bstr! b #f)
(set-direct-pos! b (direct-end b)))
(sync-input)))]
(override
[write-out
;; in atomic mode
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(slow-mode!)
(with-object pipe-data d
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(define in (ref-value input-ref))
(when in
(send pipe-input-port in on-resize))
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
#:static
[on-input-empty
(lambda ()
(slow-mode!))]
[on-need-more-ready
(lambda ()
(slow-mode!))]
#:override
[write-out
;; in atomic mode
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(slow-mode!)
(with-object pipe-data d
(let try-again ()
(define top-pos (if (fx= start 0)
(fx- len 1)
len))
(define (maybe-grow)
(cond
[(fx= src-start src-end) ;; => flush
0]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
[(or (not limit)
((+ limit peeked-amt) . > . (fx- len 1)))
;; grow pipe size
(define in (ref-value input-ref))
(when in
(send pipe-input-port in on-resize))
(define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2))))
(cond
[(fx= amt 0) (pipe-is-full)]
[(fx= 0 start)
(bytes-copy! new-bstr 0 bstr 0 (fx- len 1))]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(fast-mode! amt)
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(fast-mode! amt)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(fast-mode! amt)
amt])]
[else
(maybe-grow)]))))]
(bytes-copy! new-bstr 0 bstr start len)
(bytes-copy! new-bstr (fx- len start) bstr 0 end)
(set! start 0)
(set! end (fx- len 1))])
(set! bstr new-bstr)
(set! len (bytes-length new-bstr))
(try-again)]
[else (pipe-is-full)]))
(define (pipe-is-full)
(wrap-evt write-ready-evt (lambda (v) #f)))
(define (apply-limit amt)
(if limit
(min amt (- (+ limit peeked-amt) (content-length)))
amt))
(cond
[(fx= src-start src-end) ;; => flush
0]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(let ([new-end (fx+ end amt)])
(set! end (if (fx= new-end len) 0 new-end)))
(fast-mode! amt)
amt])]
[(fx= end top-pos)
(cond
[(fx= start 0)
(maybe-grow)]
[else
(define amt (fxmin (fx- start 1)
(fx- src-end src-start)))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt))
(set! end amt)
(fast-mode! amt)
amt])])]
[(end . fx< . (fx- start 1))
(define amt (apply-limit (fxmin (fx- (fx- start 1) end)
(fx- src-end src-start))))
(cond
[(fx= amt 0) (pipe-is-full)]
[else
(check-input-unblocking)
(bytes-copy! bstr end src-bstr src-start (fx+ src-start amt))
(set! end (fx+ end amt))
(fast-mode! amt)
amt])]
[else
(maybe-grow)]))))]
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! out v bstr start)))]
[get-write-evt
(get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! out v bstr start)))]
[close
;; in atomic mode
(lambda ()
(with-object pipe-data d
(when output-ref
(slow-mode!)
(set! output-ref #f)
(check-input-unblocking))))]))
[close
;; in atomic mode
(lambda ()
(with-object pipe-data d
(when output-ref
(slow-mode!)
(set! output-ref #f)
(check-input-unblocking))))])
;; ----------------------------------------
(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe])
(define len (min+1 limit 16))
(define d (new pipe-data
#:field
[bstr (make-bytes len)]
[len len]
[limit limit]))
@ -506,9 +505,11 @@
(define read-ready-evt (pipe-read-poller d))
(define input (new pipe-input-port
#:field
[name input-name]
[d d]))
(define output (new pipe-output-port
#:field
[name output-name]
[evt write-ready-evt]
[d d]))
@ -525,10 +526,8 @@
(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe])
(check who #:or-false exact-positive-integer? limit)
(define-values (ip op) (make-pipe-ends limit input-name output-name))
(when (port-count-lines-enabled)
(port-count-lines! ip)
(port-count-lines! op))
(values ip op))
(values (finish-port/count ip)
(finish-port/count op)))
;; ----------------------------------------

View File

@ -9,36 +9,60 @@
(struct-out location)
get-core-port-offset)
;; Port class hierarchy
;; - with "virtual" in square brackets
;; - with per-instance in curly braces
;;
;; [core]
;; |
;; ,------------------------------,
;; [input] [output]
;; | |
;; ,---------, ,---------------------------------,
;; | | | | | | |
;; [commit] {custom} {custom} pipe bytes fd max nowhere
;; | (when peek |
;; | provided) tcp
;; -------------------------,
;; ' | |
;; bytes [peek-via-read] pipe
;; |
;; ,-------,
;; | |
;; fd {custom}
;; | (when no peek provided)
;; tcp
(class core-port
(field
[name 'port #:immutable] ; anything, reported as `object-name` for the port
#:field
[name 'port #:immutable] ; anything, reported as `object-name` for the port
[data #f] ; FIXME: remove after all uses are converted
;; When `(direct-bstr buffer)` is not #f, it enables a shortcut for
;; reading and writing, where `(direct-pos buffer)` must also be
;; less than `(direct-end buffer)` for the shortcut to apply. The
;; shortcut is not necessarily always taken, just if it is used,
;; the `(direct-pos buffer)` position can be adjusted and the
;; port's methods must adapt accordingly. The `(direct-bstr
;; buffer)` and `(direct-end buffer)` fields are modified only by
;; the port's methods, however.
;;
;; For an input port, shortcut mode implies that `prepare-change`
;; does not need to be called, and no checking is needed for whether
;; the port is closed.
;;
;; A non-#f `(direct-bstr buffer)` further implies that
;; `(direct-pos buffer)` should be added to `offset` to get the
;; true offset.
[buffer (direct #f 0 0)]
[closed? #f]
[closed-sema #f]
[offset 0] ; count plain bytes; add `(- buffer-pos buffer-start)`
[count #f]) ; #f or a `location`
;; When `(direct-bstr buffer)` is not #f, it enables a shortcut for
;; reading and writing, where `(direct-pos buffer)` must also be
;; less than `(direct-end buffer)` for the shortcut to apply. The
;; shortcut is not necessarily always taken, but if it is used, the
;; `(direct-pos buffer)` position can be adjusted and the port's
;; methods must adapt accordingly. The `(direct-bstr buffer)` and
;; `(direct-end buffer)` fields are modified only by the port's
;; methods, however.
;;
;; Shortcut mode implies that the port is still open, so no checking
;; is needed for whether the port is closed.
;;
;; For an input port, shortcut mode implies that `prepare-change`
;; does not need to be called.
;;
;; A non-#f `(direct-bstr buffer)` further implies that
;; `(direct-pos buffer)` should be added to `offset` to get the
;; true offset.
[buffer (direct #f 0 0)]
[closed? #f]
[closed-sema #f] ; created on demand
[offset 0] ; count plain bytes; add `(- buffer-pos buffer-start)`
[count #f] ; #f or a `location`
;; Various methods below are called in atomic mode. The intent of
;; atomic mode is to ensure that the completion and return of the
;; function is atomic with respect to some further activity, such
@ -48,39 +72,39 @@
;; the burden of re-checking for a closed port. Leave atomic mode
;; explicitly before raising an exception.
(public
;; -*> (void)
;; Called in atomic mode.
;; Reqeusts a close, and the port is closed if/when
;; the method returns.
[close (lambda () (void))]
#:public
;; -*> (void)
;; Called in atomic mode.
;; Reqeusts a close, and the port is closed if/when
;; the method returns.
[close (lambda () (void))]
;; #f or (-*> (void))
;; Called in atomic mode.
;; Notifies the port that line counting is enabled, and
;; `get-location` can be called afterward (if it is defined)
[count-lines! #f]
;; #f or (-*> (void))
;; Called in atomic mode.
;; Notifies the port that line counting is enabled, and
;; `get-location` can be called afterward (if it is defined)
[count-lines! #f]
;; #f or (-*> (values line-or-#f column-or-#f position-or-#f))
;; Called in atomic mode.
;; Returns the location of the next character. If #f, this method
;; is implemented externally.
[get-location #f] ; #f or method called in atomic mode
;; #f or (-*> (values line-or-#f column-or-#f position-or-#f))
;; Called in atomic mode.
;; Returns the location of the next character. If #f, this method
;; is implemented externally.
[get-location #f] ; #f or method called in atomic mode
;; #f or (U (-*> position-k) (position-k -*> (void))
;; Called in atomic mode.
;; If not #f, the port implements `file-position`.
[file-position #f]
;; #f or (U (-*> position-k) (position-k -*> (void))
;; Called in atomic mode.
;; If not #f, the port implements `file-position`.
[file-position #f]
;; #f or (U (-*> mode-sym) (mode-sym -*> (void))
;; Called in atomic mode.
;; If not #f, the port implements buffer-mode selection.
[buffer-mode #f])
;; #f or (U (-*> mode-sym) (mode-sym -*> (void))
;; Called in atomic mode.
;; If not #f, the port implements buffer-mode selection.
[buffer-mode #f]
(property
[prop:unsafe-authentic-override #t] ; allow evt chaperone
[prop:object-name (struct-field-index name)]
[prop:secondary-evt port->evt]))
#:property
[prop:unsafe-authentic-override #t] ; allow evt chaperone
[prop:object-name (struct-field-index name)]
[prop:secondary-evt port->evt])
(struct direct ([bstr #:mutable]
[pos #:mutable]