io: change port from table of functions to table of methods

Adjust the internal port representation to that it doesn't inherently
require allocating lots of closures.
This commit is contained in:
Matthew Flatt 2018-12-08 10:34:44 -07:00
parent 0c2ada98ff
commit 9f27b90e31
26 changed files with 238 additions and 180 deletions

View File

@ -20,7 +20,7 @@
(atomically
(check-not-closed who p)
(and buffer-mode
(buffer-mode))))]
(buffer-mode (core-port-self p)))))]
[(p mode)
(unless (or (input-port? p) (output-port? p))
(raise-argument-error who "port?" p))
@ -36,7 +36,7 @@
(define buffer-mode (core-port-buffer-mode p))
(cond
[buffer-mode
(buffer-mode mode)
(buffer-mode (core-port-self p) mode)
#t]
[else #f])))
(cond

View File

@ -1,6 +1,7 @@
#lang racket/base
(require "../common/check.rkt"
"../host/thread.rkt"
"port.rkt"
"output-port.rkt"
"parameter.rkt"
"write.rkt"
@ -81,7 +82,7 @@
(raise-arguments-error who
"port does not support output events"
"port" out))
(get-write-evt bstr start-pos end-pos))))
(get-write-evt (core-port-self out) bstr start-pos end-pos))))
(define/who (port-writes-atomic? out)
(check who output-port? out)

View File

@ -57,13 +57,14 @@
(make-core-input-port
#:name name
#:data (input-bytes-data)
#:self #f
#:prepare-change
(lambda ()
(lambda (self)
(pause-waiting-commit))
#:read-byte
(lambda ()
(lambda (self)
(let ([pos i])
(if (pos . < . len)
(begin
@ -73,7 +74,7 @@
eof)))
#:read-in
(lambda (dest-bstr start end copy?)
(lambda (self dest-bstr start end copy?)
(define pos i)
(cond
[(pos . < . len)
@ -85,14 +86,14 @@
[else eof]))
#:peek-byte
(lambda ()
(lambda (self)
(let ([pos i])
(if (pos . < . len)
(bytes-ref bstr pos)
eof)))
#:peek-in
(lambda (dest-bstr start end skip progress-evt copy?)
(lambda (self dest-bstr start end skip progress-evt copy?)
(define pos (+ i skip))
(cond
[(and progress-evt (sync/timeout 0 progress-evt))
@ -104,22 +105,22 @@
[else eof]))
#:byte-ready
(lambda (work-done!)
(lambda (self work-done!)
(i . < . len))
#:close
(lambda ()
(lambda (self)
(set! commit-manager #f) ; to indicate closed
(progress!))
#:get-progress-evt
(lambda ()
(lambda (self)
(unless progress-sema
(set! progress-sema (make-semaphore)))
(semaphore-peek-evt progress-sema))
#:commit
(lambda (amt progress-evt ext-evt finish)
(lambda (self amt progress-evt ext-evt finish)
(unless commit-manager
(set! commit-manager (make-commit-manager)))
(commit-manager-wait
@ -136,8 +137,8 @@
#:file-position
(case-lambda
[() (or alt-pos i)]
[(new-pos)
[(self) (or alt-pos i)]
[(self new-pos)
(set! i (if (eof-object? new-pos)
len
(min len new-pos)))
@ -161,16 +162,26 @@
(make-core-output-port
#:name name
#:data (output-bytes-data i (lambda () (pipe-discard-all i)))
#:self o
#:evt o
#:write-out (core-output-port-write-out o)
#:close (core-port-close o)
#:get-write-evt (core-output-port-get-write-evt o)
#:get-location (core-port-get-location o)
#:count-lines! (core-port-count-lines! o)
#:write-out o
#:close
(lambda (o) ((core-port-close o) (core-port-self o)))
#:get-write-evt
(and (core-output-port-get-write-evt o)
(lambda (o bstr start-k end-k)
((core-output-port-get-write-evt o) (core-port-self o) bstr start-k end-k)))
#:get-location
(and (core-port-get-location o)
(lambda (o) ((core-port-get-location o) (core-port-self o))))
#:count-lines!
(and (core-port-count-lines! o)
(lambda (o)
((core-port-count-lines! o) (core-port-self o))))
#:file-position
(case-lambda
[() (pipe-write-position o)]
[(new-pos)
[(o) (pipe-write-position o)]
[(o new-pos)
(define len (pipe-content-length i))
(cond
[(eof-object? new-pos)
@ -185,7 +196,7 @@
"position" new-pos))
(pipe-write-position o len)
(define amt (- new-pos len))
((core-output-port-write-out o) (make-bytes amt 0) 0 amt #f #f #f)
((core-output-port-write-out o) (core-port-self o) (make-bytes amt 0) 0 amt #f #f #f)
(void)]
[else
(pipe-write-position o new-pos)])])))

View File

@ -26,7 +26,7 @@
(define closed (core-port-closed p))
(unless (closed-state-closed? closed)
(atomically
((core-port-close p))
((core-port-close p) (core-port-self p))
(set-closed-state! closed))))
;; in atomic mode

View File

@ -38,7 +38,7 @@
(set-core-port-position! p (add1 (or (core-port-offset p) 0)))
(define count-lines! (core-port-count-lines! p))
(when count-lines!
(count-lines!))))))
(count-lines! (core-port-self p)))))))
(define/who (port-counts-lines? p)
(core-port-count?
@ -61,7 +61,7 @@
(define get-location (core-port-get-location p))
(cond
[get-location
(get-location)]
(get-location (core-port-self p))]
[else
(values (core-port-line p)
(core-port-column p)

View File

@ -1,6 +1,7 @@
#lang racket/base
(require "../common/check.rkt"
"../host/thread.rkt"
"port.rkt"
"input-port.rkt"
"custom-port.rkt"
"pipe.rkt"
@ -54,9 +55,9 @@
(define input-pipe #f) ; `user-read-in` can redirect input
(define (protect-in dest-bstr dest-start dest-end copy? read-in)
;; We don't trust `read-in` to refrain from modifying its
;; byte-string argument after it returns, and the `read-in`
(define (protect-in dest-bstr dest-start dest-end copy? user-read-in)
;; We don't trust `user-read-in` to refrain from modifying its
;; byte-string argument after it returns, and the `user-read-in`
;; interface doesn't deal with start and end positions, so copy`
;; dest-bstr` if needed
(define len (- dest-end dest-start))
@ -66,7 +67,7 @@
(not (= len dest-end)))
(make-bytes len)
dest-bstr))
(define n (read-in user-bstr))
(define n (user-read-in user-bstr))
(cond
[(eq? user-bstr dest-bstr)
n]
@ -153,15 +154,16 @@
four-args]))
;; in atomic mode
(define (read-in dest-bstr dest-start dest-end copy?)
(define (read-in self dest-bstr dest-start dest-end copy?)
(cond
[input-pipe
(cond
[(zero? (pipe-content-length input-pipe))
(set! input-pipe #f)
(read-in dest-bstr dest-start dest-end copy?)]
(read-in self dest-bstr dest-start dest-end copy?)]
[else
((core-input-port-read-in input-pipe) dest-bstr dest-start dest-end copy?)])]
(define read-in (core-input-port-read-in input-pipe))
(read-in (core-port-self input-pipe) dest-bstr dest-start dest-end copy?)])]
[else
(define r
(parameterize-break #f
@ -170,7 +172,7 @@
(check-read-result '|user port read| r dest-start dest-end)
(cond
[(pipe-input-port? r)
(read-in dest-bstr dest-start dest-end copy?)]
(read-in self dest-bstr dest-start dest-end copy?)]
[(evt? r)
(wrap-check-read-evt-result '|user port read| r dest-start dest-end #f #f)]
[(procedure? r)
@ -179,15 +181,16 @@
;; in atomic mode
;; Used only if `user-peek-in` is a function:
(define (peek-in dest-bstr dest-start dest-end skip-k progress-evt copy?)
(define (peek-in self dest-bstr dest-start dest-end skip-k progress-evt copy?)
(cond
[input-pipe
(cond
[((pipe-content-length input-pipe) . <= . skip-k)
(set! input-pipe #f)
(peek-in dest-bstr dest-start dest-end skip-k progress-evt copy?)]
(peek-in self dest-bstr dest-start dest-end skip-k progress-evt copy?)]
[else
((core-input-port-peek-in input-pipe) dest-bstr dest-start dest-end skip-k progress-evt copy?)])]
(define peek-in (core-input-port-peek-in input-pipe))
(peek-in (core-port-self input-pipe) dest-bstr dest-start dest-end skip-k progress-evt copy?)])]
[else
(define r
(parameterize-break #f
@ -197,7 +200,7 @@
(check-read-result '|user port peek| r dest-start dest-end #:peek? #t #:ok-false? progress-evt)
(cond
[(pipe-input-port? r)
(peek-in dest-bstr dest-start dest-end skip-k progress-evt copy?)]
(peek-in self dest-bstr dest-start dest-end skip-k progress-evt copy?)]
[(evt? r)
(wrap-check-read-evt-result '|user port peek| r dest-start dest-end #t progress-evt)]
[(procedure? r)
@ -206,33 +209,33 @@
;; in atomic mode
;; Used only if `user-peek-in` is a function:
(define (byte-ready work-done!)
(define (byte-ready self work-done!)
(cond
[(and input-pipe
(positive? (pipe-content-length input-pipe)))
#t]
[else
(define bstr (make-bytes 1))
(define v (peek-in bstr 0 1 0 #f #f))
(define v (peek-in self bstr 0 1 0 #f #f))
(work-done!)
(cond
[(evt? v) v]
[else (not (eqv? v 0))])]))
;; in atomic mode
(define (close)
(define (close self)
(end-atomic)
(user-close)
(start-atomic))
(define (get-progress-evt)
(define (get-progress-evt self)
(define r (user-get-progress-evt))
(unless (evt? r)
(raise-result-error '|user port progress-evt| "evt?" r))
r)
;; in atomic mode
(define (commit amt evt ext-evt finish)
(define (commit self amt evt ext-evt finish)
(define r
(parameterize-break #f
(non-atomically
@ -248,7 +251,7 @@
(define count-lines!
(and user-count-lines!
(lambda () (end-atomic) (user-count-lines!) (start-atomic))))
(lambda (self) (end-atomic) (user-count-lines!) (start-atomic))))
(define-values (init-offset file-position)
(make-init-offset+file-position user-init-position))
@ -261,6 +264,7 @@
[user-peek-in
(make-core-input-port
#:name name
#:self #f
#:read-in
(if (input-port? user-read-in)
user-read-in
@ -285,6 +289,7 @@
(define-values (port buffer-flusher)
(open-input-peek-via-read
#:name name
#:self #f
#:read-in read-in
#:close close
#:get-location get-location

View File

@ -1,6 +1,7 @@
#lang racket/base
(require "../common/check.rkt"
"../host/thread.rkt"
"port.rkt"
"output-port.rkt"
"custom-port.rkt"
"pipe.rkt")
@ -101,7 +102,7 @@
[else r]))))
;; in atomic mode
(define (write-out bstr start end non-block/buffer? enable-break? copy?)
(define (write-out self bstr start end non-block/buffer? enable-break? copy?)
(cond
[output-pipe
(cond
@ -109,9 +110,10 @@
(= start end)
(not (sync/timeout 0 output-pipe)))
(set! output-pipe #f)
(write-out bstr start end non-block/buffer? enable-break? copy?)]
(write-out self bstr start end non-block/buffer? enable-break? copy?)]
[else
((core-output-port-write-out output-pipe) bstr start end non-block/buffer? enable-break? copy?)])]
(define write-out (core-output-port-write-out output-pipe))
(write-out (core-port-self output-pipe) bstr start end non-block/buffer? enable-break? copy?)])]
[else
(define r
;; Always tell user port to re-enable breaks if it blocks, since
@ -125,12 +127,12 @@
(check-write-result '|user port write| r start end non-block/buffer?)
(cond
[(pipe-output-port? r)
(write-out bstr start end non-block/buffer? enable-break? copy?)]
(write-out self bstr start end non-block/buffer? enable-break? copy?)]
[(evt? r)
(wrap-check-write-evt-result '|user port write| r start end non-block/buffer?)]
[else r])]))
(define (get-write-evt bstr start end)
(define (get-write-evt self bstr start end)
(end-atomic)
(define r (user-get-write-evt bstr start end))
(unless (evt? r)
@ -138,7 +140,7 @@
(start-atomic)
(wrap-check-write-evt-result '|user port write-evt| r start end #t))
(define (write-out-special v non-block/buffer? enable-break?)
(define (write-out-special self v non-block/buffer? enable-break?)
(let ([enable-break? (and (not non-block/buffer?) (break-enabled))])
(parameterize-break #f
(non-atomically
@ -150,7 +152,7 @@
(define count-lines!
(and user-count-lines!
(lambda () (end-atomic) (user-count-lines!) (start-atomic))))
(lambda (self) (end-atomic) (user-count-lines!) (start-atomic))))
(define-values (init-offset file-position)
(make-init-offset+file-position user-init-position))
@ -160,13 +162,14 @@
(make-buffer-mode user-buffer-mode #:output? #t)))
;; in atomic mode
(define (close)
(define (close self)
(end-atomic)
(user-close)
(start-atomic))
(make-core-output-port
#:name name
#:self #f
#:evt evt
#:write-out
(if (output-port? user-write-out)
@ -178,7 +181,9 @@
user-write-out-special
(and user-write-out-special write-out-special))
#:get-write-evt (and user-get-write-evt get-write-evt)
#:get-write-special-evt user-get-write-special-evt
#:get-write-special-evt (and user-get-write-special-evt
(lambda (self v)
(user-get-write-special-evt v)))
#:get-location get-location
#:count-lines! count-lines!
#:init-offset init-offset

View File

@ -18,7 +18,7 @@
;; in atomic mode
(define (make-get-location user-get-location)
(lambda ()
(lambda (self)
(end-atomic)
(call-with-values
(lambda () (user-get-location))
@ -58,7 +58,7 @@
[(input-port? user-init-position) user-init-position]
[(output-port? user-init-position) user-init-position]
[(procedure? user-init-position)
(lambda ()
(lambda (self)
(define pos (user-init-position))
(unless (or (not pos) (exact-positive-integer? pos))
(raise-result-error '|user port init-position| "(or/c exact-positive-integer? #f)" pos))
@ -78,7 +78,7 @@
(define (make-buffer-mode user-buffer-mode #:output? [output? #f])
(case-lambda
[()
[(self)
(end-atomic)
(define m (user-buffer-mode))
(cond
@ -91,6 +91,6 @@
"(or/c 'block 'line 'none #f)"
"(or/c 'block 'none #f)")
m)])]
[(m)
[(self m)
(non-atomically
(user-buffer-mode m))]))

View File

@ -69,9 +69,10 @@
(open-input-peek-via-read
#:name name
#:data (fd-data fd extra-data #t file-stream?)
#:self #f
#:read-in
;; in atomic mode
(lambda (dest-bstr start end copy?)
(lambda (self dest-bstr start end copy?)
(define n (rktio_read_in rktio fd dest-bstr start end))
(cond
[(rktio-error? n)
@ -86,7 +87,7 @@
#:read-is-atomic? #t
#:close
;; in atomic mode
(lambda ()
(lambda (self)
(on-close)
(fd-close fd fd-refcount)
(unsafe-custodian-unregister fd custodian-reference))
@ -178,6 +179,7 @@
(define port
(make-core-output-port
#:name name
#:self #f
#:data (fd-output-data fd extra-data #f file-stream?
;; Flush function needed for `file-truncate`:
(lambda ()
@ -188,7 +190,7 @@
#:write-out
;; in atomic mode
(lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?)
(lambda (self src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?)
(cond
[(= src-start src-end)
;; Flush request
@ -218,12 +220,12 @@
[else n])]))
#:count-write-evt-via-write-out
(lambda (v bstr start)
(lambda (self v bstr start)
(port-count! port v bstr start))
#:close
;; in atomic mode
(lambda ()
(lambda (self)
(flush-buffer-fully #f) ; can temporarily leave atomic mode
(when buffer ; <- in case a concurrent close succeeded
(on-close)
@ -282,7 +284,7 @@
(define (make-file-position fd buffer-control)
;; in atomic mode
(case-lambda
[()
[(self)
(define ppos (rktio_get_file_position rktio fd))
(cond
[(rktio-error? ppos)
@ -292,7 +294,7 @@
(define pos (rktio_filesize_ref ppos))
(rktio_free ppos)
(buffer-control pos)])]
[(pos)
[(self pos)
(buffer-control)
(define r
(rktio_set_file_position rktio

View File

@ -34,10 +34,10 @@
[else (->core-output-port p)])])
(define file-position (core-port-file-position cp))
(cond
[(and (procedure? file-position) (procedure-arity-includes? file-position 1))
[(and (procedure? file-position) (procedure-arity-includes? file-position 2))
(atomically
(check-not-closed who cp)
(file-position pos))]
(file-position (core-port-self cp) pos))]
[else
(raise-arguments-error who
"setting position allowed for file-stream and string ports only"
@ -62,7 +62,7 @@
(do-simple-file-position who file-position fail-k)]
[else
(define pos (or (and file-position
(file-position))
(file-position (core-port-self p)))
(core-port-offset p)))
(end-atomic)
(or pos (fail-k))])))

View File

@ -2,6 +2,7 @@
(require "../common/check.rkt"
"../host/thread.rkt"
"parameter.rkt"
"port.rkt"
"output-port.rkt"
"pipe.rkt")
@ -10,21 +11,20 @@
(define/who (flush-output [p (current-output-port)])
(check who output-port? p)
(let ([write-out
(let wo-loop ([p p])
(let ([write-out (core-output-port-write-out (->core-output-port p))])
(let wo-loop ([p p])
(let ([write-out (core-output-port-write-out (->core-output-port p))])
(cond
[(procedure? write-out)
(let loop ()
(define r (atomically
(write-out (core-port-self p) #"" 0 0 #f #f #f)))
(let r-loop ([r r])
(cond
[(procedure? write-out) write-out]
[else (wo-loop write-out)])))])
(let loop ()
(define r (atomically
(write-out #"" 0 0 #f #f #f)))
(let r-loop ([r r])
(cond
[(eq? r 0) (void)]
[(not r) (loop)]
[(evt? r) (r-loop (sync r))]
[else (error 'flush-output "weird result")])))))
[(eq? r 0) (void)]
[(not r) (loop)]
[(evt? r) (r-loop (sync r))]
[else (error 'flush-output "weird result")])))]
[else (wo-loop write-out)]))))
;; ----------------------------------------

View File

@ -62,7 +62,7 @@
;; the burden of re-checking for a closed port. Leave atomic mode
;; explicitly before raising an exception.
prepare-change ; #f or (-> void)
prepare-change ; #f or (-*> void)
;; Called in atomic mode
;; May leave atomic mode temporarily, but on return,
;; ensures that other atomic operations are ok to
@ -73,14 +73,14 @@
;; atomic mode is left. The `close` operation
;; is *not* guarded by a call to `prepare-change`.
read-byte ; #f or (-> (or/c byte? eof-object? evt?))
read-byte ; #f or (-*> (or/c byte? eof-object? evt?))
;; Called in atomic mode.
;; This shortcut is optional.
;; Non-blocking byte read, where an event must be
;; returned if no byte is available. The event's result
;; is ignored, so it should not consume a byte.
read-in ; port or (bytes start-k end-k copy? -> (or/c integer? ...))
read-in ; port or (bytes start-k end-k copy? -*> (or/c integer? ...))
;; Called in atomic mode.
;; A port value redirects to the port. Otherwise, the function
;; never blocks, and can assume `(- end-k start-k)` is non-zero.
@ -90,21 +90,21 @@
;; documented for `make-input-port`, except that a pipe result
;; is not allowed (or, more precisely, it's treated as an event).
peek-byte ; #f or (-> (or/c byte? eof-object? evt?))
peek-byte ; #f or (-*> (or/c byte? eof-object? evt?))
;; Called in atomic mode.
;; This shortcut is optional.
;; Non-blocking byte read, where an event must be
;; returned if no byte is available. The event's result
;; is ignored.
peek-in ; port or (bytes start-k end-k skip-k progress-evt copy? -> (or/c integer? ...))
peek-in ; port or (bytes start-k end-k skip-k progress-evt copy? -*> (or/c integer? ...))
;; Called in atomic mode.
;; A port value redirects to the port. Otherwise, the function
;; never blocks, and it can assume that `(- end-k start-k)` is non-zero.
;; The `copy?` flag is the same as for `read-in`. The return values
;; are the same as documented for `make-input-port`.
byte-ready ; port or ((->) -> (or/c boolean? evt))
byte-ready ; port or ((->) -*> (or/c boolean? evt))
;; Called in atomic mode.
;; A port value makes sense when `peek-in` has a port value.
;; Otherwise, check whether a peek on one byte would succeed
@ -112,15 +112,15 @@
;; that effectively does the same. The event's value doesn't
;; matter, because it will be wrapped to return some original
;; port. When `byte-ready` is a function, it should call the
;; given funciton (for its side effect) when work has been
;; given function (for its side effect) when work has been
;; done that might unblock this port or some other port.
get-progress-evt ; #f or (-> evt?)
get-progress-evt ; #f or (-*> evt?)
;; *Not* called in atomic mode.
;; Optional support for progress events, and may be
;; called on a closed port.
commit ; (amt-k progress-evt? evt? (bytes? -> any) -> boolean)
commit ; (amt-k progress-evt? evt? (bytes? -> any) -*> boolean)
;; Called in atomic mode.
;; Goes with `get-progress-evt`. The final `evt?`
;; argument is constrained to a few kinds of events;
@ -148,7 +148,8 @@
(poller
(lambda (self poll-ctx)
;; atomic mode
(define v (byte-ready (lambda ()
(define v (byte-ready (core-port-self i)
(lambda ()
(schedule-info-did-work! (poll-ctx-sched-info poll-ctx)))))
(cond
[(evt? v)
@ -160,6 +161,7 @@
(define (make-core-input-port #:name name
#:data [data #f]
#:self self
#:prepare-change [prepare-change #f]
#:read-byte [read-byte #f]
#:read-in read-in
@ -176,6 +178,7 @@
#:buffer-mode [buffer-mode #f])
(core-input-port name
data
self
close
count-lines!
@ -205,7 +208,8 @@
(define empty-input-port
(make-core-input-port #:name 'empty
#:read-in (lambda (bstr start-k end-k copy?) eof)
#:peek-in (lambda (bstr start-k end-k skip-k copy?) eof)
#:byte-ready (lambda (did-work!) #f)
#:self #f
#:read-in (lambda (self bstr start-k end-k copy?) eof)
#:peek-in (lambda (self bstr start-k end-k skip-k copy?) eof)
#:byte-ready (lambda (self did-work!) #f)
#:close void))

View File

@ -4,10 +4,11 @@
(provide open-output-nowhere)
(define (open-output-nowhere)
(make-core-output-port #:name 'nowhere
(make-core-output-port #:name 'nowhere
#:self #f
#:evt always-evt
#:write-out (lambda (bstr start-k end-k no-block/buffer? enable-break? copy?)
#:write-out (lambda (self bstr start-k end-k no-block/buffer? enable-break? copy?)
(- end-k start-k))
#:close void
#:write-out-special (lambda (any no-block/buffer? enable-break?)
#:write-out-special (lambda (self any no-block/buffer? enable-break?)
#t)))

View File

@ -58,7 +58,7 @@
evt ; An evt that is ready when writing a byte won't block
write-out ; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -> ...)
write-out ; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -*> ...)
;; Called in atomic mode.
;; Doesn't block if `no-block/buffer?` is true.
;; Does enable breaks while blocking if `enable-break?` is true.
@ -67,14 +67,14 @@
;; copied if necessary. The return values are the same as
;; documented for `make-output-port`.
write-out-special ; (any no-block/buffer? enable-break? -> boolean?)
write-out-special ; (any no-block/buffer? enable-break? -*> boolean?)
;; Called in atomic mode.
get-write-evt ; (bstr start-k end-k -> evt?)
get-write-evt ; (bstr start-k end-k -*> evt?)
;; Called in atomic mode.
;; The given bstr should not be exposed to untrusted code.
get-write-special-evt ; (-> evt?)
get-write-special-evt ; (-*> evt?)
;; *Not* called in atomic mode.
[write-handler #:mutable]
@ -103,6 +103,7 @@
(define (make-core-output-port #:name name
#:data [data #f]
#:self self
#:evt evt
#:write-out write-out
#:close close
@ -117,6 +118,7 @@
#:buffer-mode [buffer-mode #f])
(core-output-port name
data
self
close
count-lines!
@ -140,16 +142,15 @@
(and count-write-evt-via-write-out
;; If `write-out` is always atomic (in no-block, no-buffer mode),
;; then an event can poll `write-out`:
(lambda (src-bstr src-start src-end)
(lambda (self src-bstr src-start src-end)
(write-evt
;; in atomic mode:
(lambda (self)
(define v (write-out src-bstr src-start src-end #f #f #t))
(lambda (self-evt)
(define v (write-out self src-bstr src-start src-end #f #f #t))
(when (exact-integer? v)
(count-write-evt-via-write-out v src-bstr src-start))
(count-write-evt-via-write-out self v src-bstr src-start))
(if (evt? v)
;; FIXME: should be `(replace-evt v self)`
(values #f self)
(values #f (replace-evt v self-evt))
(values (list v) #f)))))))
get-write-special-evt
@ -159,9 +160,10 @@
(define empty-output-port
(make-core-output-port #:name 'empty
#:self #f
#:evt always-evt
#:write-out (lambda (bstr start end no-buffer? enable-break?)
#:write-out (lambda (self bstr start end no-buffer? enable-break?)
(- end start))
#:write-out-special (lambda (v no-buffer? enable-break?)
#:write-out-special (lambda (self v no-buffer? enable-break?)
#t)
#:close void))

View File

@ -1,5 +1,6 @@
#lang racket/base
(require "../host/thread.rkt"
"port.rkt"
"input-port.rkt"
"output-port.rkt"
"pipe.rkt")
@ -7,6 +8,7 @@
(provide open-input-peek-via-read)
(define (open-input-peek-via-read #:name name
#:self next-self
#:data [data #f]
#:read-in read-in
#:read-is-atomic? [read-is-atomic? #f] ; => can implement progress evts
@ -22,12 +24,12 @@
(define buffer-mode 'block)
;; in atomic mode
(define (prepare-change)
((core-input-port-prepare-change peek-pipe-i)))
(define (prepare-change self)
((core-input-port-prepare-change peek-pipe-i) (core-port-self peek-pipe-i)))
;; in atomic mode
(define (pull-some-bytes [amt (if (eq? 'block buffer-mode) (bytes-length buf) 1)] #:keep-eof? [keep-eof? #t])
(define v (read-in buf 0 amt #f))
(define v (read-in next-self buf 0 amt #f))
(cond
[(eof-object? v)
(when keep-eof?
@ -37,7 +39,8 @@
[(eqv? v 0) 0]
[else
(let loop ([wrote 0])
(define just-wrote ((core-output-port-write-out peek-pipe-o) buf wrote v #t #f #f))
(define write-out (core-output-port-write-out peek-pipe-o))
(define just-wrote (write-out (core-port-self peek-pipe-o) buf wrote v #t #f #f))
(define next-wrote (+ wrote just-wrote))
(unless (= v next-wrote)
(loop next-wrote)))
@ -47,11 +50,12 @@
(and (integer? v) (not (eqv? v 0))))
;; in atomic mode
(define (do-read-in dest-bstr start end copy?)
(define (do-read-in self dest-bstr start end copy?)
(let try-again ()
(cond
[(positive? (pipe-content-length peek-pipe-i))
((core-input-port-read-in peek-pipe-i) dest-bstr start end copy?)]
(define read-in (core-input-port-read-in peek-pipe-i))
(read-in (core-port-self peek-pipe-i) dest-bstr start end copy?)]
[peeked-eof?
(set! peeked-eof? #f)
;; an EOF doesn't count as progress
@ -65,14 +69,14 @@
[(or (eqv? v 0) (evt? v)) v]
[else (try-again)])]
[else
(define v (read-in dest-bstr start end copy?))
(define v (read-in next-self dest-bstr start end copy?))
(unless (eq? v 0)
(progress!))
v])])))
;; in atomic mode
(define (read-byte)
(define b ((core-input-port-read-byte peek-pipe-i)))
(define (read-byte self)
(define b ((core-input-port-read-byte peek-pipe-i) (core-port-self peek-pipe-i)))
(cond
[(or (fixnum? b) (eof-object? b))
b]
@ -83,13 +87,13 @@
[else
(define v (pull-some-bytes #:keep-eof? #f))
(cond
[(retry-pull? v) (read-byte)]
[(retry-pull? v) (read-byte self)]
[else
(progress!)
v])]))
;; in atomic mode
(define (do-peek-in dest-bstr start end skip progress-evt copy?)
(define (do-peek-in self dest-bstr start end skip progress-evt copy?)
(let try-again ()
(define peeked-amt (if peek-pipe-i
(pipe-content-length peek-pipe-i)
@ -100,7 +104,8 @@
#f]
[(and peek-pipe-i
(peeked-amt . > . skip))
((core-input-port-peek-in peek-pipe-i) dest-bstr start end skip progress-evt copy?)]
(define peek-in (core-input-port-peek-in peek-pipe-i))
(peek-in (core-port-self peek-pipe-i) dest-bstr start end skip progress-evt copy?)]
[peeked-eof?
eof]
[else
@ -110,20 +115,20 @@
v)])))
;; in atomic mode
(define (peek-byte)
(define (peek-byte self)
(cond
[(positive? (pipe-content-length peek-pipe-i))
((core-input-port-peek-byte peek-pipe-i))]
((core-input-port-peek-byte peek-pipe-i) (core-port-self peek-pipe-i))]
[peeked-eof?
eof]
[else
(define v (pull-some-bytes))
(if (retry-pull? v)
(peek-byte)
(peek-byte self)
v)]))
;; in atomic mode
(define (do-byte-ready work-done!)
(define (do-byte-ready self work-done!)
(cond
[(positive? (pipe-content-length peek-pipe-i))
#t]
@ -134,7 +139,7 @@
(work-done!)
(cond
[(retry-pull? v)
(do-byte-ready void)]
(do-byte-ready self void)]
[(evt? v) v]
[else
(not (eqv? v 0))])]))
@ -145,25 +150,26 @@
(set! peeked-eof? #f))
;; in atomic mode
(define (get-progress-evt)
((core-input-port-get-progress-evt peek-pipe-i)))
(define (get-progress-evt self)
((core-input-port-get-progress-evt peek-pipe-i) (core-port-self peek-pipe-i)))
;; in atomic mode
(define (progress!)
;; Relies on support for `0 #f #f` arguments in pipe implementation:
((core-input-port-commit peek-pipe-i) 0 #f #f void))
((core-input-port-commit peek-pipe-i) (core-port-self peek-pipe-i) 0 #f #f void))
(define (commit amt evt ext-evt finish)
((core-input-port-commit peek-pipe-i) amt evt ext-evt finish))
(define (commit self amt evt ext-evt finish)
((core-input-port-commit peek-pipe-i) (core-port-self peek-pipe-i) amt evt ext-evt finish))
(define do-buffer-mode
(case-lambda
[() buffer-mode]
[(mode) (set! buffer-mode mode)]))
[(self) buffer-mode]
[(self mode) (set! buffer-mode mode)]))
(values (make-core-input-port
#:name name
#:data data
#:self #f
#:prepare-change prepare-change
@ -177,15 +183,24 @@
get-progress-evt)
#:commit commit
#:close (lambda ()
(close)
#:close (lambda (self)
(close next-self)
(purge-buffer))
#:get-location get-location
#:count-lines! count-lines!
#:get-location (and get-location
(lambda (self) (get-location next-self)))
#:count-lines! (and count-lines!
(lambda (self) (count-lines! next-self)))
#:init-offset init-offset
#:file-position file-position
#:buffer-mode (or alt-buffer-mode do-buffer-mode))
#:file-position (and file-position
(case-lambda
[(self) (file-position next-self)]
[(self pos) (file-position next-self pos)]))
#:buffer-mode (or (and alt-buffer-mode
(case-lambda
[(self) (alt-buffer-mode next-self)]
[(self mode) (alt-buffer-mode next-self mode)]))
do-buffer-mode))
;; in atomic mode:
(case-lambda

View File

@ -37,15 +37,16 @@
[(pipe-input-port? p) (->core-input-port p)]
[(pipe-output-port? p) (->core-output-port p)]
[else
(raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)])))))
(raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)])))
(core-port-self p)))
(define pipe-write-position
(case-lambda
[(p) ((pipe-data-write-position (core-port-data p)))]
[(p pos) ((pipe-data-write-position (core-port-data p)) pos)]))
[(p) ((pipe-data-write-position (core-port-data p)) (core-port-self p))]
[(p pos) ((pipe-data-write-position (core-port-data p)) (core-port-self p) pos)]))
(define (pipe-discard-all p)
((pipe-data-discard-all (core-port-data p))))
((pipe-data-discard-all (core-port-data p)) (core-port-self p)))
(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe])
(check who #:or-false exact-positive-integer? limit)
@ -70,18 +71,18 @@
(define data
(pipe-data
;; get-content-length
(lambda ()
(lambda (self)
(atomically (content-length)))
;; write-position
(case-lambda
[() (or write-pos end)]
[(pos)
[(self) (or write-pos end)]
[(self pos)
;; `pos` must be between `start` and `end`
(if (fx= pos end)
(set! write-pos #f)
(set! write-pos pos))])
;; discard-all
(lambda ()
(lambda (self)
(set! peeked-amt 0)
(set! start 0)
(set! end 0)
@ -153,13 +154,14 @@
(make-core-input-port
#:name input-name
#:data data
#:self #f
#:prepare-change
(lambda ()
(lambda (self)
(pause-waiting-commit))
#:read-byte
(lambda ()
(lambda (self)
(assert-atomic)
(cond
[(input-empty?)
@ -181,7 +183,7 @@
(bytes-ref bstr pos)]))
#:read-in
(lambda (dest-bstr dest-start dest-end copy?)
(lambda (self dest-bstr dest-start dest-end copy?)
(assert-atomic)
(cond
[(input-empty?)
@ -210,7 +212,7 @@
(progress!))]))
#:peek-byte
(lambda ()
(lambda (self)
(assert-atomic)
(cond
[(input-empty?)
@ -222,7 +224,7 @@
(bytes-ref bstr start)]))
#:peek-in
(lambda (dest-bstr dest-start dest-end skip progress-evt copy?)
(lambda (self dest-bstr dest-start dest-end skip progress-evt copy?)
(assert-atomic)
(define content-amt (content-length))
(cond
@ -257,19 +259,19 @@
amt])]))
#:byte-ready
(lambda (work-done!)
(lambda (self work-done!)
(assert-atomic)
(or output-closed?
(not (fx= 0 (content-length)))))
#:close
(lambda ()
(lambda (self)
(unless input-closed?
(set! input-closed? #t)
(progress!)))
#:get-progress-evt
(lambda ()
(lambda (self)
(atomically
(cond
[input-closed? always-evt]
@ -281,7 +283,7 @@
#:commit
;; Allows `amt` to be zero and #f for other arguments,
;; which is helpful for `open-input-peek-via-read`.
(lambda (amt progress-evt ext-evt finish)
(lambda (self amt progress-evt ext-evt finish)
(assert-atomic)
;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt`
;; is constrained; we can send them over to different threads
@ -318,12 +320,13 @@
(make-core-output-port
#:name output-name
#:data data
#:self #f
#:evt write-ready-evt
#:write-out
;; in atomic mode
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(lambda (self src-bstr src-start src-end nonblock? enable-break? copy?)
(assert-atomic)
(let try-again ()
(define top-pos (if (fx= start 0)
@ -413,12 +416,12 @@
(maybe-grow)])))
#:count-write-evt-via-write-out
(lambda (v bstr start)
(lambda (self v bstr start)
(port-count! op v bstr start))
#:close
;; in atomic mode
(lambda ()
(lambda (self)
(unless output-closed?
(set! output-closed? #t)
(when write-ready-sema

View File

@ -10,13 +10,17 @@
(struct core-port (name ; anything, reported as `object-name` for the port
data ; anything, effectively a subtype indicator
close ; -> (void)
;; A "method" or "-*>" gets this value back as its
;; first argument:
self ; anything, passed to every method
close ; -*> (void)
;; Called in atomic mode.
count-lines! ; #f or procedure called in atomic mode
get-location ; #f or procedure called in atomic mode
file-position ; #f, port, or procedure called in atomic mode
buffer-mode ; #f or procedure in atomic mode
count-lines! ; #f or method called in atomic mode
get-location ; #f or method called in atomic mode
file-position ; #f, port, or method called in atomic mode
buffer-mode ; #f or method in atomic mode
closed ; `closed-state`

View File

@ -1,5 +1,6 @@
#lang racket/base
(require "input-port.rkt")
(require "port.rkt"
"input-port.rkt")
(provide prepare-change)
@ -8,4 +9,4 @@
(define (prepare-change in)
(define prepare-change (core-input-port-prepare-change in))
(when prepare-change
(prepare-change)))
(prepare-change (core-port-self in))))

View File

@ -2,6 +2,7 @@
(require "../common/check.rkt"
"../host/thread.rkt"
"parameter.rkt"
"port.rkt"
"input-port.rkt"
"count.rkt"
"check.rkt")
@ -40,7 +41,7 @@
(let ([in (->core-input-port orig-in)])
(define get-progress-evt (core-input-port-get-progress-evt in))
(if get-progress-evt
(progress-evt orig-in (get-progress-evt))
(progress-evt orig-in (get-progress-evt (core-port-self in)))
(raise-arguments-error 'port-progress-evt
"port does not provide progress evts"
"port" orig-in))))
@ -58,7 +59,7 @@
(atomically
;; We specially skip a check on whether the port is closed,
;; since that's handled as the progress evt becoming ready
(commit amt (progress-evt-evt progress-evt) evt
(commit (core-port-self in) amt (progress-evt-evt progress-evt) evt
;; in atomic mode (but maybe leaves atomic mode in between)
(lambda (bstr)
(port-count! in (bytes-length bstr) bstr 0))))))

View File

@ -60,7 +60,7 @@
(define read-in (core-input-port-read-in in))
(cond
[(procedure? read-in)
(define v (read-in bstr start end copy-bstr?))
(define v (read-in (core-port-self in) bstr start end copy-bstr?))
(let result-loop ([v v])
(cond
[(and (integer? v) (not (eq? v 0)))
@ -140,7 +140,7 @@
(define peek-in (core-input-port-peek-in in))
(cond
[(procedure? peek-in)
(define v (peek-in bstr start end skip progress-evt copy-bstr?))
(define v (peek-in (core-port-self in) bstr start end skip progress-evt copy-bstr?))
(end-atomic)
(let result-loop ([v v])
(cond
@ -187,7 +187,7 @@
[(closed-state-closed? (core-port-closed in))
(check-not-closed who in)]
[else
(define b (read-byte))
(define b (read-byte (core-port-self in)))
(cond
[(eof-object? b)
(end-atomic)
@ -218,7 +218,7 @@
(start-atomic)
(prepare-change in)
(check-not-closed who in)
(define b (peek-byte))
(define b (peek-byte (core-port-self in)))
(end-atomic)
(cond
[(evt? b)

View File

@ -21,7 +21,7 @@
(start-atomic)
(prepare-change in)
(check-not-closed who in)
(define r (byte-ready void))
(define r (byte-ready (core-port-self in) void))
(end-atomic)
(eq? #t r)])))
@ -31,7 +31,7 @@
(cond
[(byte-ready? in)
(define peek-byte (core-input-port-peek-byte in))
(define b (and peek-byte (atomically (peek-byte))))
(define b (and peek-byte (atomically (peek-byte (core-port-self in)))))
(cond
[(and b
(or (eof-object? b)

View File

@ -39,7 +39,7 @@
(when progress-evt
(check-progress-evt who progress-evt orig-in))
(let ([in (->core-input-port orig-in)])
(define peek-byte (core-input-port-read-byte in))
(define peek-byte (core-input-port-peek-byte in))
(cond
[peek-byte (do-peek-byte who peek-byte in orig-in)]
[else

View File

@ -1,6 +1,7 @@
#lang racket/base
(require "../common/check.rkt"
"../host/thread.rkt"
"port.rkt"
"output-port.rkt"
"parameter.rkt"
"count.rkt")
@ -30,7 +31,7 @@
[else
(let loop ()
(start-atomic)
(define r (write-out-special v (not retry?) #f))
(define r (write-out-special (core-port-self o) v (not retry?) #f))
(let result-loop ([r r])
(cond
[(not r)
@ -61,4 +62,4 @@
(raise-arguments-error who
"port does not support special-value events"
"port" o))
(get-write-special-evt v)))
(get-write-special-evt (core-port-self o) v)))

View File

@ -3,6 +3,7 @@
"../host/thread.rkt"
"parameter.rkt"
"read-and-peek.rkt"
"port.rkt"
"input-port.rkt"
(submod "bytes-input.rkt" internal)
"../string/utf-8-decode.rkt"
@ -200,7 +201,7 @@
(start-atomic)
(prepare-change in)
(check-not-closed who in)
(define b (read-byte))
(define b (read-byte (core-port-self in)))
(cond
[(fixnum? b)
(port-count-byte! in b)
@ -315,7 +316,7 @@
(let ([in (->core-input-port in)])
(define peek-byte (and (zero? skip-k)
(core-input-port-peek-byte in)))
(define b (and peek-byte (atomically (peek-byte))))
(define b (and peek-byte (atomically (peek-byte (core-port-self in)))))
(cond
[(and b
(or (eof-object? b)

View File

@ -24,7 +24,7 @@
(define write-out (core-output-port-write-out out))
(cond
[(procedure? write-out)
(define v (write-out bstr start end (not buffer-ok?) enable-break? copy-bstr?))
(define v (write-out (core-port-self out) bstr start end (not buffer-ok?) enable-break? copy-bstr?))
(let result-loop ([v v])
(cond
[(not v)

View File

@ -47,9 +47,10 @@
(make-core-output-port
#:name (object-name o)
#:data (lambda () max-length)
#:self o
#:evt o
#:write-out
(lambda (src-bstr src-start src-end nonblock? enable-break? copy?)
(lambda (o src-bstr src-start src-end nonblock? enable-break? copy?)
(cond
[max-length
(define len (- src-end src-start))