io: convert tcp-{input,output}-port to object style

This commit is contained in:
Matthew Flatt 2019-02-12 16:16:56 -07:00
parent 01d53378b2
commit d6af78cebd
6 changed files with 160 additions and 172 deletions

View File

@ -1,89 +1,91 @@
#lang racket/base #lang racket/base
(require "../common/check.rkt" (require "../common/check.rkt"
"../common/class.rkt"
"../host/rktio.rkt" "../host/rktio.rkt"
"../port/port.rkt" "../port/port.rkt"
"../port/close.rkt" "../port/close.rkt"
"../port/input-port.rkt" "../port/input-port.rkt"
"../port/output-port.rkt" "../port/output-port.rkt"
"../port/fd-port.rkt" "../port/fd-port.rkt"
"../port/place-message.rkt") "../port/file-stream.rkt"
"error.rkt")
(provide open-input-output-tcp (provide open-input-output-tcp
tcp-port? tcp-port?
tcp-abandon-port) tcp-abandon-port)
(struct tcp-data (abandon-in? abandon-out?) (class tcp-input-port #:extends fd-input-port
#:mutable (field
#:authentic [abandon? #f])
#:property (override
prop:fd-extra-data-place-message [on-close
(lambda (port) (lambda ()
(if (input-port? port) (unless abandon?
(lambda (fd name) (rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ)))]
(open-input-fd fd name [raise-read-error
#:extra-data (tcp-data #f #t) (lambda (n)
#:file-stream? #f (raise-network-error #f n "error reading from stream port"))])
#:network-error? #t)) (property
(lambda (fd name) [prop:file-stream #f]
(open-output-fd fd name [prop:fd-place-message-opener (lambda (fd name)
#:extra-data (tcp-data #t #f) (make-tcp-input-port fd name))]))
#:file-stream? #f
#:network-error? #t))))) (define (make-tcp-input-port fd name
#:fd-refcount [fd-refcount (box 1)])
(finish-fd-input-port
(new tcp-input-port
[name name]
[fd fd]
[fd-refcount fd-refcount])))
(class tcp-output-port #:extends fd-output-port
(field
[abandon? #f])
(override
[on-close
(lambda ()
(unless abandon?
(rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE)))]
[raise-write-error
(lambda (n)
(raise-network-error #f n "error writing to stream port"))])
(property
[prop:file-stream #f]
[prop:fd-place-message-opener (lambda (fd name)
(make-tcp-output-port fd name))]))
(define (make-tcp-output-port fd name
#:fd-refcount [fd-refcount (box 1)])
(finish-fd-output-port
(new tcp-output-port
[name name]
[fd fd]
[fd-refcount fd-refcount]
[buffer-mode 'block])))
;; ----------------------------------------
(define (open-input-output-tcp fd name #:close? [close? #t]) (define (open-input-output-tcp fd name #:close? [close? #t])
(define refcount (box (if close? 2 3))) (define refcount (box (if close? 2 3)))
(define extra-data (tcp-data #f #f))
(values (values
(open-input-fd fd name (make-tcp-input-port fd name
#:extra-data extra-data #:fd-refcount refcount)
#:on-close (make-tcp-output-port fd name
;; in atomic mode #:fd-refcount refcount)))
(lambda ()
(unless (tcp-data-abandon-in? extra-data)
(rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_READ)))
#:fd-refcount refcount
#:file-stream? #f
#:network-error? #t)
(open-output-fd fd name
#:extra-data extra-data
#:on-close
;; in atomic mode
(lambda ()
(unless (tcp-data-abandon-out? extra-data)
(rktio_socket_shutdown rktio fd RKTIO_SHUTDOWN_WRITE)))
#:fd-refcount refcount
#:buffer-mode 'block
#:file-stream? #f
#:network-error? #t)))
(define (port-tcp-data p)
(maybe-fd-data-extra
(cond
[(input-port? p)
(core-port-data
(->core-input-port p))]
[(output-port? p)
(core-port-data
(->core-output-port p))]
[else #f])))
(define/who (tcp-port? p) (define/who (tcp-port? p)
(tcp-data? (port-tcp-data p))) (define cp (or (->core-input-port p #:default #f)
(->core-output-port p #:default #f)))
(or (tcp-input-port? cp)
(tcp-output-port? cp)))
(define/who (tcp-abandon-port given-p) (define/who (tcp-abandon-port p)
(define p (cond (define cp (or (->core-input-port p #:default #f)
[(input-port? given-p) (->core-output-port p #:default #f)))
(->core-input-port given-p)] (cond
[(output-port? given-p) [(tcp-input-port? cp)
(->core-output-port given-p)] (set-tcp-input-port-abandon?! cp #t)
[else #f])) (close-port p)]
(define data (port-tcp-data p)) [(tcp-output-port? cp)
(unless (tcp-data? data) (set-tcp-output-port-abandon?! cp #t)
(raise-argument-error who "tcp-port?" p)) (close-port p)]))
(if (input-port? p)
(begin
(set-tcp-data-abandon-in?! data #t)
(close-port p))
(begin
(set-tcp-data-abandon-out?! data #t)
(close-port p))))

View File

@ -7,7 +7,6 @@
"../host/pthread.rkt" "../host/pthread.rkt"
"../sandman/main.rkt" "../sandman/main.rkt"
"../file/error.rkt" "../file/error.rkt"
"../network/error.rkt"
"port.rkt" "port.rkt"
"input-port.rkt" "input-port.rkt"
"output-port.rkt" "output-port.rkt"
@ -20,17 +19,15 @@
"check.rkt" "check.rkt"
"place-message.rkt") "place-message.rkt")
(provide open-input-fd (provide (struct-out fd-input-port)
open-input-fd
finish-fd-input-port
(struct-out fd-output-port)
open-output-fd open-output-fd
finish-fd-output-port
terminal-port? terminal-port?
fd-port-fd fd-port-fd
maybe-fd-data-extra) prop:fd-place-message-opener)
(struct fd-data (extra))
(define (maybe-fd-data-extra data)
(and (fd-data? data)
(fd-data-extra data)))
;; in atomic mode ;; in atomic mode
(define (fd-close fd fd-refcount) (define (fd-close fd fd-refcount)
@ -46,11 +43,13 @@
(class fd-input-port #:extends peek-via-read-input-port (class fd-input-port #:extends peek-via-read-input-port
(field (field
[fd #f] [fd #f]
[fd-refcount #f] [fd-refcount (box 1)]
[custodian-reference #f] [custodian-reference #f])
[on-close void]
[file-stream? #t] (public
[network-error? #f]) [on-close (lambda () (void))]
[raise-read-error (lambda (n)
(raise-filesystem-error #f n "error reading from stream port"))])
(override (override
[read-in/inner [read-in/inner
@ -59,9 +58,7 @@
(cond (cond
[(rktio-error? n) [(rktio-error? n)
(end-atomic) (end-atomic)
(if network-error? (send fd-input-port this raise-read-error n)]
(raise-network-error #f n "error reading from stream port")
(raise-filesystem-error #f n "error reading from stream port"))]
[(eqv? n RKTIO_READ_EOF) eof] [(eqv? n RKTIO_READ_EOF) eof]
[(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this) [(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this)
(lambda (v) 0))] (lambda (v) 0))]
@ -69,7 +66,7 @@
[close [close
(lambda () (lambda ()
(on-close) (send fd-input-port this on-close)
(fd-close fd fd-refcount) (fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference) (unsafe-custodian-unregister fd custodian-reference)
(close-peek-buffer))] (close-peek-buffer))]
@ -84,10 +81,7 @@
(set-file-position fd pos)])]) (set-file-position fd pos)])])
(property (property
[prop:file-stream (lambda (p [check? #f]) [prop:file-stream (lambda (p) (fd-input-port-fd p))]
(if check?
(fd-input-port-file-stream? p)
(fd-input-port-fd p)))]
[prop:data-place-message (lambda (port) [prop:data-place-message (lambda (port)
(lambda () (lambda ()
(fd-port->place-message port)))])) (fd-port->place-message port)))]))
@ -97,20 +91,19 @@
;; in atomic mode ;; in atomic mode
;; Current custodian must not be shut down. ;; Current custodian must not be shut down.
(define (open-input-fd fd name (define (open-input-fd fd name
#:extra-data [extra-data #f]
#:on-close [on-close void]
#:fd-refcount [fd-refcount (box 1)] #:fd-refcount [fd-refcount (box 1)]
#:custodian [cust (current-custodian)] #:custodian [cust (current-custodian)])
#:file-stream? [file-stream? #t] (finish-fd-input-port
#:network-error? [network-error? #f]) (new fd-input-port
(define p (new fd-input-port
[name name] [name name]
[data (fd-data extra-data)]
[fd fd] [fd fd]
[fd-refcount fd-refcount] [fd-refcount fd-refcount])
[on-close on-close] #:custodian cust))
[file-stream? file-stream?]
[network-error? network-error?])) (define (finish-fd-input-port p
#:custodian [cust (current-custodian)])
(define fd (fd-input-port-fd p))
(define fd-refcount (fd-input-port-fd-refcount p))
(set-fd-input-port-custodian-reference! p (register-fd-close cust fd fd-refcount #f p)) (set-fd-input-port-custodian-reference! p (register-fd-close cust fd fd-refcount #f p))
p) p)
@ -119,16 +112,19 @@
(class fd-output-port #:extends core-output-port (class fd-output-port #:extends core-output-port
(field (field
[fd fd] [fd fd]
[fd-refcount #f] [fd-refcount (box 1)]
[bstr (make-bytes 4096)] [bstr (make-bytes 4096)]
[start-pos 0] [start-pos 0]
[end-pos 0] [end-pos 0]
[flush-handle #f] [flush-handle #f]
[buffer-mode 'block] [buffer-mode 'block]
[custodian-reference #f] [custodian-reference #f])
[on-close void]
[file-stream? #t] (public
[network-error? network-error?]) [on-close (lambda () (void))]
[raise-write-error
(lambda (n)
(raise-filesystem-error #f n "error writing to stream port"))])
(private (private
;; in atomic mode ;; in atomic mode
@ -141,9 +137,7 @@
(cond (cond
[(rktio-error? n) [(rktio-error? n)
(end-atomic) (end-atomic)
(if network-error? (send fd-output-port this raise-write-error n)]
(raise-network-error #f n "error writing to stream port")
(raise-filesystem-error #f n "error writing to stream port"))]
[(fx= n 0) [(fx= n 0)
#f] #f]
[else [else
@ -212,9 +206,7 @@
(cond (cond
[(rktio-error? n) [(rktio-error? n)
(end-atomic) (end-atomic)
(if network-error? (send fd-output-port this raise-write-error n)]
(raise-network-error #f n "error writing to stream port")
(raise-filesystem-error #f n "error writing to stream port"))]
[(fx= n 0) (wrap-evt evt (lambda (v) #f))] [(fx= n 0) (wrap-evt evt (lambda (v) #f))]
[else n])]))] [else n])]))]
@ -227,7 +219,7 @@
(lambda () (lambda ()
(flush-buffer-fully #f) ; can temporarily leave atomic mode (flush-buffer-fully #f) ; can temporarily leave atomic mode
(when bstr ; <- in case a concurrent close succeeded (when bstr ; <- in case a concurrent close succeeded
(on-close) (send fd-output-port this on-close)
(plumber-flush-handle-remove! flush-handle) (plumber-flush-handle-remove! flush-handle)
(set! bstr #f) (set! bstr #f)
(fd-close fd fd-refcount) (fd-close fd fd-refcount)
@ -254,10 +246,7 @@
[(self mode) (set! buffer-mode mode)])]) [(self mode) (set! buffer-mode mode)])])
(property (property
[prop:file-stream (lambda (p [check? #f]) [prop:file-stream (lambda (p) (fd-output-port-fd p))]
(if check?
(fd-output-port-file-stream? p)
(fd-output-port-fd p)))]
[prop:file-truncate (lambda (p pos) [prop:file-truncate (lambda (p pos)
;; in atomic mode ;; in atomic mode
(send fd-output-port p flush-buffer/external) (send fd-output-port p flush-buffer/external)
@ -279,39 +268,38 @@
;; in atomic mode ;; in atomic mode
;; Current custodian must not be shut down. ;; Current custodian must not be shut down.
(define (open-output-fd fd name (define (open-output-fd fd name
#:extra-data [extra-data #f]
#:buffer-mode [buffer-mode 'infer] #:buffer-mode [buffer-mode 'infer]
#:fd-refcount [fd-refcount (box 1)] #:fd-refcount [fd-refcount (box 1)]
#:on-close [on-close void]
#:plumber [plumber (current-plumber)] #:plumber [plumber (current-plumber)]
#:custodian [cust (current-custodian)] #:custodian [cust (current-custodian)])
#:file-stream? [file-stream? #t] (finish-fd-output-port
#:network-error? [network-error? #f]) (new fd-output-port
(define evt (fd-evt fd RKTIO_POLL_WRITE #f))
(define p (new fd-output-port
[name name] [name name]
[evt evt]
[fd fd] [fd fd]
[fd-refcount fd-refcount] [fd-refcount fd-refcount]
[file-stream? file-stream?]
[flush-handle #f]
[buffer-mode [buffer-mode
(if (eq? buffer-mode 'infer) (if (eq? buffer-mode 'infer)
(if (rktio_fd_is_terminal rktio fd) (if (rktio_fd_is_terminal rktio fd)
'line 'line
'block) 'block)
buffer-mode)] buffer-mode)])
[on-close on-close] #:plumber plumber
[network-error? network-error?] #:custodian cust))
[data (fd-data extra-data)]))
(define (finish-fd-output-port p
#:plumber [plumber (current-plumber)]
#:custodian [cust (current-custodian)])
(define fd (fd-output-port-fd p))
(define fd-refcount (fd-output-port-fd-refcount p))
(define evt (fd-evt fd RKTIO_POLL_WRITE p))
(define flush-handle (plumber-add-flush! plumber (define flush-handle (plumber-add-flush! plumber
(lambda (h) (lambda (h)
(atomically (atomically
(send fd-output-port p flush-buffer/external))))) (send fd-output-port p flush-buffer/external)))))
(define custodian-reference (register-fd-close cust fd fd-refcount flush-handle p)) (define custodian-reference (register-fd-close cust fd fd-refcount flush-handle p))
(set-core-output-port-evt! p evt)
(set-fd-output-port-flush-handle! p flush-handle) (set-fd-output-port-flush-handle! p flush-handle)
(set-fd-output-port-custodian-reference! p custodian-reference) (set-fd-output-port-custodian-reference! p custodian-reference)
(set-fd-evt-closed! evt p)
p) p)
;; ---------------------------------------- ;; ----------------------------------------
@ -322,15 +310,14 @@
(rktio_fd_is_terminal rktio fd))) (rktio_fd_is_terminal rktio fd)))
(define (fd-port-fd p) (define (fd-port-fd p)
(define cp (or (->core-input-port p #:default #f)
(->core-output-port p #:default #f)))
(cond (cond
[(input-port? p) [(fd-input-port? cp)
(define cp (->core-input-port p)) (fd-input-port-fd cp)]
(and (fd-input-port? cp) [(fd-output-port? cp)
(fd-input-port-fd cp))] (fd-output-port-fd cp)]
[else [else #f]))
(define cp (->core-output-port p))
(and (fd-output-port? cp)
(fd-output-port-fd cp))]))
;; ---------------------------------------- ;; ----------------------------------------
@ -418,6 +405,9 @@
;; ---------------------------------------- ;; ----------------------------------------
(define-values (prop:fd-place-message-opener fd-place-message-opener? fd-place-message-opener-ref)
(make-struct-type-property 'fd-place-message-opener))
(define (fd-port->place-message port) (define (fd-port->place-message port)
(start-atomic) (start-atomic)
(cond (cond
@ -426,8 +416,7 @@
(define input? (input-port? port)) (define input? (input-port? port))
(define fd-dup (dup-port-fd port)) (define fd-dup (dup-port-fd port))
(define name (core-port-name port)) (define name (core-port-name port))
(define opener (or (fd-extra-data->opener (fd-data-extra (core-port-data port)) (define opener (or (fd-place-message-opener-ref port #f)
port)
(if input? (if input?
(lambda (port name) (open-input-fd port name)) (lambda (port name) (open-input-fd port name))
(lambda (port name) (open-output-fd port name))))) (lambda (port name) (open-output-fd port name)))))

View File

@ -12,10 +12,9 @@
(define/who (port-file-identity p) (define/who (port-file-identity p)
(check who file-stream-port? p) (check who file-stream-port? p)
(define cp (cond (define cp (or (->core-input-port p #:default #f)
[(input-port? p) (->core-input-port p)] (->core-output-port p #:default #f)))
[else (->core-output-port p)]))
(start-atomic) (start-atomic)
(check-not-closed who cp) (check-not-closed who cp)
(define fd ((file-stream-ref cp) cp #f)) (define fd ((file-stream-ref cp) cp))
(path-or-fd-identity who #:fd fd #:port p)) (path-or-fd-identity who #:fd fd #:port p))

View File

@ -12,14 +12,10 @@
(make-struct-type-property 'file-stream)) (make-struct-type-property 'file-stream))
(define (file-stream-port? p) (define (file-stream-port? p)
(define accessor (file-stream-ref (file-stream-ref
(cond (or (->core-input-port p #:default #f)
[(input-port? p) (->core-input-port p)] (->core-output-port p #:default #f)
[(output-port? p) (->core-output-port p)]
[else
(raise-argument-error 'file-stream-port? (raise-argument-error 'file-stream-port?
"port?" "port?"
p)]) p))
#f)) #f))
(and accessor
(accessor p #t)))

View File

@ -34,7 +34,8 @@
;; This function should not be called in atomic mode, ;; This function should not be called in atomic mode,
;; since it can invoke an artitrary function ;; since it can invoke an artitrary function
(define (->core-input-port v [who #f]) (define (->core-input-port v [who #f]
#:default [default empty-input-port])
(cond (cond
[(core-input-port? v) v] [(core-input-port? v) v]
[(input-port-ref v #f) [(input-port-ref v #f)
@ -45,7 +46,7 @@
[else [else
(->core-input-port p)]))] (->core-input-port p)]))]
[who (raise-argument-error who "input-port?" v)] [who (raise-argument-error who "input-port?" v)]
[else empty-input-port])) [else default]))
(class core-input-port #:extends core-port (class core-input-port #:extends core-port
(field (field

View File

@ -35,7 +35,8 @@
;; This function should not be called in atomic mode, ;; This function should not be called in atomic mode,
;; since it can invoke an arbitrary function ;; since it can invoke an arbitrary function
(define (->core-output-port v [who #f]) (define (->core-output-port v [who #f]
#:default [default empty-output-port])
(cond (cond
[(core-output-port? v) v] [(core-output-port? v) v]
[(output-port-ref v #f) [(output-port-ref v #f)
@ -46,7 +47,7 @@
[else [else
(->core-output-port p)]))] (->core-output-port p)]))]
[who (raise-argument-error who "output-port?" v)] [who (raise-argument-error who "output-port?" v)]
[else empty-output-port])) [else default]))
(class core-output-port #:extends core-port (class core-output-port #:extends core-port
(field (field