original commit: a0885b759a36fad402d0465d5bd336bb3736fffa
This commit is contained in:
Matthew Flatt 2004-05-18 16:11:58 +00:00
parent ae1c288d62
commit de27c8ea0c
3 changed files with 140 additions and 121 deletions

View File

@ -14,12 +14,28 @@
(define (exact-non-negative-integer? i) (define (exact-non-negative-integer? i)
(and (number? i) (exact? i) (integer? i) (i . >= . 0))) (and (number? i) (exact? i) (integer? i) (i . >= . 0)))
(provide/contract (read-bytes-avail!-evt (bytes? input-port? . -> . evt?)) (define (input-port-with-progress-evts? ip)
(read-bytes!-evt (bytes? input-port? . -> . evt?)) (and (input-port? ip)
(read-bytes-evt (exact-non-negative-integer? input-port? . -> . evt?)) (port-provides-progress-evts? ip)))
(read-string!-evt (string? input-port? . -> . evt?))
(read-string-evt (exact-non-negative-integer? input-port? . -> . evt?)) (define (mutable-bytes? b)
(regexp-match-evt ((union regexp? byte-regexp?) input-port? . -> . evt?))) (and (bytes? b) (not (immutable? b))))
(define (mutable-string? b)
(and (string? b) (not (immutable? b))))
(provide/contract (read-bytes-avail!-evt (mutable-bytes? input-port-with-progress-evts?
. -> . evt?))
(read-bytes!-evt (mutable-bytes? input-port-with-progress-evts?
. -> . evt?))
(read-bytes-evt (exact-non-negative-integer? input-port-with-progress-evts?
. -> . evt?))
(read-string!-evt (mutable-string? input-port-with-progress-evts?
. -> . evt?))
(read-string-evt (exact-non-negative-integer? input-port-with-progress-evts?
. -> . evt?))
(regexp-match-evt ((union regexp? byte-regexp? string? bytes?)
input-port-with-progress-evts?
. -> . evt?)))
;; ---------------------------------------- ;; ----------------------------------------
@ -121,7 +137,7 @@
(read s)] (read s)]
[else (if (bytes? (car special-peeked)) [else (if (bytes? (car special-peeked))
(let ([b (car special-peeked)]) (let ([b (car special-peeked)])
(set! peeked-end (+ (file-position peeked-r) (bytes-length b))) (set! peeked-end (+ peeked-end (bytes-length b)))
(write-bytes b peeked-w) (write-bytes b peeked-w)
(set! special-peeked (cdr special-peeked)) (set! special-peeked (cdr special-peeked))
(when (null? special-peeked) (when (null? special-peeked)
@ -163,19 +179,19 @@
(set! special-peeked (cons r null)) (set! special-peeked (cons r null))
(set! special-peeked-tail special-peeked) (set! special-peeked-tail special-peeked)
;; Now try again ;; Now try again
(peek-it s skip)]))] (do-peek-it s skip unless-evt)]))]
[else [else
;; Non-empty special queue, so try to use it ;; Non-empty special queue, so try to use it
(let* ([pos (file-position peeked-r)] (let* ([pos (file-position peeked-r)]
[avail (- peeked-end pos)] [avail (- peeked-end pos)]
[skip (- skip avail)]) [sk (- skip avail)])
(let loop ([skip (- skip avail)] (let loop ([sk sk]
[l special-peeked]) [l special-peeked])
(cond (cond
[(null? l) [(null? l)
;; Not enough even in the special queue. ;; Not enough even in the special queue.
;; Read once and add it. ;; Read once and add it.
(let* ([t (make-bytes (min 4096 (+ skip (bytes-length s))))] (let* ([t (make-bytes (min 4096 (+ sk (bytes-length s))))]
[r (read s)]) [r (read s)])
(cond (cond
[(evt? r) [(evt? r)
@ -192,22 +208,22 @@
r)]) r)])
(set-cdr! special-peeked-tail v) (set-cdr! special-peeked-tail v)
;; Got something; now try again ;; Got something; now try again
(do-peek-it s skip))]))] (do-peek-it s skip unless-evt))]))]
[(eof-object? (car l)) [(eof-object? (car l))
;; No peeking past an EOF ;; No peeking past an EOF
eof] eof]
[(procedure? (car l)) [(procedure? (car l))
(if (skip . < . 1) (if (zero? sk)
(car l) (car l)
(loop (sub1 skip) (cdr l)))] (loop (sub1 sk) (cdr l)))]
[(bytes? (car l)) [(bytes? (car l))
(let ([len (bytes-length (car l))]) (let ([len (bytes-length (car l))])
(if (skip . < . len) (if (sk . < . len)
(let ([n (min (bytes-length s) (let ([n (min (bytes-length s)
(- len skip))]) (- len sk))])
(bytes-copy! s 0 (car l) skip (+ skip n)) (bytes-copy! s 0 (car l) sk (+ sk n))
n) n)
(loop (- skip len) (cdr l))))])))]) (loop (- sk len) (cdr l))))])))])
v))) v)))
(define (commit-it amt unless-evt dont-evt) (define (commit-it amt unless-evt dont-evt)
(call-with-semaphore (call-with-semaphore
@ -389,34 +405,64 @@
;; ---------------------------------------- ;; ----------------------------------------
(define (poll-or-spawn go)
(poll-guard-evt
(lambda (poll?)
(if poll?
;; In poll mode, call `go' directly:
(let ([v (go never-evt #f #t)])
(if v
(convert-evt always-evt (lambda (x) v))
never-evt))
;; In non-poll mode, start a thread to call go
(nack-guard-evt
(lambda (nack)
(define ch (make-channel))
(let ([t (thread (lambda ()
(parameterize-break #t
(with-handlers ([exn:break? void])
(go nack ch #f)))))])
(thread (lambda ()
(sync nack)
(break-thread t))))
ch))))))
(define (read-at-least-bytes!-evt orig-bstr input-port need-more? shrink combo) (define (read-at-least-bytes!-evt orig-bstr input-port need-more? shrink combo)
(nack-guard-evt ;; go is the main reading function, either called directly for
(lambda (nack) ;; a poll, or called in a thread for a non-poll read
(define ch (make-channel)) (define (go nack ch poll?)
(thread (lambda () (let try-again ([pos 0][bstr orig-bstr])
(let try-again ([pos 0][bstr orig-bstr]) (let* ([progress-evt (port-progress-evt input-port)]
(let* ([progress-evt (port-progress-evt input-port)] [v ((if poll?
[v (peek-bytes-avail! bstr pos progress-evt input-port pos)]) peek-bytes-avail!*
(cond peek-bytes-avail!)
[(sync/timeout 0 nack) (void)] bstr pos progress-evt input-port pos)])
[(sync/timeout 0 progress-evt) (try-again pos bstr)] (cond
[(and (number? v) (need-more? bstr (+ pos v))) [(sync/timeout 0 nack) (void)]
=> (lambda (bstr) [(sync/timeout 0 progress-evt) (if poll?
(try-again (+ v pos) bstr))] #f
[else (try-again pos bstr))]
(let ([v2 (cond [(and poll? (equal? v 0)) #f]
[(number? v) (shrink bstr (+ v pos))] [(and (number? v) (need-more? bstr (+ pos v)))
[(positive? pos) pos] => (lambda (bstr)
[else v])]) (try-again (+ v pos) bstr))]
(unless (port-commit-peeked [else
(if (number? v2) v2 1) (let* ([v2 (cond
progress-evt [(number? v) (shrink bstr (+ v pos))]
(channel-put-evt [(positive? pos) pos]
ch [else v])]
(combo bstr v2)) [result (combo bstr v2)])
input-port) (cond
(try-again 0 orig-bstr)))]))))) [(port-commit-peeked (if (number? v2) v2 1)
ch))) progress-evt
(if poll?
always-evt
(channel-put-evt ch result))
input-port)
result]
[poll? #f]
[else (try-again 0 orig-bstr)]))]))))
(poll-or-spawn go))
(define (read-bytes-avail!-evt bstr input-port) (define (read-bytes-avail!-evt bstr input-port)
(read-at-least-bytes!-evt bstr input-port (read-at-least-bytes!-evt bstr input-port
@ -497,38 +543,44 @@
s)))) s))))
(define (regexp-match-evt pattern input-port) (define (regexp-match-evt pattern input-port)
(nack-guard-evt (define (go nack ch poll?)
(lambda (nack) (let try-again ()
(define ch (make-channel)) (let* ([progress-evt (port-progress-evt input-port)]
(thread (lambda () [m ((if poll?
(let try-again () regexp-match-peek-positions*
(let* ([progress-evt (port-progress-evt input-port)] regexp-match-peek-positions)
[m (regexp-match-peek-positions pattern input-port 0 #f progress-evt)]) pattern input-port 0 #f progress-evt)])
(cond (cond
[(sync/timeout 0 nack) (void)] [(sync/timeout 0 nack) (void)]
[(sync/timeout 0 progress-evt) (try-again)] [(sync/timeout 0 progress-evt) (try-again)]
[(not m) [(not m)
(sync nack (if poll?
(finish-evt progress-evt #f
(lambda (x) (try-again))))] (sync nack
[else (finish-evt progress-evt
(let ([m2 (map (lambda (p) (lambda (x) (try-again)))))]
(and p [else
(let ([bstr (make-bytes (- (cdr p) (car p)))]) (let ([m2 (map (lambda (p)
(unless (= (car p) (cdr p)) (and p
(let loop ([offset 0]) (let ([bstr (make-bytes (- (cdr p) (car p)))])
(let ([v (peek-bytes-avail! bstr (car p) progress-evt input-port offset)]) (unless (= (car p) (cdr p))
(unless (zero? v) (let loop ([offset 0])
(when ((+ offset v) . < . (bytes-length bstr)) (let ([v (peek-bytes-avail! bstr (car p) progress-evt input-port offset)])
(loop (+ offset v))))))) (unless (zero? v)
bstr))) (when ((+ offset v) . < . (bytes-length bstr))
m)]) (loop (+ offset v)))))))
(unless (port-commit-peeked (cdar m) bstr)))
progress-evt m)])
(channel-put-evt ch m2) (cond
input-port) [(port-commit-peeked (cdar m)
(try-again)))]))))) progress-evt
ch))) (if poll?
always-evt
(channel-put-evt ch m2))
input-port)
m2]
[poll? #f]
[else (try-again)]))]))))
(poll-or-spawn go))
) )

View File

@ -10,7 +10,7 @@
regexp-replace-quote regexp-replace-quote
regexp-match* regexp-match*
regexp-match-positions* regexp-match-positions*
regexp-match-peek-positions* regexp-match-peek-positions**
regexp-split regexp-split
regexp-match-exact? regexp-match-exact?
regexp-match/fail-without-reading) regexp-match/fail-without-reading)
@ -316,12 +316,12 @@
(wrap regexp-match-positions* -regexp-match-positions*) (wrap regexp-match-positions* -regexp-match-positions*)
;; Returns all the positions at which the pattern matched. ;; Returns all the positions at which the pattern matched.
(define -regexp-match-peek-positions* (define -regexp-match-peek-positions**
(regexp-fn 'regexp-match-peek-positions* (regexp-fn 'regexp-match-peek-positions**
;; success-k: ;; success-k:
(lambda (expr string start end match-start match-end) (lambda (expr string start end match-start match-end)
(cons (cons match-start match-end) (cons (cons match-start match-end)
(regexp-match-peek-positions* expr string match-end end))) (regexp-match-peek-positions** expr string match-end end)))
;; port-success-k --- use string case ;; port-success-k --- use string case
#f #f
;; fail-k: ;; fail-k:
@ -331,7 +331,7 @@
#f #f
#f #f
#t)) #t))
(wrap regexp-match-peek-positions* -regexp-match-peek-positions*) (wrap regexp-match-peek-positions** -regexp-match-peek-positions**)
;; Splits a string into a list by removing any piece which matches ;; Splits a string into a list by removing any piece which matches
;; the pattern. ;; the pattern.

View File

@ -3,14 +3,8 @@
(require "spidey.ss" (require "spidey.ss"
"etc.ss") "etc.ss")
(provide consumer-thread (provide run-server
with-semaphore consumer-thread)
dynamic-disable-break
dynamic-enable-break
make-single-threader
run-server)
#| #|
t accepts a function, f, and creates a thread. It returns the thread and a t accepts a function, f, and creates a thread. It returns the thread and a
@ -56,42 +50,15 @@
(unless (procedure-arity-includes? f num) (unless (procedure-arity-includes? f num)
(raise (raise
(make-exn:fail:contract:arity (make-exn:fail:contract:arity
(format "<procedure-from-consumer-thread>: consumer procedure arity is ~e; provided ~s argument~a" (string->immutable-string
(procedure-arity f) num (if (= 1 num) "" "s")) (format "<procedure-from-consumer-thread>: consumer procedure arity is ~e; provided ~s argument~a"
(procedure-arity f) num (if (= 1 num) "" "s")))
(current-continuation-marks))))) (current-continuation-marks)))))
(semaphore-wait protect) (semaphore-wait protect)
(set! front-state (cons new-state front-state)) (set! front-state (cons new-state front-state))
(semaphore-post protect) (semaphore-post protect)
(semaphore-post sema))))])) (semaphore-post sema))))]))
(define with-semaphore
(lambda (s f)
(semaphore-wait s)
(begin0 (f)
(semaphore-post s))))
(define dynamic-enable-break
(polymorphic
(lambda (thunk)
(parameterize-break #t
(thunk)))))
(define dynamic-disable-break
(polymorphic
(lambda (thunk)
(parameterize-break #f
(thunk)))))
(define make-single-threader
(polymorphic
(lambda ()
(let ([sema (make-semaphore 1)])
(lambda (thunk)
(dynamic-wind
(lambda () (semaphore-wait sema))
thunk
(lambda () (semaphore-post sema))))))))
(define run-server (define run-server
(opt-lambda (port-number (opt-lambda (port-number
handler handler