add newline handling to reencode-input/output-port; provide a streamlined path for custom-port buffering (via pipes); fix JIT bug related to updating the runstack pointer after a call that turns out to be a direct-native call

svn: r8851

original commit: b3476706ece63ddd451de728d8923837172e481a
This commit is contained in:
Matthew Flatt 2008-03-02 16:00:12 +00:00
parent 143975381c
commit dc5dd14c5d

View File

@ -113,13 +113,15 @@
;; Not kill-safe. ;; Not kill-safe.
;; If the `read' proc returns an event, the event must produce ;; If the `read' proc returns an event, the event must produce
;; 0 always ;; 0 always (which implies that the `read' proc must not return
;; a pipe input port).
(define make-input-port/read-to-peek (define make-input-port/read-to-peek
(opt-lambda (name read fast-peek close (opt-lambda (name read fast-peek close
[location-proc #f] [location-proc #f]
[count-lines!-proc void] [count-lines!-proc void]
[init-position 1] [init-position 1]
[buffer-mode-proc #f]) [buffer-mode-proc #f]
[buffering? #f])
(define lock-semaphore (make-semaphore 1)) (define lock-semaphore (make-semaphore 1))
(define commit-semaphore (make-semaphore 1)) (define commit-semaphore (make-semaphore 1))
(define-values (peeked-r peeked-w) (make-pipe)) (define-values (peeked-r peeked-w) (make-pipe))
@ -143,6 +145,11 @@
(thread-resume manager-th (current-thread)) (thread-resume manager-th (current-thread))
(dynamic-wind suspend-manager thunk resume-manager)) (dynamic-wind suspend-manager thunk resume-manager))
(define (make-progress) (define (make-progress)
;; We dont worry about this byte getting picked up directly
;; from peeked-r, because the pipe must have been empty when
;; we grabed the lock, and since we've grabbed the lock,
;; no other thread could have re-returned the pipe behind
;; our back.
(write-byte 0 peeked-w) (write-byte 0 peeked-w)
(read-byte peeked-r)) (read-byte peeked-r))
(define (read-it-with-lock s) (define (read-it-with-lock s)
@ -157,14 +164,25 @@
s)) s))
(define (do-read-it s) (define (do-read-it s)
(if (byte-ready? peeked-r) (if (byte-ready? peeked-r)
(read-bytes-avail!* s peeked-r) peeked-r
;; If nothing is saved from a peeking read, ;; If nothing is saved from a peeking read,
;; dispatch to `read', otherwise return ;; dispatch to `read', otherwise return
;; previously peeked data ;; previously peeked data
(cond (cond
[(null? special-peeked) [(null? special-peeked)
(when progress-requested? (make-progress)) (when progress-requested? (make-progress))
(read s)] (if (and buffering?
((bytes-length s) . < . 10))
;; Buffering is enabled, so read more to move things
;; along:
(let ([r (read buf)])
(if (and (number? r) (positive? r))
(begin
(write-bytes buf peeked-w 0 r)
peeked-r)
r))
;; Just read requested amount:
(read s))]
[else (if (bytes? (mcar special-peeked)) [else (if (bytes? (mcar special-peeked))
(let ([b (mcar special-peeked)]) (let ([b (mcar special-peeked)])
(write-bytes b peeked-w) (write-bytes b peeked-w)
@ -205,13 +223,17 @@
;; Empty special queue, so read through the original proc. ;; Empty special queue, so read through the original proc.
;; We only only need ;; We only only need
;; (- (+ skip (bytes-length s)) (pipe-content-length peeked-w)) ;; (- (+ skip (bytes-length s)) (pipe-content-length peeked-w))
;; bytes, but read more (up to size of buf) to help move ;; bytes, but if buffering is enabled, read more (up to size of
;; things along. ;; buf) to help move things along.
(let* ([r (read buf)]) (let* ([dest (if buffering?
buf
(make-bytes (- (+ skip (bytes-length s))
(pipe-content-length peeked-w))))]
[r (read dest)])
(cond (cond
[(number? r) [(number? r)
;; The nice case --- reading gave us more bytes ;; The nice case --- reading gave us more bytes
(write-bytes buf peeked-w 0 r) (write-bytes dest peeked-w 0 r)
;; Now try again ;; Now try again
(peek-bytes-avail!* s skip #f peeked-r)] (peek-bytes-avail!* s skip #f peeked-r)]
[(evt? r) [(evt? r)
@ -427,7 +449,12 @@
location-proc location-proc
count-lines!-proc count-lines!-proc
init-position init-position
buffer-mode-proc))) (and buffer-mode-proc
(case-lambda
[() (buffer-mode-proc)]
[(mode)
(set! buffering? (eq? mode 'block))
(buffer-mode-proc mode)])))))
(define peeking-input-port (define peeking-input-port
(opt-lambda (orig-in [name (object-name orig-in)] [delta 0]) (opt-lambda (orig-in [name (object-name orig-in)] [delta 0])
@ -1154,9 +1181,123 @@
;; -------------------------------------------------- ;; --------------------------------------------------
;; Helper for reencode-input-port: simulate the composition
;; of a CRLF/CRNEL/NEL/LS -> LF decoding and some other
;; decoding.
;; The "converter" `c' is (mcons converter saved), where
;; saved is #f if no byte is saved, otherwise it's a saved
;; byte. It would be nicer and closer to the `bytes-convert'
;; interface to not consume a trailing CR, but we don't
;; know the inner encoding, and so we can't rewind it.
(define (bytes-convert/post-nl c buf buf-start buf-end dest)
(cond
[(and (mcdr c) (= buf-start buf-end))
;; No more bytes to convert; provide single
;; saved byte if it's not #\return, other report 'aborts
(if (eq? (mcdr c) (char->integer #\return))
(values 0 0 'aborts)
(begin
(bytes-set! dest 0 (mcdr c))
(set-mcdr! c #f)
(values 1 0 'complete)))]
[(and (mcdr c) (= 1 (bytes-length dest)))
;; We have a saved byte, but the destination is only 1 byte.
;; If the saved byte is a return, we need to try decoding more,
;; which means we may end up saving a non-#\return byte:
(if (eq? (mcdr c) (char->integer #\return))
(let-values ([(got-c used-c status)
(bytes-convert (mcar c) buf buf-start buf-end dest)])
(if (positive? got-c)
(cond
[(eq? (bytes-ref dest 0) (char->integer #\newline))
;; Found CRLF, so just produce LF (and nothing to save)
(set-mcdr! c #f)
(values 1 used-c status)]
[else
;; Next char fits in a byte, so it isn't NEL, etc.
;; Save it, and for now return the #\return.
(set-mcdr! c (bytes-ref dest 0))
(bytes-set! dest 0 (char->integer #\newline))
(values 1 used-c 'continues)])
;; Didn't decode any more; ask for bigger input, etc.
(values 0 0 status)))
;; Saved a non-#\return, so use that up now.
(begin
(bytes-set! dest 0 (mcdr c))
(set-mcdr! c #f)
(values 1 0 'continues)))]
[else
;; Normal convert, maybe prefixed:
(let-values ([(got-c used-c status)
(bytes-convert (mcar c) buf buf-start buf-end dest
(if (mcdr c) 1 0))])
(let* ([got-c (if (mcdr c)
;; Insert saved character:
(begin
(bytes-set! dest 0 (char->integer #\return))
(set-mcdr! c #f)
(add1 got-c))
got-c)]
[got-c (if (and (positive? got-c)
(eq? (bytes-ref dest (sub1 got-c)) (char->integer #\return))
(not (eq? status 'error)))
;; Save trailing carriage return:
(begin
(set-mcdr! c (char->integer #\return))
(sub1 got-c))
got-c)])
;; Iterate through the converted bytes to apply the newline conversions:
(let loop ([i 0]
[j 0])
(cond
[(= i got-c)
(values (- got-c (- i j)) used-c (if (and (eq? 'complete status)
(mcdr c))
'aborts
status))]
[(eq? (bytes-ref dest i) (char->integer #\return))
(cond
[(= (add1 i) got-c)
;; Found lone CR:
(bytes-set! dest j (char->integer #\newline))
(loop (add1 i) (add1 j))]
[(eq? (bytes-ref dest (add1 i)) (char->integer #\newline))
;; Found CRLF:
(bytes-set! dest j (char->integer #\newline))
(loop (+ i 2) (add1 j))]
[(and (eq? (bytes-ref dest (add1 i)) #o302)
(eq? (bytes-ref dest (+ i 2)) #o205))
;; Found CRNEL:
(bytes-set! dest j (char->integer #\newline))
(loop (+ i 3) (add1 j))]
[else
;; Found lone CR:
(bytes-set! dest j (char->integer #\newline))
(loop (add1 i) (add1 j))])]
[(and (eq? (bytes-ref dest i) #o302)
(eq? (bytes-ref dest (+ i 1)) #o205))
;; Found NEL:
(bytes-set! dest j (char->integer #\newline))
(loop (+ i 2) (add1 j))]
[(and (eq? (bytes-ref dest i) #o342)
(eq? (bytes-ref dest (+ i 1)) #o200)
(eq? (bytes-ref dest (+ i 2)) #o250))
;; Found LS:
(bytes-set! dest j (char->integer #\newline))
(loop (+ i 3) (add1 j))]
[else
;; Anything else:
(unless (= i j)
(bytes-set! dest j (bytes-ref dest i)))
(loop (add1 i) (add1 j))]))))]))
(define reencode-input-port (define reencode-input-port
(opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)]) (opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)]
(let ([c (bytes-open-converter encoding "UTF-8")] [newline-convert? #f])
(let ([c (let ([c (bytes-open-converter encoding "UTF-8")])
(if newline-convert?
(mcons c #f)
c))]
[ready-bytes (make-bytes 1024)] [ready-bytes (make-bytes 1024)]
[ready-start 0] [ready-start 0]
[ready-end 0] [ready-end 0]
@ -1181,20 +1322,33 @@
;; Try converting already-read bytes: ;; Try converting already-read bytes:
(let-values ([(got-c used-c status) (if (= buf-start buf-end) (let-values ([(got-c used-c status) (if (= buf-start buf-end)
(values 0 0 'aborts) (values 0 0 'aborts)
(bytes-convert c buf buf-start buf-end s))]) ((if newline-convert?
bytes-convert/post-nl
bytes-convert)
c buf buf-start buf-end s))])
(when (positive? used-c)
(set! buf-start (+ used-c buf-start)))
(cond (cond
[(positive? got-c) [(positive? got-c)
;; We converted some bytes into s. ;; We converted some bytes into s.
(set! buf-start (+ used-c buf-start))
got-c] got-c]
[(eq? status 'aborts) [(eq? status 'aborts)
(if buf-eof? (if buf-eof?
;; Had an EOF or special in the stream. ;; Had an EOF or special in the stream.
(if (= buf-start buf-end) (if (= buf-start buf-end)
(if (and newline-convert? (mcdr c)) ; should be bytes-convert-end
;; Have leftover CR:
(begin
(bytes-set! s 0 (if (eq? (mcdr c) (char->integer #\return))
(char->integer #\newline)
(mcdr c)))
(set-mcdr! c #f)
1)
;; Return EOF:
(begin0 (begin0
buf-eof-result buf-eof-result
(set! buf-eof? #f) (set! buf-eof? #f)
(set! buf-eof-result #f)) (set! buf-eof-result #f)))
(handle-error s)) (handle-error s))
;; Need more bytes. ;; Need more bytes.
(begin (begin
@ -1227,7 +1381,10 @@
[(eq? status 'continues) [(eq? status 'continues)
;; Need more room to make progress at all. ;; Need more room to make progress at all.
;; Decode into ready-bytes. ;; Decode into ready-bytes.
(let-values ([(got-c used-c status) (bytes-convert c buf buf-start buf-end ready-bytes)]) (let-values ([(got-c used-c status) ((if newline-convert?
bytes-convert/post-nl
bytes-convert)
c buf buf-start buf-end ready-bytes)])
(unless (memq status '(continues complete)) (unless (memq status '(continues complete))
(error 'reencode-input-port-read (error 'reencode-input-port-read
"unable to make decoding progress: ~e" "unable to make decoding progress: ~e"
@ -1267,16 +1424,20 @@
(lambda () (lambda ()
(when close? (when close?
(close-input-port port)) (close-input-port port))
(bytes-close-converter c)) (bytes-close-converter (if newline-convert?
(mcar c)
c)))
#f void 1 #f void 1
(case-lambda (case-lambda
[() buffer-mode] [() buffer-mode]
[(mode) (set! buffer-mode mode)]))))) [(mode) (set! buffer-mode mode)])
(eq? buffer-mode 'block)))))
;; -------------------------------------------------- ;; --------------------------------------------------
(define reencode-output-port (define reencode-output-port
(opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)]) (opt-lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)]
[convert-newlines-to #f])
(let ([c (bytes-open-converter "UTF-8" encoding)] (let ([c (bytes-open-converter "UTF-8" encoding)]
[ready-bytes (make-bytes 1024)] [ready-bytes (make-bytes 1024)]
[ready-start 0] [ready-start 0]
@ -1285,7 +1446,10 @@
[out-start 0] [out-start 0]
[out-end 0] [out-end 0]
[buffer-mode (or (file-stream-buffer-mode port) [buffer-mode (or (file-stream-buffer-mode port)
'block)]) 'block)]
[debuffer-buf #f]
[newline-buffer #f])
(define-values (buffered-r buffered-w) (make-pipe 4096))
;; The main writing entry point: ;; The main writing entry point:
(define (write-it s start end no-buffer&block? enable-break?) (define (write-it s start end no-buffer&block? enable-break?)
@ -1294,6 +1458,7 @@
;; This is a flush request; no-buffer&block? must be #f ;; This is a flush request; no-buffer&block? must be #f
;; Note: we could get stuck because only half an encoding ;; Note: we could get stuck because only half an encoding
;; is available in out-bytes. ;; is available in out-bytes.
(flush-buffer-pipe #f enable-break?)
(flush-some #f enable-break?) (flush-some #f enable-break?)
(if (buffer-flushed?) (if (buffer-flushed?)
0 0
@ -1317,32 +1482,98 @@
(set! out-start 0) (set! out-start 0)
(set! out-end 0) (set! out-end 0)
(- c out-len)))))])] (- c out-len)))))])]
[(and (eq? buffer-mode 'block)
(zero? (pipe-content-length buffered-r)))
;; The port system can buffer to a pipe faster, so give it a pipe.
buffered-w]
[else [else
(when (or (> ready-end ready-start) ;; Flush/buffer from pipe, first:
(< (- (bytes-length out-bytes) out-end) 100)) (flush-buffer-pipe #f enable-break?)
;; Make room for conversion. ;; Flush as needed to make room in the buffer:
(flush-some #f enable-break?) ;; convert some (make-buffer-room #f enable-break?)
(flush-some #f enable-break?)) ;; write converted
;; Make room in buffer
(when (positive? out-start)
(bytes-copy! out-bytes 0 out-bytes out-start out-end)
(set! out-end (- out-end out-start))
(set! out-start 0))
;; Buffer some bytes: ;; Buffer some bytes:
(let ([cnt (min (- end start) (let-values ([(s2 start2 cnt2 used) (convert-newlines s start
(- end start)
(- (bytes-length out-bytes) out-end))]) (- (bytes-length out-bytes) out-end))])
(if (zero? cnt) (if (zero? used)
;; No room --- try flushing again: ;; No room --- try flushing again:
(write-it s start end #f enable-break?) (write-it s start end #f enable-break?)
;; Buffer and report success: ;; Buffer and report success:
(begin (begin
(bytes-copy! out-bytes out-end s start (+ start cnt)) (bytes-copy! out-bytes out-end s2 start2 (+ start2 cnt2))
(set! out-end (+ cnt out-end)) (set! out-end (+ cnt2 out-end))
(case buffer-mode (case buffer-mode
[(none) (flush-all-now enable-break?)] [(none) (flush-all-now enable-break?)]
[(line) (when (regexp-match-positions #rx#"[\r\n]" s start (+ start cnt)) [(line) (when (regexp-match-positions #rx#"[\r\n]" s start (+ start used))
(flush-all-now enable-break?))]) (flush-all-now enable-break?))])
cnt)))])) used)))]))
(define (convert-newlines s start cnt avail)
;; If newline converting is on, try convert up to cnt
;; bytes to produce a result that fits in avail bytes.
(if convert-newlines-to
;; Conversion:
(let ([end (+ start cnt)]
[avail (min avail 1024)])
(unless newline-buffer
(set! newline-buffer (make-bytes 1024)))
(let loop ([i start][j 0])
(cond
[(or (= j avail) (= i end)) (values newline-buffer 0 j i)]
[(eq? (char->integer #\newline) (bytes-ref s i))
;; Newline conversion
(let ([len (bytes-length convert-newlines-to)])
(if ((+ j len) . > . avail)
;; No room
(values newline-buffer 0 j i)
;; Room
(begin
(bytes-copy! newline-buffer j convert-newlines-to)
(loop (add1 i) (+ j len)))))]
[else
(bytes-set! newline-buffer j (bytes-ref s i))
(loop (add1 i) (add1 j))])))
;; No conversion:
(let ([cnt (min cnt avail)])
(values s start cnt cnt))))
(define (make-buffer-room non-block? enable-break?)
(when (or (> ready-end ready-start)
(< (- (bytes-length out-bytes) out-end) 100))
;; Make room for conversion.
(flush-some non-block? enable-break?) ;; convert some
(flush-some non-block? enable-break?)) ;; write converted
;; Make room in buffer
(when (positive? out-start)
(bytes-copy! out-bytes 0 out-bytes out-start out-end)
(set! out-end (- out-end out-start))
(set! out-start 0)))
(define (flush-buffer-pipe non-block? enable-break?)
(let loop ()
(if (zero? (pipe-content-length buffered-r))
'done
(begin
(unless debuffer-buf
(set! debuffer-buf (make-bytes 4096)))
(make-buffer-room non-block? enable-break?)
(let ([amt (- (bytes-length out-bytes) out-end)])
(if (zero? amt)
'stuck
(if convert-newlines-to
;; Peek, convert newlines, write, then read converted amount:
(let ([cnt (peek-bytes-avail! debuffer-buf 0 #f buffered-r 0 amt)])
(let-values ([(s2 start2 cnt2 used)
(convert-newlines debuffer-buf 0 cnt amt)])
(bytes-copy! out-bytes out-end s2 start2 cnt2)
(set! out-end (+ cnt2 out-end))
(read-bytes-avail! debuffer-buf buffered-r 0 used)
(loop)))
;; Skip an indirection: read directly and write:
(let ([cnt (read-bytes-avail! debuffer-buf buffered-r 0 amt)])
(bytes-copy! out-bytes out-end debuffer-buf 0 cnt)
(set! out-end (+ cnt out-end))
(loop)))))))))
(define (non-blocking-write s start end) (define (non-blocking-write s start end)
;; For now, everything that we can flushed is flushed. ;; For now, everything that we can flushed is flushed.
@ -1351,23 +1582,32 @@
;; everyone is happy enough. If some of the bytes get written, ;; everyone is happy enough. If some of the bytes get written,
;; the we will have buffered bytes when we shouldn't have. ;; the we will have buffered bytes when we shouldn't have.
;; That probably won't happen, but we can't guarantee it. ;; That probably won't happen, but we can't guarantee it.
(if (sync/timeout 0.0 port)
;; We should be able to write one byte...
(let loop ([len 1]) (let loop ([len 1])
(let-values ([(got-c used-c status) (bytes-convert c s start (+ start len) ready-bytes)]) (let*-values ([(s2 start2 len2 used) (convert-newlines s start (- end start) len)]
[(got-c used-c status) (bytes-convert c s2 start2 (+ start2 len2) ready-bytes)])
(cond (cond
[(positive? got-c) [(positive? got-c)
(try-flush-ready got-c used-c)] (try-flush-ready got-c used-c)
;; If used-c < len2, then we converted only partially --- which
;; is strange, because we kept adding bytes one at a time.
;; we will just guess is that the unused bytes were not converted
;; bytes, and generally hope that this sort of encoding doesn't
;; show up.
(- used (- len2 used-c))]
[(eq? status 'aborts) [(eq? status 'aborts)
(if (< len (- end start)) (if (< len (- end start))
;; Try converting a bigger chunk ;; Try converting a bigger chunk
(loop (add1 len)) (loop (add1 len))
;; We can't flush half an encoding, so just buffer it. ;; We can't flush half an encoding, so just buffer it.
(let ([cnt (- start end)]) (begin
(when (> (- end start) (bytes-length out-bytes)) (when (> len2 (bytes-length out-bytes))
(raise-insane-decoding-length)) (raise-insane-decoding-length))
(bytes-copy out-bytes 0 s start end) (bytes-copy out-bytes 0 s2 start2 (+ start2 len2))
(set! out-start 0) (set! out-start 0)
(set! out-end cnt) (set! out-end len2)
cnt))] used))]
[(eq? status 'continues) [(eq? status 'continues)
;; Not enough room in ready-bytes!? We give up. ;; Not enough room in ready-bytes!? We give up.
(raise-insane-decoding-length)] (raise-insane-decoding-length)]
@ -1375,7 +1615,10 @@
;; Encoding error. Try to flush error bytes. ;; Encoding error. Try to flush error bytes.
(let ([cnt (bytes-length error-bytes)]) (let ([cnt (bytes-length error-bytes)])
(bytes-copy! ready-bytes 0 error-bytes) (bytes-copy! ready-bytes 0 error-bytes)
(try-flush-ready cnt 1))])))) (try-flush-ready cnt 1)
used)])))
;; Port is not ready for writing:
#f))
(define (write-special-it v no-buffer&block? enable-break?) (define (write-special-it v no-buffer&block? enable-break?)
(cond (cond
@ -1390,6 +1633,7 @@
[else [else
;; Note: we could get stuck because only half an encoding ;; Note: we could get stuck because only half an encoding
;; is available in out-bytes. ;; is available in out-bytes.
(flush-buffer-pipe no-buffer&block? enable-break?)
(flush-some no-buffer&block? enable-break?) (flush-some no-buffer&block? enable-break?)
(if (or (buffer-flushed?) (if (or (buffer-flushed?)
(not no-buffer&block?)) (not no-buffer&block?))
@ -1398,6 +1642,7 @@
;; flush-all : -> 'done, 'not-done, or 'stuck ;; flush-all : -> 'done, 'not-done, or 'stuck
(define (flush-all non-block? enable-break?) (define (flush-all non-block? enable-break?)
(if (eq? (flush-buffer-pipe non-block? enable-break?) 'done)
(let ([orig-none-ready? (= ready-start ready-end)] (let ([orig-none-ready? (= ready-start ready-end)]
[orig-out-start out-start] [orig-out-start out-start]
[orig-out-end out-end]) [orig-out-end out-end])
@ -1411,7 +1656,8 @@
(= orig-out-start out-start) (= orig-out-start out-start)
(= orig-out-end out-end)) (= orig-out-end out-end))
'stuck 'stuck
'not-done)))) 'not-done)))
'stuck))
(define (flush-all-now enable-break?) (define (flush-all-now enable-break?)
(case (flush-all #f enable-break?) (case (flush-all #f enable-break?)
@ -1419,20 +1665,17 @@
(define (buffer-flushed?) (define (buffer-flushed?)
(and (= ready-start ready-end) (and (= ready-start ready-end)
(= out-start out-end))) (= out-start out-end)
(zero? (pipe-content-length buffered-r))))
;; Try to flush immediately a certain number of bytes ;; Try to flush immediately a certain number of bytes.
;; we've already converted them, so we have to keep
;; the bytes in any case.
(define (try-flush-ready got-c used-c) (define (try-flush-ready got-c used-c)
(let ([c (write-bytes-avail* ready-bytes port 0 got-c)]) (let ([c (write-bytes-avail* ready-bytes port 0 got-c)])
(if (zero? c) (unless (= c got-c)
;; Didn't flush any - give up:
#f
;; Hopefully, we flushed them all, but set ready-start and ready-end,
;; just in case.
(begin
(set! ready-start c) (set! ready-start c)
(set! ready-end got-c) (set! ready-end got-c))))
used-c))))
;; Try to make progress flushing buffered bytes ;; Try to make progress flushing buffered bytes
(define (flush-some non-block? enable-break?) (define (flush-some non-block? enable-break?)