diff --git a/collects/mzlib/port.ss b/collects/mzlib/port.ss index 6186b08..1d0e9e2 100644 --- a/collects/mzlib/port.ss +++ b/collects/mzlib/port.ss @@ -13,13 +13,29 @@ (define (exact-non-negative-integer? i) (and (number? i) (exact? i) (integer? i) (i . >= . 0))) + + (define (input-port-with-progress-evts? ip) + (and (input-port? ip) + (port-provides-progress-evts? ip))) + + (define (mutable-bytes? b) + (and (bytes? b) (not (immutable? b)))) + (define (mutable-string? b) + (and (string? b) (not (immutable? b)))) - (provide/contract (read-bytes-avail!-evt (bytes? input-port? . -> . evt?)) - (read-bytes!-evt (bytes? input-port? . -> . evt?)) - (read-bytes-evt (exact-non-negative-integer? input-port? . -> . evt?)) - (read-string!-evt (string? input-port? . -> . evt?)) - (read-string-evt (exact-non-negative-integer? input-port? . -> . evt?)) - (regexp-match-evt ((union regexp? byte-regexp?) input-port? . -> . evt?))) + (provide/contract (read-bytes-avail!-evt (mutable-bytes? input-port-with-progress-evts? + . -> . evt?)) + (read-bytes!-evt (mutable-bytes? input-port-with-progress-evts? + . -> . evt?)) + (read-bytes-evt (exact-non-negative-integer? input-port-with-progress-evts? + . -> . evt?)) + (read-string!-evt (mutable-string? input-port-with-progress-evts? + . -> . evt?)) + (read-string-evt (exact-non-negative-integer? input-port-with-progress-evts? + . -> . evt?)) + (regexp-match-evt ((union regexp? byte-regexp? string? bytes?) + input-port-with-progress-evts? + . -> . evt?))) ;; ---------------------------------------- @@ -121,7 +137,7 @@ (read s)] [else (if (bytes? (car special-peeked)) (let ([b (car special-peeked)]) - (set! peeked-end (+ (file-position peeked-r) (bytes-length b))) + (set! peeked-end (+ peeked-end (bytes-length b))) (write-bytes b peeked-w) (set! special-peeked (cdr special-peeked)) (when (null? special-peeked) @@ -163,19 +179,19 @@ (set! special-peeked (cons r null)) (set! special-peeked-tail special-peeked) ;; Now try again - (peek-it s skip)]))] + (do-peek-it s skip unless-evt)]))] [else ;; Non-empty special queue, so try to use it (let* ([pos (file-position peeked-r)] [avail (- peeked-end pos)] - [skip (- skip avail)]) - (let loop ([skip (- skip avail)] + [sk (- skip avail)]) + (let loop ([sk sk] [l special-peeked]) (cond [(null? l) ;; Not enough even in the special queue. ;; Read once and add it. - (let* ([t (make-bytes (min 4096 (+ skip (bytes-length s))))] + (let* ([t (make-bytes (min 4096 (+ sk (bytes-length s))))] [r (read s)]) (cond [(evt? r) @@ -192,22 +208,22 @@ r)]) (set-cdr! special-peeked-tail v) ;; Got something; now try again - (do-peek-it s skip))]))] + (do-peek-it s skip unless-evt))]))] [(eof-object? (car l)) ;; No peeking past an EOF eof] [(procedure? (car l)) - (if (skip . < . 1) + (if (zero? sk) (car l) - (loop (sub1 skip) (cdr l)))] + (loop (sub1 sk) (cdr l)))] [(bytes? (car l)) (let ([len (bytes-length (car l))]) - (if (skip . < . len) + (if (sk . < . len) (let ([n (min (bytes-length s) - (- len skip))]) - (bytes-copy! s 0 (car l) skip (+ skip n)) + (- len sk))]) + (bytes-copy! s 0 (car l) sk (+ sk n)) n) - (loop (- skip len) (cdr l))))])))]) + (loop (- sk len) (cdr l))))])))]) v))) (define (commit-it amt unless-evt dont-evt) (call-with-semaphore @@ -389,34 +405,64 @@ ;; ---------------------------------------- + (define (poll-or-spawn go) + (poll-guard-evt + (lambda (poll?) + (if poll? + ;; In poll mode, call `go' directly: + (let ([v (go never-evt #f #t)]) + (if v + (convert-evt always-evt (lambda (x) v)) + never-evt)) + ;; In non-poll mode, start a thread to call go + (nack-guard-evt + (lambda (nack) + (define ch (make-channel)) + (let ([t (thread (lambda () + (parameterize-break #t + (with-handlers ([exn:break? void]) + (go nack ch #f)))))]) + (thread (lambda () + (sync nack) + (break-thread t)))) + ch)))))) + (define (read-at-least-bytes!-evt orig-bstr input-port need-more? shrink combo) - (nack-guard-evt - (lambda (nack) - (define ch (make-channel)) - (thread (lambda () - (let try-again ([pos 0][bstr orig-bstr]) - (let* ([progress-evt (port-progress-evt input-port)] - [v (peek-bytes-avail! bstr pos progress-evt input-port pos)]) - (cond - [(sync/timeout 0 nack) (void)] - [(sync/timeout 0 progress-evt) (try-again pos bstr)] - [(and (number? v) (need-more? bstr (+ pos v))) - => (lambda (bstr) - (try-again (+ v pos) bstr))] - [else - (let ([v2 (cond - [(number? v) (shrink bstr (+ v pos))] - [(positive? pos) pos] - [else v])]) - (unless (port-commit-peeked - (if (number? v2) v2 1) - progress-evt - (channel-put-evt - ch - (combo bstr v2)) - input-port) - (try-again 0 orig-bstr)))]))))) - ch))) + ;; go is the main reading function, either called directly for + ;; a poll, or called in a thread for a non-poll read + (define (go nack ch poll?) + (let try-again ([pos 0][bstr orig-bstr]) + (let* ([progress-evt (port-progress-evt input-port)] + [v ((if poll? + peek-bytes-avail!* + peek-bytes-avail!) + bstr pos progress-evt input-port pos)]) + (cond + [(sync/timeout 0 nack) (void)] + [(sync/timeout 0 progress-evt) (if poll? + #f + (try-again pos bstr))] + [(and poll? (equal? v 0)) #f] + [(and (number? v) (need-more? bstr (+ pos v))) + => (lambda (bstr) + (try-again (+ v pos) bstr))] + [else + (let* ([v2 (cond + [(number? v) (shrink bstr (+ v pos))] + [(positive? pos) pos] + [else v])] + [result (combo bstr v2)]) + (cond + [(port-commit-peeked (if (number? v2) v2 1) + progress-evt + (if poll? + always-evt + (channel-put-evt ch result)) + input-port) + result] + [poll? #f] + [else (try-again 0 orig-bstr)]))])))) + (poll-or-spawn go)) (define (read-bytes-avail!-evt bstr input-port) (read-at-least-bytes!-evt bstr input-port @@ -497,38 +543,44 @@ s)))) (define (regexp-match-evt pattern input-port) - (nack-guard-evt - (lambda (nack) - (define ch (make-channel)) - (thread (lambda () - (let try-again () - (let* ([progress-evt (port-progress-evt input-port)] - [m (regexp-match-peek-positions pattern input-port 0 #f progress-evt)]) - (cond - [(sync/timeout 0 nack) (void)] - [(sync/timeout 0 progress-evt) (try-again)] - [(not m) - (sync nack - (finish-evt progress-evt - (lambda (x) (try-again))))] - [else - (let ([m2 (map (lambda (p) - (and p - (let ([bstr (make-bytes (- (cdr p) (car p)))]) - (unless (= (car p) (cdr p)) - (let loop ([offset 0]) - (let ([v (peek-bytes-avail! bstr (car p) progress-evt input-port offset)]) - (unless (zero? v) - (when ((+ offset v) . < . (bytes-length bstr)) - (loop (+ offset v))))))) - bstr))) - m)]) - (unless (port-commit-peeked (cdar m) - progress-evt - (channel-put-evt ch m2) - input-port) - (try-again)))]))))) - ch))) - + (define (go nack ch poll?) + (let try-again () + (let* ([progress-evt (port-progress-evt input-port)] + [m ((if poll? + regexp-match-peek-positions* + regexp-match-peek-positions) + pattern input-port 0 #f progress-evt)]) + (cond + [(sync/timeout 0 nack) (void)] + [(sync/timeout 0 progress-evt) (try-again)] + [(not m) + (if poll? + #f + (sync nack + (finish-evt progress-evt + (lambda (x) (try-again)))))] + [else + (let ([m2 (map (lambda (p) + (and p + (let ([bstr (make-bytes (- (cdr p) (car p)))]) + (unless (= (car p) (cdr p)) + (let loop ([offset 0]) + (let ([v (peek-bytes-avail! bstr (car p) progress-evt input-port offset)]) + (unless (zero? v) + (when ((+ offset v) . < . (bytes-length bstr)) + (loop (+ offset v))))))) + bstr))) + m)]) + (cond + [(port-commit-peeked (cdar m) + progress-evt + (if poll? + always-evt + (channel-put-evt ch m2)) + input-port) + m2] + [poll? #f] + [else (try-again)]))])))) + (poll-or-spawn go)) ) diff --git a/collects/mzlib/string.ss b/collects/mzlib/string.ss index dbe8609..6f45605 100644 --- a/collects/mzlib/string.ss +++ b/collects/mzlib/string.ss @@ -10,7 +10,7 @@ regexp-replace-quote regexp-match* regexp-match-positions* - regexp-match-peek-positions* + regexp-match-peek-positions** regexp-split regexp-match-exact? regexp-match/fail-without-reading) @@ -316,12 +316,12 @@ (wrap regexp-match-positions* -regexp-match-positions*) ;; Returns all the positions at which the pattern matched. - (define -regexp-match-peek-positions* - (regexp-fn 'regexp-match-peek-positions* + (define -regexp-match-peek-positions** + (regexp-fn 'regexp-match-peek-positions** ;; success-k: (lambda (expr string start end match-start match-end) (cons (cons match-start match-end) - (regexp-match-peek-positions* expr string match-end end))) + (regexp-match-peek-positions** expr string match-end end))) ;; port-success-k --- use string case #f ;; fail-k: @@ -331,7 +331,7 @@ #f #f #t)) - (wrap regexp-match-peek-positions* -regexp-match-peek-positions*) + (wrap regexp-match-peek-positions** -regexp-match-peek-positions**) ;; Splits a string into a list by removing any piece which matches ;; the pattern. diff --git a/collects/mzlib/thread.ss b/collects/mzlib/thread.ss index d839ead..9755992 100644 --- a/collects/mzlib/thread.ss +++ b/collects/mzlib/thread.ss @@ -3,14 +3,8 @@ (require "spidey.ss" "etc.ss") - (provide consumer-thread - with-semaphore - - dynamic-disable-break - dynamic-enable-break - make-single-threader - - run-server) + (provide run-server + consumer-thread) #| t accepts a function, f, and creates a thread. It returns the thread and a @@ -56,42 +50,15 @@ (unless (procedure-arity-includes? f num) (raise (make-exn:fail:contract:arity - (format ": consumer procedure arity is ~e; provided ~s argument~a" - (procedure-arity f) num (if (= 1 num) "" "s")) + (string->immutable-string + (format ": consumer procedure arity is ~e; provided ~s argument~a" + (procedure-arity f) num (if (= 1 num) "" "s"))) (current-continuation-marks))))) (semaphore-wait protect) (set! front-state (cons new-state front-state)) (semaphore-post protect) (semaphore-post sema))))])) - (define with-semaphore - (lambda (s f) - (semaphore-wait s) - (begin0 (f) - (semaphore-post s)))) - - (define dynamic-enable-break - (polymorphic - (lambda (thunk) - (parameterize-break #t - (thunk))))) - - (define dynamic-disable-break - (polymorphic - (lambda (thunk) - (parameterize-break #f - (thunk))))) - - (define make-single-threader - (polymorphic - (lambda () - (let ([sema (make-semaphore 1)]) - (lambda (thunk) - (dynamic-wind - (lambda () (semaphore-wait sema)) - thunk - (lambda () (semaphore-post sema)))))))) - (define run-server (opt-lambda (port-number handler