diff --git a/racket/src/io/demo.rkt b/racket/src/io/demo.rkt index e0678b6e11..571a3bf3f0 100644 --- a/racket/src/io/demo.rkt +++ b/racket/src/io/demo.rkt @@ -319,7 +319,7 @@ (let () (define-values (i o) (make-pipe)) (for ([n 3]) - (write-bytes (make-bytes 4096 (char->integer #\a)) o) + (test 4096 (write-bytes (make-bytes 4096 (char->integer #\a)) o)) (for ([j (in-range 4096)]) (read-byte i)) (unless (zero? (pipe-content-length i)) @@ -347,7 +347,6 @@ (error)) (loop (add1 x) (cdr content) (list* bstr bstr accum))]))) - (let () (define path (build-path "compiled" "demo-out")) (define o (open-output-file path 'truncate)) @@ -401,6 +400,7 @@ (test (void) (file-position out 10)) (test #"hola!!\0\0\0\0" (get-output-bytes out))) +(log-error "start") (let () (define-values (i o) (make-pipe)) (port-count-lines! i) @@ -428,6 +428,7 @@ (write-bytes #"!" o) (test '(3 1 8) (next-location o)) +(log-error "here") (test #"x\r" (read-bytes 2 i)) (test '(3 0 7) (next-location i)) (test #"\n!" (read-bytes 2 i)) diff --git a/racket/src/io/port/bytes-output.rkt b/racket/src/io/port/bytes-output.rkt index 47bf5dcc1a..fb21cfd5e1 100644 --- a/racket/src/io/port/bytes-output.rkt +++ b/racket/src/io/port/bytes-output.rkt @@ -4,6 +4,7 @@ "../common/class.rkt" "../host/thread.rkt" "port.rkt" + "count.rkt" "output-port.rkt" "parameter.rkt" "write.rkt" @@ -24,7 +25,18 @@ (check who byte? b) (check who output-port? out) (let ([out (->core-output-port out)]) - (write-some-bytes 'write-byte out (bytes b) 0 1 #:buffer-ok? #t #:copy-bstr? #f)) + (start-atomic) + (define pos (core-port-buffer-pos out)) + (cond + [(pos . fx< . (core-port-buffer-end out)) + (bytes-set! (core-port-buffer out) pos b) + (set-core-port-buffer-pos! out (fx+ pos 1)) + (when (core-port-count out) + (port-count-byte! out b)) + (end-atomic)] + [else + (end-atomic) + (write-some-bytes 'write-byte out (bytes b) 0 1 #:buffer-ok? #t #:copy-bstr? #f)])) (void)) (define (do-write-bytes who out bstr start end) diff --git a/racket/src/io/port/bytes-port.rkt b/racket/src/io/port/bytes-port.rkt index 332c74e030..157cd80b19 100644 --- a/racket/src/io/port/bytes-port.rkt +++ b/racket/src/io/port/bytes-port.rkt @@ -156,9 +156,14 @@ [max-pos 0]) (public - [get-length (lambda () max-pos)] + [get-length (lambda () + (start-atomic) + (slow-mode!) + (end-atomic) + max-pos)] [get-bytes (lambda (dest-bstr start-pos discard?) (start-atomic) + (slow-mode!) (bytes-copy! dest-bstr 0 bstr start-pos (fx+ start-pos (bytes-length dest-bstr))) (when discard? (set! bstr #"") @@ -171,11 +176,29 @@ (lambda (len) (define new-bstr (make-bytes (fx* 2 len))) (bytes-copy! new-bstr 0 bstr 0 pos) - (set! bstr new-bstr))]) + (set! bstr new-bstr))] + + [slow-mode! + (lambda () + (when buffer + (define s buffer-pos) + (set! pos s) + (set! buffer-pos buffer-end) + (set! buffer #f) + (set! offset s) + (set! max-pos (fxmax s max-pos))))] + + [fast-mode! + (lambda () + (set! buffer bstr) + (set! buffer-pos pos) + (set! buffer-end (bytes-length bstr)) + (set! offset 0))]) (override [write-out (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) + (slow-mode!) (define i pos) (define amt (min (fx- src-end src-start) 4096)) (define end-i (fx+ i amt)) @@ -184,6 +207,7 @@ (bytes-copy! bstr i src-bstr src-start (fx+ src-start amt)) (set! pos end-i) (set! max-pos (fxmax pos max-pos)) + (fast-mode!) amt)] [get-write-evt (get-write-evt-via-write-out (lambda (out v bstr start) @@ -192,6 +216,7 @@ (case-lambda [() pos] [(new-pos) + (slow-mode!) (define len (bytes-length bstr)) (cond [(eof-object? new-pos) diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index 6c2111b307..908b1dbeb2 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -47,7 +47,10 @@ [else (raise-argument-error 'pipe-contact-length "(or/c pipe-input-port? pipe-output-port?)" p)])) (atomically - (send pipe-input-port (pipe-data-input d) sync-data) + (let ([input (pipe-data-input d)]) + (when input (send pipe-input-port input sync-data))) + (let ([output (pipe-data-output d)]) + (when output (send pipe-output-port output sync-data))) (send pipe-data d content-length))) ;; ---------------------------------------- @@ -60,8 +63,8 @@ [peeked-amt 0] ; peeked but not yet read, effectively extends `limit` [start 0] [end 0] - [input #f] ; #f => closed - [output-closed? #f] + [input #f] ; #f => closed + [output #f] ; #f => closed [read-ready-sema #f] [write-ready-sema #f] [more-read-ready-sema #f] ; for lookahead peeks @@ -96,12 +99,15 @@ (when (output-full?) (semaphore-post write-ready-sema)))] [check-input-blocking (lambda () - (when (input-empty?) (semaphore-wait read-ready-sema)))] + (when (input-empty?) + (semaphore-wait read-ready-sema) + (when output + (send pipe-output-port output on-input-empty))))] ;; Used before/after write: [check-input-unblocking (lambda () - (when (and (input-empty?) (not output-closed?)) (semaphore-post read-ready-sema)) + (when (and (input-empty?) output) (semaphore-post read-ready-sema)) (when more-read-ready-sema (semaphore-post more-read-ready-sema) (set! more-read-ready-sema #f)))] @@ -128,31 +134,34 @@ (private [fast-mode! (lambda (amt) ; amt = not yet added to `offset` - (unless buffer - (with-object pipe-data d - (define s start) - (define e end) - (unless (fx= s e) - (set! buffer bstr) - (set! buffer-pos s) - (set! buffer-end (if (s . fx< . e) e len)) - (define o offset) - (when o - (set! offset (- (+ o amt) s)))))))] + (with-object pipe-data d + (define s start) + (define e end) + (unless (fx= s e) + (set! buffer bstr) + (set! buffer-pos s) + ;; don't read last byte, because the output + ;; end needs to know about a transition to + ;; the empty state + (set! buffer-end (fx- (if (s . fx< . e) e len) 1)) + (define o offset) + (when o + (set! offset (- (+ o amt) s))))))] [slow-mode! (lambda () - (when buffer - (with-object pipe-data d + (with-object pipe-data d + (when buffer (define pos buffer-pos) (define o offset) (when o (set! offset (+ o pos))) - (set! start (if (fx= pos len) - 0 - pos)) + (set! start (if (fx= pos len) 0 pos)) (set! buffer #f) - (set! buffer-pos buffer-end))))]) + (set! buffer-pos buffer-end)) + (define out output) + (when out + (send pipe-output-port out sync-data))))]) (static [sync-data @@ -163,6 +172,13 @@ (set! start (if (fx= pos len) 0 pos)))))] + [sync-data-both + (lambda () + (sync-data) + (with-object pipe-data d + (define out output) + (when out + (send pipe-output-port out sync-data))))] [on-resize (lambda () (slow-mode!))] @@ -183,9 +199,9 @@ (with-object pipe-data d (cond [(input-empty?) - (if output-closed? - eof - read-ready-evt)] + (if output + read-ready-evt + eof)] [else (check-output-unblocking) (define s start) @@ -215,7 +231,7 @@ (lambda (dest-bstr dest-start dest-end skip progress-evt copy?) (with-object pipe-data d (assert-atomic) - (sync-data) + (sync-data-both) (define content-amt (content-length)) (cond [(and progress-evt @@ -223,10 +239,12 @@ #f] [(content-amt . <= . skip) (cond - [output-closed? eof] + [(not output) eof] [else (unless (or (zero? skip) more-read-ready-sema) - (set! more-read-ready-sema (make-semaphore))) + (set! more-read-ready-sema (make-semaphore)) + (when output + (send pipe-output-port output on-need-more-ready))) (define evt (if (zero? skip) read-ready-evt (wrap-evt (semaphore-peek-evt more-read-ready-sema) @@ -252,9 +270,9 @@ (lambda (work-done!) (assert-atomic) (with-object pipe-data d - (or output-closed? + (or (not output) (begin - (sync-data) + (sync-data-both) (not (fx= 0 (content-length)))))))] [close @@ -325,11 +343,78 @@ (field [d d]) ; pipe-data + (private + [fast-mode! + (lambda (amt) ; amt = not yet added to `offset` + (with-object pipe-data d + (define lim limit) + (define avail (and lim (- lim (content-length) + ;; don't fill last byte, because the input + ;; end needs to know about a trasition to the + ;; full state + 1))) + (when (or (not avail) (avail . <= . 0)) + (define s start) + (define e end) + (set! buffer bstr) + (set! buffer-pos e) + (set! buffer-end (let ([end (if (s . fx<= . e) + (if (fx= s 0) + (fx- len 1) + len) + (fx- s 1))]) + (if (and avail + ((fx- end e) . > . avail)) + (fx+ e avail) + end))) + (define o offset) + (when o + (set! offset (- (+ o amt) e))))))] + + [slow-mode! + (lambda () + (with-object pipe-data d + (when buffer + (define pos buffer-pos) + (define o offset) + (when o + (set! offset (+ o pos))) + (set! end (if (fx= pos len) 0 pos)) + (set! buffer #f) + (set! buffer-pos buffer-end)) + (define in input) + (when in + (send pipe-input-port in sync-data))))]) + + (static + [sync-data + (lambda () + (when buffer + (with-object pipe-data d + (define pos buffer-pos) + (set! end (if (fx= pos len) + 0 + pos)))))] + [sync-data-both + (lambda () + (sync-data) + (with-object pipe-data d + (define in input) + (when in + (send pipe-output-port in sync-data #f))))] + [on-input-empty + (lambda () + (slow-mode!))] + [on-need-more-ready + (lambda () + (slow-mode!))]) + (override [write-out ;; in atomic mode (lambda (src-bstr src-start src-end nonblock? enable-break? copy?) (assert-atomic) + (slow-mode!) (with-object pipe-data d (send pipe-input-port input sync-data) (let try-again () @@ -376,6 +461,7 @@ (let ([new-end (fx+ end amt)]) (set! end (if (fx= new-end len) 0 new-end))) (check-output-blocking) + (fast-mode! amt) amt])] [(fx= end top-pos) (cond @@ -391,6 +477,7 @@ (bytes-copy! bstr 0 src-bstr src-start (fx+ src-start amt)) (set! end amt) (check-output-blocking) + (fast-mode! amt) amt])])] [(end . fx< . (fx- start 1)) (define amt (apply-limit (fxmin (fx- (fx- start 1) end) @@ -402,6 +489,7 @@ (bytes-copy! bstr end src-bstr src-start (fx+ src-start amt)) (set! end (fx+ end amt)) (check-output-blocking) + (fast-mode! amt) amt])] [else (maybe-grow)]))))] @@ -414,8 +502,9 @@ ;; in atomic mode (lambda () (with-object pipe-data d - (unless output-closed? - (set! output-closed? #t) + (when output + (slow-mode!) + (set! output #f) (when write-ready-sema (semaphore-post write-ready-sema)) (when more-read-ready-sema @@ -450,6 +539,7 @@ [d d])) (set-pipe-data-input! d input) + (set-pipe-data-output! d output) (values input output)) diff --git a/racket/src/io/port/read-and-peek.rkt b/racket/src/io/port/read-and-peek.rkt index 4da9075bb2..a9c6cff85c 100644 --- a/racket/src/io/port/read-and-peek.rkt +++ b/racket/src/io/port/read-and-peek.rkt @@ -46,7 +46,7 @@ (start-atomic) (prepare-change in) (cond - [(= start end) ; intentionally before the port-closed check + [(fx= start end) ; intentionally before the port-closed check (end-atomic) 0] [(core-port-closed? in) @@ -61,9 +61,9 @@ (define buf-pos (core-port-buffer-pos in)) (define buf-end (core-port-buffer-end in)) (cond - [(buf-pos . < . buf-end) + [(buf-pos . fx< . buf-end) ;; Read bytes from buffer - (define v (min (- buf-end buf-pos) (- end start))) + (define v (fxmin (fx- buf-end buf-pos) (fx- end start))) (define new-pos (fx+ buf-pos v)) (bytes-copy! bstr start (core-port-buffer in) buf-pos new-pos) (set-core-port-buffer-pos! in new-pos) diff --git a/racket/src/io/port/string-output.rkt b/racket/src/io/port/string-output.rkt index 0e534fb1e5..0e28ad13c7 100644 --- a/racket/src/io/port/string-output.rkt +++ b/racket/src/io/port/string-output.rkt @@ -11,7 +11,8 @@ (define/who (write-char ch [out (current-output-port)]) (check who char? ch) (check who output-port? out) - (write-string (string ch) out 0 1)) + (write-string (string ch) out 0 1) + (void)) (define/who (write-string str [out (current-output-port)] [start 0] [end (and (string? str) (string-length str))]) diff --git a/racket/src/io/port/write.rkt b/racket/src/io/port/write.rkt index 3eda061f9e..dfa59af0c0 100644 --- a/racket/src/io/port/write.rkt +++ b/racket/src/io/port/write.rkt @@ -23,34 +23,47 @@ (end-atomic) 0] [else - (define write-out (method core-output-port out write-out)) + (define buf-pos (core-port-buffer-pos out)) + (define buf-end (core-port-buffer-end out)) (cond - [(procedure? write-out) - (define v (write-out out bstr start end (not buffer-ok?) enable-break? copy-bstr?)) - (let result-loop ([v v]) - (cond - [(not v) - (end-atomic) - (if zero-ok? - 0 - (try-again out extra-count-outs))] - [(exact-positive-integer? v) - (port-count-all! out extra-count-outs v bstr start) - (end-atomic) - v] - [(evt? v) - (end-atomic) - (cond - [zero-ok? 0] - [else - (define new-v (if enable-break? - (sync/enable-break v) - (sync v))) - (start-atomic) - (result-loop new-v)])] - [else - (end-atomic) - (internal-error (format "write-some-bytes: weird result ~s for ~s ~s ~s at ~s" v bstr start end out))]))] - [else + [(buf-pos . fx< . buf-end) + ;; Copy bytes from buffer + (define v (fxmin (fx- buf-end buf-pos) (fx- end start))) + (bytes-copy! (core-port-buffer out) buf-pos bstr start (fx+ start v)) + (set-core-port-buffer-pos! out (fx+ buf-pos v)) + (when (core-port-count out) + (port-count! out v bstr start)) (end-atomic) - (try-again (->core-output-port write-out) (cons out extra-count-outs))])]))) + v] + [else + (define write-out (method core-output-port out write-out)) + (cond + [(procedure? write-out) + (define v (write-out out bstr start end (not buffer-ok?) enable-break? copy-bstr?)) + (let result-loop ([v v]) + (cond + [(not v) + (end-atomic) + (if zero-ok? + 0 + (try-again out extra-count-outs))] + [(exact-positive-integer? v) + (port-count-all! out extra-count-outs v bstr start) + (end-atomic) + v] + [(evt? v) + (end-atomic) + (cond + [zero-ok? 0] + [else + (define new-v (if enable-break? + (sync/enable-break v) + (sync v))) + (start-atomic) + (result-loop new-v)])] + [else + (end-atomic) + (internal-error (format "write-some-bytes: weird result ~s for ~s ~s ~s at ~s" v bstr start end out))]))] + [else + (end-atomic) + (try-again (->core-output-port write-out) (cons out extra-count-outs))])])])))