io: convert fd-input-port to object style

This commit is contained in:
Matthew Flatt 2019-02-12 09:37:14 -07:00
parent 95083d6add
commit e3b00715bc
4 changed files with 207 additions and 147 deletions

View File

@ -764,6 +764,7 @@
;; ---------------------------------------- ;; ----------------------------------------
'read-string
(time (time
(let loop ([j 10]) (let loop ([j 10])
(unless (zero? j) (unless (zero? j)

View File

@ -1,5 +1,6 @@
#lang racket/base #lang racket/base
(require "../host/rktio.rkt" (require "../common/class.rkt"
"../host/rktio.rkt"
"../host/error.rkt" "../host/error.rkt"
"../host/thread.rkt" "../host/thread.rkt"
"../host/pthread.rkt" "../host/pthread.rkt"
@ -56,6 +57,43 @@
;; ---------------------------------------- ;; ----------------------------------------
(class fd-input-port #:extends peek-via-read-input-port
(field
[fd #f]
[fd-refcount #f]
[custodian-reference #f]
[on-close on-close]
[network-error? #f])
(override
[read-in/inner
(lambda (dest-bstr start end copy?)
(define n (rktio_read_in rktio fd dest-bstr start end))
(cond
[(rktio-error? n)
(end-atomic)
(if network-error?
(raise-network-error #f n "error reading from stream port")
(raise-filesystem-error #f n "error reading from stream port"))]
[(eqv? n RKTIO_READ_EOF) eof]
[(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this)
(lambda (v) 0))]
[else n]))]
[close
(lambda ()
(on-close)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference)
(close-peek-buffer))]
[file-position
(case-lambda
[() (do-file-position fd (lambda (pos) (buffer-adjust-pos pos)))]
[(pos) (do-file-position fd (lambda () (purge-buffer)) pos)])]))
;; ----------------------------------------
;; in atomic mode ;; in atomic mode
;; Current custodian must not be shut down. ;; Current custodian must not be shut down.
(define (open-input-fd fd name (define (open-input-fd fd name
@ -65,40 +103,15 @@
#:custodian [cust (current-custodian)] #:custodian [cust (current-custodian)]
#:file-stream? [file-stream? #t] #:file-stream? [file-stream? #t]
#:network-error? [network-error? #f]) #:network-error? [network-error? #f])
(define-values (port buffer-control) (define p (new fd-input-port
(open-input-peek-via-read [name name]
#:name name [data (fd-data fd extra-data #t file-stream?)]
#:data (fd-data fd extra-data #t file-stream?) [fd fd]
#:self #f [fd-refcount fd-refcount]
#:read-in [on-close on-close]
;; in atomic mode [network-error? network-error?]))
(lambda (self dest-bstr start end copy?) (set-fd-input-port-custodian-reference! p (register-fd-close cust fd fd-refcount #f p))
(define n (rktio_read_in rktio fd dest-bstr start end)) p)
(cond
[(rktio-error? n)
(end-atomic)
(if network-error?
(raise-network-error #f n "error reading from stream port")
(raise-filesystem-error #f n "error reading from stream port"))]
[(eqv? n RKTIO_READ_EOF) eof]
[(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ port)
(lambda (v) 0))]
[else n]))
#:read-is-atomic? #t
#:close
;; in atomic mode
(lambda (self)
(on-close)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference))
#:file-position (make-file-position
fd
(case-lambda
[() (buffer-control)]
[(pos) (buffer-control pos)]))))
(define custodian-reference
(register-fd-close cust fd fd-refcount #f port))
port)
;; ---------------------------------------- ;; ----------------------------------------
@ -281,10 +294,10 @@
;; ---------------------------------------- ;; ----------------------------------------
(define (make-file-position fd buffer-control) ;; in atomic mode
;; in atomic mode (define do-file-position
(case-lambda (case-lambda
[(self) [(fd buffer-control)
(define ppos (rktio_get_file_position rktio fd)) (define ppos (rktio_get_file_position rktio fd))
(cond (cond
[(rktio-error? ppos) [(rktio-error? ppos)
@ -294,7 +307,7 @@
(define pos (rktio_filesize_ref ppos)) (define pos (rktio_filesize_ref ppos))
(rktio_free ppos) (rktio_free ppos)
(buffer-control pos)])] (buffer-control pos)])]
[(self pos) [(fd buffer-control pos)
(buffer-control) (buffer-control)
(define r (define r
(rktio_set_file_position rktio (rktio_set_file_position rktio
@ -309,6 +322,11 @@
(end-atomic) (end-atomic)
(raise-rktio-error 'file-position r "error setting stream position"))])) (raise-rktio-error 'file-position r "error setting stream position"))]))
(define (make-file-position fd buffer-control)
(case-lambda
[(self) (do-file-position fd buffer-control)]
[(self pos) (do-file-position fd buffer-control pos)]))
;; ---------------------------------------- ;; ----------------------------------------
(struct fd-evt (fd mode [closed #:mutable]) (struct fd-evt (fd mode [closed #:mutable])

View File

@ -13,18 +13,12 @@
(class peek-via-read-input-port #:extends commit-input-port (class peek-via-read-input-port #:extends commit-input-port
(field (field
[bstr #""] [bstr (make-bytes 4096)]
[pos 0] [pos 0]
[end-pos 0] [end-pos 0]
[peeked-eof? #f] [peeked-eof? #f]
[buffer-mode 'block]) [buffer-mode 'block])
(override
[prepare-change
(lambda ()
(when commit-manager
(commit-manager-pause commit-manager)))])
(public (public
;; in atomic mode; must override ;; in atomic mode; must override
[read-in/inner [read-in/inner
@ -35,9 +29,19 @@
;; in atomic mode ;; in atomic mode
[purge-buffer [purge-buffer
(lambda () (lambda ()
(slow-mode!)
(set! pos 0) (set! pos 0)
(set! end-pos 0) (set! end-pos 0)
(set! peeked-eof? #f))]) (set! peeked-eof? #f))]
[close-peek-buffer
(lambda ()
(purge-buffer)
(set! bstr #""))]
[buffer-adjust-pos
(lambda (i)
(- i (fx- end-pos (if buffer buffer-pos pos))))])
(private (private
;; in atomic mode ;; in atomic mode
@ -74,117 +78,154 @@
(bytes-copy! bstr 0 bstr pos end-pos) (bytes-copy! bstr 0 bstr pos end-pos)
(set! end-pos (fx- end-pos pos)) (set! end-pos (fx- end-pos pos))
(set! pos 0) (set! pos 0)
(pull-more-bytes)]))] (pull-more-bytes amt)]))]
;; in atomic mode ;; in atomic mode
[retry-pull? [retry-pull?
(lambda (v) (lambda (v)
(and (integer? v) (not (eqv? v 0))))]) (and (integer? v) (not (eqv? v 0))))]
;; in atomic mode
[fast-mode!
(lambda (amt) ; amt = not yet added to `count`
(unless count
(set! buffer bstr)
(define s pos)
(set! buffer-pos s)
(set! buffer-end end-pos)
(define o offset)
(when o
(set! offset (- (+ o amt) s)))))]
;; in atomic mode
[slow-mode!
(lambda ()
(when buffer
(define s buffer-pos)
(define o offset)
(when o
(set! offset (+ o s)))
(set! pos s)
(set! buffer #f)
(set! buffer-pos buffer-end)))])
(override (override
;; in atomic mode ;; in atomic mode
[read-in [prepare-change
(lambda (dest-bstr start end copy?) (lambda ()
(let try-again () (pause-waiting-commit))]
(cond
[(pos . fx< . end-pos) ;; in atomic mode
(define amt (min (fx- end-pos pos) (fx- end start))) [read-in
(bytes-copy! dest-bstr start bstr pos (fx+ pos amt)) (lambda (dest-bstr start end copy?)
amt] (slow-mode!)
[peeked-eof? (let try-again ()
(set! peeked-eof? #f) (cond
;; an EOF doesn't count as progress [(pos . fx< . end-pos)
eof] (define amt (min (fx- end-pos pos) (fx- end start)))
[else (bytes-copy! dest-bstr start bstr pos (fx+ pos amt))
(cond (set! pos (fx+ pos amt))
[(and (fx< (fx- end start) (bytes-length bstr)) (progress!)
(eq? 'block buffer-mode)) (fast-mode! amt)
(define v (pull-some-bytes)) amt]
(cond [peeked-eof?
[(or (eqv? v 0) (evt? v)) v] (set! peeked-eof? #f)
[else (try-again)])] ;; an EOF doesn't count as progress
[else eof]
(define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?)) [else
(unless (eqv? v 0) (cond
(progress!)) [(and (eq? 'block buffer-mode)
v])])))] (fx< (fx- end start) (fxrshift (bytes-length bstr) 1)))
(define v (pull-some-bytes))
(cond
[(or (eqv? v 0) (evt? v)) v]
[else (try-again)])]
[else
(define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?))
(unless (eqv? v 0)
(progress!))
v])])))]
;; in atomic mode ;; in atomic mode
[peek-in [peek-in
(lambda (dest-bstr start end skip progress-evt copy?) (lambda (dest-bstr start end skip progress-evt copy?)
(let try-again () (let try-again ()
(cond (cond
[(and progress-evt [(and progress-evt
(sync/timeout 0 progress-evt)) (sync/timeout 0 progress-evt))
#f] #f]
[else [else
(define peeked-amt (fx- end-pos pos)) (define s (if buffer buffer-pos pos))
(cond (define peeked-amt (fx- end-pos s))
[(peeked-amt . > . skip) (cond
(define amt (min (fx- peeked-amt skip) (fx- end start))) [(peeked-amt . > . skip)
(define s-pos (fx+ pos skip)) (define amt (min (fx- peeked-amt skip) (fx- end start)))
(bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt)) (define s-pos (fx+ s skip))
amt] (bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt))
[peeked-eof? amt]
eof] [peeked-eof?
[else eof]
(define v (pull-more-bytes (- skip peeked-amt))) [else
(if (retry-pull? v) (slow-mode!)
(try-again) (define v (pull-more-bytes (+ (- skip peeked-amt) (fx- end start))))
v)])])))] (if (retry-pull? v)
(try-again)
v)])])))]
;; in atomic mode ;; in atomic mode
[byte-ready [byte-ready
(lambda (work-done!) (lambda (work-done!)
(let loop () (let loop ()
(define peeked-amt (fx- end-pos pos)) (define peeked-amt (fx- end-pos (if buffer buffer-pos pos)))
(cond (cond
[(peeked-amt . fx> . 0) #t] [(peeked-amt . fx> . 0) #t]
[peeked-eof? #t] [peeked-eof? #t]
[else [else
(define v (pull-some-bytes)) (slow-mode!)
(work-done!) (define v (pull-some-bytes))
(cond (work-done!)
[(retry-pull? v) (cond
(loop)] [(retry-pull? v)
[(evt? v) v] (loop)]
[else [(evt? v) v]
(not (eqv? v 0))])])))] [else
(not (eqv? v 0))])])))]
[get-progress-evt [get-progress-evt
(lambda () (lambda ()
(atomically (atomically
(make-progress-evt)))] (slow-mode!)
(make-progress-evt)))]
;; in atomic mode ;; in atomic mode
[commit [commit
(lambda (amt progress-evt ext-evt finish) (lambda (amt progress-evt ext-evt finish)
(wait-commit (slow-mode!)
progress-evt ext-evt (wait-commit
;; in atomic mode, maybe in a different thread: progress-evt ext-evt
(lambda () ;; in atomic mode, maybe in a different thread:
(let ([amt (fxmin amt (fx- end-pos pos))]) (lambda ()
(cond (let ([amt (fxmin amt (fx- end-pos pos))])
[(fx= 0 amt) (cond
(finish #"")] [(fx= 0 amt)
[else (finish #"")]
(define dest-bstr (make-bytes amt)) [else
(bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt)) (define dest-bstr (make-bytes amt))
(set! pos (fx+ pos amt)) (bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt))
(progress!) (set! pos (fx+ pos amt))
(finish dest-bstr)])))))] (progress!)
(finish dest-bstr)])))))]
;; in atomic mode ;; in atomic mode
[buffer-mode [buffer-mode
(case-lambda (case-lambda
[(self) buffer-mode] [(self) buffer-mode]
[(self mode) (set! buffer-mode mode)])] [(self mode) (set! buffer-mode mode)])]
;; in atomic mode ;; in atomic mode
[close [close
(lambda () (lambda ()
(purge-buffer) (close-peek-buffer))]))
(set! bstr #""))]))
;; ---------------------------------------- ;; ----------------------------------------

View File

@ -127,7 +127,7 @@
(private (private
[fast-mode! [fast-mode!
(lambda (amt) (lambda (amt) ; amt = not yet added to `count`
(unless (or count buffer) (unless (or count buffer)
(with-object pipe-data d (with-object pipe-data d
(define s start) (define s start)