From 6e85165b3ce1267292a917b959492e90ba7ff30f Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Tue, 12 Feb 2019 10:27:43 -0700 Subject: [PATCH] io: convert fd-output-port to object style --- racket/collects/racket/port.rkt | 2 +- racket/src/io/port/fd-port.rkt | 327 +++++++++++++++++--------------- 2 files changed, 176 insertions(+), 153 deletions(-) diff --git a/racket/collects/racket/port.rkt b/racket/collects/racket/port.rkt index 839bc9016f..e8ac4745a4 100644 --- a/racket/collects/racket/port.rkt +++ b/racket/collects/racket/port.rkt @@ -863,7 +863,7 @@ (thread-resume mgr-th (current-thread)) (channel-put mgr-ch (list* what ch nack req-sfx)) (wrap-evt ch (lambda (x) - (if (eq? x 'close) + (if (eq? x 'closed) (raise-mismatch-error 'write-evt "port is closed: " out) x))))))))) (define (resume-mgr) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 7c460b781b..bbd2f5cd23 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -1,5 +1,6 @@ #lang racket/base -(require "../common/class.rkt" +(require racket/fixnum + "../common/class.rkt" "../host/rktio.rkt" "../host/error.rkt" "../host/thread.rkt" @@ -62,7 +63,7 @@ [fd #f] [fd-refcount #f] [custodian-reference #f] - [on-close on-close] + [on-close void] [network-error? #f]) (override @@ -89,8 +90,12 @@ [file-position (case-lambda - [() (do-file-position fd (lambda (pos) (buffer-adjust-pos pos)))] - [(pos) (do-file-position fd (lambda () (purge-buffer)) pos)])])) + [() + (define pos (get-file-position fd)) + (and pos (buffer-adjust-pos pos))] + [(pos) + (purge-buffer) + (set-file-position fd pos)])])) ;; ---------------------------------------- @@ -115,105 +120,89 @@ ;; ---------------------------------------- -;; in atomic mode -;; Current custodian must not be shut down. -(define (open-output-fd fd name - #:extra-data [extra-data #f] - #:buffer-mode [buffer-mode 'infer] - #:fd-refcount [fd-refcount (box 1)] - #:on-close [on-close void] - #:plumber [plumber (current-plumber)] - #:custodian [cust (current-custodian)] - #:file-stream? [file-stream? #t] - #:network-error? [network-error? #f]) - (define buffer (make-bytes 4096)) - (define buffer-start 0) - (define buffer-end 0) - (define flush-handle - (plumber-add-flush! plumber - (lambda (h) - (atomically - (flush-buffer-fully #f))))) - - (when (eq? buffer-mode 'infer) - (if (rktio_fd_is_terminal rktio fd) - (set! buffer-mode 'line) - (set! buffer-mode 'block))) +(class fd-output-port #:extends core-output-port + (field + [fd fd] + [fd-refcount #f] + [bstr (make-bytes 4096)] + [start-pos 0] + [end-pos 0] + [flush-handle #f] + [buffer-mode 'block] + [custodian-reference #f] + [on-close void] + [network-error? network-error?]) - (define evt (fd-evt fd RKTIO_POLL_WRITE #f)) + (private + ;; in atomic mode + ;; Returns `#t` if the buffer is already or successfully flushed + [flush-buffer + (lambda () + (cond + [(not (fx= start-pos end-pos)) + (define n (rktio_write_in rktio fd bstr start-pos end-pos)) + (cond + [(rktio-error? n) + (end-atomic) + (if network-error? + (raise-network-error #f n "error writing to stream port") + (raise-filesystem-error #f n "error writing to stream port"))] + [(fx= n 0) + #f] + [else + (define new-start-pos (fx+ start-pos n)) + (cond + [(fx= new-start-pos end-pos) + (set! start-pos 0) + (set! end-pos 0) + #t] + [else + (set! start-pos new-start-pos) + #f])])] + [else #t]))] - ;; in atomic mode - ;; Returns `#t` if the buffer is already or successfully flushed - (define (flush-buffer) - (cond - [(not (= buffer-start buffer-end)) - (define n (rktio_write_in rktio fd buffer buffer-start buffer-end)) - (cond - [(rktio-error? n) + ;; in atomic mode, but may leave it temporarily + [flush-buffer-fully + (lambda (enable-break?) + (let loop () + (unless (flush-buffer) (end-atomic) - (if network-error? - (raise-network-error #f n "error writing to stream port") - (raise-filesystem-error #f n "error writing to stream port"))] - [(zero? n) - #f] - [else - (define new-buffer-start (+ buffer-start n)) - (cond - [(= new-buffer-start buffer-end) - (set! buffer-start 0) - (set! buffer-end 0) - #t] - [else - (set! buffer-start new-buffer-start) - #f])])] - [else #t])) + (if enable-break? + (sync/enable-break evt) + (sync evt)) + (start-atomic) + (when bstr ; in case it was closed + (loop)))))] - ;; in atomic mode - (define (flush-buffer-fully enable-break?) - (let loop () - (unless (flush-buffer) - (end-atomic) - (if enable-break? - (sync/enable-break evt) - (sync evt)) - (start-atomic) - (when buffer ; in case it was closed - (loop))))) + ;; in atomic mode, but may leave it temporarily + [flush-buffer-fully-if-newline + (lambda (src-bstr src-start src-end enable-break?) + (for ([b (in-bytes src-bstr src-start src-end)]) + (define newline? (or (eqv? b (char->integer #\newline)) + (eqv? b (char->integer #\return)))) + (when newline? (flush-buffer-fully enable-break?)) + #:break newline? + (void)))]) - ;; in atomic mode - (define (flush-buffer-fully-if-newline src-bstr src-start src-end enable-break?) - (for ([b (in-bytes src-bstr src-start src-end)]) - (define newline? (or (eqv? b (char->integer #\newline)) - (eqv? b (char->integer #\return)))) - (when newline? (flush-buffer-fully enable-break?)) - #:break newline? - (void))) + (static + [flush-buffer/external + (lambda () + (flush-buffer-fully #f))]) - (define port - (make-core-output-port - #:name name - #:self #f - #:data (fd-output-data fd extra-data #f file-stream? - ;; Flush function needed for `file-truncate`: - (lambda () - (atomically - (flush-buffer-fully #f)))) - - #:evt evt - - #:write-out - ;; in atomic mode - (lambda (self src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?) + (override + ;; in atomic mode + [write-out + (lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?) (cond - [(= src-start src-end) + [(fx= src-start src-end) ;; Flush request (and (flush-buffer) 0)] [(and (not (eq? buffer-mode 'none)) (not nonbuffer/nonblock?) - (< buffer-end (bytes-length buffer))) - (define amt (min (- src-end src-start) (- (bytes-length buffer) buffer-end))) - (bytes-copy! buffer buffer-end src-bstr src-start (+ src-start amt)) - (set! buffer-end (+ buffer-end amt)) + (fx< end-pos (bytes-length bstr))) + (define amt (fxmin (fx- src-end src-start) (fx- (bytes-length bstr) end-pos))) + (bytes-copy! bstr end-pos src-bstr src-start (fx+ src-start amt)) + (set! end-pos (fx+ end-pos amt)) (unless nonbuffer/nonblock? (when (eq? buffer-mode 'line) ;; can temporarily leave atomic mode: @@ -229,46 +218,86 @@ (if network-error? (raise-network-error #f n "error writing to stream port") (raise-filesystem-error #f n "error writing to stream port"))] - [(zero? n) (wrap-evt evt (lambda (v) #f))] - [else n])])) + [(fx= n 0) (wrap-evt evt (lambda (v) #f))] + [else n])]))] - #:count-write-evt-via-write-out - (lambda (self port v bstr start) - (port-count! port v bstr start)) + [get-write-evt + (get-write-evt-via-write-out (lambda (out v bstr start) + (port-count! out v bstr start)))] - #:close - ;; in atomic mode - (lambda (self) + ;; in atomic mode + [close + (lambda () (flush-buffer-fully #f) ; can temporarily leave atomic mode - (when buffer ; <- in case a concurrent close succeeded + (when bstr ; <- in case a concurrent close succeeded (on-close) (plumber-flush-handle-remove! flush-handle) - (set! buffer #f) + (set! bstr #f) (fd-close fd fd-refcount) - (unsafe-custodian-unregister fd custodian-reference))) + (unsafe-custodian-unregister fd custodian-reference)))] - #:file-position (make-file-position - fd - ;; in atomic mode - (case-lambda - [() - (flush-buffer-fully #f) - ;; flushing can leave atomic mode, so make sure the - ;; port is still open before continuing - (unless buffer - (check-not-closed 'file-position port))] - [(pos) - (+ pos (- buffer-end buffer-start))])) - #:buffer-mode (case-lambda - [(self) buffer-mode] - [(self mode) (set! buffer-mode mode)]))) + ;; in atomic mode + [file-position + (case-lambda + [() + (define pos (get-file-position fd)) + (and pos (+ pos (fx- end-pos start-pos)))] + [(pos) + (flush-buffer-fully #f) + ;; flushing can leave atomic mode, so make sure the + ;; port is still open before continuing + (unless bstr + (check-not-closed 'file-position this)) + (set-file-position fd pos)])] - (define custodian-reference - (register-fd-close cust fd fd-refcount flush-handle port)) + ;; in atomic mode + [buffer-mode + (case-lambda + [(self) buffer-mode] + [(self mode) (set! buffer-mode mode)])])) - (set-fd-evt-closed! evt port) +;; ---------------------------------------- - port) +;; in atomic mode +;; Current custodian must not be shut down. +(define (open-output-fd fd name + #:extra-data [extra-data #f] + #:buffer-mode [buffer-mode 'infer] + #:fd-refcount [fd-refcount (box 1)] + #:on-close [on-close void] + #:plumber [plumber (current-plumber)] + #:custodian [cust (current-custodian)] + #:file-stream? [file-stream? #t] + #:network-error? [network-error? #f]) + (define evt (fd-evt fd RKTIO_POLL_WRITE #f)) + (define p (new fd-output-port + [name name] + [evt evt] + [fd fd] + [fd-refcount fd-refcount] + [flush-handle #f] + [buffer-mode + (if (eq? buffer-mode 'infer) + (if (rktio_fd_is_terminal rktio fd) + (set! buffer-mode 'line) + (set! buffer-mode 'block)) + buffer-mode)] + [on-close on-close] + [network-error? network-error?] + [data (fd-output-data fd extra-data #f file-stream? + ;; Flush function needed for `file-truncate`: + (lambda () + (atomically + (send fd-output-port p flush-buffer/external))))])) + (define flush-handle (plumber-add-flush! plumber + (lambda (h) + (atomically + (send fd-output-port p flush-buffer/external))))) + (define custodian-reference (register-fd-close cust fd fd-refcount flush-handle p)) + (set-fd-output-port-flush-handle! p flush-handle) + (set-fd-output-port-custodian-reference! p custodian-reference) + (set-fd-evt-closed! evt p) + p) ;; ---------------------------------------- @@ -295,37 +324,31 @@ ;; ---------------------------------------- ;; in atomic mode -(define do-file-position - (case-lambda - [(fd buffer-control) - (define ppos (rktio_get_file_position rktio fd)) - (cond - [(rktio-error? ppos) - ;; #f => not supported, so use port's own counter, instead - #f] - [else - (define pos (rktio_filesize_ref ppos)) - (rktio_free ppos) - (buffer-control pos)])] - [(fd buffer-control pos) - (buffer-control) - (define r - (rktio_set_file_position rktio - fd - (if (eof-object? pos) - 0 - pos) - (if (eof-object? pos) - RKTIO_POSITION_FROM_END - RKTIO_POSITION_FROM_START))) - (when (rktio-error? r) - (end-atomic) - (raise-rktio-error 'file-position r "error setting stream position"))])) +(define (get-file-position fd) + (define ppos (rktio_get_file_position rktio fd)) + (cond + [(rktio-error? ppos) + ;; #f => not supported, so use port's own counter, instead + #f] + [else + (define pos (rktio_filesize_ref ppos)) + (rktio_free ppos) + pos])) -(define (make-file-position fd buffer-control) - (case-lambda - [(self) (do-file-position fd buffer-control)] - [(self pos) (do-file-position fd buffer-control pos)])) +;; in atomic mode +(define (set-file-position fd pos) + (define r + (rktio_set_file_position rktio + fd + (if (eof-object? pos) + 0 + pos) + (if (eof-object? pos) + RKTIO_POSITION_FROM_END + RKTIO_POSITION_FROM_START))) + (when (rktio-error? r) + (end-atomic) + (raise-rktio-error 'file-position r "error setting stream position"))) ;; ----------------------------------------