#lang racket/base (require (for-syntax racket/base) racket/contract/base mzlib/list "private/port.rkt") (define (input-port-with-progress-evts? ip) (and (input-port? ip) (port-provides-progress-evts? ip))) (define (mutable-bytes? b) (and (bytes? b) (not (immutable? b)))) (define (mutable-string? b) (and (string? b) (not (immutable? b)))) (define (line-mode-symbol? s) (memq s '(linefeed return return-linefeed any any-one))) (define (evt?/false v) (or (eq? #f v) (evt? v))) ;; ---------------------------------------- (define (strip-shell-command-start in) (when (regexp-match-peek #rx#"^#![^\r\n]*" in) (let loop ([s (read-line in)]) (when (regexp-match #rx#"\\\\$" s) (loop (read-line in)))))) ;; ---------------------------------------- (define (copy-port src dest . dests) (unless (input-port? src) (raise-type-error 'copy-port "input-port" src)) (for-each (lambda (dest) (unless (output-port? dest) (raise-type-error 'copy-port "output-port" dest))) (cons dest dests)) (let ([s (make-bytes 4096)] [dests (cons dest dests)]) (let loop () (let ([c (read-bytes-avail! s src)]) (cond [(number? c) (let loop ([dests dests]) (unless (null? dests) (let loop ([start 0]) (unless (= start c) (let ([c2 (write-bytes-avail s (car dests) start c)]) (loop (+ start c2))))) (loop (cdr dests)))) (loop)] [(procedure? c) (let ([v (let-values ([(l col p) (port-next-location src)]) (c (object-name src) l col p))]) (let loop ([dests dests]) (unless (null? dests) (write-special v (car dests)) (loop (cdr dests))))) (loop)] [else ;; Must be EOF (void)]))))) (define merge-input (case-lambda [(a b) (merge-input a b 4096)] [(a b limit) (or (input-port? a) (raise-type-error 'merge-input "input-port" a)) (or (input-port? b) (raise-type-error 'merge-input "input-port" b)) (or (not limit) (and (number? limit) (positive? limit) (exact? limit) (integer? limit)) (raise-type-error 'merge-input "positive exact integer or #f" limit)) (let-values ([(rd wt) (make-pipe-with-specials limit)] [(other-done?) #f] [(sema) (make-semaphore 1)]) (let ([copy (lambda (from) (thread (lambda () (copy-port from wt) (semaphore-wait sema) (if other-done? (close-output-port wt) (set! other-done? #t)) (semaphore-post sema))))]) (copy a) (copy b) rd))])) ;; `make-input-port/read-to-peek' sometimes needs to wrap a special-value ;; procedure so that it's only called once when the value is both ;; peeked and read. (define-values (struct:memoized make-memoized memoized? memoized-ref memoized-set!) (make-struct-type 'memoized #f 1 0 #f null (current-inspector) 0)) (define (memoize p) (define result #f) (make-memoized (if (procedure-arity-includes? p 0) ;; original p accepts 0 or 4 arguments: (case-lambda [() (unless result (set! result (box (p)))) (unbox result)] [(src line col pos) (unless result (set! result (box (p src line col pos)))) (unbox result)]) ;; original p accepts only 4 arguments: (lambda (src line col pos) (unless result (set! result (box (p src line col pos)))) (unbox result))))) ;; Not kill-safe. ;; If the `read' proc returns an event, the event must produce ;; 0 always (which implies that the `read' proc must not return ;; a pipe input port). (define make-input-port/read-to-peek (lambda (name read fast-peek close [location-proc #f] [count-lines!-proc void] [init-position 1] [buffer-mode-proc #f] [buffering? #f] [on-consumed #f]) (define lock-semaphore (make-semaphore 1)) (define commit-semaphore (make-semaphore 1)) (define-values (peeked-r peeked-w) (make-pipe)) (define special-peeked null) (define special-peeked-tail #f) (define progress-requested? #f) (define line-counting? #f) (define use-manager? #f) (define manager-th #f) (define manager-ch (make-channel)) (define resume-ch (make-channel)) (define buf (make-bytes 4096)) (define (try-again) (wrap-evt (semaphore-peek-evt lock-semaphore) (lambda (x) 0))) (define (suspend-manager) (channel-put manager-ch 'suspend)) (define (resume-manager) (channel-put resume-ch 'resume)) (define (with-manager-lock thunk) (thread-resume manager-th (current-thread)) (dynamic-wind suspend-manager thunk resume-manager)) (define (make-progress) ;; We dont worry about this byte getting picked up directly ;; from peeked-r, because the pipe must have been empty when ;; we grabed the lock, and since we've grabbed the lock, ;; no other thread could have re-returned the pipe behind ;; our back. (write-byte 0 peeked-w) (read-byte peeked-r)) (define (consume-from-peeked s) (let ([n (read-bytes-avail!* s peeked-r)]) (when on-consumed (on-consumed n)) n)) (define (read-it-with-lock s) (if use-manager? (with-manager-lock (lambda () (do-read-it s))) (do-read-it s))) (define (read-it s) (call-with-semaphore lock-semaphore read-it-with-lock try-again s)) (define (do-read-it s) (if (byte-ready? peeked-r) (if on-consumed (consume-from-peeked s) peeked-r) ;; If nothing is saved from a peeking read, dispatch to ;; `read', otherwise return previously peeked data (cond [(null? special-peeked) (when progress-requested? (make-progress)) (if (and buffering? ((bytes-length s) . < . 10)) ;; Buffering is enabled, so read more to move things ;; along: (let ([r (read buf)]) (if (and (number? r) (positive? r)) (begin (write-bytes buf peeked-w 0 r) (if on-consumed (consume-from-peeked s) peeked-r)) (begin (when on-consumed (on-consumed r)) r))) ;; Just read requested amount: (let ([v (read s)]) (when on-consumed (on-consumed v)) v))] [else (if (bytes? (mcar special-peeked)) (let ([b (mcar special-peeked)]) (write-bytes b peeked-w) (set! special-peeked (mcdr special-peeked)) (when (null? special-peeked) (set! special-peeked-tail #f)) (consume-from-peeked s)) (let ([v (mcar special-peeked)]) (make-progress) (set! special-peeked (mcdr special-peeked)) (when on-consumed (on-consumed v)) (when (null? special-peeked) (set! special-peeked-tail #f)) v))]))) (define (peek-it-with-lock s skip unless-evt) (if use-manager? (with-manager-lock (lambda () (do-peek-it s skip unless-evt))) (do-peek-it s skip unless-evt))) (define (peek-it s skip unless-evt) (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)]) (if (eq? v 0) (call-with-semaphore lock-semaphore peek-it-with-lock try-again s skip unless-evt) v))) (define (do-peek-it s skip unless-evt) (let ([v (peek-bytes-avail!* s skip unless-evt peeked-r)]) (if (eq? v 0) ;; The peek may have failed because peeked-r is empty, ;; because unless-evt is ready, or because the skip is ;; far. Handle nicely the common case where there are no ;; specials. (cond [(and unless-evt (sync/timeout 0 unless-evt)) #f] [(null? special-peeked) ;; Empty special queue, so read through the original proc. ;; We only only need ;; (- (+ skip (bytes-length s)) (pipe-content-length peeked-w)) ;; bytes, but if buffering is enabled, read more (up to size of ;; buf) to help move things along. (let* ([dest (if buffering? buf (make-bytes (- (+ skip (bytes-length s)) (pipe-content-length peeked-w))))] [r (read dest)]) (cond [(number? r) ;; The nice case --- reading gave us more bytes (write-bytes dest peeked-w 0 r) ;; Now try again (peek-bytes-avail!* s skip #f peeked-r)] [(evt? r) (if unless-evt ;; Technically, there's a race condition here. ;; We might choose r (and return 0) even when ;; unless-evt becomes available first. However, ;; this race is detectable only by the inside ;; of `read'. (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) r)] [else (set! special-peeked (mcons r null)) (set! special-peeked-tail special-peeked) ;; Now try again (do-peek-it s skip unless-evt)]))] [else ;; Non-empty special queue, so try to use it (let* ([avail (pipe-content-length peeked-r)] [sk (- skip avail)]) (let loop ([sk sk] [l special-peeked]) (cond [(null? l) ;; Not enough even in the special queue. ;; Read once and add it. (let* ([t (make-bytes (min 4096 (+ sk (bytes-length s))))] [r (read t)]) (cond [(evt? r) (if unless-evt ;; See note above (choice-evt r (wrap-evt unless-evt (lambda (x) #f))) r)] [(eq? r 0) ;; Original read thinks a spin is ok, ;; so we return 0 to skin, too. 0] [else (let ([v (if (number? r) (subbytes t 0 r) r)]) (let ([pr (mcons v null)]) (set-mcdr! special-peeked-tail pr) (set! special-peeked-tail pr)) ;; Got something; now try again (do-peek-it s skip unless-evt))]))] [(eof-object? (mcar l)) ;; No peeking past an EOF eof] [(procedure? (mcar l)) (if (zero? sk) ;; We should call the procedure only once. Change ;; (mcar l) to a memoizing function, if it isn't already: (let ([proc (mcar l)]) (if (memoized? proc) proc (let ([proc (memoize proc)]) (set-mcar! l proc) proc))) ;; Skipping over special... (loop (sub1 sk) (mcdr l)))] [(bytes? (mcar l)) (let ([len (bytes-length (mcar l))]) (if (sk . < . len) (let ([n (min (bytes-length s) (- len sk))]) (bytes-copy! s 0 (mcar l) sk (+ sk n)) n) (loop (- sk len) (mcdr l))))])))]) v))) (define (commit-it-with-lock amt unless-evt done-evt) (if use-manager? (with-manager-lock (lambda () (do-commit-it amt unless-evt done-evt))) (do-commit-it amt unless-evt done-evt))) (define (commit-it amt unless-evt done-evt) (call-with-semaphore lock-semaphore commit-it-with-lock #f amt unless-evt done-evt)) (define (do-commit-it amt unless-evt done-evt) (if (sync/timeout 0 unless-evt) #f (let* ([avail (pipe-content-length peeked-r)] [p-commit (min avail amt)]) (let loop ([amt (- amt p-commit)] [l special-peeked] ;; result is either bytes (if needed for line ounting) ;; or an integer count (for on-consumed) [result (if line-counting? null 0)]) (cond [(amt . <= . 0) ;; Enough has been peeked. Do commit... (actual-commit p-commit l unless-evt done-evt result)] [(null? l) ;; Requested commit was larger than previous peeks #f] [(bytes? (mcar l)) (let ([bl (bytes-length (mcar l))]) (if (bl . > . amt) ;; Split the string (let ([next (mcons (subbytes (mcar l) amt) (mcdr l))]) (set-mcar! l (subbytes (mcar l) 0 amt)) (set-mcdr! l next) (when (eq? l special-peeked-tail) (set! special-peeked-tail next)) (loop 0 (mcdr l) (if line-counting? (cons (subbytes (mcar l) 0 amt) result) (+ amt result)))) ;; Consume this string... (loop (- amt bl) (mcdr l) (if line-counting? (cons (mcar l) result) (+ bl result)))))] [else (loop (sub1 amt) (mcdr l) (if line-counting? (cons #"." result) (add1 result)))]))))) (define (actual-commit p-commit l unless-evt done-evt result) ;; The `finish' proc finally, actually, will commit... (define (finish) (let ([result (if line-counting? (cons (peek-bytes p-commit 0 peeked-r) result) (+ p-commit result))]) (unless (zero? p-commit) (peek-byte peeked-r (sub1 p-commit)) (port-commit-peeked p-commit unless-evt always-evt peeked-r)) (set! special-peeked l) (when (null? special-peeked) (set! special-peeked-tail #f)) (when (and progress-requested? (zero? p-commit)) (make-progress)) (if line-counting? ;; bytes representation of committed text allows line counting ;; to be updated correctly (when line counting is implemented ;; automatically) (let ([bstr (apply bytes-append (reverse result))]) (when on-consumed (on-consumed (bytes-length bstr))) bstr) (begin (when on-consumed (on-consumed result)) #t)))) ;; If we can sync done-evt immediately, then finish. (if (sync/timeout 0 (wrap-evt done-evt (lambda (x) #t))) (finish) ;; We need to wait, so we'll have to release the lock. ;; Send the work to a manager thread. (let ([result-ch (make-channel)] [w/manager? use-manager?]) (if w/manager? ;; Resume manager if it was running: (resume-manager) ;; Start manager if it wasn't running: (begin (set! manager-th (thread manage-commits)) (set! use-manager? #t) (thread-resume manager-th (current-thread)))) ;; Sets use-manager? if the manager wasn't already running: (channel-put manager-ch (list finish unless-evt done-evt result-ch)) ;; Release locks: (semaphore-post lock-semaphore) (begin0 ;; Wait for manager to complete commit: (sync result-ch) ;; Grab locks again, so they're released ;; properly on exit: (semaphore-wait lock-semaphore) (when w/manager? (suspend-manager)))))) (define (manage-commits) (let loop ([commits null]) (apply sync (handle-evt manager-ch (lambda (c) (case c [(suspend) (channel-get resume-ch) (loop commits)] [else ;; adding a commit (loop (cons c commits))]))) (map (lambda (c) (define (send-result v) ;; Create a new thread to send the result asynchronously: (thread-resume (thread (lambda () (channel-put (list-ref c 3) v))) (current-thread)) (when (null? (cdr commits)) (set! use-manager? #f)) (loop (remq c commits))) ;; Choose between done and unless: (if (sync/timeout 0 (list-ref c 1)) (handle-evt always-evt (lambda (x) (send-result #f))) (choice-evt (handle-evt (list-ref c 1) (lambda (x) ;; unless ready, which means that the commit must fail (send-result #f))) (handle-evt (list-ref c 2) (lambda (x) ;; done-evt ready, which means that the commit ;; must succeed. ;; If we get here, then commits are not ;; suspended, so we implicitly have the ;; lock. ((list-ref c 0)) (send-result #t)))))) commits)))) (make-input-port name ;; Read read-it ;; Peek (if fast-peek (let ([fast-peek-k (lambda (s skip) (peek-it s skip #f))]) (lambda (s skip unless-evt) (if (or unless-evt (byte-ready? peeked-r) (mpair? special-peeked)) (peek-it s skip unless-evt) (fast-peek s skip fast-peek-k)))) peek-it) close (lambda () (set! progress-requested? #t) (port-progress-evt peeked-r)) commit-it location-proc (lambda () (set! line-counting? #t) (count-lines!-proc)) init-position (and buffer-mode-proc (case-lambda [() (buffer-mode-proc)] [(mode) (set! buffering? (eq? mode 'block)) (buffer-mode-proc mode)]))))) (define peeking-input-port (lambda (orig-in [name (object-name orig-in)] [delta 0]) (make-input-port/read-to-peek name (lambda (s) (let ([r (peek-bytes-avail!* s delta #f orig-in)]) (set! delta (+ delta (if (number? r) r 1))) (if (eq? r 0) (handle-evt orig-in (lambda (v) 0)) r))) (lambda (s skip default) (peek-bytes-avail!* s (+ delta skip) #f orig-in)) void))) (define relocate-input-port (lambda (p line col pos [close? #t]) (transplant-to-relocate transplant-input-port p line col pos close?))) (define transplant-input-port (lambda (p location-proc pos [close? #t] [count-lines!-proc void]) (make-input-port (object-name p) (lambda (s) (let ([v (read-bytes-avail!* s p)]) (if (eq? v 0) (wrap-evt p (lambda (x) 0)) v))) (lambda (s skip evt) (let ([v (peek-bytes-avail!* s skip evt p)]) (if (eq? v 0) (choice-evt (wrap-evt p (lambda (x) 0)) (if evt (wrap-evt evt (lambda (x) #f)) never-evt)) v))) (lambda () (when close? (close-input-port p))) (and (port-provides-progress-evts? p) (lambda () (port-progress-evt p))) (and (port-provides-progress-evts? p) (lambda (n evt target-evt) (port-commit-peeked n evt target-evt p))) location-proc count-lines!-proc pos))) (define filter-read-input-port (lambda (p wrap-read wrap-peek [close? #t]) (make-input-port (object-name p) (lambda (s) (let ([v (read-bytes-avail!* s p)]) (wrap-read s (if (eq? v 0) (wrap-evt p (lambda (x) 0)) v)))) (lambda (s skip evt) (let ([v (peek-bytes-avail!* s skip evt p)]) (wrap-peek s skip evt (if (eq? v 0) (choice-evt (wrap-evt p (lambda (x) 0)) (if evt (wrap-evt evt (lambda (x) #f)) never-evt)) v)))) (lambda () (when close? (close-input-port p))) (and (port-provides-progress-evts? p) (lambda () (port-progress-evt p))) (and (port-provides-progress-evts? p) (lambda (n evt target-evt) (port-commit-peeked n evt target-evt p))) (lambda () (port-next-location p)) (lambda () (port-count-lines! p)) (add1 (file-position p))))) ;; Not kill-safe. (define make-pipe-with-specials ;; This implementation of pipes is almost CML-style, with a manager thread ;; to guard access to the pipe content. But we only enable the manager ;; thread when write evts are active; otherwise, we use a lock semaphore. ;; (Actually, the lock semaphore has to be used all the time, to guard ;; the flag indicating whether the manager thread is running.) (lambda ([limit (expt 2 64)] [in-name 'pipe] [out-name 'pipe]) (let-values ([(r w) (make-pipe limit)] [(more) null] [(more-last) #f] [(more-sema) #f] [(close-w?) #f] [(lock-semaphore) (make-semaphore 1)] [(mgr-th) #f] [(via-manager?) #f] [(mgr-ch) (make-channel)]) (define (flush-more) (if (null? more) (begin (set! more-last #f) (when close-w? (close-output-port w))) (when (bytes? (mcar more)) (let ([amt (bytes-length (mcar more))]) (let ([wrote (write-bytes-avail* (mcar more) w)]) (if (= wrote amt) (begin (set! more (mcdr more)) (flush-more)) (begin ;; This means that we let too many bytes ;; get written while a special was pending. ;; (The limit is disabled when a special ;; is in the pipe.) (set-mcar! more (subbytes (mcar more) wrote)) ;; By peeking, make room for more: (peek-byte r (sub1 (min (pipe-content-length w) (- amt wrote)))) (flush-more)))))))) (define (read-one s) (let ([v (read-bytes-avail!* s r)]) (if (eq? v 0) (if more-last ;; Return a special (let ([a (mcar more)]) (set! more (mcdr more)) (flush-more) (lambda (file line col ppos) a)) ;; Nothing available, yet. (begin (unless more-sema (set! more-sema (make-semaphore))) (wrap-evt (semaphore-peek-evt more-sema) (lambda (x) 0)))) v))) (define (close-it) (set! close-w? #t) (unless more-last (close-output-port w)) (when more-sema (semaphore-post more-sema))) (define (write-these-bytes str start end) (begin0 (if more-last (let ([p (mcons (subbytes str start end) null)]) (set-mcdr! more-last p) (set! more-last p) (- end start)) (let ([v (write-bytes-avail* str w start end)]) (if (zero? v) (wrap-evt w (lambda (x) #f)) v))) (when more-sema (semaphore-post more-sema) (set! more-sema #f)))) (define (write-spec v) (let ([p (mcons v null)]) (if more-last (set-mcdr! more-last p) (set! more p)) (set! more-last p) (when more-sema (semaphore-post more-sema) (set! more-sema #f)))) (define (serve) ;; A request is ;; (list sym result-ch nack-evt . v) ;; where `v' varies for different `sym's ;; The possible syms are: read, reply, close, ;; write, write-spec, write-evt, write-spec-evt (let loop ([reqs null]) (apply sync ;; Listen for a request: (handle-evt mgr-ch (lambda (req) (let ([req ;; Most requests we handle immediately and ;; convert to a reply. The manager thread ;; implicitly has the lock. (let ([reply (lambda (v) (list 'reply (cadr req) (caddr req) v))]) (case (car req) [(read) (reply (read-one (cadddr req)))] [(close) (reply (close-it))] [(write) (reply (apply write-these-bytes (cdddr req)))] [(write-spec) (reply (write-spec (cadddr req)))] [else req]))]) (loop (cons req reqs))))) (if (and (null? reqs) via-manager?) ;; If we can get the lock before another request ;; turn off manager mode: (handle-evt lock-semaphore (lambda (x) (set! via-manager? #f) (semaphore-post lock-semaphore) (loop null))) never-evt) (append (map (lambda (req) (case (car req) [(reply) (handle-evt (channel-put-evt (cadr req) (cadddr req)) (lambda (x) (loop (remq req reqs))))] [(write-spec-evt) (if close-w? ;; Report close error: (handle-evt (channel-put-evt (cadr req) 'closed) (lambda (x) (loop (remq req reqs)))) ;; Try to write special: (handle-evt (channel-put-evt (cadr req) #t) (lambda (x) ;; We sync'd, so now we *must* write (write-spec (cadddr req)) (loop (remq req reqs)))))] [(write-evt) (if close-w? ;; Report close error: (handle-evt (channel-put-evt (cadr req) 'closed) (lambda (x) (loop (remq req reqs)))) ;; Try to write bytes: (let* ([start (list-ref req 4)] [end (list-ref req 5)] [len (if more-last (- end start) (min (- end start) (max 0 (- limit (pipe-content-length w)))))]) (if (and (zero? len) (null? more)) (handle-evt w (lambda (x) (loop reqs))) (handle-evt (channel-put-evt (cadr req) len) (lambda (x) ;; We sync'd, so now we *must* write (write-these-bytes (cadddr req) start (+ start len)) (loop (remq req reqs)))))))])) reqs) ;; nack => remove request (could be anything) (map (lambda (req) (handle-evt (caddr req) (lambda (x) (loop (remq req reqs))))) reqs))))) (define (via-manager what req-sfx) (thread-resume mgr-th (current-thread)) (let ([ch (make-channel)]) (sync (nack-guard-evt (lambda (nack) (channel-put mgr-ch (list* what ch nack req-sfx)) ch))))) (define (start-mgr) (unless mgr-th (set! mgr-th (thread serve))) (set! via-manager? #t)) (define (evt what req-sfx) (nack-guard-evt (lambda (nack) (resume-mgr) (let ([ch (make-channel)]) (call-with-semaphore lock-semaphore (lambda () (unless mgr-th (set! mgr-th (thread serve))) (set! via-manager? #t) (thread-resume mgr-th (current-thread)) (channel-put mgr-ch (list* what ch nack req-sfx)) (wrap-evt ch (lambda (x) (if (eq? x 'close) (raise-mismatch-error 'write-evt "port is closed: " out) x))))))))) (define (resume-mgr) (when mgr-th (thread-resume mgr-th (current-thread)))) (define in ;; ----- Input ------ (make-input-port/read-to-peek in-name (lambda (s) (let ([v (read-bytes-avail!* s r)]) (if (eq? v 0) (begin (resume-mgr) (call-with-semaphore lock-semaphore (lambda () (if via-manager? (via-manager 'read (list s)) (read-one s))))) v))) #f void)) (define out ;; ----- Output ------ (make-output-port out-name w ;; write (lambda (str start end buffer? w/break?) (if (= start end) #t (begin (resume-mgr) (call-with-semaphore lock-semaphore (lambda () (if via-manager? (via-manager 'write (list str start end)) (write-these-bytes str start end))))))) ;; close (lambda () (resume-mgr) (call-with-semaphore lock-semaphore (lambda () (if via-manager? (via-manager 'close null) (close-it))))) ;; write-special (lambda (v buffer? w/break?) (resume-mgr) (call-with-semaphore lock-semaphore (lambda () (if via-manager? (via-manager 'write-spec (list v)) (write-spec v))))) ;; write-evt (lambda (str start end) (if (= start end) (wrap-evt always-evt (lambda (x) 0)) (evt 'write-evt (list str start end)))) ;; write-special-evt (lambda (v) (evt 'write-spec-evt (list v))))) (values in out)))) (define input-port-append (lambda (close-orig? . ports) (make-input-port (map object-name ports) (lambda (str) ;; Reading is easy -- read from the first port, ;; and get rid of it if the result is eof (if (null? ports) eof (let ([n (read-bytes-avail!* str (car ports))]) (cond [(eq? n 0) (wrap-evt (car ports) (lambda (x) 0))] [(eof-object? n) (when close-orig? (close-input-port (car ports))) (set! ports (cdr ports)) 0] [else n])))) (lambda (str skip unless-evt) ;; Peeking is more difficult, due to skips. (let loop ([ports ports][skip skip]) (if (null? ports) eof (let ([n (peek-bytes-avail!* str skip unless-evt (car ports))]) (cond [(eq? n 0) ;; Not ready, yet. (peek-bytes-avail!-evt str skip unless-evt (car ports))] [(eof-object? n) ;; Port is exhausted, or we skipped past its input. ;; If skip is not zero, we need to figure out ;; how many chars were skipped. (loop (cdr ports) (- skip (compute-avail-to-skip skip (car ports))))] [else n]))))) (lambda () (when close-orig? (map close-input-port ports)))))) (define (convert-stream from from-port to to-port) (let ([c (bytes-open-converter from to)] [in (make-bytes 4096)] [out (make-bytes 4096)]) (unless c (error 'convert-stream "could not create converter from ~e to ~e" from to)) (dynamic-wind void (lambda () (let loop ([got 0]) (let ([n (read-bytes-avail! in from-port got)]) (let ([got (+ got (if (number? n) n 0))]) (let-values ([(wrote used status) (bytes-convert c in 0 got out)]) (when (eq? status 'error) (error 'convert-stream "conversion error")) (unless (zero? wrote) (write-bytes out to-port 0 wrote)) (bytes-copy! in 0 in used got) (if (not (number? n)) (begin (unless (= got used) (error 'convert-stream "input stream ~a with a partial conversion" (if (eof-object? n) "ended" "hit a special value"))) (let-values ([(wrote status) (bytes-convert-end c out)]) (when (eq? status 'error) (error 'convert-stream "conversion-end error")) (unless (zero? wrote) (write-bytes out to-port 0 wrote)) (if (eof-object? n) ;; Success (void) (begin (write-special n to-port) (loop 0))))) (loop (- got used)))))))) (lambda () (bytes-close-converter c))))) ;; Helper for input-port-append; given a skip count ;; and an input port, determine how many characters ;; (up to upto) are left in the port. We figure this ;; out using binary search. (define (compute-avail-to-skip upto p) (let ([str (make-bytes 1)]) (let loop ([upto upto][skip 0]) (if (zero? upto) skip (let* ([half (quotient upto 2)] [n (peek-bytes-avail!* str (+ skip half) #f p)]) (if (eq? n 1) (loop (- upto half 1) (+ skip half 1)) (loop half skip))))))) (define make-limited-input-port (lambda (port limit [close-orig? #t]) (let ([got 0] [lock-semaphore (make-semaphore 1)]) (define (do-read str) (let ([count (min (- limit got) (bytes-length str))]) (if (zero? count) eof (let ([n (read-bytes-avail!* str port 0 count)]) (cond [(eq? n 0) (wrap-evt port (lambda (x) 0))] [(number? n) (set! got (+ got n)) n] [(procedure? n) (set! got (add1 got)) n] [else n]))))) (define (do-peek str skip progress-evt) (let ([count (max 0 (min (- limit got skip) (bytes-length str)))]) (if (zero? count) eof (let ([n (peek-bytes-avail!* str skip progress-evt port 0 count)]) (if (eq? n 0) (wrap-evt port (lambda (x) 0)) n))))) (define (try-again) (wrap-evt (semaphore-peek-evt lock-semaphore) (lambda (x) 0))) (make-input-port (object-name port) (lambda (str) (call-with-semaphore lock-semaphore do-read try-again str)) (lambda (str skip progress-evt) (call-with-semaphore lock-semaphore do-peek try-again str skip progress-evt)) (lambda () (when close-orig? (close-input-port port))) (and (port-provides-progress-evts? port) (lambda () (port-progress-evt port))) (and (port-provides-progress-evts? port) (lambda (n evt target-evt) (let loop () (if (semaphore-try-wait? lock-semaphore) (let ([ok? (port-commit-peeked n evt target-evt port)]) (when ok? (set! got (+ got n))) (semaphore-post lock-semaphore) ok?) (sync (handle-evt evt (lambda (v) #f)) (handle-evt (semaphore-peek-evt lock-semaphore) (lambda (v) (loop)))))))) (lambda () (port-next-location port)) (lambda () (port-count-lines! port)) (add1 (file-position port)))))) (define special-filter-input-port (lambda (p filter [close? #t]) (unless (input-port? p) (raise-type-error 'special-filter-input-port "input port" p)) (unless (and (procedure? filter) (procedure-arity-includes? filter 2)) (raise-type-error 'special-filter-input-port "procedure (arity 2)" filter)) (make-input-port (object-name p) (lambda (s) (let ([v (read-bytes-avail!* s p)]) (cond [(eq? v 0) (wrap-evt p (lambda (x) 0))] [(procedure? v) (filter v s)] [else v]))) (lambda (s skip evt) (let ([v (peek-bytes-avail!* s skip evt p)]) (cond [(eq? v 0) (choice-evt (wrap-evt p (lambda (x) 0)) (if evt (wrap-evt evt (lambda (x) #f)) never-evt))] [(procedure? v) (filter v s)] [else v]))) (lambda () (when close? (close-input-port p))) (and (port-provides-progress-evts? p) (lambda () (port-progress-evt p))) (and (port-provides-progress-evts? p) (lambda (n evt target-evt) (port-commit-peeked n evt target-evt p))) (lambda () (port-next-location p)) (lambda () (port-count-lines! p)) (add1 (file-position p))))) ;; ---------------------------------------- (define (poll-or-spawn go) (poll-guard-evt (lambda (poll?) (if poll? ;; In poll mode, call `go' directly: (let ([v (go never-evt #f #t)]) (if v (wrap-evt always-evt (lambda (x) v)) never-evt)) ;; In non-poll mode, start a thread to call go (nack-guard-evt (lambda (nack) (define ch (make-channel)) (define ready (make-semaphore)) (let ([t (thread (lambda () (parameterize-break #t (with-handlers ([exn:break? void]) (semaphore-post ready) (go nack ch #f)))))]) (thread (lambda () (sync nack) (semaphore-wait ready) (break-thread t)))) ch)))))) (define (read-at-least-bytes!-evt orig-bstr input-port need-more? shrink combo peek-offset prog-evt) ;; go is the main reading function, either called directly for ;; a poll, or called in a thread for a non-poll read (define (go nack ch poll?) (let try-again ([pos 0][bstr orig-bstr]) (let* ([progress-evt (or prog-evt (port-progress-evt input-port))] [v ((if poll? peek-bytes-avail!* peek-bytes-avail!) bstr (+ pos (or peek-offset 0)) progress-evt input-port pos)]) (cond ;; the first two cases below are shortcuts, and not ;; strictly necessary [(sync/timeout 0 nack) (void)] [(sync/timeout 0 progress-evt) (cond [poll? #f] [prog-evt (void)] [else (try-again pos bstr)])] [(and poll? (equal? v 0)) #f] [(and (number? v) (need-more? bstr (+ pos v))) => (lambda (bstr) (try-again (+ v pos) bstr))] [else (let* ([v2 (cond [(number? v) (shrink bstr (+ v pos))] [(positive? pos) pos] [else v])] [result (combo bstr v2)]) (cond [peek-offset (if poll? result (sync (or prog-evt never-evt) (channel-put-evt ch result)))] [(port-commit-peeked (if (number? v2) v2 1) progress-evt (if poll? always-evt (channel-put-evt ch result)) input-port) result] [(and (eof-object? eof) (zero? pos) (not (sync/timeout 0 progress-evt))) ;; Must be a true end-of-file (let ([result (combo bstr eof)]) (if poll? result (channel-put ch result)))] [poll? #f] [else (try-again 0 orig-bstr)]))])))) (if (zero? (bytes-length orig-bstr)) (wrap-evt always-evt (lambda (x) 0)) (poll-or-spawn go))) (define (-read-bytes-avail!-evt bstr input-port peek-offset prog-evt) (read-at-least-bytes!-evt bstr input-port (lambda (bstr v) (if (zero? v) bstr #f)) (lambda (bstr v) v) (lambda (bstr v) v) peek-offset prog-evt)) (define (read-bytes-avail!-evt bstr input-port) (-read-bytes-avail!-evt bstr input-port #f #f)) (define (peek-bytes-avail!-evt bstr peek-offset prog-evt input-port) (-read-bytes-avail!-evt bstr input-port peek-offset prog-evt)) (define (-read-bytes!-evt bstr input-port peek-offset prog-evt) (read-at-least-bytes!-evt bstr input-port (lambda (bstr v) (if (v . < . (bytes-length bstr)) bstr #f)) (lambda (bstr v) v) (lambda (bstr v) v) peek-offset prog-evt)) (define (read-bytes!-evt bstr input-port) (-read-bytes!-evt bstr input-port #f #f)) (define (peek-bytes!-evt bstr peek-offset prog-evt input-port) (-read-bytes!-evt bstr input-port peek-offset prog-evt)) (define (-read-bytes-evt len input-port peek-offset prog-evt) (let ([bstr (make-bytes len)]) (wrap-evt (-read-bytes!-evt bstr input-port peek-offset prog-evt) (lambda (v) (if (number? v) (if (= v len) bstr (subbytes bstr 0 v)) v))))) (define (read-bytes-evt len input-port) (-read-bytes-evt len input-port #f #f)) (define (peek-bytes-evt len peek-offset prog-evt input-port) (-read-bytes-evt len input-port peek-offset prog-evt)) (define (-read-string-evt goal input-port peek-offset prog-evt) (if (zero? goal) (wrap-evt always-evt (lambda (x) "")) (let ([bstr (make-bytes goal)] [c (bytes-open-converter "UTF-8-permissive" "UTF-8")]) (wrap-evt (read-at-least-bytes!-evt bstr input-port (lambda (bstr v) (if (= v (bytes-length bstr)) ;; We can't easily use bytes-utf-8-length here, ;; because we may need more bytes to figure out ;; the true role of the last byte. The ;; `bytes-convert' function lets us deal with ;; the last byte properly. (let-values ([(bstr2 used status) (bytes-convert c bstr 0 v)]) (let ([got (bytes-utf-8-length bstr2)]) (if (= got goal) ;; Done: #f ;; Need more bytes: (let ([bstr2 (make-bytes (+ v (- goal got)))]) (bytes-copy! bstr2 0 bstr) bstr2)))) ;; Need more bytes in bstr: bstr)) (lambda (bstr v) ;; We may need one less than v, ;; because we may have had to peek ;; an extra byte to discover an ;; error in the stream. (if ((bytes-utf-8-length bstr #\? 0 v) . > . goal) (sub1 v) v)) cons peek-offset prog-evt) (lambda (bstr+v) (let ([bstr (car bstr+v)] [v (cdr bstr+v)]) (if (number? v) (bytes->string/utf-8 bstr #\? 0 v) v))))))) (define (read-string-evt goal input-port) (-read-string-evt goal input-port #f #f)) (define (peek-string-evt goal peek-offset prog-evt input-port) (-read-string-evt goal input-port peek-offset prog-evt)) (define (-read-string!-evt str input-port peek-offset prog-evt) (wrap-evt (-read-string-evt (string-length str) input-port peek-offset prog-evt) (lambda (s) (if (string? s) (begin (string-copy! str 0 s) (string-length s)) s)))) (define (read-string!-evt str input-port) (-read-string!-evt str input-port #f #f)) (define (peek-string!-evt str peek-offset prog-evt input-port) (-read-string!-evt str input-port peek-offset prog-evt)) (define (regexp-match-evt pattern input-port) (define (go nack ch poll?) (let try-again () (if (port-closed? input-port) #f (let* ([progress-evt (port-progress-evt input-port)] [m ((if poll? regexp-match-peek-positions-immediate regexp-match-peek-positions) pattern input-port 0 #f progress-evt)]) (cond [(sync/timeout 0 nack) (void)] [(sync/timeout 0 progress-evt) (try-again)] [(not m) (if poll? #f (sync nack (handle-evt progress-evt (lambda (x) (try-again)))))] [else (let ([m2 (map (lambda (p) (and p (let ([bstr (make-bytes (- (cdr p) (car p)))]) (unless (= (car p) (cdr p)) (let loop ([offset 0]) (let ([v (peek-bytes-avail! bstr (car p) progress-evt input-port offset)]) (unless (zero? v) (when ((+ offset v) . < . (bytes-length bstr)) (loop (+ offset v))))))) bstr))) m)]) (cond [(and (zero? (cdar m)) (or poll? (channel-put ch m2))) m2] [(port-commit-peeked (cdar m) progress-evt (if poll? always-evt (channel-put-evt ch m2)) input-port) m2] [poll? #f] [else (try-again)]))]))))) (poll-or-spawn go)) (define-syntax (newline-rx stx) (syntax-case stx () [(_ str) (datum->syntax #'here (byte-regexp (string->bytes/latin-1 (format "^(?:(.*?)~a)|(.*?$)" (syntax-e #'str)))))])) (define read-bytes-line-evt (lambda (input-port [mode 'linefeed]) (wrap-evt (regexp-match-evt (case mode [(linefeed) (newline-rx "\n")] [(return) (newline-rx "\r")] [(return-linefeed) (newline-rx "\r\n")] [(any) (newline-rx "(?:\r\n|\r|\n)")] [(any-one) (newline-rx "[\r\n]")]) input-port) (lambda (m) (or (cadr m) (let ([l (caddr m)]) (if (and l (zero? (bytes-length l))) eof l))))))) (define read-line-evt (lambda (input-port [mode 'linefeed]) (wrap-evt (read-bytes-line-evt input-port mode) (lambda (s) (if (eof-object? s) s (bytes->string/utf-8 s #\?)))))) (define (eof-evt input-port) (wrap-evt (regexp-match-evt #rx#"^$" input-port) (lambda (x) eof))) ;; -------------------------------------------------- ;; Helper for reencode-input-port: simulate the composition ;; of a CRLF/CRNEL/NEL/LS -> LF decoding and some other ;; decoding. ;; The "converter" `c' is (mcons converter saved), where ;; saved is #f if no byte is saved, otherwise it's a saved ;; byte. It would be nicer and closer to the `bytes-convert' ;; interface to not consume a trailing CR, but we don't ;; know the inner encoding, and so we can't rewind it. (define (bytes-convert/post-nl c buf buf-start buf-end dest) (cond [(and (mcdr c) (= buf-start buf-end)) ;; No more bytes to convert; provide single ;; saved byte if it's not #\return, otherwise report 'aborts (if (eq? (mcdr c) (char->integer #\return)) (values 0 0 'aborts) (begin (bytes-set! dest 0 (mcdr c)) (set-mcdr! c #f) (values 1 0 'complete)))] [(and (mcdr c) (= 1 (bytes-length dest))) ;; We have a saved byte, but the destination is only 1 byte. ;; If the saved byte is a return, we need to try decoding more, ;; which means we may end up saving a non-#\return byte: (if (eq? (mcdr c) (char->integer #\return)) (let-values ([(got-c used-c status) (bytes-convert (mcar c) buf buf-start buf-end dest)]) (if (positive? got-c) (cond [(eq? (bytes-ref dest 0) (char->integer #\newline)) ;; Found CRLF, so just produce LF (and nothing to save) (set-mcdr! c #f) (values 1 used-c status)] [else ;; Next char fits in a byte, so it isn't NEL, etc. ;; Save it, and for now return the #\return. (set-mcdr! c (bytes-ref dest 0)) (bytes-set! dest 0 (char->integer #\newline)) (values 1 used-c 'continues)]) ;; Didn't decode any more; ask for bigger input, etc. (values 0 0 status))) ;; Saved a non-#\return, so use that up now. (begin (bytes-set! dest 0 (mcdr c)) (set-mcdr! c #f) (values 1 0 'continues)))] [else ;; Normal convert, maybe prefixed: (let-values ([(got-c used-c status) (bytes-convert (mcar c) buf buf-start buf-end dest (if (mcdr c) 1 0))]) (let* ([got-c (if (mcdr c) ;; Insert saved character: (begin (bytes-set! dest 0 (char->integer #\return)) (set-mcdr! c #f) (add1 got-c)) got-c)] [got-c (if (and (positive? got-c) (eq? (bytes-ref dest (sub1 got-c)) (char->integer #\return)) (not (eq? status 'error))) ;; Save trailing carriage return: (begin (set-mcdr! c (char->integer #\return)) (sub1 got-c)) got-c)]) ;; Iterate through the converted bytes to apply the newline ;; conversions: (let loop ([i 0] [j 0]) (cond [(= i got-c) (values (- got-c (- i j)) used-c (if (and (eq? 'complete status) (mcdr c)) 'aborts status))] [(eq? (bytes-ref dest i) (char->integer #\return)) (cond [(= (add1 i) got-c) ;; Found lone CR: (bytes-set! dest j (char->integer #\newline)) (loop (add1 i) (add1 j))] [(eq? (bytes-ref dest (add1 i)) (char->integer #\newline)) ;; Found CRLF: (bytes-set! dest j (char->integer #\newline)) (loop (+ i 2) (add1 j))] [(and (eq? (bytes-ref dest (add1 i)) #o302) (eq? (bytes-ref dest (+ i 2)) #o205)) ;; Found CRNEL: (bytes-set! dest j (char->integer #\newline)) (loop (+ i 3) (add1 j))] [else ;; Found lone CR: (bytes-set! dest j (char->integer #\newline)) (loop (add1 i) (add1 j))])] [(and (eq? (bytes-ref dest i) #o302) (eq? (bytes-ref dest (+ i 1)) #o205)) ;; Found NEL: (bytes-set! dest j (char->integer #\newline)) (loop (+ i 2) (add1 j))] [(and (eq? (bytes-ref dest i) #o342) (eq? (bytes-ref dest (+ i 1)) #o200) (eq? (bytes-ref dest (+ i 2)) #o250)) ;; Found LS: (bytes-set! dest j (char->integer #\newline)) (loop (+ i 3) (add1 j))] [else ;; Anything else: (unless (= i j) (bytes-set! dest j (bytes-ref dest i))) (loop (add1 i) (add1 j))]))))])) (define reencode-input-port (lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)] [newline-convert? #f] [decode-error (lambda (msg port) (error 'reencode-input-port (format "~a: ~e" msg) port))]) (let ([c (let ([c (bytes-open-converter encoding "UTF-8")]) (if newline-convert? (mcons c #f) c))] [ready-bytes (make-bytes 1024)] [ready-start 0] [ready-end 0] [buf (make-bytes 1024)] [buf-start 0] [buf-end 0] [buf-eof? #f] [buf-eof-result #f] [buffer-mode (or (file-stream-buffer-mode port) 'none)]) ;; Main reader entry: (define (read-it s) (cond [(> ready-end ready-start) ;; We have leftover converted bytes: (let ([cnt (min (bytes-length s) (- ready-end ready-start))]) (bytes-copy! s 0 ready-bytes ready-start (+ ready-start cnt)) (set! ready-start (+ ready-start cnt)) cnt)] [else ;; Try converting already-read bytes: (let-values ([(got-c used-c status) (if (= buf-start buf-end) (values 0 0 'aborts) ((if newline-convert? bytes-convert/post-nl bytes-convert) c buf buf-start buf-end s))]) (when (positive? used-c) (set! buf-start (+ used-c buf-start))) (cond [(positive? got-c) ;; We converted some bytes into s. got-c] [(eq? status 'aborts) (if buf-eof? ;; Had an EOF or special in the stream. (if (= buf-start buf-end) (if (and newline-convert? (mcdr c)) ; should be bytes-convert-end ;; Have leftover CR: (begin (bytes-set! s 0 (if (eq? (mcdr c) (char->integer #\return)) (char->integer #\newline) (mcdr c))) (set-mcdr! c #f) 1) ;; Return EOF: (begin0 buf-eof-result (set! buf-eof? #f) (set! buf-eof-result #f))) (handle-error s)) ;; Need more bytes. (begin (when (positive? buf-start) (bytes-copy! buf 0 buf buf-start buf-end) (set! buf-end (- buf-end buf-start)) (set! buf-start 0)) (let* ([amt (bytes-length s)] [c (read-bytes-avail!* buf port buf-end (if (eq? buffer-mode 'block) (bytes-length buf) (min (bytes-length buf) (+ buf-end amt))))]) (cond [(or (eof-object? c) (procedure? c)) ;; Got EOF/procedure (set! buf-eof? #t) (set! buf-eof-result c) (read-it s)] [(zero? c) ;; No bytes ready --- try again later. (wrap-evt port (lambda (v) 0))] [else ;; Got some bytes; loop to decode. (set! buf-end (+ buf-end c)) (read-it s)]))))] [(eq? status 'error) (handle-error s)] [(eq? status 'continues) ;; Need more room to make progress at all. ;; Decode into ready-bytes. (let-values ([(got-c used-c status) ((if newline-convert? bytes-convert/post-nl bytes-convert) c buf buf-start buf-end ready-bytes)]) (unless (memq status '(continues complete)) (decode-error "unable to make decoding progress" port)) (set! ready-start 0) (set! ready-end got-c) (set! buf-start (+ used-c buf-start)) (read-it s))]))])) ;; Raise exception or discard first buffered byte. ;; We assume that read-bytes is empty (define (handle-error s) (if error-bytes (begin (set! buf-start (add1 buf-start)) (let ([cnt (min (bytes-length s) (bytes-length error-bytes))]) (bytes-copy! s 0 error-bytes 0 cnt) (bytes-copy! ready-bytes 0 error-bytes cnt) (set! ready-start 0) (set! ready-end (- (bytes-length error-bytes) cnt)) cnt)) (decode-error "decoding error in input stream" port))) (unless c (error 'reencode-input-port "could not create converter from ~e to UTF-8" encoding)) (make-input-port/read-to-peek name read-it #f (lambda () (when close? (close-input-port port)) (bytes-close-converter (if newline-convert? (mcar c) c))) #f void 1 (case-lambda [() buffer-mode] [(mode) (set! buffer-mode mode)]) (eq? buffer-mode 'block))))) ;; -------------------------------------------------- (define reencode-output-port (lambda (port encoding [error-bytes #f] [close? #f] [name (object-name port)] [convert-newlines-to #f] [decode-error (lambda (msg port) (error 'reencode-input-port (format "~a: ~e" msg) port))]) (let ([c (bytes-open-converter "UTF-8" encoding)] [ready-bytes (make-bytes 1024)] [ready-start 0] [ready-end 0] [out-bytes (make-bytes 1024)] [out-start 0] [out-end 0] [buffer-mode (or (file-stream-buffer-mode port) 'block)] [debuffer-buf #f] [newline-buffer #f]) (define-values (buffered-r buffered-w) (make-pipe 4096)) ;; The main writing entry point: (define (write-it s start end no-buffer&block? enable-break?) (cond [(= start end) ;; This is a flush request; no-buffer&block? must be #f ;; Note: we could get stuck because only half an encoding ;; is available in out-bytes. (flush-buffer-pipe #f enable-break?) (flush-some #f enable-break?) (if (buffer-flushed?) 0 (write-it s start end no-buffer&block? enable-break?))] [no-buffer&block? (case (flush-all #t enable-break?) [(not-done) ;; We couldn't flush right away, so give up. #f] [(done) (non-blocking-write s start end)] [(stuck) ;; We need more bytes to make progress. ;; Add out-bytes and s into one string for non-blocking-write. (let ([s2 (bytes-append (subbytes out-bytes out-start out-end) (subbytes s start end))] [out-len (- out-end out-start)]) (let ([c (non-blocking-write s2 0 (bytes-length s2))]) (and c (begin (set! out-start 0) (set! out-end 0) (- c out-len)))))])] [(and (eq? buffer-mode 'block) (zero? (pipe-content-length buffered-r))) ;; The port system can buffer to a pipe faster, so give it a pipe. buffered-w] [else ;; Flush/buffer from pipe, first: (flush-buffer-pipe #f enable-break?) ;; Flush as needed to make room in the buffer: (make-buffer-room #f enable-break?) ;; Buffer some bytes: (let-values ([(s2 start2 cnt2 used) (convert-newlines s start (- end start) (- (bytes-length out-bytes) out-end))]) (if (zero? used) ;; No room --- try flushing again: (write-it s start end #f enable-break?) ;; Buffer and report success: (begin (bytes-copy! out-bytes out-end s2 start2 (+ start2 cnt2)) (set! out-end (+ cnt2 out-end)) (case buffer-mode [(none) (flush-all-now enable-break?)] [(line) (when (regexp-match-positions #rx#"[\r\n]" s start (+ start used)) (flush-all-now enable-break?))]) used)))])) (define (convert-newlines s start cnt avail) ;; If newline converting is on, try convert up to cnt ;; bytes to produce a result that fits in avail bytes. (if convert-newlines-to ;; Conversion: (let ([end (+ start cnt)] [avail (min avail 1024)]) (unless newline-buffer (set! newline-buffer (make-bytes 1024))) (let loop ([i start][j 0]) (cond [(or (= j avail) (= i end)) (values newline-buffer 0 j i)] [(eq? (char->integer #\newline) (bytes-ref s i)) ;; Newline conversion (let ([len (bytes-length convert-newlines-to)]) (if ((+ j len) . > . avail) ;; No room (values newline-buffer 0 j i) ;; Room (begin (bytes-copy! newline-buffer j convert-newlines-to) (loop (add1 i) (+ j len)))))] [else (bytes-set! newline-buffer j (bytes-ref s i)) (loop (add1 i) (add1 j))]))) ;; No conversion: (let ([cnt (min cnt avail)]) (values s start cnt cnt)))) (define (make-buffer-room non-block? enable-break?) (when (or (> ready-end ready-start) (< (- (bytes-length out-bytes) out-end) 100)) ;; Make room for conversion. (flush-some non-block? enable-break?) ;; convert some (flush-some non-block? enable-break?)) ;; write converted ;; Make room in buffer (when (positive? out-start) (bytes-copy! out-bytes 0 out-bytes out-start out-end) (set! out-end (- out-end out-start)) (set! out-start 0))) (define (flush-buffer-pipe non-block? enable-break?) (let loop () (if (zero? (pipe-content-length buffered-r)) 'done (begin (unless debuffer-buf (set! debuffer-buf (make-bytes 4096))) (make-buffer-room non-block? enable-break?) (let ([amt (- (bytes-length out-bytes) out-end)]) (if (zero? amt) 'stuck (if convert-newlines-to ;; Peek, convert newlines, write, then read converted amount: (let ([cnt (peek-bytes-avail! debuffer-buf 0 #f buffered-r 0 amt)]) (let-values ([(s2 start2 cnt2 used) (convert-newlines debuffer-buf 0 cnt amt)]) (bytes-copy! out-bytes out-end s2 start2 cnt2) (set! out-end (+ cnt2 out-end)) (read-bytes-avail! debuffer-buf buffered-r 0 used) (loop))) ;; Skip an indirection: read directly and write: (let ([cnt (read-bytes-avail! debuffer-buf buffered-r 0 amt)]) (bytes-copy! out-bytes out-end debuffer-buf 0 cnt) (set! out-end (+ cnt out-end)) (loop))))))))) (define (non-blocking-write s start end) ;; For now, everything that we can flushed is flushed. ;; Try to write the minimal number of bytes, and hope for the ;; best. If none of all of the minimal bytes get written, ;; everyone is happy enough. If some of the bytes get written, ;; the we will have buffered bytes when we shouldn't have. ;; That probably won't happen, but we can't guarantee it. (if (sync/timeout 0.0 port) ;; We should be able to write one byte... (let loop ([len 1]) (let*-values ([(s2 start2 len2 used) (convert-newlines s start (- end start) len)] [(got-c used-c status) (bytes-convert c s2 start2 (+ start2 len2) ready-bytes)]) (cond [(positive? got-c) (try-flush-ready got-c used-c) ;; If used-c < len2, then we converted only partially ;; --- which is strange, because we kept adding ;; bytes one at a time. we will just guess is that ;; the unused bytes were not converted bytes, and ;; generally hope that this sort of encoding doesn't ;; show up. (- used (- len2 used-c))] [(eq? status 'aborts) (if (< len (- end start)) ;; Try converting a bigger chunk (loop (add1 len)) ;; We can't flush half an encoding, so just buffer it. (begin (when (> len2 (bytes-length out-bytes)) (raise-insane-decoding-length)) (bytes-copy! out-bytes 0 s2 start2 (+ start2 len2)) (set! out-start 0) (set! out-end len2) used))] [(eq? status 'continues) ;; Not enough room in ready-bytes!? We give up. (raise-insane-decoding-length)] [else ;; Encoding error. Try to flush error bytes. (let ([cnt (bytes-length error-bytes)]) (bytes-copy! ready-bytes 0 error-bytes) (try-flush-ready cnt 1) used)]))) ;; Port is not ready for writing: #f)) (define (write-special-it v no-buffer&block? enable-break?) (cond [(buffer-flushed?) ((if no-buffer&block? write-special-avail* (if enable-break? (lambda (v p) (parameterize-break #t (write-special v p))) write-special)) v port)] [else ;; Note: we could get stuck because only half an encoding ;; is available in out-bytes. (flush-buffer-pipe no-buffer&block? enable-break?) (flush-some no-buffer&block? enable-break?) (if (or (buffer-flushed?) (not no-buffer&block?)) (write-special-it v no-buffer&block? enable-break?) #f)])) ;; flush-all : -> 'done, 'not-done, or 'stuck (define (flush-all non-block? enable-break?) (if (eq? (flush-buffer-pipe non-block? enable-break?) 'done) (let ([orig-none-ready? (= ready-start ready-end)] [orig-out-start out-start] [orig-out-end out-end]) (flush-some non-block? enable-break?) (if (buffer-flushed?) 'done ;; Couldn't flush everything. One possibility is that we need ;; more bytes to convert before a flush. (if (and orig-none-ready? (= ready-start ready-end) (= orig-out-start out-start) (= orig-out-end out-end)) 'stuck 'not-done))) 'stuck)) (define (flush-all-now enable-break?) (case (flush-all #f enable-break?) [(not-done) (flush-all-now enable-break?)])) (define (buffer-flushed?) (and (= ready-start ready-end) (= out-start out-end) (zero? (pipe-content-length buffered-r)))) ;; Try to flush immediately a certain number of bytes. ;; we've already converted them, so we have to keep ;; the bytes in any case. (define (try-flush-ready got-c used-c) (let ([c (write-bytes-avail* ready-bytes port 0 got-c)]) (unless (= c got-c) (set! ready-start c) (set! ready-end got-c)))) ;; Try to make progress flushing buffered bytes (define (flush-some non-block? enable-break?) (unless (= ready-start ready-end) ;; Flush converted bytes: (let ([cnt ((cond [non-block? write-bytes-avail*] [enable-break? write-bytes-avail/enable-break] [else write-bytes-avail]) ready-bytes port ready-start ready-end)]) (set! ready-start (+ ready-start cnt)))) (when (= ready-start ready-end) ;; Convert more, if available: (set! ready-start 0) (set! ready-end 0) (when (> out-end out-start) (let-values ([(got-c used-c status) (bytes-convert c out-bytes out-start out-end ready-bytes)]) (set! ready-end got-c) (set! out-start (+ out-start used-c)) (when (and (eq? status 'continues) (zero? used-c)) ;; Yikes! Size of ready-bytes isn't enough room for progress!? (raise-insane-decoding-length)) (when (and (eq? status 'error) (zero? used-c)) ;; No progress before an encoding error. (if error-bytes ;; Write error bytes and drop an output byte: (begin (set! out-start (add1 out-start)) (bytes-copy! ready-bytes 0 error-bytes) (set! ready-end (bytes-length error-bytes))) ;; Raise an exception: (begin (set! out-start out-end) ;; flush buffer so close can work (decode-error "error decoding output to stream" port)))))))) ;; This error is used when decoding wants more bytes to make ;; progress even though we've supplied hundreds of bytes (define (raise-insane-decoding-length) (decode-error "unable to make decoding progress" port)) ;; Check that a decoder is available: (unless c (error 'reencode-output-port "could not create converter from ~e to UTF-8" encoding)) (make-output-port name port write-it (lambda () ;; Flush output (write-it #"" 0 0 #f #f) (when close? (close-output-port port)) (bytes-close-converter c)) write-special-it #f #f #f void 1 (case-lambda [() buffer-mode] [(mode) (let ([old buffer-mode]) (set! buffer-mode mode) (when (or (and (eq? old 'block) (memq mode '(none line))) (and (eq? old 'line) (memq mode '(none)))) ;; Flush output (write-it #"" 0 0 #f #f)))]))))) ;; ---------------------------------------- (define dup-output-port (lambda (p [close? #f]) (let ([new (transplant-output-port p (lambda () (port-next-location p)) (add1 (file-position p)) close? (lambda () (port-count-lines! p)))]) (port-display-handler new (port-display-handler p)) (port-write-handler new (port-write-handler p)) new))) (define dup-input-port (lambda (p [close? #f]) (let ([new (transplant-input-port p (lambda () (port-next-location p)) (add1 (file-position p)) close? (lambda () (port-count-lines! p)))]) (port-read-handler new (port-read-handler p)) new))) ;; ---------------------------------------- (provide open-output-nowhere make-pipe-with-specials make-input-port/read-to-peek peeking-input-port relocate-input-port transplant-input-port filter-read-input-port special-filter-input-port relocate-output-port transplant-output-port merge-input copy-port input-port-append convert-stream make-limited-input-port reencode-input-port reencode-output-port dup-input-port dup-output-port strip-shell-command-start) (provide/contract (read-bytes-avail!-evt (mutable-bytes? input-port-with-progress-evts? . -> . evt?)) (peek-bytes-avail!-evt (mutable-bytes? exact-nonnegative-integer? evt?/false input-port-with-progress-evts? . -> . evt?)) (read-bytes!-evt (mutable-bytes? input-port-with-progress-evts? . -> . evt?)) (peek-bytes!-evt (mutable-bytes? exact-nonnegative-integer? evt?/false input-port-with-progress-evts? . -> . evt?)) (read-bytes-evt (exact-nonnegative-integer? input-port-with-progress-evts? . -> . evt?)) (peek-bytes-evt (exact-nonnegative-integer? exact-nonnegative-integer? evt?/false input-port-with-progress-evts? . -> . evt?)) (read-string!-evt (mutable-string? input-port-with-progress-evts? . -> . evt?)) (peek-string!-evt (mutable-string? exact-nonnegative-integer? evt?/false input-port-with-progress-evts? . -> . evt?)) (read-string-evt (exact-nonnegative-integer? input-port-with-progress-evts? . -> . evt?)) (peek-string-evt (exact-nonnegative-integer? exact-nonnegative-integer? evt?/false input-port-with-progress-evts? . -> . evt?)) (regexp-match-evt ((or/c regexp? byte-regexp? string? bytes?) input-port-with-progress-evts? . -> . evt?)) (read-bytes-line-evt (case-> (input-port-with-progress-evts? . -> . evt?) (input-port-with-progress-evts? line-mode-symbol? . -> . evt?))) (read-line-evt (case-> (input-port-with-progress-evts? . -> . evt?) (input-port-with-progress-evts? line-mode-symbol? . -> . evt?))) (eof-evt (input-port-with-progress-evts? . -> . evt?)))