diff --git a/collects/mzlib/port.ss b/collects/mzlib/port.ss index 5074e79..6186b08 100644 --- a/collects/mzlib/port.ss +++ b/collects/mzlib/port.ss @@ -1,6 +1,7 @@ (module port mzscheme - (require (lib "etc.ss")) + (require (lib "etc.ss") + (lib "contract.ss")) (provide open-output-nowhere make-input-port/read-to-peek @@ -10,6 +11,18 @@ convert-stream make-limited-input-port) + (define (exact-non-negative-integer? i) + (and (number? i) (exact? i) (integer? i) (i . >= . 0))) + + (provide/contract (read-bytes-avail!-evt (bytes? input-port? . -> . evt?)) + (read-bytes!-evt (bytes? input-port? . -> . evt?)) + (read-bytes-evt (exact-non-negative-integer? input-port? . -> . evt?)) + (read-string!-evt (string? input-port? . -> . evt?)) + (read-string-evt (exact-non-negative-integer? input-port? . -> . evt?)) + (regexp-match-evt ((union regexp? byte-regexp?) input-port? . -> . evt?))) + + ;; ---------------------------------------- + (define open-output-nowhere (opt-lambda ([name 'nowhere]) (make-output-port @@ -17,13 +30,15 @@ always-evt (lambda (s start end non-block? breakable?) (- end start)) void - (lambda (special non-block?) #t) + (lambda (special non-block? breakable?) #t) (lambda (s start end) (convert-evt always-evt (lambda (x) (- end start)))) (lambda (special) (convert-evt always-evt (lambda (x) #t)))))) + ;; ---------------------------------------- + (define (copy-port src dest . dests) (unless (input-port? src) (raise-type-error 'copy-port "input-port" src)) @@ -212,7 +227,7 @@ ;; Enough has been peeked... (unless (zero? p-commit) (peek-byte peeked-r (sub1 amt)) - (port-commit-peeked amt peeked-r)) + (port-commit-peeked amt #f peeked-r)) (set! special-peeked l) (when (null? special-peeked) (set! special-peeked-tail #f)) @@ -242,8 +257,12 @@ read-it ;; Peek (if fast-peek - (lambda (s skip) - (fast-peek s skip peek-it)) + (let ([fast-peek-k (lambda (s skip) + (peek-it s skip #f))]) + (lambda (s skip unless-evt) + (if unless-evt + (peek-it s skip unless-evt) + (fast-peek s skip fast-peek-k)))) peek-it) close (lambda () @@ -366,4 +385,150 @@ n))))) (lambda () (when close-orig? - (close-input-port port)))))))) + (close-input-port port))))))) + + ;; ---------------------------------------- + + (define (read-at-least-bytes!-evt orig-bstr input-port need-more? shrink combo) + (nack-guard-evt + (lambda (nack) + (define ch (make-channel)) + (thread (lambda () + (let try-again ([pos 0][bstr orig-bstr]) + (let* ([progress-evt (port-progress-evt input-port)] + [v (peek-bytes-avail! bstr pos progress-evt input-port pos)]) + (cond + [(sync/timeout 0 nack) (void)] + [(sync/timeout 0 progress-evt) (try-again pos bstr)] + [(and (number? v) (need-more? bstr (+ pos v))) + => (lambda (bstr) + (try-again (+ v pos) bstr))] + [else + (let ([v2 (cond + [(number? v) (shrink bstr (+ v pos))] + [(positive? pos) pos] + [else v])]) + (unless (port-commit-peeked + (if (number? v2) v2 1) + progress-evt + (channel-put-evt + ch + (combo bstr v2)) + input-port) + (try-again 0 orig-bstr)))]))))) + ch))) + + (define (read-bytes-avail!-evt bstr input-port) + (read-at-least-bytes!-evt bstr input-port + (lambda (bstr v) (if (zero? v) + bstr + #f)) + (lambda (bstr v) v) + (lambda (bstr v) v))) + + (define (read-bytes!-evt bstr input-port) + (read-at-least-bytes!-evt bstr input-port + (lambda (bstr v) + (if (v . < . (bytes-length bstr)) + bstr + #f)) + (lambda (bstr v) v) + (lambda (bstr v) v))) + + (define (read-bytes-evt len input-port) + (let ([bstr (make-bytes len)]) + (convert-evt + (read-bytes!-evt bstr input-port) + (lambda (v) + (if (number? v) + (if (= v len) + bstr + (subbytes bstr 0 v)) + v))))) + + (define (read-string-evt goal input-port) + (let ([bstr (make-bytes goal)] + [c (bytes-open-converter "UTF-8-permissive" "UTF-8")]) + (convert-evt + (read-at-least-bytes!-evt bstr input-port + (lambda (bstr v) + (if (= v (bytes-length bstr)) + ;; We can't easily use bytes-utf-8-length here, + ;; because we may need more bytes to figure out + ;; the true role of the last byte. The + ;; `bytes-convert' function lets us deal with + ;; the last byte properly. + (let-values ([(bstr2 used status) + (bytes-convert c bstr 0 v)]) + (let ([got (bytes-utf-8-length bstr2)]) + (if (= got goal) + ;; Done: + #f + ;; Need more bytes: + (let ([bstr2 (make-bytes (+ v (- goal got)))]) + (bytes-copy! bstr2 0 bstr) + bstr2)))) + ;; Need more bytes in bstr: + bstr)) + (lambda (bstr v) + ;; We may need one less than v, + ;; because we may have had to peek + ;; an extra byte to discover an + ;; error in the stream. + (if ((bytes-utf-8-length bstr #\? 0 v) . > . goal) + (sub1 v) + v)) + cons) + (lambda (bstr+v) + (let ([bstr (car bstr+v)] + [v (cdr bstr+v)]) + (if (number? v) + (bytes->string/utf-8 bstr #\? 0 v) + v)))))) + + (define (read-string!-evt str input-port) + (convert-evt + (read-string-evt (string-length str) input-port) + (lambda (s) + (if (string? s) + (begin + (string-copy! str 0 s) + (string-length s)) + s)))) + + (define (regexp-match-evt pattern input-port) + (nack-guard-evt + (lambda (nack) + (define ch (make-channel)) + (thread (lambda () + (let try-again () + (let* ([progress-evt (port-progress-evt input-port)] + [m (regexp-match-peek-positions pattern input-port 0 #f progress-evt)]) + (cond + [(sync/timeout 0 nack) (void)] + [(sync/timeout 0 progress-evt) (try-again)] + [(not m) + (sync nack + (finish-evt progress-evt + (lambda (x) (try-again))))] + [else + (let ([m2 (map (lambda (p) + (and p + (let ([bstr (make-bytes (- (cdr p) (car p)))]) + (unless (= (car p) (cdr p)) + (let loop ([offset 0]) + (let ([v (peek-bytes-avail! bstr (car p) progress-evt input-port offset)]) + (unless (zero? v) + (when ((+ offset v) . < . (bytes-length bstr)) + (loop (+ offset v))))))) + bstr))) + m)]) + (unless (port-commit-peeked (cdar m) + progress-evt + (channel-put-evt ch m2) + input-port) + (try-again)))]))))) + ch))) + + + )