original commit: beef60b4210af451033b663eb7464c6b22ee348c
This commit is contained in:
Matthew Flatt 2004-05-08 04:45:10 +00:00
parent 8723e44ce6
commit 7a0c5c5b48
6 changed files with 373 additions and 252 deletions

View File

@ -8,14 +8,14 @@
;; if the buffer is full).
;; We make a fancy structure just so an async-channel
;; can be supplied directly to 'object-wait-multiple'.
;; can be supplied directly to `sync'.
;; The alternative is to use `define-struct' and supply
;; a `make-async-channel-get-waitable' procedure.
;; a `async-channel-get-evt' procedure.
(define-values (struct:ac make-ac async-channel? ac-ref ac-set!)
(make-struct-type 'async-channel #f 5 0 #f
(list (cons prop:waitable
(list (cons prop:evt
;; This is the guard that is called when
;; we use an async-channel as a waitable
;; we use an async-channel as an event
;; (to get).
(lambda (ac)
(async-channel-get-guard ac))))
@ -36,12 +36,12 @@
[full-ch (make-channel)] ; for put polls
[queue-first (cons #f null)] ; queue head
[queue-last queue-first] ; queue tail
;; Waitables:
;; Events:
[tell-empty
(make-channel-put-waitable empty-ch (make-semaphore))] ; see poll->ch
(channel-put-evt empty-ch (make-semaphore))] ; see poll->ch
[tell-full
(make-channel-put-waitable full-ch (make-semaphore))] ; see poll->ch
[enqueue (make-wrapped-waitable
(channel-put-evt full-ch (make-semaphore))] ; see poll->ch
[enqueue (wrap-evt
enqueue-ch
(lambda (v)
;; We received a put; enqueue it:
@ -51,8 +51,8 @@
(set! queue-last p))))]
[mk-dequeue
(lambda ()
(make-wrapped-waitable
(make-channel-put-waitable dequeue-ch (car queue-first))
(wrap-evt
(channel-put-evt dequeue-ch (car queue-first))
(lambda (ignored)
;; A get succeeded; dequeue it:
(set! queue-first (cdr queue-first)))))]
@ -72,11 +72,11 @@
(cond
[(= 1 (length queue-first))
;; The queue is currently empty:
(object-wait-multiple #f enqueue tell-empty)]
(sync enqueue tell-empty)]
[(or (not limit) ((sub1 (length queue-first)) . < . limit))
(object-wait-multiple #f enqueue (mk-dequeue))]
(sync enqueue (mk-dequeue))]
[else
(object-wait-multiple #f (mk-dequeue) tell-full)])
(sync (mk-dequeue) tell-full)])
(loop))))])
(make-ac enqueue-ch dequeue-ch empty-ch full-ch manager-thread))))
@ -91,29 +91,29 @@
;; block on the dequeue channel and the empty
;; channel, and create a new waitable to report
;; the result.
(make-poll-guard-waitable
(poll-guard-evt
(lambda (poll?)
(if poll?
(poll->ch (ac-dequeue-ch ac) (ac-empty-ch ac))
(ac-dequeue-ch ac)))))
(define (async-channel-get ac)
(object-wait-multiple #f ac))
(sync ac))
(define (async-channel-try-get ac)
(object-wait-multiple 0 ac))
(sync/timeout 0 ac))
;; Put ----------------------------------------
(define (make-async-channel-put-waitable ac v)
(letrec ([p (make-wrapped-waitable
(make-guard-waitable
(define (async-channel-put-evt ac v)
(letrec ([p (wrap-evt
(guard-evt
(lambda ()
;; Make sure queue manager is running:
(thread-resume (ac-thread ac) (current-thread))
(let ([p (make-channel-put-waitable (ac-enqueue-ch ac) v)])
(let ([p (channel-put-evt (ac-enqueue-ch ac) v)])
;; Poll handling, as in `async-channel-get-guard':
(make-poll-guard-waitable
(poll-guard-evt
(lambda (poll?)
(if poll?
(poll->ch p (ac-full-ch ac))
@ -123,27 +123,27 @@
(define (async-channel-put ac v)
(thread-resume (ac-thread ac) (current-thread))
(object-wait-multiple #f (make-channel-put-waitable (ac-enqueue-ch ac) v))
(sync (channel-put-evt (ac-enqueue-ch ac) v))
(void))
;; Poll helper ----------------------------------------
(define (poll->ch normal not-ready)
(object-wait-multiple #f
;; If a value becomes available,
;; create a waitable that returns
;; the value:
(make-wrapped-waitable
normal
(lambda (v)
;; Return a waitable for a successful poll:
(make-wrapped-waitable
(make-semaphore 1)
(lambda (ignored) v))))
;; If not-ready becomes available,
;; the result is supposed to be
;; a never-ready waitable:
not-ready))
(sync
;; If a value becomes available,
;; create a waitable that returns
;; the value:
(wrap-evt
normal
(lambda (v)
;; Return a waitable for a successful poll:
(wrap-evt
always-evt
(lambda (ignored) v))))
;; If not-ready becomes available,
;; the result is supposed to be
;; a never-ready waitable:
not-ready))
;; Provides ----------------------------------------
@ -158,4 +158,4 @@
(async-channel-get (async-channel? . -> . any?))
(async-channel-try-get (async-channel? . -> . any?))
(async-channel-put (async-channel? any? . -> . any?))
(make-async-channel-put-waitable (async-channel? any? . -> . object-waitable?))))
(async-channel-put-evt (async-channel? any? . -> . evt?))))

View File

@ -5,12 +5,6 @@
(define (spawn thunk)
(thread/suspend-to-kill thunk))
(define (sync w)
(object-wait-multiple #f w))
(define (sync/enable-break w)
(object-wait-multiple/enable-break #f w))
(define (channel)
(make-channel))
@ -22,40 +16,21 @@
(make-channel-put-waitable ch v)
void))
(define (choice-evt . l)
(apply waitables->waitable-set l))
(define (wrap-evt w proc)
(make-wrapped-waitable w proc))
(define (guard-evt proc)
(make-guard-waitable proc))
(define (nack-guard-evt proc)
(make-nack-guard-waitable proc))
(define (thread-done-evt th)
(thread-dead-waitable th))
(define (current-time)
(current-inexact-milliseconds))
(define (time-evt t)
(make-alarm-waitable t))
(alarm-evt t))
(provide/contract
(spawn ((-> any) . -> . thread?))
(sync (object-waitable? . -> . any))
(sync/enable-break (object-waitable? . -> . any))
(channel (-> channel?))
(channel-recv-evt (channel? . -> . object-waitable?))
(channel-send-evt (channel? any? . -> . object-waitable?))
(choice-evt (() (listof object-waitable?) . ->* . (object-waitable?)))
(wrap-evt (object-waitable? (any? . -> . any?) . -> . object-waitable?))
(guard-evt ((-> any?) . -> . object-waitable?))
(nack-guard-evt ((object-waitable? . -> . any?) . -> . object-waitable?))
(thread-done-evt (thread? . -> . object-waitable?))
(channel-recv-evt (channel? . -> . evt?))
(channel-send-evt (channel? any? . -> . evt?))
(thread-done-evt (thread? . -> . evt?))
(current-time (-> number?))
(time-evt (number? . -> . object-waitable?))))
(time-evt (real? . -> . evt?))))

315
collects/mzlib/port.ss Normal file
View File

@ -0,0 +1,315 @@
(module port mzscheme
(require (lib "etc.ss"))
(provide open-output-nowhere
make-input-port/read-to-peek
merge-input
copy-port
input-port-append
convert-stream
make-limited-input-port)
(define open-output-nowhere
(opt-lambda ([name 'nowhere])
(make-output-port
name
always-evt
(lambda (s start end non-block?) (- end start))
void
(lambda (special non-block?) #t)
(lambda (s start end) (wrap-evt
always-evt
(lambda (x)
(- end start))))
(lambda (special) (wrap-evt always-evt (lambda (x) #t))))))
(define (copy-port src dest . dests)
(unless (input-port? src)
(raise-type-error 'copy-port "input-port" src))
(for-each
(lambda (dest)
(unless (output-port? dest)
(raise-type-error 'copy-port "output-port" dest)))
(cons dest dests))
(let ([s (make-bytes 4096)])
(let loop ()
(let ([c (read-bytes-avail! s src)])
(unless (eof-object? c)
(for-each
(lambda (dest)
(let loop ([start 0])
(unless (= start c)
(let ([c2 (write-bytes-avail s dest start c)])
(loop (+ start c2))))))
(cons dest dests))
(loop))))))
(define merge-input
(case-lambda
[(a b) (merge-input a b 4096)]
[(a b limit)
(or (input-port? a)
(raise-type-error 'merge-input "input-port" a))
(or (input-port? b)
(raise-type-error 'merge-input "input-port" b))
(or (not limit)
(and (number? limit) (positive? limit) (exact? limit) (integer? limit))
(raise-type-error 'merge-input "positive exact integer or #f" limit))
(let-values ([(rd wt) (make-pipe limit)]
[(other-done?) #f]
[(sema) (make-semaphore 1)])
(let ([copy
(lambda (from)
(thread
(lambda ()
(copy-port from wt)
(semaphore-wait sema)
(if other-done?
(close-output-port wt)
(set! other-done? #t))
(semaphore-post sema))))])
(copy a)
(copy b)
rd))]))
(define (make-input-port/read-to-peek name read fast-peek close)
(define lock-semaphore (make-semaphore 1))
(define-values (peeked-r peeked-w) (make-pipe))
(define peeked-end 0)
(define special-peeked null)
(define special-peeked-tail #f)
(define (try-again)
(wrap-evt
(semaphore-peek-evt lock-semaphore)
(lambda (x) 0)))
(define (read-it s)
(parameterize ([break-enabled #f])
(call-with-semaphore
lock-semaphore
(lambda ()
(do-read-it s))
try-again)))
(define (do-read-it s)
(if (char-ready? peeked-r)
(read-bytes-avail!* s peeked-r)
;; If nothing is saved from a peeking read,
;; dispatch to `read', otherwise
(cond
[(null? special-peeked) (read s)]
[else (if (bytes? (car special-peeked))
(let ([b (car special-peeked)])
(set! peeked-end (+ (file-position peeked-r) (bytes-length b)))
(write-bytes b peeked-w)
(set! special-peeked (cdr special-peeked))
(when (null? special-peeked)
(set! special-peeked-tail #f))
(read-bytes-avail!* s peeked-r))
(begin0
(car special-peeked)
(set! special-peeked (cdr special-peeked))
(when (null? special-peeked)
(set! special-peeked-tail #f))))])))
(define (peek-it s skip)
(parameterize ([break-enabled #f])
(call-with-semaphore
lock-semaphore
(lambda ()
(do-peek-it s skip))
try-again)))
(define (do-peek-it s skip)
(let ([v (peek-bytes-avail!* s skip peeked-r)])
(if (zero? v)
;; The peek may have failed because peeked-r is empty,
;; or because the skip is far. Handle nicely the common
;; case where there are no specials.
(cond
[(null? special-peeked)
;; Empty special queue, so read through the original proc
(let ([r (read s)])
(cond
[(number? r)
;; The nice case --- reading gave us more bytes
(set! peeked-end (+ r peeked-end))
(write-bytes s peeked-w 0 r)
;; Now try again
(peek-bytes-avail!* s skip peeked-r)]
[else
(set! special-peeked (cons r null))
(set! special-peeked-tail special-peeked)
;; Now try again
(peek-it s skip)]))]
[else
;; Non-empty special queue, so try to use it
(let* ([pos (file-position peeked-r)]
[avail (- peeked-end pos)]
[skip (- skip avail)])
(let loop ([skip (- skip avail)]
[l special-peeked])
(cond
[(null? l)
;; Not enough even in the special queue.
;; Read once and add it.
(let* ([t (make-bytes (min 4096 (+ skip (bytes-length s))))]
[r (read s)])
(cond
[(evt? r)
;; We can't deal with an event, so complain
(error 'make-input-port/read-to-peek
"original read produced an event: ~e"
r)]
[(eq? r 0)
;; Original read thinks a spin is ok,
;; so we return 0 to skin, too.
0]
[else (let ([v (if (number? r)
(subbytes t 0 r)
r)])
(set-cdr! special-peeked-tail v)
;; Got something; now try again
(do-peek-it s skip))]))]
[(eof-object? (car l))
;; No peeking past an EOF
eof]
[(pair? (car l))
(if (skip . < . (caar l))
(car l)
(loop (- skip (caar l)) (cdr l)))]
[(bytes? (car l))
(let ([len (bytes-length (car l))])
(if (skip . < . len)
(let ([n (min (bytes-length s)
(- len skip))])
(bytes-copy! s 0 (car l) skip (+ skip n))
n)
(loop (- skip len) (cdr l))))])))])
v)))
(make-input-port
name
;; Read
read-it
;; Peek
(if fast-peek
(lambda (s skip)
(fast-peek s skip peek-it))
peek-it)
close))
(define input-port-append
(opt-lambda (close-orig? . ports)
(make-input-port
(map object-name ports)
(lambda (str)
;; Reading is easy -- read from the first port,
;; and get rid of it if the result is eof
(if (null? ports)
eof
(let ([n (read-bytes-avail!* str (car ports))])
(cond
[(eq? n 0) (car ports)]
[(eof-object? n)
(when close-orig?
(close-input-port (car ports)))
(set! ports (cdr ports))
0]
[else n]))))
(lambda (str skip)
;; Peeking is more difficult, due to skips.
(let loop ([ports ports][skip skip])
(if (null? ports)
eof
(let ([n (peek-bytes-avail!* str skip (car ports))])
(cond
[(eq? n 0)
;; Not ready, yet.
(car ports)]
[(eof-object? n)
;; Port is exhausted, or we skipped past its input.
;; If skip is not zero, we need to figure out
;; how many chars were skipped.
(loop (cdr ports)
(- skip (compute-avail-to-skip skip (car ports))))]
[else n])))))
(lambda ()
(when close-orig?
(map close-input-port ports))))))
(define (convert-stream from from-port
to to-port)
(let ([c (bytes-open-converter from to)]
[in (make-bytes 4096)]
[out (make-bytes 4096)])
(unless c
(error 'convert-stream "could not create converter from ~e to ~e"
from to))
(dynamic-wind
void
(lambda ()
(let loop ([got 0])
(let ([n (read-bytes-avail! in from-port got)])
(let ([got (+ got (if (eof-object? n)
0
n))])
(let-values ([(wrote used status) (bytes-convert c in 0 got out)])
(when (eq? status 'error)
(error 'convert-stream "conversion error"))
(unless (zero? wrote)
(write-bytes out to-port 0 wrote))
(bytes-copy! in 0 in used got)
(if (eof-object? n)
(begin
(unless (= got used)
(error 'convert-stream "input stream ended with a partial conversion"))
(let-values ([(wrote status) (bytes-convert-end c out)])
(when (eq? status 'error)
(error 'convert-stream "conversion-end error"))
(unless (zero? wrote)
(write-bytes out to-port 0 wrote))
;; Success
(void)))
(loop (- got used))))))))
(lambda () (bytes-close-converter c)))))
;; Helper for input-port-append; given a skip count
;; and an input port, determine how many characters
;; (up to upto) are left in the port. We figure this
;; out using binary search.
(define (compute-avail-to-skip upto p)
(let ([str (make-bytes 1)])
(let loop ([upto upto][skip 0])
(if (zero? upto)
skip
(let* ([half (quotient upto 2)]
[n (peek-bytes-avail!* str (+ skip half) p)])
(if (eq? n 1)
(loop (- upto half 1) (+ skip half 1))
(loop half skip)))))))
(define make-limited-input-port
(opt-lambda (port limit [close-orig? #t])
(let ([got 0])
(make-input-port
(object-name port)
(lambda (str)
(let ([count (min (- limit got) (bytes-length str))])
(if (zero? count)
eof
(let ([n (read-bytes-avail!* str port 0 count)])
(cond
[(eq? n 0) port]
[(number? n) (set! got (+ got n)) n]
[else n])))))
(lambda (str skip)
(let ([count (max 0 (min (- limit got skip) (bytes-length str)))])
(if (zero? count)
eof
(let ([n (peek-bytes-avail!* str skip port 0 count)])
(if (eq? n 0)
port
n)))))
(lambda ()
(when close-orig?
(close-input-port port))))))))

View File

@ -9,7 +9,7 @@
system/exit-code
system*/exit-code)
(require (lib "thread.ss"))
(require (lib "port.ss"))
;; Helpers: ----------------------------------------

View File

@ -248,13 +248,13 @@
(and end (if need-leftover? (- end start) end))
(if need-leftover?
leftover-port
(make-custom-output-port
#f
(make-output-port
'counter
always-evt
(lambda (s start end flush?)
(let ([c (- end start)])
(set! discarded (+ c discarded))
c))
void
void)))]
[leftovers (and need-leftover?
(if (and (regexp? pattern)

View File

@ -4,19 +4,13 @@
"etc.ss")
(provide consumer-thread
with-semaphore
dynamic-disable-break
dynamic-enable-break
make-single-threader
with-semaphore
dynamic-disable-break
dynamic-enable-break
make-single-threader
merge-input
copy-port
input-port-append
convert-stream
run-server
make-limited-input-port)
run-server)
#|
t accepts a function, f, and creates a thread. It returns the thread and a
@ -24,7 +18,7 @@
the call of f in the time of the thread that was created. Calls to g do not
block.
|#
(define consumer-thread
(case-lambda
[(f) (consumer-thread f void)]
@ -98,55 +92,6 @@
thunk
(lambda () (semaphore-post sema))))))))
(define (copy-port src dest . dests)
(unless (input-port? src)
(raise-type-error 'copy-port "input-port" src))
(for-each
(lambda (dest)
(unless (output-port? dest)
(raise-type-error 'copy-port "output-port" dest)))
(cons dest dests))
(let ([s (make-bytes 4096)])
(let loop ()
(let ([c (read-bytes-avail! s src)])
(unless (eof-object? c)
(for-each
(lambda (dest)
(let loop ([start 0])
(unless (= start c)
(let ([c2 (write-bytes-avail s dest start c)])
(loop (+ start c2))))))
(cons dest dests))
(loop))))))
(define merge-input
(case-lambda
[(a b) (merge-input a b 4096)]
[(a b limit)
(or (input-port? a)
(raise-type-error 'merge-input "input-port" a))
(or (input-port? b)
(raise-type-error 'merge-input "input-port" b))
(or (not limit)
(and (number? limit) (positive? limit) (exact? limit) (integer? limit))
(raise-type-error 'merge-input "positive exact integer or #f" limit))
(let-values ([(rd wt) (make-pipe limit)]
[(other-done?) #f]
[(sema) (make-semaphore 1)])
(let ([copy
(lambda (from)
(thread
(lambda ()
(copy-port from wt)
(semaphore-wait sema)
(if other-done?
(close-output-port wt)
(set! other-done? #t))
(semaphore-post sema))))])
(copy a)
(copy b)
rd))]))
(define run-server
(opt-lambda (port-number
handler
@ -181,126 +126,12 @@
(handler r w)))])
;; Clean-up and timeout thread:
(thread (lambda ()
(object-wait-multiple connection-timeout t)
(sync/timeout connection-timeout t)
(when (thread-running? t)
;; Only happens if connection-timeout is not #f
(break-thread t))
(object-wait-multiple connection-timeout t)
(sync/timeout connection-timeout t)
(custodian-shutdown-all c)))))))))
(loop)))
(lambda () (tcp-close l))))))
(define input-port-append
(opt-lambda (close-orig? . ports)
(make-custom-input-port
(lambda (str)
;; Reading is easy -- read from the first port,
;; and get rid of it if the result is eof
(if (null? ports)
eof
(let ([n (read-bytes-avail!* str (car ports))])
(cond
[(eq? n 0) (car ports)]
[(eof-object? n)
(when close-orig?
(close-input-port (car ports)))
(set! ports (cdr ports))
0]
[else n]))))
(lambda (str skip)
;; Peeking is more difficult, due to skips.
(let loop ([ports ports][skip skip])
(if (null? ports)
eof
(let ([n (peek-bytes-avail!* str skip (car ports))])
(cond
[(eq? n 0)
;; Not ready, yet.
(car ports)]
[(eof-object? n)
;; Port is exhausted, or we skipped past its input.
;; If skip is not zero, we need to figure out
;; how many chars were skipped.
(loop (cdr ports)
(- skip (compute-avail-to-skip skip (car ports))))]
[else n])))))
(lambda ()
(when close-orig?
(map close-input-port ports))))))
(define (convert-stream from from-port
to to-port)
(let ([c (bytes-open-converter from to)]
[in (make-bytes 4096)]
[out (make-bytes 4096)])
(unless c
(error 'convert-stream "could not create converter from ~e to ~e"
from to))
(dynamic-wind
void
(lambda ()
(let loop ([got 0])
(let ([n (read-bytes-avail! in from-port got)])
(let ([got (+ got (if (eof-object? n)
0
n))])
(let-values ([(wrote used status) (bytes-convert c in 0 got out)])
(when (eq? status 'error)
(error 'convert-stream "conversion error"))
(unless (zero? wrote)
(write-bytes out to-port 0 wrote))
(bytes-copy! in used in 0 got)
(if (eof-object? n)
(begin
(unless (= got used)
(error 'convert-stream "input stream ended with a partial conversion"))
(let-values ([(wrote status) (bytes-convert-end c out)])
(when (eq? status 'error)
(error 'convert-stream "conversion-end error"))
(unless (zero? wrote)
(write-bytes out to-port 0 wrote))
;; Success
(void)))
(loop (- got used))))))))
(lambda () (bytes-close-converter c)))))
;; Helper for input-port-append; given a skip count
;; and an input port, determine how many characters
;; (up to upto) are left in the port. We figure this
;; out using binary search.
(define (compute-avail-to-skip upto p)
(let ([str (make-bytes 1)])
(let loop ([upto upto][skip 0])
(if (zero? upto)
skip
(let* ([half (quotient upto 2)]
[n (peek-bytes-avail!* str (+ skip half) p)])
(if (eq? n 1)
(loop (- upto half 1) (+ skip half 1))
(loop half skip)))))))
(define make-limited-input-port
(opt-lambda (port limit [close-orig? #t])
(let ([got 0])
(make-custom-input-port
(lambda (str)
(let ([count (min (- limit got) (bytes-length str))])
(if (zero? count)
eof
(let ([n (read-bytes-avail!* str port 0 count)])
(cond
[(eq? n 0) port]
[(number? n) (set! got (+ got n)) n]
[else n])))))
(lambda (str skip)
(let ([count (max 0 (min (- limit got skip) (bytes-length str)))])
(if (zero? count)
eof
(let ([n (peek-bytes-avail!* str skip port 0 count)])
(if (eq? n 0)
port
n)))))
(lambda ()
(when close-orig?
(close-input-port port))))))))
(lambda () (tcp-close l)))))))