diff --git a/racket/src/io/demo.rkt b/racket/src/io/demo.rkt index ea70a76392..797ea3a02b 100644 --- a/racket/src/io/demo.rkt +++ b/racket/src/io/demo.rkt @@ -764,6 +764,7 @@ ;; ---------------------------------------- +'read-string (time (let loop ([j 10]) (unless (zero? j) diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index 47b5049d6a..7c460b781b 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -1,5 +1,6 @@ #lang racket/base -(require "../host/rktio.rkt" +(require "../common/class.rkt" + "../host/rktio.rkt" "../host/error.rkt" "../host/thread.rkt" "../host/pthread.rkt" @@ -56,6 +57,43 @@ ;; ---------------------------------------- +(class fd-input-port #:extends peek-via-read-input-port + (field + [fd #f] + [fd-refcount #f] + [custodian-reference #f] + [on-close on-close] + [network-error? #f]) + + (override + [read-in/inner + (lambda (dest-bstr start end copy?) + (define n (rktio_read_in rktio fd dest-bstr start end)) + (cond + [(rktio-error? n) + (end-atomic) + (if network-error? + (raise-network-error #f n "error reading from stream port") + (raise-filesystem-error #f n "error reading from stream port"))] + [(eqv? n RKTIO_READ_EOF) eof] + [(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ this) + (lambda (v) 0))] + [else n]))] + + [close + (lambda () + (on-close) + (fd-close fd fd-refcount) + (unsafe-custodian-unregister fd custodian-reference) + (close-peek-buffer))] + + [file-position + (case-lambda + [() (do-file-position fd (lambda (pos) (buffer-adjust-pos pos)))] + [(pos) (do-file-position fd (lambda () (purge-buffer)) pos)])])) + +;; ---------------------------------------- + ;; in atomic mode ;; Current custodian must not be shut down. (define (open-input-fd fd name @@ -65,40 +103,15 @@ #:custodian [cust (current-custodian)] #:file-stream? [file-stream? #t] #:network-error? [network-error? #f]) - (define-values (port buffer-control) - (open-input-peek-via-read - #:name name - #:data (fd-data fd extra-data #t file-stream?) - #:self #f - #:read-in - ;; in atomic mode - (lambda (self dest-bstr start end copy?) - (define n (rktio_read_in rktio fd dest-bstr start end)) - (cond - [(rktio-error? n) - (end-atomic) - (if network-error? - (raise-network-error #f n "error reading from stream port") - (raise-filesystem-error #f n "error reading from stream port"))] - [(eqv? n RKTIO_READ_EOF) eof] - [(eqv? n 0) (wrap-evt (fd-evt fd RKTIO_POLL_READ port) - (lambda (v) 0))] - [else n])) - #:read-is-atomic? #t - #:close - ;; in atomic mode - (lambda (self) - (on-close) - (fd-close fd fd-refcount) - (unsafe-custodian-unregister fd custodian-reference)) - #:file-position (make-file-position - fd - (case-lambda - [() (buffer-control)] - [(pos) (buffer-control pos)])))) - (define custodian-reference - (register-fd-close cust fd fd-refcount #f port)) - port) + (define p (new fd-input-port + [name name] + [data (fd-data fd extra-data #t file-stream?)] + [fd fd] + [fd-refcount fd-refcount] + [on-close on-close] + [network-error? network-error?])) + (set-fd-input-port-custodian-reference! p (register-fd-close cust fd fd-refcount #f p)) + p) ;; ---------------------------------------- @@ -281,10 +294,10 @@ ;; ---------------------------------------- -(define (make-file-position fd buffer-control) - ;; in atomic mode +;; in atomic mode +(define do-file-position (case-lambda - [(self) + [(fd buffer-control) (define ppos (rktio_get_file_position rktio fd)) (cond [(rktio-error? ppos) @@ -294,7 +307,7 @@ (define pos (rktio_filesize_ref ppos)) (rktio_free ppos) (buffer-control pos)])] - [(self pos) + [(fd buffer-control pos) (buffer-control) (define r (rktio_set_file_position rktio @@ -309,6 +322,11 @@ (end-atomic) (raise-rktio-error 'file-position r "error setting stream position"))])) +(define (make-file-position fd buffer-control) + (case-lambda + [(self) (do-file-position fd buffer-control)] + [(self pos) (do-file-position fd buffer-control pos)])) + ;; ---------------------------------------- (struct fd-evt (fd mode [closed #:mutable]) diff --git a/racket/src/io/port/peek-via-read-port.rkt b/racket/src/io/port/peek-via-read-port.rkt index 2d197175f5..c7b9d73fd2 100644 --- a/racket/src/io/port/peek-via-read-port.rkt +++ b/racket/src/io/port/peek-via-read-port.rkt @@ -13,18 +13,12 @@ (class peek-via-read-input-port #:extends commit-input-port (field - [bstr #""] + [bstr (make-bytes 4096)] [pos 0] [end-pos 0] [peeked-eof? #f] [buffer-mode 'block]) - (override - [prepare-change - (lambda () - (when commit-manager - (commit-manager-pause commit-manager)))]) - (public ;; in atomic mode; must override [read-in/inner @@ -35,9 +29,19 @@ ;; in atomic mode [purge-buffer (lambda () + (slow-mode!) (set! pos 0) (set! end-pos 0) - (set! peeked-eof? #f))]) + (set! peeked-eof? #f))] + + [close-peek-buffer + (lambda () + (purge-buffer) + (set! bstr #""))] + + [buffer-adjust-pos + (lambda (i) + (- i (fx- end-pos (if buffer buffer-pos pos))))]) (private ;; in atomic mode @@ -74,117 +78,154 @@ (bytes-copy! bstr 0 bstr pos end-pos) (set! end-pos (fx- end-pos pos)) (set! pos 0) - (pull-more-bytes)]))] + (pull-more-bytes amt)]))] ;; in atomic mode [retry-pull? (lambda (v) - (and (integer? v) (not (eqv? v 0))))]) + (and (integer? v) (not (eqv? v 0))))] + + ;; in atomic mode + [fast-mode! + (lambda (amt) ; amt = not yet added to `count` + (unless count + (set! buffer bstr) + (define s pos) + (set! buffer-pos s) + (set! buffer-end end-pos) + (define o offset) + (when o + (set! offset (- (+ o amt) s)))))] + + ;; in atomic mode + [slow-mode! + (lambda () + (when buffer + (define s buffer-pos) + (define o offset) + (when o + (set! offset (+ o s))) + (set! pos s) + (set! buffer #f) + (set! buffer-pos buffer-end)))]) (override - ;; in atomic mode - [read-in - (lambda (dest-bstr start end copy?) - (let try-again () - (cond - [(pos . fx< . end-pos) - (define amt (min (fx- end-pos pos) (fx- end start))) - (bytes-copy! dest-bstr start bstr pos (fx+ pos amt)) - amt] - [peeked-eof? - (set! peeked-eof? #f) - ;; an EOF doesn't count as progress - eof] - [else - (cond - [(and (fx< (fx- end start) (bytes-length bstr)) - (eq? 'block buffer-mode)) - (define v (pull-some-bytes)) - (cond - [(or (eqv? v 0) (evt? v)) v] - [else (try-again)])] - [else - (define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?)) - (unless (eqv? v 0) - (progress!)) - v])])))] + ;; in atomic mode + [prepare-change + (lambda () + (pause-waiting-commit))] + + ;; in atomic mode + [read-in + (lambda (dest-bstr start end copy?) + (slow-mode!) + (let try-again () + (cond + [(pos . fx< . end-pos) + (define amt (min (fx- end-pos pos) (fx- end start))) + (bytes-copy! dest-bstr start bstr pos (fx+ pos amt)) + (set! pos (fx+ pos amt)) + (progress!) + (fast-mode! amt) + amt] + [peeked-eof? + (set! peeked-eof? #f) + ;; an EOF doesn't count as progress + eof] + [else + (cond + [(and (eq? 'block buffer-mode) + (fx< (fx- end start) (fxrshift (bytes-length bstr) 1))) + (define v (pull-some-bytes)) + (cond + [(or (eqv? v 0) (evt? v)) v] + [else (try-again)])] + [else + (define v (send peek-via-read-input-port this read-in/inner dest-bstr start end copy?)) + (unless (eqv? v 0) + (progress!)) + v])])))] - ;; in atomic mode - [peek-in - (lambda (dest-bstr start end skip progress-evt copy?) - (let try-again () - (cond - [(and progress-evt - (sync/timeout 0 progress-evt)) - #f] - [else - (define peeked-amt (fx- end-pos pos)) - (cond - [(peeked-amt . > . skip) - (define amt (min (fx- peeked-amt skip) (fx- end start))) - (define s-pos (fx+ pos skip)) - (bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt)) - amt] - [peeked-eof? - eof] - [else - (define v (pull-more-bytes (- skip peeked-amt))) - (if (retry-pull? v) - (try-again) - v)])])))] + ;; in atomic mode + [peek-in + (lambda (dest-bstr start end skip progress-evt copy?) + (let try-again () + (cond + [(and progress-evt + (sync/timeout 0 progress-evt)) + #f] + [else + (define s (if buffer buffer-pos pos)) + (define peeked-amt (fx- end-pos s)) + (cond + [(peeked-amt . > . skip) + (define amt (min (fx- peeked-amt skip) (fx- end start))) + (define s-pos (fx+ s skip)) + (bytes-copy! dest-bstr start bstr s-pos (fx+ s-pos amt)) + amt] + [peeked-eof? + eof] + [else + (slow-mode!) + (define v (pull-more-bytes (+ (- skip peeked-amt) (fx- end start)))) + (if (retry-pull? v) + (try-again) + v)])])))] - ;; in atomic mode - [byte-ready - (lambda (work-done!) - (let loop () - (define peeked-amt (fx- end-pos pos)) - (cond - [(peeked-amt . fx> . 0) #t] - [peeked-eof? #t] - [else - (define v (pull-some-bytes)) - (work-done!) - (cond - [(retry-pull? v) - (loop)] - [(evt? v) v] - [else - (not (eqv? v 0))])])))] + ;; in atomic mode + [byte-ready + (lambda (work-done!) + (let loop () + (define peeked-amt (fx- end-pos (if buffer buffer-pos pos))) + (cond + [(peeked-amt . fx> . 0) #t] + [peeked-eof? #t] + [else + (slow-mode!) + (define v (pull-some-bytes)) + (work-done!) + (cond + [(retry-pull? v) + (loop)] + [(evt? v) v] + [else + (not (eqv? v 0))])])))] - [get-progress-evt - (lambda () - (atomically - (make-progress-evt)))] + [get-progress-evt + (lambda () + (atomically + (slow-mode!) + (make-progress-evt)))] - ;; in atomic mode - [commit - (lambda (amt progress-evt ext-evt finish) - (wait-commit - progress-evt ext-evt - ;; in atomic mode, maybe in a different thread: - (lambda () - (let ([amt (fxmin amt (fx- end-pos pos))]) - (cond - [(fx= 0 amt) - (finish #"")] - [else - (define dest-bstr (make-bytes amt)) - (bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt)) - (set! pos (fx+ pos amt)) - (progress!) - (finish dest-bstr)])))))] + ;; in atomic mode + [commit + (lambda (amt progress-evt ext-evt finish) + (slow-mode!) + (wait-commit + progress-evt ext-evt + ;; in atomic mode, maybe in a different thread: + (lambda () + (let ([amt (fxmin amt (fx- end-pos pos))]) + (cond + [(fx= 0 amt) + (finish #"")] + [else + (define dest-bstr (make-bytes amt)) + (bytes-copy! dest-bstr 0 bstr pos (fx+ pos amt)) + (set! pos (fx+ pos amt)) + (progress!) + (finish dest-bstr)])))))] - ;; in atomic mode - [buffer-mode - (case-lambda - [(self) buffer-mode] - [(self mode) (set! buffer-mode mode)])] + ;; in atomic mode + [buffer-mode + (case-lambda + [(self) buffer-mode] + [(self mode) (set! buffer-mode mode)])] - ;; in atomic mode - [close - (lambda () - (purge-buffer) - (set! bstr #""))])) + ;; in atomic mode + [close + (lambda () + (close-peek-buffer))])) ;; ---------------------------------------- diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index b50f7ed0fa..5e3ef973ee 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -127,7 +127,7 @@ (private [fast-mode! - (lambda (amt) + (lambda (amt) ; amt = not yet added to `count` (unless (or count buffer) (with-object pipe-data d (define s start)