From d6af78cebdb3a912f42718db10e705f8b32c3a24 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Tue, 12 Feb 2019 16:16:56 -0700 Subject: [PATCH] io: convert tcp-{input,output}-port to object style --- racket/src/io/network/tcp-port.rkt | 140 ++++++++++++------------ racket/src/io/port/fd-port.rkt | 157 +++++++++++++-------------- racket/src/io/port/file-identity.rkt | 7 +- racket/src/io/port/file-stream.rkt | 18 ++- racket/src/io/port/input-port.rkt | 5 +- racket/src/io/port/output-port.rkt | 5 +- 6 files changed, 160 insertions(+), 172 deletions(-) diff --git a/racket/src/io/network/tcp-port.rkt b/racket/src/io/network/tcp-port.rkt index 3feb48de2a..fbf90e40c8 100644 --- a/racket/src/io/network/tcp-port.rkt +++ b/racket/src/io/network/tcp-port.rkt @@ -1,89 +1,91 @@ #lang racket/base (require "../common/check.rkt" + "../common/class.rkt" "../host/rktio.rkt" "../port/port.rkt" "../port/close.rkt" "../port/input-port.rkt" "../port/output-port.rkt" "../port/fd-port.rkt" - "../port/place-message.rkt") + "../port/file-stream.rkt" + "error.rkt") (provide open-input-output-tcp tcp-port? tcp-abandon-port) -(struct tcp-data (abandon-in? abandon-out?) - #:mutable - #:authentic - #:property - prop:fd-extra-data-place-message - (lambda (port) - (if (input-port? port) - (lambda (fd name) - (open-input-fd fd name - #:extra-data (tcp-data #f #t) - #:file-stream? #f - #:network-error? #t)) - (lambda (fd name) - (open-output-fd fd name - #:extra-data (tcp-data #t #f) - #:file-stream? #f - #:network-error? #t))))) +(class tcp-input-port #:extends fd-input-port + (field + [abandon? #f]) + (override + [on-close + (lambda () + (unless abandon? + (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ)))] + [raise-read-error + (lambda (n) + (raise-network-error #f n "error reading from stream port"))]) + (property + [prop:file-stream #f] + [prop:fd-place-message-opener (lambda (fd name) + (make-tcp-input-port fd name))])) + +(define (make-tcp-input-port fd name + #:fd-refcount [fd-refcount (box 1)]) + (finish-fd-input-port + (new tcp-input-port + [name name] + [fd fd] + [fd-refcount fd-refcount]))) + +(class tcp-output-port #:extends fd-output-port + (field + [abandon? #f]) + (override + [on-close + (lambda () + (unless abandon? + (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE)))] + [raise-write-error + (lambda (n) + (raise-network-error #f n "error writing to stream port"))]) + (property + [prop:file-stream #f] + [prop:fd-place-message-opener (lambda (fd name) + (make-tcp-output-port fd name))])) + +(define (make-tcp-output-port fd name + #:fd-refcount [fd-refcount (box 1)]) + (finish-fd-output-port + (new tcp-output-port + [name name] + [fd fd] + [fd-refcount fd-refcount] + [buffer-mode 'block]))) + +;; ---------------------------------------- (define (open-input-output-tcp fd name #:close? [close? #t]) (define refcount (box (if close? 2 3))) - (define extra-data (tcp-data #f #f)) (values - (open-input-fd fd name - #:extra-data extra-data - #:on-close - ;; in atomic mode - (lambda () - (unless (tcp-data-abandon-in? extra-data) - (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ))) - #:fd-refcount refcount - #:file-stream? #f - #:network-error? #t) - (open-output-fd fd name - #:extra-data extra-data - #:on-close - ;; in atomic mode - (lambda () - (unless (tcp-data-abandon-out? extra-data) - (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE))) - #:fd-refcount refcount - #:buffer-mode 'block - #:file-stream? #f - #:network-error? #t))) - -(define (port-tcp-data p) - (maybe-fd-data-extra - (cond - [(input-port? p) - (core-port-data - (->core-input-port p))] - [(output-port? p) - (core-port-data - (->core-output-port p))] - [else #f]))) + (make-tcp-input-port fd name + #:fd-refcount refcount) + (make-tcp-output-port fd name + #:fd-refcount refcount))) (define/who (tcp-port? p) - (tcp-data? (port-tcp-data p))) + (define cp (or (->core-input-port p #:default #f) + (->core-output-port p #:default #f))) + (or (tcp-input-port? cp) + (tcp-output-port? cp))) -(define/who (tcp-abandon-port given-p) - (define p (cond - [(input-port? given-p) - (->core-input-port given-p)] - [(output-port? given-p) - (->core-output-port given-p)] - [else #f])) - (define data (port-tcp-data p)) - (unless (tcp-data? data) - (raise-argument-error who "tcp-port?" p)) - (if (input-port? p) - (begin - (set-tcp-data-abandon-in?! data #t) - (close-port p)) - (begin - (set-tcp-data-abandon-out?! data #t) - (close-port p)))) +(define/who (tcp-abandon-port p) + (define cp (or (->core-input-port p #:default #f) + (->core-output-port p #:default #f))) + (cond + [(tcp-input-port? cp) + (set-tcp-input-port-abandon?! cp #t) + (close-port p)] + [(tcp-output-port? cp) + (set-tcp-output-port-abandon?! cp #t) + (close-port p)])) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 7bef379e1d..91e7584176 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -7,7 +7,6 @@ "../host/pthread.rkt" "../sandman/main.rkt" "../file/error.rkt" - "../network/error.rkt" "port.rkt" "input-port.rkt" "output-port.rkt" @@ -20,17 +19,15 @@ "check.rkt" "place-message.rkt") -(provide open-input-fd +(provide (struct-out fd-input-port) + open-input-fd + finish-fd-input-port + (struct-out fd-output-port) open-output-fd + finish-fd-output-port terminal-port? fd-port-fd - maybe-fd-data-extra) - -(struct fd-data (extra)) - -(define (maybe-fd-data-extra data) - (and (fd-data? data) - (fd-data-extra data))) + prop:fd-place-message-opener) ;; in atomic mode (define (fd-close fd fd-refcount) @@ -46,11 +43,13 @@ (class fd-input-port #:extends peek-via-read-input-port (field [fd #f] - [fd-refcount #f] - [custodian-reference #f] - [on-close void] - [file-stream? #t] - [network-error? #f]) + [fd-refcount (box 1)] + [custodian-reference #f]) + + (public + [on-close (lambda () (void))] + [raise-read-error (lambda (n) + (raise-filesystem-error #f n "error reading from stream port"))]) (override [read-in/inner @@ -59,9 +58,7 @@ (cond [(rktio-error? n) (end-atomic) - (if network-error? - (raise-network-error #f n "error reading from stream port") - (raise-filesystem-error #f n "error reading from stream port"))] + (send fd-input-port this raise-read-error n)] [(eqv? n RKTIO_READ_EOF) eof] [(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this) (lambda (v) 0))] @@ -69,7 +66,7 @@ [close (lambda () - (on-close) + (send fd-input-port this on-close) (fd-close fd fd-refcount) (unsafe-custodian-unregister fd custodian-reference) (close-peek-buffer))] @@ -84,10 +81,7 @@ (set-file-position fd pos)])]) (property - [prop:file-stream (lambda (p [check? #f]) - (if check? - (fd-input-port-file-stream? p) - (fd-input-port-fd p)))] + [prop:file-stream (lambda (p) (fd-input-port-fd p))] [prop:data-place-message (lambda (port) (lambda () (fd-port->place-message port)))])) @@ -97,20 +91,19 @@ ;; in atomic mode ;; Current custodian must not be shut down. (define (open-input-fd fd name - #:extra-data [extra-data #f] - #:on-close [on-close void] #:fd-refcount [fd-refcount (box 1)] - #:custodian [cust (current-custodian)] - #:file-stream? [file-stream? #t] - #:network-error? [network-error? #f]) - (define p (new fd-input-port - [name name] - [data (fd-data extra-data)] - [fd fd] - [fd-refcount fd-refcount] - [on-close on-close] - [file-stream? file-stream?] - [network-error? network-error?])) + #:custodian [cust (current-custodian)]) + (finish-fd-input-port + (new fd-input-port + [name name] + [fd fd] + [fd-refcount fd-refcount]) + #:custodian cust)) + +(define (finish-fd-input-port p + #:custodian [cust (current-custodian)]) + (define fd (fd-input-port-fd p)) + (define fd-refcount (fd-input-port-fd-refcount p)) (set-fd-input-port-custodian-reference! p (register-fd-close cust fd fd-refcount #f p)) p) @@ -119,16 +112,19 @@ (class fd-output-port #:extends core-output-port (field [fd fd] - [fd-refcount #f] + [fd-refcount (box 1)] [bstr (make-bytes 4096)] [start-pos 0] [end-pos 0] [flush-handle #f] [buffer-mode 'block] - [custodian-reference #f] - [on-close void] - [file-stream? #t] - [network-error? network-error?]) + [custodian-reference #f]) + + (public + [on-close (lambda () (void))] + [raise-write-error + (lambda (n) + (raise-filesystem-error #f n "error writing to stream port"))]) (private ;; in atomic mode @@ -141,9 +137,7 @@ (cond [(rktio-error? n) (end-atomic) - (if network-error? - (raise-network-error #f n "error writing to stream port") - (raise-filesystem-error #f n "error writing to stream port"))] + (send fd-output-port this raise-write-error n)] [(fx= n 0) #f] [else @@ -212,9 +206,7 @@ (cond [(rktio-error? n) (end-atomic) - (if network-error? - (raise-network-error #f n "error writing to stream port") - (raise-filesystem-error #f n "error writing to stream port"))] + (send fd-output-port this raise-write-error n)] [(fx= n 0) (wrap-evt evt (lambda (v) #f))] [else n])]))] @@ -227,7 +219,7 @@ (lambda () (flush-buffer-fully #f) ; can temporarily leave atomic mode (when bstr ; <- in case a concurrent close succeeded - (on-close) + (send fd-output-port this on-close) (plumber-flush-handle-remove! flush-handle) (set! bstr #f) (fd-close fd fd-refcount) @@ -254,10 +246,7 @@ [(self mode) (set! buffer-mode mode)])]) (property - [prop:file-stream (lambda (p [check? #f]) - (if check? - (fd-output-port-file-stream? p) - (fd-output-port-fd p)))] + [prop:file-stream (lambda (p) (fd-output-port-fd p))] [prop:file-truncate (lambda (p pos) ;; in atomic mode (send fd-output-port p flush-buffer/external) @@ -279,39 +268,38 @@ ;; in atomic mode ;; Current custodian must not be shut down. (define (open-output-fd fd name - #:extra-data [extra-data #f] #:buffer-mode [buffer-mode 'infer] #:fd-refcount [fd-refcount (box 1)] - #:on-close [on-close void] #:plumber [plumber (current-plumber)] - #:custodian [cust (current-custodian)] - #:file-stream? [file-stream? #t] - #:network-error? [network-error? #f]) - (define evt (fd-evt fd RKTIO_POLL_WRITE #f)) - (define p (new fd-output-port - [name name] - [evt evt] - [fd fd] - [fd-refcount fd-refcount] - [file-stream? file-stream?] - [flush-handle #f] - [buffer-mode - (if (eq? buffer-mode 'infer) - (if (rktio_fd_is_terminal rktio fd) - 'line - 'block) - buffer-mode)] - [on-close on-close] - [network-error? network-error?] - [data (fd-data extra-data)])) + #:custodian [cust (current-custodian)]) + (finish-fd-output-port + (new fd-output-port + [name name] + [fd fd] + [fd-refcount fd-refcount] + [buffer-mode + (if (eq? buffer-mode 'infer) + (if (rktio_fd_is_terminal rktio fd) + 'line + 'block) + buffer-mode)]) + #:plumber plumber + #:custodian cust)) + +(define (finish-fd-output-port p + #:plumber [plumber (current-plumber)] + #:custodian [cust (current-custodian)]) + (define fd (fd-output-port-fd p)) + (define fd-refcount (fd-output-port-fd-refcount p)) + (define evt (fd-evt fd RKTIO_POLL_WRITE p)) (define flush-handle (plumber-add-flush! plumber (lambda (h) (atomically (send fd-output-port p flush-buffer/external))))) (define custodian-reference (register-fd-close cust fd fd-refcount flush-handle p)) + (set-core-output-port-evt! p evt) (set-fd-output-port-flush-handle! p flush-handle) (set-fd-output-port-custodian-reference! p custodian-reference) - (set-fd-evt-closed! evt p) p) ;; ---------------------------------------- @@ -322,15 +310,14 @@ (rktio_fd_is_terminal rktio fd))) (define (fd-port-fd p) + (define cp (or (->core-input-port p #:default #f) + (->core-output-port p #:default #f))) (cond - [(input-port? p) - (define cp (->core-input-port p)) - (and (fd-input-port? cp) - (fd-input-port-fd cp))] - [else - (define cp (->core-output-port p)) - (and (fd-output-port? cp) - (fd-output-port-fd cp))])) + [(fd-input-port? cp) + (fd-input-port-fd cp)] + [(fd-output-port? cp) + (fd-output-port-fd cp)] + [else #f])) ;; ---------------------------------------- @@ -418,6 +405,9 @@ ;; ---------------------------------------- +(define-values (prop:fd-place-message-opener fd-place-message-opener? fd-place-message-opener-ref) + (make-struct-type-property 'fd-place-message-opener)) + (define (fd-port->place-message port) (start-atomic) (cond @@ -426,8 +416,7 @@ (define input? (input-port? port)) (define fd-dup (dup-port-fd port)) (define name (core-port-name port)) - (define opener (or (fd-extra-data->opener (fd-data-extra (core-port-data port)) - port) + (define opener (or (fd-place-message-opener-ref port #f) (if input? (lambda (port name) (open-input-fd port name)) (lambda (port name) (open-output-fd port name))))) diff --git a/racket/src/io/port/file-identity.rkt b/racket/src/io/port/file-identity.rkt index 55810d52ac..e585cc22dd 100644 --- a/racket/src/io/port/file-identity.rkt +++ b/racket/src/io/port/file-identity.rkt @@ -12,10 +12,9 @@ (define/who (port-file-identity p) (check who file-stream-port? p) - (define cp (cond - [(input-port? p) (->core-input-port p)] - [else (->core-output-port p)])) + (define cp (or (->core-input-port p #:default #f) + (->core-output-port p #:default #f))) (start-atomic) (check-not-closed who cp) - (define fd ((file-stream-ref cp) cp #f)) + (define fd ((file-stream-ref cp) cp)) (path-or-fd-identity who #:fd fd #:port p)) diff --git a/racket/src/io/port/file-stream.rkt b/racket/src/io/port/file-stream.rkt index f8797e3888..b28090f454 100644 --- a/racket/src/io/port/file-stream.rkt +++ b/racket/src/io/port/file-stream.rkt @@ -12,14 +12,10 @@ (make-struct-type-property 'file-stream)) (define (file-stream-port? p) - (define accessor (file-stream-ref - (cond - [(input-port? p) (->core-input-port p)] - [(output-port? p) (->core-output-port p)] - [else - (raise-argument-error 'file-stream-port? - "port?" - p)]) - #f)) - (and accessor - (accessor p #t))) + (file-stream-ref + (or (->core-input-port p #:default #f) + (->core-output-port p #:default #f) + (raise-argument-error 'file-stream-port? + "port?" + p)) + #f)) diff --git a/racket/src/io/port/input-port.rkt b/racket/src/io/port/input-port.rkt index 13def442a1..46070ea41b 100644 --- a/racket/src/io/port/input-port.rkt +++ b/racket/src/io/port/input-port.rkt @@ -34,7 +34,8 @@ ;; This function should not be called in atomic mode, ;; since it can invoke an artitrary function -(define (->core-input-port v [who #f]) +(define (->core-input-port v [who #f] + #:default [default empty-input-port]) (cond [(core-input-port? v) v] [(input-port-ref v #f) @@ -45,7 +46,7 @@ [else (->core-input-port p)]))] [who (raise-argument-error who "input-port?" v)] - [else empty-input-port])) + [else default])) (class core-input-port #:extends core-port (field diff --git a/racket/src/io/port/output-port.rkt b/racket/src/io/port/output-port.rkt index 8b752b7c21..7f53b27bf9 100644 --- a/racket/src/io/port/output-port.rkt +++ b/racket/src/io/port/output-port.rkt @@ -35,7 +35,8 @@ ;; This function should not be called in atomic mode, ;; since it can invoke an arbitrary function -(define (->core-output-port v [who #f]) +(define (->core-output-port v [who #f] + #:default [default empty-output-port]) (cond [(core-output-port? v) v] [(output-port-ref v #f) @@ -46,7 +47,7 @@ [else (->core-output-port p)]))] [who (raise-argument-error who "output-port?" v)] - [else empty-output-port])) + [else default])) (class core-output-port #:extends core-port (field