io: convert fd-output-port to object style

This commit is contained in:
Matthew Flatt 2019-02-12 10:27:43 -07:00
parent c28a0f45dc
commit 6e85165b3c
2 changed files with 176 additions and 153 deletions

View File

@ -863,7 +863,7 @@
(thread-resume mgr-th (current-thread)) (thread-resume mgr-th (current-thread))
(channel-put mgr-ch (list* what ch nack req-sfx)) (channel-put mgr-ch (list* what ch nack req-sfx))
(wrap-evt ch (lambda (x) (wrap-evt ch (lambda (x)
(if (eq? x 'close) (if (eq? x 'closed)
(raise-mismatch-error 'write-evt "port is closed: " out) (raise-mismatch-error 'write-evt "port is closed: " out)
x))))))))) x)))))))))
(define (resume-mgr) (define (resume-mgr)

View File

@ -1,5 +1,6 @@
#lang racket/base #lang racket/base
(require "../common/class.rkt" (require racket/fixnum
"../common/class.rkt"
"../host/rktio.rkt" "../host/rktio.rkt"
"../host/error.rkt" "../host/error.rkt"
"../host/thread.rkt" "../host/thread.rkt"
@ -62,7 +63,7 @@
[fd #f] [fd #f]
[fd-refcount #f] [fd-refcount #f]
[custodian-reference #f] [custodian-reference #f]
[on-close on-close] [on-close void]
[network-error? #f]) [network-error? #f])
(override (override
@ -89,8 +90,12 @@
[file-position [file-position
(case-lambda (case-lambda
[() (do-file-position fd (lambda (pos) (buffer-adjust-pos pos)))] [()
[(pos) (do-file-position fd (lambda () (purge-buffer)) pos)])])) (define pos (get-file-position fd))
(and pos (buffer-adjust-pos pos))]
[(pos)
(purge-buffer)
(set-file-position fd pos)])]))
;; ---------------------------------------- ;; ----------------------------------------
@ -115,105 +120,89 @@
;; ---------------------------------------- ;; ----------------------------------------
;; in atomic mode (class fd-output-port #:extends core-output-port
;; Current custodian must not be shut down. (field
(define (open-output-fd fd name [fd fd]
#:extra-data [extra-data #f] [fd-refcount #f]
#:buffer-mode [buffer-mode 'infer] [bstr (make-bytes 4096)]
#:fd-refcount [fd-refcount (box 1)] [start-pos 0]
#:on-close [on-close void] [end-pos 0]
#:plumber [plumber (current-plumber)] [flush-handle #f]
#:custodian [cust (current-custodian)] [buffer-mode 'block]
#:file-stream? [file-stream? #t] [custodian-reference #f]
#:network-error? [network-error? #f]) [on-close void]
(define buffer (make-bytes 4096)) [network-error? network-error?])
(define buffer-start 0)
(define buffer-end 0)
(define flush-handle
(plumber-add-flush! plumber
(lambda (h)
(atomically
(flush-buffer-fully #f)))))
(when (eq? buffer-mode 'infer) (private
(if (rktio_fd_is_terminal rktio fd) ;; in atomic mode
(set! buffer-mode 'line) ;; Returns `#t` if the buffer is already or successfully flushed
(set! buffer-mode 'block))) [flush-buffer
(lambda ()
(cond
[(not (fx= start-pos end-pos))
(define n (rktio_write_in rktio fd bstr start-pos end-pos))
(cond
[(rktio-error? n)
(end-atomic)
(if network-error?
(raise-network-error #f n "error writing to stream port")
(raise-filesystem-error #f n "error writing to stream port"))]
[(fx= n 0)
#f]
[else
(define new-start-pos (fx+ start-pos n))
(cond
[(fx= new-start-pos end-pos)
(set! start-pos 0)
(set! end-pos 0)
#t]
[else
(set! start-pos new-start-pos)
#f])])]
[else #t]))]
(define evt (fd-evt fd RKTIO_POLL_WRITE #f)) ;; in atomic mode, but may leave it temporarily
[flush-buffer-fully
;; in atomic mode (lambda (enable-break?)
;; Returns `#t` if the buffer is already or successfully flushed (let loop ()
(define (flush-buffer) (unless (flush-buffer)
(cond
[(not (= buffer-start buffer-end))
(define n (rktio_write_in rktio fd buffer buffer-start buffer-end))
(cond
[(rktio-error? n)
(end-atomic) (end-atomic)
(if network-error? (if enable-break?
(raise-network-error #f n "error writing to stream port") (sync/enable-break evt)
(raise-filesystem-error #f n "error writing to stream port"))] (sync evt))
[(zero? n) (start-atomic)
#f] (when bstr ; in case it was closed
[else (loop)))))]
(define new-buffer-start (+ buffer-start n))
(cond
[(= new-buffer-start buffer-end)
(set! buffer-start 0)
(set! buffer-end 0)
#t]
[else
(set! buffer-start new-buffer-start)
#f])])]
[else #t]))
;; in atomic mode ;; in atomic mode, but may leave it temporarily
(define (flush-buffer-fully enable-break?) [flush-buffer-fully-if-newline
(let loop () (lambda (src-bstr src-start src-end enable-break?)
(unless (flush-buffer) (for ([b (in-bytes src-bstr src-start src-end)])
(end-atomic) (define newline? (or (eqv? b (char->integer #\newline))
(if enable-break? (eqv? b (char->integer #\return))))
(sync/enable-break evt) (when newline? (flush-buffer-fully enable-break?))
(sync evt)) #:break newline?
(start-atomic) (void)))])
(when buffer ; in case it was closed
(loop)))))
;; in atomic mode (static
(define (flush-buffer-fully-if-newline src-bstr src-start src-end enable-break?) [flush-buffer/external
(for ([b (in-bytes src-bstr src-start src-end)]) (lambda ()
(define newline? (or (eqv? b (char->integer #\newline)) (flush-buffer-fully #f))])
(eqv? b (char->integer #\return))))
(when newline? (flush-buffer-fully enable-break?))
#:break newline?
(void)))
(define port (override
(make-core-output-port ;; in atomic mode
#:name name [write-out
#:self #f (lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?)
#:data (fd-output-data fd extra-data #f file-stream?
;; Flush function needed for `file-truncate`:
(lambda ()
(atomically
(flush-buffer-fully #f))))
#:evt evt
#:write-out
;; in atomic mode
(lambda (self src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?)
(cond (cond
[(= src-start src-end) [(fx= src-start src-end)
;; Flush request ;; Flush request
(and (flush-buffer) 0)] (and (flush-buffer) 0)]
[(and (not (eq? buffer-mode 'none)) [(and (not (eq? buffer-mode 'none))
(not nonbuffer/nonblock?) (not nonbuffer/nonblock?)
(< buffer-end (bytes-length buffer))) (fx< end-pos (bytes-length bstr)))
(define amt (min (- src-end src-start) (- (bytes-length buffer) buffer-end))) (define amt (fxmin (fx- src-end src-start) (fx- (bytes-length bstr) end-pos)))
(bytes-copy! buffer buffer-end src-bstr src-start (+ src-start amt)) (bytes-copy! bstr end-pos src-bstr src-start (fx+ src-start amt))
(set! buffer-end (+ buffer-end amt)) (set! end-pos (fx+ end-pos amt))
(unless nonbuffer/nonblock? (unless nonbuffer/nonblock?
(when (eq? buffer-mode 'line) (when (eq? buffer-mode 'line)
;; can temporarily leave atomic mode: ;; can temporarily leave atomic mode:
@ -229,46 +218,86 @@
(if network-error? (if network-error?
(raise-network-error #f n "error writing to stream port") (raise-network-error #f n "error writing to stream port")
(raise-filesystem-error #f n "error writing to stream port"))] (raise-filesystem-error #f n "error writing to stream port"))]
[(zero? n) (wrap-evt evt (lambda (v) #f))] [(fx= n 0) (wrap-evt evt (lambda (v) #f))]
[else n])])) [else n])]))]
#:count-write-evt-via-write-out [get-write-evt
(lambda (self port v bstr start) (get-write-evt-via-write-out (lambda (out v bstr start)
(port-count! port v bstr start)) (port-count! out v bstr start)))]
#:close ;; in atomic mode
;; in atomic mode [close
(lambda (self) (lambda ()
(flush-buffer-fully #f) ; can temporarily leave atomic mode (flush-buffer-fully #f) ; can temporarily leave atomic mode
(when buffer ; <- in case a concurrent close succeeded (when bstr ; <- in case a concurrent close succeeded
(on-close) (on-close)
(plumber-flush-handle-remove! flush-handle) (plumber-flush-handle-remove! flush-handle)
(set! buffer #f) (set! bstr #f)
(fd-close fd fd-refcount) (fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference))) (unsafe-custodian-unregister fd custodian-reference)))]
#:file-position (make-file-position ;; in atomic mode
fd [file-position
;; in atomic mode (case-lambda
(case-lambda [()
[() (define pos (get-file-position fd))
(flush-buffer-fully #f) (and pos (+ pos (fx- end-pos start-pos)))]
;; flushing can leave atomic mode, so make sure the [(pos)
;; port is still open before continuing (flush-buffer-fully #f)
(unless buffer ;; flushing can leave atomic mode, so make sure the
(check-not-closed 'file-position port))] ;; port is still open before continuing
[(pos) (unless bstr
(+ pos (- buffer-end buffer-start))])) (check-not-closed 'file-position this))
#:buffer-mode (case-lambda (set-file-position fd pos)])]
[(self) buffer-mode]
[(self mode) (set! buffer-mode mode)])))
(define custodian-reference ;; in atomic mode
(register-fd-close cust fd fd-refcount flush-handle port)) [buffer-mode
(case-lambda
[(self) buffer-mode]
[(self mode) (set! buffer-mode mode)])]))
(set-fd-evt-closed! evt port) ;; ----------------------------------------
port) ;; in atomic mode
;; Current custodian must not be shut down.
(define (open-output-fd fd name
#:extra-data [extra-data #f]
#:buffer-mode [buffer-mode 'infer]
#:fd-refcount [fd-refcount (box 1)]
#:on-close [on-close void]
#:plumber [plumber (current-plumber)]
#:custodian [cust (current-custodian)]
#:file-stream? [file-stream? #t]
#:network-error? [network-error? #f])
(define evt (fd-evt fd RKTIO_POLL_WRITE #f))
(define p (new fd-output-port
[name name]
[evt evt]
[fd fd]
[fd-refcount fd-refcount]
[flush-handle #f]
[buffer-mode
(if (eq? buffer-mode 'infer)
(if (rktio_fd_is_terminal rktio fd)
(set! buffer-mode 'line)
(set! buffer-mode 'block))
buffer-mode)]
[on-close on-close]
[network-error? network-error?]
[data (fd-output-data fd extra-data #f file-stream?
;; Flush function needed for `file-truncate`:
(lambda ()
(atomically
(send fd-output-port p flush-buffer/external))))]))
(define flush-handle (plumber-add-flush! plumber
(lambda (h)
(atomically
(send fd-output-port p flush-buffer/external)))))
(define custodian-reference (register-fd-close cust fd fd-refcount flush-handle p))
(set-fd-output-port-flush-handle! p flush-handle)
(set-fd-output-port-custodian-reference! p custodian-reference)
(set-fd-evt-closed! evt p)
p)
;; ---------------------------------------- ;; ----------------------------------------
@ -295,37 +324,31 @@
;; ---------------------------------------- ;; ----------------------------------------
;; in atomic mode ;; in atomic mode
(define do-file-position (define (get-file-position fd)
(case-lambda (define ppos (rktio_get_file_position rktio fd))
[(fd buffer-control) (cond
(define ppos (rktio_get_file_position rktio fd)) [(rktio-error? ppos)
(cond ;; #f => not supported, so use port's own counter, instead
[(rktio-error? ppos) #f]
;; #f => not supported, so use port's own counter, instead [else
#f] (define pos (rktio_filesize_ref ppos))
[else (rktio_free ppos)
(define pos (rktio_filesize_ref ppos)) pos]))
(rktio_free ppos)
(buffer-control pos)])]
[(fd buffer-control pos)
(buffer-control)
(define r
(rktio_set_file_position rktio
fd
(if (eof-object? pos)
0
pos)
(if (eof-object? pos)
RKTIO_POSITION_FROM_END
RKTIO_POSITION_FROM_START)))
(when (rktio-error? r)
(end-atomic)
(raise-rktio-error 'file-position r "error setting stream position"))]))
(define (make-file-position fd buffer-control) ;; in atomic mode
(case-lambda (define (set-file-position fd pos)
[(self) (do-file-position fd buffer-control)] (define r
[(self pos) (do-file-position fd buffer-control pos)])) (rktio_set_file_position rktio
fd
(if (eof-object? pos)
0
pos)
(if (eof-object? pos)
RKTIO_POSITION_FROM_END
RKTIO_POSITION_FROM_START)))
(when (rktio-error? r)
(end-atomic)
(raise-rktio-error 'file-position r "error setting stream position")))
;; ---------------------------------------- ;; ----------------------------------------