diff --git a/racket/src/io/port/buffer-mode.rkt b/racket/src/io/port/buffer-mode.rkt index cf5dc932e2..50be72adfc 100644 --- a/racket/src/io/port/buffer-mode.rkt +++ b/racket/src/io/port/buffer-mode.rkt @@ -20,7 +20,7 @@ (atomically (check-not-closed who p) (and buffer-mode - (buffer-mode))))] + (buffer-mode (core-port-self p)))))] [(p mode) (unless (or (input-port? p) (output-port? p)) (raise-argument-error who "port?" p)) @@ -36,7 +36,7 @@ (define buffer-mode (core-port-buffer-mode p)) (cond [buffer-mode - (buffer-mode mode) + (buffer-mode (core-port-self p) mode) #t] [else #f]))) (cond diff --git a/racket/src/io/port/bytes-output.rkt b/racket/src/io/port/bytes-output.rkt index 021b81bda5..3565a39236 100644 --- a/racket/src/io/port/bytes-output.rkt +++ b/racket/src/io/port/bytes-output.rkt @@ -1,6 +1,7 @@ #lang racket/base (require "../common/check.rkt" "../host/thread.rkt" + "port.rkt" "output-port.rkt" "parameter.rkt" "write.rkt" @@ -81,7 +82,7 @@ (raise-arguments-error who "port does not support output events" "port" out)) - (get-write-evt bstr start-pos end-pos)))) + (get-write-evt (core-port-self out) bstr start-pos end-pos)))) (define/who (port-writes-atomic? out) (check who output-port? out) diff --git a/racket/src/io/port/bytes-port.rkt b/racket/src/io/port/bytes-port.rkt index 8b22d0d268..9ea0976104 100644 --- a/racket/src/io/port/bytes-port.rkt +++ b/racket/src/io/port/bytes-port.rkt @@ -57,13 +57,14 @@ (make-core-input-port #:name name #:data (input-bytes-data) + #:self #f #:prepare-change - (lambda () + (lambda (self) (pause-waiting-commit)) #:read-byte - (lambda () + (lambda (self) (let ([pos i]) (if (pos . < . len) (begin @@ -73,7 +74,7 @@ eof))) #:read-in - (lambda (dest-bstr start end copy?) + (lambda (self dest-bstr start end copy?) (define pos i) (cond [(pos . < . len) @@ -85,14 +86,14 @@ [else eof])) #:peek-byte - (lambda () + (lambda (self) (let ([pos i]) (if (pos . < . len) (bytes-ref bstr pos) eof))) #:peek-in - (lambda (dest-bstr start end skip progress-evt copy?) + (lambda (self dest-bstr start end skip progress-evt copy?) (define pos (+ i skip)) (cond [(and progress-evt (sync/timeout 0 progress-evt)) @@ -104,22 +105,22 @@ [else eof])) #:byte-ready - (lambda (work-done!) + (lambda (self work-done!) (i . < . len)) #:close - (lambda () + (lambda (self) (set! commit-manager #f) ; to indicate closed (progress!)) #:get-progress-evt - (lambda () + (lambda (self) (unless progress-sema (set! progress-sema (make-semaphore))) (semaphore-peek-evt progress-sema)) #:commit - (lambda (amt progress-evt ext-evt finish) + (lambda (self amt progress-evt ext-evt finish) (unless commit-manager (set! commit-manager (make-commit-manager))) (commit-manager-wait @@ -136,8 +137,8 @@ #:file-position (case-lambda - [() (or alt-pos i)] - [(new-pos) + [(self) (or alt-pos i)] + [(self new-pos) (set! i (if (eof-object? new-pos) len (min len new-pos))) @@ -161,16 +162,26 @@ (make-core-output-port #:name name #:data (output-bytes-data i (lambda () (pipe-discard-all i))) + #:self o #:evt o - #:write-out (core-output-port-write-out o) - #:close (core-port-close o) - #:get-write-evt (core-output-port-get-write-evt o) - #:get-location (core-port-get-location o) - #:count-lines! (core-port-count-lines! o) + #:write-out o + #:close + (lambda (o) ((core-port-close o) (core-port-self o))) + #:get-write-evt + (and (core-output-port-get-write-evt o) + (lambda (o bstr start-k end-k) + ((core-output-port-get-write-evt o) (core-port-self o) bstr start-k end-k))) + #:get-location + (and (core-port-get-location o) + (lambda (o) ((core-port-get-location o) (core-port-self o)))) + #:count-lines! + (and (core-port-count-lines! o) + (lambda (o) + ((core-port-count-lines! o) (core-port-self o)))) #:file-position (case-lambda - [() (pipe-write-position o)] - [(new-pos) + [(o) (pipe-write-position o)] + [(o new-pos) (define len (pipe-content-length i)) (cond [(eof-object? new-pos) @@ -185,7 +196,7 @@ "position" new-pos)) (pipe-write-position o len) (define amt (- new-pos len)) - ((core-output-port-write-out o) (make-bytes amt 0) 0 amt #f #f #f) + ((core-output-port-write-out o) (core-port-self o) (make-bytes amt 0) 0 amt #f #f #f) (void)] [else (pipe-write-position o new-pos)])]))) diff --git a/racket/src/io/port/close.rkt b/racket/src/io/port/close.rkt index ea12d1f079..1202512031 100644 --- a/racket/src/io/port/close.rkt +++ b/racket/src/io/port/close.rkt @@ -26,7 +26,7 @@ (define closed (core-port-closed p)) (unless (closed-state-closed? closed) (atomically - ((core-port-close p)) + ((core-port-close p) (core-port-self p)) (set-closed-state! closed)))) ;; in atomic mode diff --git a/racket/src/io/port/count.rkt b/racket/src/io/port/count.rkt index 883f0e1728..690a0a6d37 100644 --- a/racket/src/io/port/count.rkt +++ b/racket/src/io/port/count.rkt @@ -38,7 +38,7 @@ (set-core-port-position! p (add1 (or (core-port-offset p) 0))) (define count-lines! (core-port-count-lines! p)) (when count-lines! - (count-lines!)))))) + (count-lines! (core-port-self p))))))) (define/who (port-counts-lines? p) (core-port-count? @@ -61,7 +61,7 @@ (define get-location (core-port-get-location p)) (cond [get-location - (get-location)] + (get-location (core-port-self p))] [else (values (core-port-line p) (core-port-column p) diff --git a/racket/src/io/port/custom-input-port.rkt b/racket/src/io/port/custom-input-port.rkt index c6fb4da96a..a2bed29191 100644 --- a/racket/src/io/port/custom-input-port.rkt +++ b/racket/src/io/port/custom-input-port.rkt @@ -1,6 +1,7 @@ #lang racket/base (require "../common/check.rkt" "../host/thread.rkt" + "port.rkt" "input-port.rkt" "custom-port.rkt" "pipe.rkt" @@ -54,9 +55,9 @@ (define input-pipe #f) ; `user-read-in` can redirect input - (define (protect-in dest-bstr dest-start dest-end copy? read-in) - ;; We don't trust `read-in` to refrain from modifying its - ;; byte-string argument after it returns, and the `read-in` + (define (protect-in dest-bstr dest-start dest-end copy? user-read-in) + ;; We don't trust `user-read-in` to refrain from modifying its + ;; byte-string argument after it returns, and the `user-read-in` ;; interface doesn't deal with start and end positions, so copy` ;; dest-bstr` if needed (define len (- dest-end dest-start)) @@ -66,7 +67,7 @@ (not (= len dest-end))) (make-bytes len) dest-bstr)) - (define n (read-in user-bstr)) + (define n (user-read-in user-bstr)) (cond [(eq? user-bstr dest-bstr) n] @@ -153,15 +154,16 @@ four-args])) ;; in atomic mode - (define (read-in dest-bstr dest-start dest-end copy?) + (define (read-in self dest-bstr dest-start dest-end copy?) (cond [input-pipe (cond [(zero? (pipe-content-length input-pipe)) (set! input-pipe #f) - (read-in dest-bstr dest-start dest-end copy?)] + (read-in self dest-bstr dest-start dest-end copy?)] [else - ((core-input-port-read-in input-pipe) dest-bstr dest-start dest-end copy?)])] + (define read-in (core-input-port-read-in input-pipe)) + (read-in (core-port-self input-pipe) dest-bstr dest-start dest-end copy?)])] [else (define r (parameterize-break #f @@ -170,7 +172,7 @@ (check-read-result '|user port read| r dest-start dest-end) (cond [(pipe-input-port? r) - (read-in dest-bstr dest-start dest-end copy?)] + (read-in self dest-bstr dest-start dest-end copy?)] [(evt? r) (wrap-check-read-evt-result '|user port read| r dest-start dest-end #f #f)] [(procedure? r) @@ -179,15 +181,16 @@ ;; in atomic mode ;; Used only if `user-peek-in` is a function: - (define (peek-in dest-bstr dest-start dest-end skip-k progress-evt copy?) + (define (peek-in self dest-bstr dest-start dest-end skip-k progress-evt copy?) (cond [input-pipe (cond [((pipe-content-length input-pipe) . <= . skip-k) (set! input-pipe #f) - (peek-in dest-bstr dest-start dest-end skip-k progress-evt copy?)] + (peek-in self dest-bstr dest-start dest-end skip-k progress-evt copy?)] [else - ((core-input-port-peek-in input-pipe) dest-bstr dest-start dest-end skip-k progress-evt copy?)])] + (define peek-in (core-input-port-peek-in input-pipe)) + (peek-in (core-port-self input-pipe) dest-bstr dest-start dest-end skip-k progress-evt copy?)])] [else (define r (parameterize-break #f @@ -197,7 +200,7 @@ (check-read-result '|user port peek| r dest-start dest-end #:peek? #t #:ok-false? progress-evt) (cond [(pipe-input-port? r) - (peek-in dest-bstr dest-start dest-end skip-k progress-evt copy?)] + (peek-in self dest-bstr dest-start dest-end skip-k progress-evt copy?)] [(evt? r) (wrap-check-read-evt-result '|user port peek| r dest-start dest-end #t progress-evt)] [(procedure? r) @@ -206,33 +209,33 @@ ;; in atomic mode ;; Used only if `user-peek-in` is a function: - (define (byte-ready work-done!) + (define (byte-ready self work-done!) (cond [(and input-pipe (positive? (pipe-content-length input-pipe))) #t] [else (define bstr (make-bytes 1)) - (define v (peek-in bstr 0 1 0 #f #f)) + (define v (peek-in self bstr 0 1 0 #f #f)) (work-done!) (cond [(evt? v) v] [else (not (eqv? v 0))])])) ;; in atomic mode - (define (close) + (define (close self) (end-atomic) (user-close) (start-atomic)) - (define (get-progress-evt) + (define (get-progress-evt self) (define r (user-get-progress-evt)) (unless (evt? r) (raise-result-error '|user port progress-evt| "evt?" r)) r) ;; in atomic mode - (define (commit amt evt ext-evt finish) + (define (commit self amt evt ext-evt finish) (define r (parameterize-break #f (non-atomically @@ -248,7 +251,7 @@ (define count-lines! (and user-count-lines! - (lambda () (end-atomic) (user-count-lines!) (start-atomic)))) + (lambda (self) (end-atomic) (user-count-lines!) (start-atomic)))) (define-values (init-offset file-position) (make-init-offset+file-position user-init-position)) @@ -261,6 +264,7 @@ [user-peek-in (make-core-input-port #:name name + #:self #f #:read-in (if (input-port? user-read-in) user-read-in @@ -285,6 +289,7 @@ (define-values (port buffer-flusher) (open-input-peek-via-read #:name name + #:self #f #:read-in read-in #:close close #:get-location get-location diff --git a/racket/src/io/port/custom-output-port.rkt b/racket/src/io/port/custom-output-port.rkt index f6fb6000bb..03ad7eb918 100644 --- a/racket/src/io/port/custom-output-port.rkt +++ b/racket/src/io/port/custom-output-port.rkt @@ -1,6 +1,7 @@ #lang racket/base (require "../common/check.rkt" "../host/thread.rkt" + "port.rkt" "output-port.rkt" "custom-port.rkt" "pipe.rkt") @@ -101,7 +102,7 @@ [else r])))) ;; in atomic mode - (define (write-out bstr start end non-block/buffer? enable-break? copy?) + (define (write-out self bstr start end non-block/buffer? enable-break? copy?) (cond [output-pipe (cond @@ -109,9 +110,10 @@ (= start end) (not (sync/timeout 0 output-pipe))) (set! output-pipe #f) - (write-out bstr start end non-block/buffer? enable-break? copy?)] + (write-out self bstr start end non-block/buffer? enable-break? copy?)] [else - ((core-output-port-write-out output-pipe) bstr start end non-block/buffer? enable-break? copy?)])] + (define write-out (core-output-port-write-out output-pipe)) + (write-out (core-port-self output-pipe) bstr start end non-block/buffer? enable-break? copy?)])] [else (define r ;; Always tell user port to re-enable breaks if it blocks, since @@ -125,12 +127,12 @@ (check-write-result '|user port write| r start end non-block/buffer?) (cond [(pipe-output-port? r) - (write-out bstr start end non-block/buffer? enable-break? copy?)] + (write-out self bstr start end non-block/buffer? enable-break? copy?)] [(evt? r) (wrap-check-write-evt-result '|user port write| r start end non-block/buffer?)] [else r])])) - (define (get-write-evt bstr start end) + (define (get-write-evt self bstr start end) (end-atomic) (define r (user-get-write-evt bstr start end)) (unless (evt? r) @@ -138,7 +140,7 @@ (start-atomic) (wrap-check-write-evt-result '|user port write-evt| r start end #t)) - (define (write-out-special v non-block/buffer? enable-break?) + (define (write-out-special self v non-block/buffer? enable-break?) (let ([enable-break? (and (not non-block/buffer?) (break-enabled))]) (parameterize-break #f (non-atomically @@ -150,7 +152,7 @@ (define count-lines! (and user-count-lines! - (lambda () (end-atomic) (user-count-lines!) (start-atomic)))) + (lambda (self) (end-atomic) (user-count-lines!) (start-atomic)))) (define-values (init-offset file-position) (make-init-offset+file-position user-init-position)) @@ -160,13 +162,14 @@ (make-buffer-mode user-buffer-mode #:output? #t))) ;; in atomic mode - (define (close) + (define (close self) (end-atomic) (user-close) (start-atomic)) (make-core-output-port #:name name + #:self #f #:evt evt #:write-out (if (output-port? user-write-out) @@ -178,7 +181,9 @@ user-write-out-special (and user-write-out-special write-out-special)) #:get-write-evt (and user-get-write-evt get-write-evt) - #:get-write-special-evt user-get-write-special-evt + #:get-write-special-evt (and user-get-write-special-evt + (lambda (self v) + (user-get-write-special-evt v))) #:get-location get-location #:count-lines! count-lines! #:init-offset init-offset diff --git a/racket/src/io/port/custom-port.rkt b/racket/src/io/port/custom-port.rkt index 78f91ed16c..2d06b4868c 100644 --- a/racket/src/io/port/custom-port.rkt +++ b/racket/src/io/port/custom-port.rkt @@ -18,7 +18,7 @@ ;; in atomic mode (define (make-get-location user-get-location) - (lambda () + (lambda (self) (end-atomic) (call-with-values (lambda () (user-get-location)) @@ -58,7 +58,7 @@ [(input-port? user-init-position) user-init-position] [(output-port? user-init-position) user-init-position] [(procedure? user-init-position) - (lambda () + (lambda (self) (define pos (user-init-position)) (unless (or (not pos) (exact-positive-integer? pos)) (raise-result-error '|user port init-position| "(or/c exact-positive-integer? #f)" pos)) @@ -78,7 +78,7 @@ (define (make-buffer-mode user-buffer-mode #:output? [output? #f]) (case-lambda - [() + [(self) (end-atomic) (define m (user-buffer-mode)) (cond @@ -91,6 +91,6 @@ "(or/c 'block 'line 'none #f)" "(or/c 'block 'none #f)") m)])] - [(m) + [(self m) (non-atomically (user-buffer-mode m))])) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 05418038fd..749e8cda0b 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -69,9 +69,10 @@ (open-input-peek-via-read #:name name #:data (fd-data fd extra-data #t file-stream?) + #:self #f #:read-in ;; in atomic mode - (lambda (dest-bstr start end copy?) + (lambda (self dest-bstr start end copy?) (define n (rktio_read_in rktio fd dest-bstr start end)) (cond [(rktio-error? n) @@ -86,7 +87,7 @@ #:read-is-atomic? #t #:close ;; in atomic mode - (lambda () + (lambda (self) (on-close) (fd-close fd fd-refcount) (unsafe-custodian-unregister fd custodian-reference)) @@ -178,6 +179,7 @@ (define port (make-core-output-port #:name name + #:self #f #:data (fd-output-data fd extra-data #f file-stream? ;; Flush function needed for `file-truncate`: (lambda () @@ -188,7 +190,7 @@ #:write-out ;; in atomic mode - (lambda (src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?) + (lambda (self src-bstr src-start src-end nonbuffer/nonblock? enable-break? copy?) (cond [(= src-start src-end) ;; Flush request @@ -218,12 +220,12 @@ [else n])])) #:count-write-evt-via-write-out - (lambda (v bstr start) + (lambda (self v bstr start) (port-count! port v bstr start)) #:close ;; in atomic mode - (lambda () + (lambda (self) (flush-buffer-fully #f) ; can temporarily leave atomic mode (when buffer ; <- in case a concurrent close succeeded (on-close) @@ -282,7 +284,7 @@ (define (make-file-position fd buffer-control) ;; in atomic mode (case-lambda - [() + [(self) (define ppos (rktio_get_file_position rktio fd)) (cond [(rktio-error? ppos) @@ -292,7 +294,7 @@ (define pos (rktio_filesize_ref ppos)) (rktio_free ppos) (buffer-control pos)])] - [(pos) + [(self pos) (buffer-control) (define r (rktio_set_file_position rktio diff --git a/racket/src/io/port/file-position.rkt b/racket/src/io/port/file-position.rkt index 488caabcda..a1f3b60d58 100644 --- a/racket/src/io/port/file-position.rkt +++ b/racket/src/io/port/file-position.rkt @@ -34,10 +34,10 @@ [else (->core-output-port p)])]) (define file-position (core-port-file-position cp)) (cond - [(and (procedure? file-position) (procedure-arity-includes? file-position 1)) + [(and (procedure? file-position) (procedure-arity-includes? file-position 2)) (atomically (check-not-closed who cp) - (file-position pos))] + (file-position (core-port-self cp) pos))] [else (raise-arguments-error who "setting position allowed for file-stream and string ports only" @@ -62,7 +62,7 @@ (do-simple-file-position who file-position fail-k)] [else (define pos (or (and file-position - (file-position)) + (file-position (core-port-self p))) (core-port-offset p))) (end-atomic) (or pos (fail-k))]))) diff --git a/racket/src/io/port/flush-output.rkt b/racket/src/io/port/flush-output.rkt index 36f27eb540..4173b0eb62 100644 --- a/racket/src/io/port/flush-output.rkt +++ b/racket/src/io/port/flush-output.rkt @@ -2,6 +2,7 @@ (require "../common/check.rkt" "../host/thread.rkt" "parameter.rkt" + "port.rkt" "output-port.rkt" "pipe.rkt") @@ -10,21 +11,20 @@ (define/who (flush-output [p (current-output-port)]) (check who output-port? p) - (let ([write-out - (let wo-loop ([p p]) - (let ([write-out (core-output-port-write-out (->core-output-port p))]) + (let wo-loop ([p p]) + (let ([write-out (core-output-port-write-out (->core-output-port p))]) + (cond + [(procedure? write-out) + (let loop () + (define r (atomically + (write-out (core-port-self p) #"" 0 0 #f #f #f))) + (let r-loop ([r r]) (cond - [(procedure? write-out) write-out] - [else (wo-loop write-out)])))]) - (let loop () - (define r (atomically - (write-out #"" 0 0 #f #f #f))) - (let r-loop ([r r]) - (cond - [(eq? r 0) (void)] - [(not r) (loop)] - [(evt? r) (r-loop (sync r))] - [else (error 'flush-output "weird result")]))))) + [(eq? r 0) (void)] + [(not r) (loop)] + [(evt? r) (r-loop (sync r))] + [else (error 'flush-output "weird result")])))] + [else (wo-loop write-out)])))) ;; ---------------------------------------- diff --git a/racket/src/io/port/input-port.rkt b/racket/src/io/port/input-port.rkt index f3fac3c0bc..5682bb9bd5 100644 --- a/racket/src/io/port/input-port.rkt +++ b/racket/src/io/port/input-port.rkt @@ -62,7 +62,7 @@ ;; the burden of re-checking for a closed port. Leave atomic mode ;; explicitly before raising an exception. - prepare-change ; #f or (-> void) + prepare-change ; #f or (-*> void) ;; Called in atomic mode ;; May leave atomic mode temporarily, but on return, ;; ensures that other atomic operations are ok to @@ -73,14 +73,14 @@ ;; atomic mode is left. The `close` operation ;; is *not* guarded by a call to `prepare-change`. - read-byte ; #f or (-> (or/c byte? eof-object? evt?)) + read-byte ; #f or (-*> (or/c byte? eof-object? evt?)) ;; Called in atomic mode. ;; This shortcut is optional. ;; Non-blocking byte read, where an event must be ;; returned if no byte is available. The event's result ;; is ignored, so it should not consume a byte. - read-in ; port or (bytes start-k end-k copy? -> (or/c integer? ...)) + read-in ; port or (bytes start-k end-k copy? -*> (or/c integer? ...)) ;; Called in atomic mode. ;; A port value redirects to the port. Otherwise, the function ;; never blocks, and can assume `(- end-k start-k)` is non-zero. @@ -90,21 +90,21 @@ ;; documented for `make-input-port`, except that a pipe result ;; is not allowed (or, more precisely, it's treated as an event). - peek-byte ; #f or (-> (or/c byte? eof-object? evt?)) + peek-byte ; #f or (-*> (or/c byte? eof-object? evt?)) ;; Called in atomic mode. ;; This shortcut is optional. ;; Non-blocking byte read, where an event must be ;; returned if no byte is available. The event's result ;; is ignored. - peek-in ; port or (bytes start-k end-k skip-k progress-evt copy? -> (or/c integer? ...)) + peek-in ; port or (bytes start-k end-k skip-k progress-evt copy? -*> (or/c integer? ...)) ;; Called in atomic mode. ;; A port value redirects to the port. Otherwise, the function ;; never blocks, and it can assume that `(- end-k start-k)` is non-zero. ;; The `copy?` flag is the same as for `read-in`. The return values ;; are the same as documented for `make-input-port`. - byte-ready ; port or ((->) -> (or/c boolean? evt)) + byte-ready ; port or ((->) -*> (or/c boolean? evt)) ;; Called in atomic mode. ;; A port value makes sense when `peek-in` has a port value. ;; Otherwise, check whether a peek on one byte would succeed @@ -112,15 +112,15 @@ ;; that effectively does the same. The event's value doesn't ;; matter, because it will be wrapped to return some original ;; port. When `byte-ready` is a function, it should call the - ;; given funciton (for its side effect) when work has been + ;; given function (for its side effect) when work has been ;; done that might unblock this port or some other port. - get-progress-evt ; #f or (-> evt?) + get-progress-evt ; #f or (-*> evt?) ;; *Not* called in atomic mode. ;; Optional support for progress events, and may be ;; called on a closed port. - commit ; (amt-k progress-evt? evt? (bytes? -> any) -> boolean) + commit ; (amt-k progress-evt? evt? (bytes? -> any) -*> boolean) ;; Called in atomic mode. ;; Goes with `get-progress-evt`. The final `evt?` ;; argument is constrained to a few kinds of events; @@ -148,7 +148,8 @@ (poller (lambda (self poll-ctx) ;; atomic mode - (define v (byte-ready (lambda () + (define v (byte-ready (core-port-self i) + (lambda () (schedule-info-did-work! (poll-ctx-sched-info poll-ctx))))) (cond [(evt? v) @@ -160,6 +161,7 @@ (define (make-core-input-port #:name name #:data [data #f] + #:self self #:prepare-change [prepare-change #f] #:read-byte [read-byte #f] #:read-in read-in @@ -176,6 +178,7 @@ #:buffer-mode [buffer-mode #f]) (core-input-port name data + self close count-lines! @@ -205,7 +208,8 @@ (define empty-input-port (make-core-input-port #:name 'empty - #:read-in (lambda (bstr start-k end-k copy?) eof) - #:peek-in (lambda (bstr start-k end-k skip-k copy?) eof) - #:byte-ready (lambda (did-work!) #f) + #:self #f + #:read-in (lambda (self bstr start-k end-k copy?) eof) + #:peek-in (lambda (self bstr start-k end-k skip-k copy?) eof) + #:byte-ready (lambda (self did-work!) #f) #:close void)) diff --git a/racket/src/io/port/nowhere.rkt b/racket/src/io/port/nowhere.rkt index 5e13ff2a91..f149a7a408 100644 --- a/racket/src/io/port/nowhere.rkt +++ b/racket/src/io/port/nowhere.rkt @@ -4,10 +4,11 @@ (provide open-output-nowhere) (define (open-output-nowhere) - (make-core-output-port #:name 'nowhere + (make-core-output-port #:name 'nowhere + #:self #f #:evt always-evt - #:write-out (lambda (bstr start-k end-k no-block/buffer? enable-break? copy?) + #:write-out (lambda (self bstr start-k end-k no-block/buffer? enable-break? copy?) (- end-k start-k)) #:close void - #:write-out-special (lambda (any no-block/buffer? enable-break?) + #:write-out-special (lambda (self any no-block/buffer? enable-break?) #t))) diff --git a/racket/src/io/port/output-port.rkt b/racket/src/io/port/output-port.rkt index e827b6a377..d945374c65 100644 --- a/racket/src/io/port/output-port.rkt +++ b/racket/src/io/port/output-port.rkt @@ -58,7 +58,7 @@ evt ; An evt that is ready when writing a byte won't block - write-out ; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -> ...) + write-out ; port or (bstr start-k end-k no-block/buffer? enable-break? copy? -*> ...) ;; Called in atomic mode. ;; Doesn't block if `no-block/buffer?` is true. ;; Does enable breaks while blocking if `enable-break?` is true. @@ -67,14 +67,14 @@ ;; copied if necessary. The return values are the same as ;; documented for `make-output-port`. - write-out-special ; (any no-block/buffer? enable-break? -> boolean?) + write-out-special ; (any no-block/buffer? enable-break? -*> boolean?) ;; Called in atomic mode. - get-write-evt ; (bstr start-k end-k -> evt?) + get-write-evt ; (bstr start-k end-k -*> evt?) ;; Called in atomic mode. ;; The given bstr should not be exposed to untrusted code. - get-write-special-evt ; (-> evt?) + get-write-special-evt ; (-*> evt?) ;; *Not* called in atomic mode. [write-handler #:mutable] @@ -103,6 +103,7 @@ (define (make-core-output-port #:name name #:data [data #f] + #:self self #:evt evt #:write-out write-out #:close close @@ -117,6 +118,7 @@ #:buffer-mode [buffer-mode #f]) (core-output-port name data + self close count-lines! @@ -140,16 +142,15 @@ (and count-write-evt-via-write-out ;; If `write-out` is always atomic (in no-block, no-buffer mode), ;; then an event can poll `write-out`: - (lambda (src-bstr src-start src-end) + (lambda (self src-bstr src-start src-end) (write-evt ;; in atomic mode: - (lambda (self) - (define v (write-out src-bstr src-start src-end #f #f #t)) + (lambda (self-evt) + (define v (write-out self src-bstr src-start src-end #f #f #t)) (when (exact-integer? v) - (count-write-evt-via-write-out v src-bstr src-start)) + (count-write-evt-via-write-out self v src-bstr src-start)) (if (evt? v) - ;; FIXME: should be `(replace-evt v self)` - (values #f self) + (values #f (replace-evt v self-evt)) (values (list v) #f))))))) get-write-special-evt @@ -159,9 +160,10 @@ (define empty-output-port (make-core-output-port #:name 'empty + #:self #f #:evt always-evt - #:write-out (lambda (bstr start end no-buffer? enable-break?) + #:write-out (lambda (self bstr start end no-buffer? enable-break?) (- end start)) - #:write-out-special (lambda (v no-buffer? enable-break?) + #:write-out-special (lambda (self v no-buffer? enable-break?) #t) #:close void)) diff --git a/racket/src/io/port/peek-via-read-port.rkt b/racket/src/io/port/peek-via-read-port.rkt index 39ab3cf051..20b1431ddf 100644 --- a/racket/src/io/port/peek-via-read-port.rkt +++ b/racket/src/io/port/peek-via-read-port.rkt @@ -1,5 +1,6 @@ #lang racket/base (require "../host/thread.rkt" + "port.rkt" "input-port.rkt" "output-port.rkt" "pipe.rkt") @@ -7,6 +8,7 @@ (provide open-input-peek-via-read) (define (open-input-peek-via-read #:name name + #:self next-self #:data [data #f] #:read-in read-in #:read-is-atomic? [read-is-atomic? #f] ; => can implement progress evts @@ -22,12 +24,12 @@ (define buffer-mode 'block) ;; in atomic mode - (define (prepare-change) - ((core-input-port-prepare-change peek-pipe-i))) + (define (prepare-change self) + ((core-input-port-prepare-change peek-pipe-i) (core-port-self peek-pipe-i))) ;; in atomic mode (define (pull-some-bytes [amt (if (eq? 'block buffer-mode) (bytes-length buf) 1)] #:keep-eof? [keep-eof? #t]) - (define v (read-in buf 0 amt #f)) + (define v (read-in next-self buf 0 amt #f)) (cond [(eof-object? v) (when keep-eof? @@ -37,7 +39,8 @@ [(eqv? v 0) 0] [else (let loop ([wrote 0]) - (define just-wrote ((core-output-port-write-out peek-pipe-o) buf wrote v #t #f #f)) + (define write-out (core-output-port-write-out peek-pipe-o)) + (define just-wrote (write-out (core-port-self peek-pipe-o) buf wrote v #t #f #f)) (define next-wrote (+ wrote just-wrote)) (unless (= v next-wrote) (loop next-wrote))) @@ -47,11 +50,12 @@ (and (integer? v) (not (eqv? v 0)))) ;; in atomic mode - (define (do-read-in dest-bstr start end copy?) + (define (do-read-in self dest-bstr start end copy?) (let try-again () (cond [(positive? (pipe-content-length peek-pipe-i)) - ((core-input-port-read-in peek-pipe-i) dest-bstr start end copy?)] + (define read-in (core-input-port-read-in peek-pipe-i)) + (read-in (core-port-self peek-pipe-i) dest-bstr start end copy?)] [peeked-eof? (set! peeked-eof? #f) ;; an EOF doesn't count as progress @@ -65,14 +69,14 @@ [(or (eqv? v 0) (evt? v)) v] [else (try-again)])] [else - (define v (read-in dest-bstr start end copy?)) + (define v (read-in next-self dest-bstr start end copy?)) (unless (eq? v 0) (progress!)) v])]))) ;; in atomic mode - (define (read-byte) - (define b ((core-input-port-read-byte peek-pipe-i))) + (define (read-byte self) + (define b ((core-input-port-read-byte peek-pipe-i) (core-port-self peek-pipe-i))) (cond [(or (fixnum? b) (eof-object? b)) b] @@ -83,13 +87,13 @@ [else (define v (pull-some-bytes #:keep-eof? #f)) (cond - [(retry-pull? v) (read-byte)] + [(retry-pull? v) (read-byte self)] [else (progress!) v])])) ;; in atomic mode - (define (do-peek-in dest-bstr start end skip progress-evt copy?) + (define (do-peek-in self dest-bstr start end skip progress-evt copy?) (let try-again () (define peeked-amt (if peek-pipe-i (pipe-content-length peek-pipe-i) @@ -100,7 +104,8 @@ #f] [(and peek-pipe-i (peeked-amt . > . skip)) - ((core-input-port-peek-in peek-pipe-i) dest-bstr start end skip progress-evt copy?)] + (define peek-in (core-input-port-peek-in peek-pipe-i)) + (peek-in (core-port-self peek-pipe-i) dest-bstr start end skip progress-evt copy?)] [peeked-eof? eof] [else @@ -110,20 +115,20 @@ v)]))) ;; in atomic mode - (define (peek-byte) + (define (peek-byte self) (cond [(positive? (pipe-content-length peek-pipe-i)) - ((core-input-port-peek-byte peek-pipe-i))] + ((core-input-port-peek-byte peek-pipe-i) (core-port-self peek-pipe-i))] [peeked-eof? eof] [else (define v (pull-some-bytes)) (if (retry-pull? v) - (peek-byte) + (peek-byte self) v)])) ;; in atomic mode - (define (do-byte-ready work-done!) + (define (do-byte-ready self work-done!) (cond [(positive? (pipe-content-length peek-pipe-i)) #t] @@ -134,7 +139,7 @@ (work-done!) (cond [(retry-pull? v) - (do-byte-ready void)] + (do-byte-ready self void)] [(evt? v) v] [else (not (eqv? v 0))])])) @@ -145,25 +150,26 @@ (set! peeked-eof? #f)) ;; in atomic mode - (define (get-progress-evt) - ((core-input-port-get-progress-evt peek-pipe-i))) + (define (get-progress-evt self) + ((core-input-port-get-progress-evt peek-pipe-i) (core-port-self peek-pipe-i))) ;; in atomic mode (define (progress!) ;; Relies on support for `0 #f #f` arguments in pipe implementation: - ((core-input-port-commit peek-pipe-i) 0 #f #f void)) + ((core-input-port-commit peek-pipe-i) (core-port-self peek-pipe-i) 0 #f #f void)) - (define (commit amt evt ext-evt finish) - ((core-input-port-commit peek-pipe-i) amt evt ext-evt finish)) + (define (commit self amt evt ext-evt finish) + ((core-input-port-commit peek-pipe-i) (core-port-self peek-pipe-i) amt evt ext-evt finish)) (define do-buffer-mode (case-lambda - [() buffer-mode] - [(mode) (set! buffer-mode mode)])) + [(self) buffer-mode] + [(self mode) (set! buffer-mode mode)])) (values (make-core-input-port #:name name #:data data + #:self #f #:prepare-change prepare-change @@ -177,15 +183,24 @@ get-progress-evt) #:commit commit - #:close (lambda () - (close) + #:close (lambda (self) + (close next-self) (purge-buffer)) - #:get-location get-location - #:count-lines! count-lines! + #:get-location (and get-location + (lambda (self) (get-location next-self))) + #:count-lines! (and count-lines! + (lambda (self) (count-lines! next-self))) #:init-offset init-offset - #:file-position file-position - #:buffer-mode (or alt-buffer-mode do-buffer-mode)) + #:file-position (and file-position + (case-lambda + [(self) (file-position next-self)] + [(self pos) (file-position next-self pos)])) + #:buffer-mode (or (and alt-buffer-mode + (case-lambda + [(self) (alt-buffer-mode next-self)] + [(self mode) (alt-buffer-mode next-self mode)])) + do-buffer-mode)) ;; in atomic mode: (case-lambda diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index 437dcae74b..d1f50b30d2 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -37,15 +37,16 @@ [(pipe-input-port? p) (->core-input-port p)] [(pipe-output-port? p) (->core-output-port p)] [else - (raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)]))))) + (raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)]))) + (core-port-self p))) (define pipe-write-position (case-lambda - [(p) ((pipe-data-write-position (core-port-data p)))] - [(p pos) ((pipe-data-write-position (core-port-data p)) pos)])) + [(p) ((pipe-data-write-position (core-port-data p)) (core-port-self p))] + [(p pos) ((pipe-data-write-position (core-port-data p)) (core-port-self p) pos)])) (define (pipe-discard-all p) - ((pipe-data-discard-all (core-port-data p)))) + ((pipe-data-discard-all (core-port-data p)) (core-port-self p))) (define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe]) (check who #:or-false exact-positive-integer? limit) @@ -70,18 +71,18 @@ (define data (pipe-data ;; get-content-length - (lambda () + (lambda (self) (atomically (content-length))) ;; write-position (case-lambda - [() (or write-pos end)] - [(pos) + [(self) (or write-pos end)] + [(self pos) ;; `pos` must be between `start` and `end` (if (fx= pos end) (set! write-pos #f) (set! write-pos pos))]) ;; discard-all - (lambda () + (lambda (self) (set! peeked-amt 0) (set! start 0) (set! end 0) @@ -153,13 +154,14 @@ (make-core-input-port #:name input-name #:data data + #:self #f #:prepare-change - (lambda () + (lambda (self) (pause-waiting-commit)) #:read-byte - (lambda () + (lambda (self) (assert-atomic) (cond [(input-empty?) @@ -181,7 +183,7 @@ (bytes-ref bstr pos)])) #:read-in - (lambda (dest-bstr dest-start dest-end copy?) + (lambda (self dest-bstr dest-start dest-end copy?) (assert-atomic) (cond [(input-empty?) @@ -210,7 +212,7 @@ (progress!))])) #:peek-byte - (lambda () + (lambda (self) (assert-atomic) (cond [(input-empty?) @@ -222,7 +224,7 @@ (bytes-ref bstr start)])) #:peek-in - (lambda (dest-bstr dest-start dest-end skip progress-evt copy?) + (lambda (self dest-bstr dest-start dest-end skip progress-evt copy?) (assert-atomic) (define content-amt (content-length)) (cond @@ -257,19 +259,19 @@ amt])])) #:byte-ready - (lambda (work-done!) + (lambda (self work-done!) (assert-atomic) (or output-closed? (not (fx= 0 (content-length))))) #:close - (lambda () + (lambda (self) (unless input-closed? (set! input-closed? #t) (progress!))) #:get-progress-evt - (lambda () + (lambda (self) (atomically (cond [input-closed? always-evt] @@ -281,7 +283,7 @@ #:commit ;; Allows `amt` to be zero and #f for other arguments, ;; which is helpful for `open-input-peek-via-read`. - (lambda (amt progress-evt ext-evt finish) + (lambda (self amt progress-evt ext-evt finish) (assert-atomic) ;; `progress-evt` is a `semepahore-peek-evt`, and `ext-evt` ;; is constrained; we can send them over to different threads @@ -318,12 +320,13 @@ (make-core-output-port #:name output-name #:data data + #:self #f #:evt write-ready-evt #:write-out ;; in atomic mode - (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (lambda (self src-bstr src-start src-end nonblock? enable-break? copy?) (assert-atomic) (let try-again () (define top-pos (if (fx= start 0) @@ -413,12 +416,12 @@ (maybe-grow)]))) #:count-write-evt-via-write-out - (lambda (v bstr start) + (lambda (self v bstr start) (port-count! op v bstr start)) #:close ;; in atomic mode - (lambda () + (lambda (self) (unless output-closed? (set! output-closed? #t) (when write-ready-sema diff --git a/racket/src/io/port/port.rkt b/racket/src/io/port/port.rkt index 28076995d8..9eb7b8c6d2 100644 --- a/racket/src/io/port/port.rkt +++ b/racket/src/io/port/port.rkt @@ -10,13 +10,17 @@ (struct core-port (name ; anything, reported as `object-name` for the port data ; anything, effectively a subtype indicator - close ; -> (void) + ;; A "method" or "-*>" gets this value back as its + ;; first argument: + self ; anything, passed to every method + + close ; -*> (void) ;; Called in atomic mode. - count-lines! ; #f or procedure called in atomic mode - get-location ; #f or procedure called in atomic mode - file-position ; #f, port, or procedure called in atomic mode - buffer-mode ; #f or procedure in atomic mode + count-lines! ; #f or method called in atomic mode + get-location ; #f or method called in atomic mode + file-position ; #f, port, or method called in atomic mode + buffer-mode ; #f or method in atomic mode closed ; `closed-state` diff --git a/racket/src/io/port/prepare-change.rkt b/racket/src/io/port/prepare-change.rkt index cfe580c9de..cc8a27c8d1 100644 --- a/racket/src/io/port/prepare-change.rkt +++ b/racket/src/io/port/prepare-change.rkt @@ -1,5 +1,6 @@ #lang racket/base -(require "input-port.rkt") +(require "port.rkt" + "input-port.rkt") (provide prepare-change) @@ -8,4 +9,4 @@ (define (prepare-change in) (define prepare-change (core-input-port-prepare-change in)) (when prepare-change - (prepare-change))) + (prepare-change (core-port-self in)))) diff --git a/racket/src/io/port/progress-evt.rkt b/racket/src/io/port/progress-evt.rkt index 9a57aa8de3..6fc559efb9 100644 --- a/racket/src/io/port/progress-evt.rkt +++ b/racket/src/io/port/progress-evt.rkt @@ -2,6 +2,7 @@ (require "../common/check.rkt" "../host/thread.rkt" "parameter.rkt" + "port.rkt" "input-port.rkt" "count.rkt" "check.rkt") @@ -40,7 +41,7 @@ (let ([in (->core-input-port orig-in)]) (define get-progress-evt (core-input-port-get-progress-evt in)) (if get-progress-evt - (progress-evt orig-in (get-progress-evt)) + (progress-evt orig-in (get-progress-evt (core-port-self in))) (raise-arguments-error 'port-progress-evt "port does not provide progress evts" "port" orig-in)))) @@ -58,7 +59,7 @@ (atomically ;; We specially skip a check on whether the port is closed, ;; since that's handled as the progress evt becoming ready - (commit amt (progress-evt-evt progress-evt) evt + (commit (core-port-self in) amt (progress-evt-evt progress-evt) evt ;; in atomic mode (but maybe leaves atomic mode in between) (lambda (bstr) (port-count! in (bytes-length bstr) bstr 0)))))) diff --git a/racket/src/io/port/read-and-peek.rkt b/racket/src/io/port/read-and-peek.rkt index 6e87a709c3..99d41cdca9 100644 --- a/racket/src/io/port/read-and-peek.rkt +++ b/racket/src/io/port/read-and-peek.rkt @@ -60,7 +60,7 @@ (define read-in (core-input-port-read-in in)) (cond [(procedure? read-in) - (define v (read-in bstr start end copy-bstr?)) + (define v (read-in (core-port-self in) bstr start end copy-bstr?)) (let result-loop ([v v]) (cond [(and (integer? v) (not (eq? v 0))) @@ -140,7 +140,7 @@ (define peek-in (core-input-port-peek-in in)) (cond [(procedure? peek-in) - (define v (peek-in bstr start end skip progress-evt copy-bstr?)) + (define v (peek-in (core-port-self in) bstr start end skip progress-evt copy-bstr?)) (end-atomic) (let result-loop ([v v]) (cond @@ -187,7 +187,7 @@ [(closed-state-closed? (core-port-closed in)) (check-not-closed who in)] [else - (define b (read-byte)) + (define b (read-byte (core-port-self in))) (cond [(eof-object? b) (end-atomic) @@ -218,7 +218,7 @@ (start-atomic) (prepare-change in) (check-not-closed who in) - (define b (peek-byte)) + (define b (peek-byte (core-port-self in))) (end-atomic) (cond [(evt? b) diff --git a/racket/src/io/port/ready.rkt b/racket/src/io/port/ready.rkt index c4d7f67d79..2b27454bf8 100644 --- a/racket/src/io/port/ready.rkt +++ b/racket/src/io/port/ready.rkt @@ -21,7 +21,7 @@ (start-atomic) (prepare-change in) (check-not-closed who in) - (define r (byte-ready void)) + (define r (byte-ready (core-port-self in) void)) (end-atomic) (eq? #t r)]))) @@ -31,7 +31,7 @@ (cond [(byte-ready? in) (define peek-byte (core-input-port-peek-byte in)) - (define b (and peek-byte (atomically (peek-byte)))) + (define b (and peek-byte (atomically (peek-byte (core-port-self in))))) (cond [(and b (or (eof-object? b) diff --git a/racket/src/io/port/special-input.rkt b/racket/src/io/port/special-input.rkt index 9e3d9f0def..ea1e707a02 100644 --- a/racket/src/io/port/special-input.rkt +++ b/racket/src/io/port/special-input.rkt @@ -39,7 +39,7 @@ (when progress-evt (check-progress-evt who progress-evt orig-in)) (let ([in (->core-input-port orig-in)]) - (define peek-byte (core-input-port-read-byte in)) + (define peek-byte (core-input-port-peek-byte in)) (cond [peek-byte (do-peek-byte who peek-byte in orig-in)] [else diff --git a/racket/src/io/port/special-output.rkt b/racket/src/io/port/special-output.rkt index e414f1ef6f..b55cb80055 100644 --- a/racket/src/io/port/special-output.rkt +++ b/racket/src/io/port/special-output.rkt @@ -1,6 +1,7 @@ #lang racket/base (require "../common/check.rkt" "../host/thread.rkt" + "port.rkt" "output-port.rkt" "parameter.rkt" "count.rkt") @@ -30,7 +31,7 @@ [else (let loop () (start-atomic) - (define r (write-out-special v (not retry?) #f)) + (define r (write-out-special (core-port-self o) v (not retry?) #f)) (let result-loop ([r r]) (cond [(not r) @@ -61,4 +62,4 @@ (raise-arguments-error who "port does not support special-value events" "port" o)) - (get-write-special-evt v))) + (get-write-special-evt (core-port-self o) v))) diff --git a/racket/src/io/port/string-input.rkt b/racket/src/io/port/string-input.rkt index 434b0c0dcf..671b784a54 100644 --- a/racket/src/io/port/string-input.rkt +++ b/racket/src/io/port/string-input.rkt @@ -3,6 +3,7 @@ "../host/thread.rkt" "parameter.rkt" "read-and-peek.rkt" + "port.rkt" "input-port.rkt" (submod "bytes-input.rkt" internal) "../string/utf-8-decode.rkt" @@ -200,7 +201,7 @@ (start-atomic) (prepare-change in) (check-not-closed who in) - (define b (read-byte)) + (define b (read-byte (core-port-self in))) (cond [(fixnum? b) (port-count-byte! in b) @@ -315,7 +316,7 @@ (let ([in (->core-input-port in)]) (define peek-byte (and (zero? skip-k) (core-input-port-peek-byte in))) - (define b (and peek-byte (atomically (peek-byte)))) + (define b (and peek-byte (atomically (peek-byte (core-port-self in))))) (cond [(and b (or (eof-object? b) diff --git a/racket/src/io/port/write.rkt b/racket/src/io/port/write.rkt index ac91e8aaf9..859e2bfcab 100644 --- a/racket/src/io/port/write.rkt +++ b/racket/src/io/port/write.rkt @@ -24,7 +24,7 @@ (define write-out (core-output-port-write-out out)) (cond [(procedure? write-out) - (define v (write-out bstr start end (not buffer-ok?) enable-break? copy-bstr?)) + (define v (write-out (core-port-self out) bstr start end (not buffer-ok?) enable-break? copy-bstr?)) (let result-loop ([v v]) (cond [(not v) diff --git a/racket/src/io/print/write-with-max.rkt b/racket/src/io/print/write-with-max.rkt index c76118746b..295e4c198f 100644 --- a/racket/src/io/print/write-with-max.rkt +++ b/racket/src/io/print/write-with-max.rkt @@ -47,9 +47,10 @@ (make-core-output-port #:name (object-name o) #:data (lambda () max-length) + #:self o #:evt o #:write-out - (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (lambda (o src-bstr src-start src-end nonblock? enable-break? copy?) (cond [max-length (define len (- src-end src-start))