io: add peek-via-read layer as prep for fd and custom ports

This commit is contained in:
Matthew Flatt 2019-02-12 07:47:36 -07:00
parent 35ceb8e3b3
commit 95083d6add
5 changed files with 361 additions and 120 deletions

View File

@ -6,7 +6,10 @@
;; A class system that is somewhat similar to `racket/class`, but
;; completely first order, with its structure nature exposed, and
;; where the notion of "method" is flexible to allow non-procedures in
;; the vtable.
;; the vtable. The run-time componention of a full expansion is
;; efficient, but beware that there are various quadratic factors in
;; intermediate expansions and compile-time data. There should be more
;; checks to make sure that method declarations are distinct, etc.
;;
;; <class-defn> = (class <class-id> <clause> ...)
;; | (class <class-id> #:extends <class-id> <clause> ...)
@ -17,15 +20,24 @@
;; | (static [<method-id> <method>] ...) ; not in vtable
;; | (property [<property-expr> <val-expr>] ...)
;; <method> = #f
;; | (lambda (<id> ...) <expr> ...+)
;; | (case-lambda [(<id> ...) <expr> ...+] ...)
;; | (lambda <formals> <expr> ...+)
;; | (case-lambda [<formals> <expr> ...+] ...)
;; | <expr> ; must have explicit `self`, etc.
;;
;; A <class-id> and its <field>s behave as if they are in
;; a `struct` declaration where `create-<class-id>` is the
;; constructor, but an extra `vtable` field is added to
;; the start of a class's structure if it has no superclass.
;; The `#:authentic` option is added implicitly.
;; A <class-id> and its <field>s behave as if they are in a `struct`
;; declaration where `create-<class-id>` is the constructor, but an
;; extra `vtable` field is added to the start of a class's structure
;; if it has no superclass. The `#:authentic` option is added
;; implicitly. The `property` clause supplies additional structure
;; type properties.
;;
;; A `public` method is one that can be overridden with `override` or
;; called via `send`. A `private` or `static` method cannot be
;; overridden, and a `private` method cannot be called via `send`. Bot
;; `private` and `static` methods can be called directly like
;; functions within another method (but `public` methods cannot be
;; called that way, and that restriction is intended to discourange
;; unnecessary indirections through methods that can be overridden).
;;
;; Normally, use
;; (new <class-id> [<field-id> <expr] ...)
@ -138,9 +150,13 @@
[((local-id local-expr) ...) locals]
[((static-id static-expr) ...) statics]
[(local-tmp-id ...) (generate-temporaries locals)]
[(static-tmp-id ...) (generate-temporaries statics)])
[(static-tmp-id ...) (generate-temporaries statics)]
[((parent-static-id parent-static-tmp-id) ...) (if super-ci
(class-info-statics super-ci)
null)])
(with-syntax ([local-bindings #'[([field-id field-accessor-id field-mutator-maybe-id] ...)
([local-id local-tmp-id] ... [static-id static-tmp-id] ...)]])
([local-id local-tmp-id] ... [static-id static-tmp-id] ...
[parent-static-id parent-static-tmp-id] ...)]])
(define wrapped-new-methods
(for/list ([new-method (in-list new-methods)])
(syntax-case new-method ()
@ -234,6 +250,8 @@
(quote-syntax method-accessor-id))
...)
(list (list (quote-syntax static-id) (quote-syntax static-tmp-id))
...
(list (quote-syntax parent-static-id) (quote-syntax parent-static-tmp-id))
...))))))))
(define-syntax (bind-locals-in-body stx)
@ -243,23 +261,34 @@
(with-syntax ([(_ _ orig) stx])
#'(bind-locals-in-body locals form orig))]
[(_ locals expr) #'expr]
[(_ locals ctx (lambda (arg ...) body0 body ...))
#'(bind-locals-in-body locals ctx (case-lambda [(arg ...) body0 body ...]))]
[(_ locals ctx (lambda args body0 body ...))
#'(bind-locals-in-body locals ctx (case-lambda [args body0 body ...]))]
[(_ locals ctx (case-lambda clause ...))
(with-syntax ([(new-clause ...)
(for/list ([clause (in-list (syntax->list #'(clause ...)))])
(syntax-case clause ()
[[(arg ...) body0 body ...]
(with-syntax ([(arg-tmp ...) (generate-temporaries #'(arg ...))])
#'[(this-id arg-tmp ...)
(syntax-parameterize ([this (make-rename-transformer #'this-id)])
(bind-locals
locals
this-id ctx
(let-syntax ([arg (make-rename-transformer #'arg-tmp)] ...)
body0 body ...)))])]))])
(syntax/loc (syntax-case stx () [(_ _ _ rhs) #'rhs])
(case-lambda new-clause ...)))]
(let ([new-clauses
(for/list ([clause (in-list (syntax->list #'(clause ...)))])
(syntax-case clause ()
[[args body0 body ...]
(with-syntax ([(arg-id ...) (extract-arg-ids #'args)])
(with-syntax ([(arg-tmp ...) (generate-temporaries #'(arg-id ...))])
(with-syntax ([tmp-args (substitute-arg-ids #'args (syntax->list #'(arg-tmp ...))
#'this-id #'locals #'ctx)])
#'[(this-id . tmp-args)
(syntax-parameterize ([this (make-rename-transformer #'this-id)])
(bind-locals
locals
this-id ctx
(let-syntax ([arg-id (make-rename-transformer #'arg-tmp)] ...)
body0 body ...)))])))]))])
(define rhs (syntax-case stx () [(_ _ _ rhs) #'rhs]))
(cond
[(= 1 (length new-clauses))
(with-syntax ([new-clause (car new-clauses)])
(syntax/loc rhs
(lambda . new-clause)))]
[else
(with-syntax ([(new-clause ...) new-clauses])
(syntax/loc rhs
(case-lambda new-clause ...)))]))]
[(_ locals _ expr)
#'expr]))
@ -410,6 +439,55 @@
[(_ #f) #f]
[(_ e) (quote-syntax e)])))
(define-for-syntax (extract-arg-ids args)
(let loop ([args args])
(syntax-case args ()
[() null]
[id
(identifier? #'id)
(list #'id)]
[(id . rest)
(identifier? #'id)
(cons #'id (loop #'rest))]
[(kw . rest)
(keyword? (syntax-e #'kw))
(loop #'rest)]
[([id val-expr] . rest)
(cons #'id (loop #'rest))])))
(define-for-syntax (substitute-arg-ids args tmp-ids this-id locals ctx)
(let loop ([args args] [tmp-ids tmp-ids] [done-ids '()] [done-tmp-ids '()])
(syntax-case args ()
[() null]
[id
(identifier? #'id)
(car tmp-ids)]
[(id . rest)
(identifier? #'id)
(cons (car tmp-ids) (loop #'rest (cdr tmp-ids)
(cons #'id done-ids)
(cons (car tmp-ids) done-tmp-ids)))]
[(kw . rest)
(keyword? (syntax-e #'kw))
(cons #'kw (loop #'rest tmp-ids done-ids done-tmp-ids))]
[([id val-expr] . rest)
(let ([val-expr
(with-syntax ([this-id this-id]
[locals locals]
[ctx ctx]
[(done-id ...) done-ids]
[(done-tmp-id ...) done-tmp-ids])
#'(syntax-parameterize ([this (make-rename-transformer #'this-id)])
(bind-locals
locals
this-id ctx
(let-syntax ([done-id (make-rename-transformer #'done-tmp-id)] ...)
val-expr))))])
(cons (list (car tmp-ids) val-expr)
(loop #'rest (cdr tmp-ids)
(cons #'id done-ids)
(cons (car tmp-ids) done-tmp-ids))))])))
;; ----------------------------------------
(module+ test
@ -420,10 +498,11 @@
(private
[other (lambda (q) (list q this))])
(static
[enbox (lambda (v) (box (vector a v)))])
[enbox (lambda (v #:opt [opt (vector v a)])
(box (vector a v opt)))])
(public
[q #f]
[m (lambda (z) (list a (other b)))]
[m (lambda (z #:maybe [maybe 9]) (list a (other b) maybe))]
[n (lambda (x y z) (vector a b (enbox x) y z))]))
(class sub #:extends example
@ -439,12 +518,13 @@
(define ex (new example [b 5]))
(send example ex m 'ok)
(send example ex m 'ok #:maybe 'yep)
(method example ex m)
(new sub [d 5])
(send example (new sub) m 'more)
(set-example-b! ex 6)
(send example ex enbox 88)
(send example ex enbox 88 #:opt 'given)
(define ex2 (new example
#:override

View File

@ -9,7 +9,7 @@
"output-port.rkt"
"bytes-input.rkt"
"count.rkt"
"commit-manager.rkt")
"commit-port.rkt")
(provide open-input-bytes
open-output-bytes
@ -23,47 +23,13 @@
(port-count-lines! p))
p)
(class bytes-input-port #:extends core-input-port
(class bytes-input-port #:extends commit-input-port
(field
[progress-sema #f]
[commit-manager #f]
[bstr #f] ; normally installed as buffer
[pos 0] ; used when bstr is not installed as buffer
[alt-pos #f])
(private
;; in atomic mode
[progress!
(lambda ()
(when progress-sema
(semaphore-post progress-sema)
(set! progress-sema #f)))]
;; in atomic mode [can leave atomic mode temporarily]
;; After this function returns, complete any commit-changing work
;; before leaving atomic mode again.
[pause-waiting-commit
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))]
;; in atomic mode [can leave atomic mode temporarily]
[wait-commit
(lambda (progress-evt ext-evt finish)
(cond
[(and (not commit-manager)
;; Try shortcut:
(not (sync/timeout 0 progress-evt))
(sync/timeout 0 ext-evt))
(finish)
#t]
[else
;; General case to support blocking and potentially multiple
;; commiting threads:
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))]
;; in atomic mode
[in-buffer-pos
(lambda ()
@ -140,19 +106,16 @@
[get-progress-evt
(lambda ()
(define new-sema
(or progress-sema
(let ([sema (make-semaphore)])
(set! progress-sema sema)
;; set port to slow mode:
(when buffer
(define i buffer-pos)
(set! pos i)
(set! offset i)
(set! buffer #f)
(set! buffer-pos buffer-end))
sema)))
(semaphore-peek-evt new-sema))]
(atomically
(unless progress-sema
;; set port to slow mode:
(when buffer
(define i buffer-pos)
(set! pos i)
(set! offset i)
(set! buffer #f)
(set! buffer-pos buffer-end)))
(make-progress-evt)))]
[commit
(lambda (amt progress-evt ext-evt finish)

View File

@ -0,0 +1,53 @@
#lang racket/base
(require racket/fixnum
"../common/class.rkt"
"port.rkt"
"input-port.rkt"
"commit-manager.rkt")
(provide commit-input-port)
(class commit-input-port #:extends core-input-port
(field
[progress-sema #f]
[commit-manager #f])
(static
;; in atomic mode
[progress!
(lambda ()
(when progress-sema
(semaphore-post progress-sema)
(set! progress-sema #f)))]
;; in atomic mode [can leave atomic mode temporarily]
;; After this function returns, complete any commit-changing work
;; before leaving atomic mode again.
[pause-waiting-commit
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))]
;; in atomic mode [can leave atomic mode temporarily]
[wait-commit
(lambda (progress-evt ext-evt finish)
(cond
[(and (not commit-manager)
;; Try shortcut:
(not (sync/timeout 0 progress-evt))
(sync/timeout 0 ext-evt))
(finish)
#t]
[else
;; General case to support blocking and potentially multiple
;; commiting threads:
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))]
;; in atomic mode
[make-progress-evt
(lambda ()
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema))]))

View File

@ -1,12 +1,192 @@
#lang racket/base
(require "../common/class.rkt"
(require racket/fixnum
"../common/class.rkt"
"../host/thread.rkt"
"port.rkt"
"input-port.rkt"
"output-port.rkt"
"pipe.rkt")
"pipe.rkt"
"commit-port.rkt")
(provide open-input-peek-via-read)
(provide peek-via-read-input-port
open-input-peek-via-read)
(class peek-via-read-input-port #:extends commit-input-port
(field
[bstr #""]
[pos 0]
[end-pos 0]
[peeked-eof? #f]
[buffer-mode 'block])
(override
[prepare-change
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))])
(public
;; in atomic mode; must override
[read-in/inner
(lambda (dest-bstr start end copy?)
0)])
(static
;; in atomic mode
[purge-buffer
(lambda ()
(set! pos 0)
(set! end-pos 0)
(set! peeked-eof? #f))])
(private
;; in atomic mode
[pull-some-bytes
(lambda ([amt (if (eq? 'block buffer-mode) (bytes-length bstr) 1)] [offset 0] [init-pos 0])
(define get-end (min (+ amt offset) (bytes-length bstr)))
(define v (send peek-via-read-input-port this read-in/inner bstr offset get-end #f))
(cond
[(eof-object? v)
(set! peeked-eof? #t)
eof]
[(evt? v) v]
[(eqv? v 0) 0]
[else
(set! pos init-pos)
(set! end-pos (fx+ offset v))
v]))]
;; in atomic mode
[pull-more-bytes
(lambda (amt)
(cond
[(end-pos . fx< . (bytes-length bstr))
;; add to end of buffer
(pull-some-bytes amt end-pos pos)]
[(fx= pos 0)
;; extend buffer
(define new-bstr (make-bytes (fx* 2 (bytes-length bstr))))
(bytes-copy! new-bstr 0 bstr 0 end-pos)
(set! bstr new-bstr)
(pull-some-bytes amt end-pos)]
[else
;; shift to start of buffer and retry
(bytes-copy! bstr 0 bstr pos end-pos)
(set! end-pos (fx- end-pos pos))
(set! pos 0)
(pull-more-bytes)]))]
;; in atomic mode
[retry-pull?
(lambda (v)
(and (integer? v) (not (eqv? v 0))))])
(override
;; in atomic mode
[read-in
(lambda (dest-bstr start end copy?)
(let try-again ()
(cond
[(pos . fx< . end-pos)
(define amt (min (fx- end-pos pos) (fx- end start)))
(bytes-copy! dest-bstr start bstr pos (fx+ pos amt))
amt]
[peeked-eof?
(set! peeked-eof? #f)
;; an EOF doesn't count as progress
eof]
[else
(cond
[(and (fx< (fx- end start) (bytes-length bstr))
(eq? 'block buffer-mode))
(define v (pull-some-bytes))
(cond
[(or (eqv? v 0) (evt? v)) v]
[else (try-again)])]
[else
(define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?))
(unless (eqv? v 0)
(progress!))
v])])))]
;; in atomic mode
[peek-in
(lambda (dest-bstr start end skip progress-evt copy?)
(let try-again ()
(cond
[(and progress-evt
(sync/timeout 0 progress-evt))
#f]
[else
(define peeked-amt (fx- end-pos pos))
(cond
[(peeked-amt . > . skip)
(define amt (min (fx- peeked-amt skip) (fx- end start)))
(define s-pos (fx+ pos skip))
(bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt))
amt]
[peeked-eof?
eof]
[else
(define v (pull-more-bytes (- skip peeked-amt)))
(if (retry-pull? v)
(try-again)
v)])])))]
;; in atomic mode
[byte-ready
(lambda (work-done!)
(let loop ()
(define peeked-amt (fx- end-pos pos))
(cond
[(peeked-amt . fx> . 0) #t]
[peeked-eof? #t]
[else
(define v (pull-some-bytes))
(work-done!)
(cond
[(retry-pull? v)
(loop)]
[(evt? v) v]
[else
(not (eqv? v 0))])])))]
[get-progress-evt
(lambda ()
(atomically
(make-progress-evt)))]
;; in atomic mode
[commit
(lambda (amt progress-evt ext-evt finish)
(wait-commit
progress-evt ext-evt
;; in atomic mode, maybe in a different thread:
(lambda ()
(let ([amt (fxmin amt (fx- end-pos pos))])
(cond
[(fx= 0 amt)
(finish #"")]
[else
(define dest-bstr (make-bytes amt))
(bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt))
(set! pos (fx+ pos amt))
(progress!)
(finish dest-bstr)])))))]
;; in atomic mode
[buffer-mode
(case-lambda
[(self) buffer-mode]
[(self mode) (set! buffer-mode mode)])]
;; in atomic mode
[close
(lambda ()
(purge-buffer)
(set! bstr #""))]))
;; ----------------------------------------
(define (open-input-peek-via-read #:name name
#:self next-self

View File

@ -7,7 +7,7 @@
"input-port.rkt"
"output-port.rkt"
"count.rkt"
"commit-manager.rkt")
"commit-port.rkt")
(provide make-pipe
make-pipe-ends
@ -121,44 +121,11 @@
;; ----------------------------------------
(class pipe-input-port #:extends core-input-port
(class pipe-input-port #:extends commit-input-port
(field
[d #f] ; pipe-data
[progress-sema #f]
[commit-manager #f])
[d #f]) ; pipe-data
(private
[progress!
(lambda ()
(when progress-sema
(semaphore-post progress-sema)
(set! progress-sema #f)))]
;; [can leave atomic mode temporarily]
;; After this function returns, complete any commit-changing work
;; before leaving atomic mode again.
[pause-waiting-commit
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))]
;; [can leave atomic mode temporarily]
[wait-commit
(lambda (progress-evt ext-evt finish)
(cond
[(and (not commit-manager)
;; Try shortcut:
(not (sync/timeout 0 progress-evt))
(sync/timeout 0 ext-evt))
(finish)
#t]
[else
;; General case to support blocking and potentially multiple
;; commiting threads:
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait commit-manager progress-evt ext-evt finish)]))]
[fast-mode!
(lambda (amt)
(unless (or count buffer)
@ -306,9 +273,7 @@
[(not input) always-evt]
[else
(slow-mode!)
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema)]))))]
(make-progress-evt)]))))]
[commit
;; Allows `amt` to be zero and #f for other arguments,