diff --git a/racket/src/io/port/bytes-output.rkt b/racket/src/io/port/bytes-output.rkt index 3565a39236..8acac742cd 100644 --- a/racket/src/io/port/bytes-output.rkt +++ b/racket/src/io/port/bytes-output.rkt @@ -82,7 +82,7 @@ (raise-arguments-error who "port does not support output events" "port" out)) - (get-write-evt (core-port-self out) bstr start-pos end-pos)))) + (get-write-evt (core-port-self out) out bstr start-pos end-pos)))) (define/who (port-writes-atomic? out) (check who output-port? out) diff --git a/racket/src/io/port/bytes-port.rkt b/racket/src/io/port/bytes-port.rkt index 9ea0976104..fe8a2a84be 100644 --- a/racket/src/io/port/bytes-port.rkt +++ b/racket/src/io/port/bytes-port.rkt @@ -154,14 +154,18 @@ ;; ---------------------------------------- -(struct output-bytes-data (i reset)) +(struct output-bytes-data (o get)) (define (open-output-bytes [name 'string]) - (define-values (i o) (make-pipe)) + (define-values (i/none o) (make-pipe-ends #:need-input? #f)) (define p (make-core-output-port #:name name - #:data (output-bytes-data i (lambda () (pipe-discard-all i))) + #:data (output-bytes-data o (lambda (o bstr start-pos discard?) + ;; in atomic mode + (pipe-get-content o bstr start-pos) + (when discard? + (pipe-discard-all o)))) #:self o #:evt o #:write-out o @@ -169,8 +173,8 @@ (lambda (o) ((core-port-close o) (core-port-self o))) #:get-write-evt (and (core-output-port-get-write-evt o) - (lambda (o bstr start-k end-k) - ((core-output-port-get-write-evt o) (core-port-self o) bstr start-k end-k))) + (lambda (o orig-o bstr start-k end-k) + ((core-output-port-get-write-evt o) (core-port-self o) o bstr start-k end-k))) #:get-location (and (core-port-get-location o) (lambda (o) ((core-port-get-location o) (core-port-self o)))) @@ -182,7 +186,7 @@ (case-lambda [(o) (pipe-write-position o)] [(o new-pos) - (define len (pipe-content-length i)) + (define len (pipe-content-length o)) (cond [(eof-object? new-pos) (pipe-write-position o len)] @@ -201,6 +205,7 @@ [else (pipe-write-position o new-pos)])]))) (when (port-count-lines-enabled) + (port-count-lines! o) (port-count-lines! p)) p) @@ -210,19 +215,21 @@ o) (check who exact-nonnegative-integer? start-pos) (check who exact-nonnegative-integer? #:or-false end-pos) - (let ([o (->core-output-port o)]) - (define i (output-bytes-data-i (core-port-data o))) - (define len (pipe-content-length i)) + (let ([bstr-o (->core-output-port o)]) + (define o (output-bytes-data-o (core-port-data bstr-o))) + (start-atomic) + (define len (pipe-content-length o)) (when (start-pos . > . len) + (end-atomic) (raise-range-error who "port content" "starting " start-pos o 0 len #f)) (when end-pos (unless (<= start-pos end-pos len) + (end-atomic) (raise-range-error who "port content" "ending " end-pos o 0 len start-pos))) (define amt (- (min len (or end-pos len)) start-pos)) (define bstr (make-bytes amt)) - (peek-bytes! bstr start-pos i) - (when reset? - ((output-bytes-data-reset (core-port-data o)))) + ((output-bytes-data-get (core-port-data bstr-o)) o bstr start-pos reset?) + (end-atomic) bstr)) ;; ---------------------------------------- diff --git a/racket/src/io/port/custom-output-port.rkt b/racket/src/io/port/custom-output-port.rkt index 03ad7eb918..9ea9bb416c 100644 --- a/racket/src/io/port/custom-output-port.rkt +++ b/racket/src/io/port/custom-output-port.rkt @@ -132,7 +132,7 @@ (wrap-check-write-evt-result '|user port write| r start end non-block/buffer?)] [else r])])) - (define (get-write-evt self bstr start end) + (define (get-write-evt self orig-out bstr start end) (end-atomic) (define r (user-get-write-evt bstr start end)) (unless (evt? r) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 749e8cda0b..009b56de03 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -220,7 +220,7 @@ [else n])])) #:count-write-evt-via-write-out - (lambda (self v bstr start) + (lambda (self port v bstr start) (port-count! port v bstr start)) #:close diff --git a/racket/src/io/port/output-port.rkt b/racket/src/io/port/output-port.rkt index d945374c65..650c33df18 100644 --- a/racket/src/io/port/output-port.rkt +++ b/racket/src/io/port/output-port.rkt @@ -70,8 +70,10 @@ write-out-special ; (any no-block/buffer? enable-break? -*> boolean?) ;; Called in atomic mode. - get-write-evt ; (bstr start-k end-k -*> evt?) + get-write-evt ; (port bstr start-k end-k -*> evt?) ;; Called in atomic mode. + ;; Note the extra "self" argument as a port, which is useful + ;; for implementing `count-write-evt-via-write-out`. ;; The given bstr should not be exposed to untrusted code. get-write-special-evt ; (-*> evt?) @@ -142,11 +144,11 @@ (and count-write-evt-via-write-out ;; If `write-out` is always atomic (in no-block, no-buffer mode), ;; then an event can poll `write-out`: - (lambda (self src-bstr src-start src-end) + (lambda (self o src-bstr src-start src-end) (write-evt ;; in atomic mode: (lambda (self-evt) - (define v (write-out self src-bstr src-start src-end #f #f #t)) + (define v (write-out self o src-bstr src-start src-end #f #f #t)) (when (exact-integer? v) (count-write-evt-via-write-out self v src-bstr src-start)) (if (evt? v) diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index d1f50b30d2..2e28edb279 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -10,16 +10,19 @@ "commit-manager.rkt") (provide make-pipe + make-pipe-ends pipe-input-port? pipe-output-port? pipe-content-length pipe-write-position + pipe-get-content pipe-discard-all) (define (min+1 a b) (if a (min (add1 a) b) b)) (struct pipe-data (get-content-length write-position + get-content discard-all)) (define (pipe-input-port? p) @@ -40,16 +43,21 @@ (raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)]))) (core-port-self p))) +;; in atomic mode: (define pipe-write-position (case-lambda [(p) ((pipe-data-write-position (core-port-data p)) (core-port-self p))] [(p pos) ((pipe-data-write-position (core-port-data p)) (core-port-self p) pos)])) +;; in atomic mode: (define (pipe-discard-all p) ((pipe-data-discard-all (core-port-data p)) (core-port-self p))) -(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe]) - (check who #:or-false exact-positive-integer? limit) +;; in atomic mode: +(define (pipe-get-content p bstr start-pos) + ((pipe-data-get-content (core-port-data p)) (core-port-self p) bstr start-pos)) + +(define (make-pipe-ends [limit #f] [input-name 'pipe] [output-name 'pipe] #:need-input? [need-input? #t]) (define bstr (make-bytes (min+1 limit 16))) (define len (bytes-length bstr)) (define-fixnum peeked-amt 0) ; peeked but not yet read effectively extends `limit` @@ -75,14 +83,30 @@ (atomically (content-length))) ;; write-position (case-lambda + ;; in atomic mode [(self) (or write-pos end)] [(self pos) ;; `pos` must be between `start` and `end` (if (fx= pos end) (set! write-pos #f) (set! write-pos pos))]) + ;; get-content + (lambda (self to-bstr start-pos) + ;; in atomic mode + (define pos (let ([p (fx+ start start-pos)]) + (if (p . fx>= . len) + (fx- p len) + p))) + (define end-pos (fx+ pos (bytes-length to-bstr))) + (cond + [(end-pos . fx<= . len) + (bytes-copy! to-bstr 0 bstr pos end-pos)] + [else + (bytes-copy! to-bstr 0 bstr pos len) + (bytes-copy! to-bstr (fx- len pos) bstr 0 (fx- end-pos len))])) ;; discard-all (lambda (self) + ;; in atomic mode (set! peeked-amt 0) (set! start 0) (set! end 0) @@ -149,8 +173,10 @@ (set! commit-manager (make-commit-manager))) (commit-manager-wait commit-manager progress-evt ext-evt finish)])) - ;; input ---------------------------------------- - (define ip + (values + ;; input ---------------------------------------- + (and + need-input? (make-core-input-port #:name input-name #:data data @@ -314,125 +340,125 @@ (progress!) (check-input-blocking) (finish dest-bstr)]))))])))) + + ;; output ---------------------------------------- + (make-core-output-port + #:name output-name + #:data data + #:self #f - ;; out ---------------------------------------- - (define op - (make-core-output-port - #:name output-name - #:data data - #:self #f - - #:evt write-ready-evt - - #:write-out - ;; in atomic mode - (lambda (self src-bstr src-start src-end nonblock? enable-break? copy?) - (assert-atomic) - (let try-again () - (define top-pos (if (fx= start 0) - (fx- len 1) - len)) - (define (maybe-grow) - (cond - [(or (not limit) - ((+ limit peeked-amt) . > . (fx- len 1))) - ;; grow pipe size - (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) - (cond - [(fx= 0 start) - (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] - [else - (bytes-copy! new-bstr 0 bstr start len) - (bytes-copy! new-bstr (fx- len start) bstr 0 end) - (set! start 0) - (set! end (fx- len 1))]) - (set! bstr new-bstr) - (set! len (bytes-length new-bstr)) - (try-again)] - [else (pipe-is-full)])) - (define (pipe-is-full) - (wrap-evt write-ready-evt (lambda (v) #f))) - (define (apply-limit amt) - (if limit - (min amt (- (+ limit peeked-amt) (content-length))) - amt)) + #:evt write-ready-evt + + #:write-out + ;; in atomic mode + (lambda (self src-bstr src-start src-end nonblock? enable-break? copy?) + (assert-atomic) + (let try-again () + (define top-pos (if (fx= start 0) + (fx- len 1) + len)) + (define (maybe-grow) (cond - [(fx= src-start src-end) ;; => flush - 0] - [write-pos ; set by `file-position` on a bytes port - (define amt (apply-limit (fxmin (fx- end write-pos) - (fx- src-end src-start)))) + [(or (not limit) + ((+ limit peeked-amt) . > . (fx- len 1))) + ;; grow pipe size + (define new-bstr (make-bytes (min+1 (and limit (+ limit peeked-amt)) (* len 2)))) (cond - [(fx= amt 0) (pipe-is-full)] + [(fx= 0 start) + (bytes-copy! new-bstr 0 bstr 0 (fx- len 1))] [else - (check-input-unblocking) - (bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt)) - (let ([new-write-pos (fx+ write-pos amt)]) - (if (fx= new-write-pos end) - (set! write-pos #f) ; back to normal mode - (set! write-pos new-write-pos))) - (check-output-blocking) - amt])] - [(and (end . fx>= . start) - (end . fx< . top-pos)) - (define amt (apply-limit (fxmin (fx- top-pos end) - (fx- src-end src-start)))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (let ([new-end (fx+ end amt)]) - (set! end (if (fx= new-end len) 0 new-end))) - (check-output-blocking) - amt])] - [(fx= end top-pos) - (cond - [(fx= start 0) - (maybe-grow)] - [else - (define amt (fxmin (fx- start 1) - (fx- src-end src-start))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) - (set! end amt) - (check-output-blocking) - amt])])] - [(end . fx< . (fx- start 1)) - (define amt (apply-limit (fxmin (fx- (fx- start 1) end) - (fx- src-end src-start)))) - (cond - [(fx= amt 0) (pipe-is-full)] - [else - (check-input-unblocking) - (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) - (set! end (fx+ end amt)) - (check-output-blocking) - amt])] - [else - (maybe-grow)]))) + (bytes-copy! new-bstr 0 bstr start len) + (bytes-copy! new-bstr (fx- len start) bstr 0 end) + (set! start 0) + (set! end (fx- len 1))]) + (set! bstr new-bstr) + (set! len (bytes-length new-bstr)) + (try-again)] + [else (pipe-is-full)])) + (define (pipe-is-full) + (wrap-evt write-ready-evt (lambda (v) #f))) + (define (apply-limit amt) + (if limit + (min amt (- (+ limit peeked-amt) (content-length))) + amt)) + (cond + [(fx= src-start src-end) ;; => flush + 0] + [write-pos ; set by `file-position` on a bytes port + (define amt (apply-limit (fxmin (fx- end write-pos) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr write-pos src-bstr src-start (fx+ src-start amt)) + (let ([new-write-pos (fx+ write-pos amt)]) + (if (fx= new-write-pos end) + (set! write-pos #f) ; back to normal mode + (set! write-pos new-write-pos))) + (check-output-blocking) + amt])] + [(and (end . fx>= . start) + (end . fx< . top-pos)) + (define amt (apply-limit (fxmin (fx- top-pos end) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (let ([new-end (fx+ end amt)]) + (set! end (if (fx= new-end len) 0 new-end))) + (check-output-blocking) + amt])] + [(fx= end top-pos) + (cond + [(fx= start 0) + (maybe-grow)] + [else + (define amt (fxmin (fx- start 1) + (fx- src-end src-start))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) + (set! end amt) + (check-output-blocking) + amt])])] + [(end . fx< . (fx- start 1)) + (define amt (apply-limit (fxmin (fx- (fx- start 1) end) + (fx- src-end src-start)))) + (cond + [(fx= amt 0) (pipe-is-full)] + [else + (check-input-unblocking) + (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) + (set! end (fx+ end amt)) + (check-output-blocking) + amt])] + [else + (maybe-grow)]))) - #:count-write-evt-via-write-out - (lambda (self v bstr start) - (port-count! op v bstr start)) + #:count-write-evt-via-write-out + (lambda (self op v bstr start) + (port-count! op v bstr start)) - #:close - ;; in atomic mode - (lambda (self) - (unless output-closed? - (set! output-closed? #t) - (when write-ready-sema - (semaphore-post write-ready-sema)) - (when more-read-ready-sema - (semaphore-post more-read-ready-sema)) - (semaphore-post read-ready-sema))))) + #:close + ;; in atomic mode + (lambda (self) + (unless output-closed? + (set! output-closed? #t) + (when write-ready-sema + (semaphore-post write-ready-sema)) + (when more-read-ready-sema + (semaphore-post more-read-ready-sema)) + (semaphore-post read-ready-sema)))))) - ;; Results ---------------------------------------- +(define/who (make-pipe [limit #f] [input-name 'pipe] [output-name 'pipe]) + (check who #:or-false exact-positive-integer? limit) + (define-values (ip op) (make-pipe-ends limit input-name output-name)) (when (port-count-lines-enabled) (port-count-lines! ip) (port-count-lines! op)) - (values ip op))