Add combine-output-ports
This commit is contained in:
parent
c40a7ae2fc
commit
8464896255
|
@ -297,6 +297,24 @@ The optional @racket[in-name] and @racket[out-name] arguments
|
|||
determine the names of the result ports.}
|
||||
|
||||
|
||||
@defproc[(combine-output [a-out output-port?]
|
||||
[b-out output-port?])
|
||||
output-port?]{
|
||||
|
||||
Accepts two output ports and returns a new output port
|
||||
combining the original ports. When written to, the combined port
|
||||
first writes as many bytes as possible to @racket[a-out], and then
|
||||
tries to write the same number of bytes to @racket[b-out]. If that
|
||||
doesn't succeed, what is left over is buffered and no further writes
|
||||
can go through until the ports are evened out. The port is ready (for
|
||||
the purposes of synchronization) when each port reports being ready.
|
||||
However, the first port may stop being ready while waiting on
|
||||
the second port to sync, so it cannot be guaranteed that both
|
||||
ports are ready at once. Closing the combined port is done
|
||||
after writing all remaining bytes to @racket[b-out].}
|
||||
|
||||
@history[#:added "7.7.0.9"]
|
||||
|
||||
@defproc[(merge-input [a-in input-port?]
|
||||
[b-in input-port?]
|
||||
[buffer-limit (or/c exact-nonnegative-integer? #f) 4096])
|
||||
|
|
|
@ -995,6 +995,72 @@
|
|||
(write-bytes (make-bytes 80) o2)
|
||||
(test 0 file-position o2))
|
||||
|
||||
;; --------------------------------------------------
|
||||
;; test combine-output
|
||||
(let ([port-a (open-output-string)]
|
||||
[port-b (open-output-string)])
|
||||
(define two-byte-port (make-output-port
|
||||
`two-byte-port
|
||||
port-b
|
||||
(lambda (s start end non-blocking? breakable?)
|
||||
(cond
|
||||
[non-blocking?
|
||||
(write-bytes-avail* (subbytes
|
||||
s
|
||||
start
|
||||
(if (< start (- end 1)) (+ start 2) end))
|
||||
port-b)]
|
||||
[breakable?
|
||||
(write-bytes-avail/enable-break
|
||||
(subbytes
|
||||
s
|
||||
start
|
||||
(if (< start (- end 1)) (+ start 2) end))
|
||||
port-b)]
|
||||
[else
|
||||
(write-bytes s port-b)]))
|
||||
void))
|
||||
(define port-ab (combine-output port-a two-byte-port))
|
||||
(test 12 write-bytes #"hello, world" port-ab)
|
||||
(test "hello, world" get-output-string port-a)
|
||||
(test "he" get-output-string port-b)
|
||||
(test 0 write-bytes-avail* #" test" port-ab)
|
||||
(test "hello, world" get-output-string port-a)
|
||||
(test "hell" get-output-string port-b)
|
||||
(test (void) flush-output port-ab)
|
||||
(test "hello, world" get-output-string port-a)
|
||||
(test "hello, world" get-output-string port-b)
|
||||
(define worker1 (thread
|
||||
(lambda ()
|
||||
(for ([i 10])
|
||||
(write-bytes (string->bytes/utf-8 (number->string i)) port-ab)))))
|
||||
(define worker2 (thread
|
||||
(lambda ()
|
||||
(write-bytes-avail* #"0123456789" port-ab))))
|
||||
(thread-wait worker1)
|
||||
(thread-wait worker2)
|
||||
(test "hello, world01234567890123456789" get-output-string port-a)
|
||||
(test "hello, world01234567890123456789" get-output-string port-b)
|
||||
(test (void) close-output-port port-ab)
|
||||
(test (void) close-output-port port-a)
|
||||
(test (void) close-output-port port-b)
|
||||
(define-values (i1 o1) (make-pipe 10 'i1 'o1))
|
||||
(define-values (i2 o2) (make-pipe 10 'i2 'o2))
|
||||
(define two-pipes (combine-output o1 o2))
|
||||
(test 10 write-bytes #"0123456789" two-pipes)
|
||||
(define sync-test-var 0)
|
||||
(define sync-thread (thread (lambda ()
|
||||
(begin
|
||||
(sync two-pipes)
|
||||
(set! sync-test-var 1)))))
|
||||
(test #t equal? sync-test-var 0)
|
||||
(test "01234" read-string 5 i1)
|
||||
(test #t equal? sync-test-var 0)
|
||||
(test "012" read-string 3 i2)
|
||||
(thread-wait sync-thread)
|
||||
(test #t equal? sync-test-var 1)
|
||||
(test 5 write-bytes-avail* #"test123" two-pipes))
|
||||
|
||||
;; --------------------------------------------------
|
||||
|
||||
(let-values ([(in out) (make-pipe)])
|
||||
|
|
|
@ -24,6 +24,8 @@
|
|||
call-with-input-string
|
||||
call-with-input-bytes
|
||||
|
||||
combine-output
|
||||
|
||||
;; `mzlib/port` exports
|
||||
open-output-nowhere
|
||||
make-pipe-with-specials
|
||||
|
@ -164,6 +166,75 @@
|
|||
(define (call-with-input-bytes str proc)
|
||||
(with-input-from-x 'call-with-input-bytes 1 #t str proc))
|
||||
|
||||
(define (combine-output a-out b-out)
|
||||
(struct buffer (bstr out start end) #:authentic #:mutable)
|
||||
(define pending #f)
|
||||
(define lock (make-semaphore 1))
|
||||
(define ready-evt (replace-evt a-out (λ (_) b-out)))
|
||||
(define retry-evt (handle-evt
|
||||
(replace-evt
|
||||
(semaphore-peek-evt lock)
|
||||
(λ (_) (replace-evt a-out (λ (_) b-out))))
|
||||
(λ (_) #f)))
|
||||
(define (write-pending!)
|
||||
(when pending
|
||||
(define bstr (buffer-bstr pending))
|
||||
(define out (buffer-out pending))
|
||||
(define start (buffer-start pending))
|
||||
(define end (buffer-end pending))
|
||||
(define n (write-bytes-avail* bstr out start end))
|
||||
(when n
|
||||
(cond
|
||||
[(= n (- end start))
|
||||
(set! pending #f)]
|
||||
[(> n 0)
|
||||
(set-buffer-start! pending (+ start n))]))))
|
||||
(define (write-out bstr start end non-blocking? enable-break?)
|
||||
(define result
|
||||
(call-with-semaphore
|
||||
lock
|
||||
(λ ()
|
||||
(write-pending!)
|
||||
(cond
|
||||
[pending retry-evt]
|
||||
[(= start end) 0]
|
||||
[enable-break? (write-out* write-bytes-avail/enable-break bstr start end)]
|
||||
[else (write-out* write-bytes-avail* bstr start end)]))
|
||||
(λ () retry-evt)))
|
||||
(when (eqv? result 0)
|
||||
(flush-output a-out)
|
||||
(flush-output b-out))
|
||||
result)
|
||||
(define (write-out* write-initial bstr start end)
|
||||
(define m (write-initial bstr a-out start end))
|
||||
(cond
|
||||
[(or (not m) (= m 0))
|
||||
retry-evt]
|
||||
[else
|
||||
(define n
|
||||
(or (write-bytes-avail* bstr b-out start (+ start m))
|
||||
0))
|
||||
(when (< n m)
|
||||
(set! pending (buffer bstr b-out (+ start n) (+ start m))))
|
||||
m]))
|
||||
(define (close)
|
||||
(call-with-semaphore
|
||||
lock
|
||||
(λ ()
|
||||
(when pending
|
||||
(write-bytes (buffer-bstr pending)
|
||||
(buffer-out pending)
|
||||
(buffer-start pending)
|
||||
(buffer-end pending))
|
||||
(set! pending #f))))
|
||||
(flush-output a-out)
|
||||
(flush-output b-out))
|
||||
(make-output-port
|
||||
'tee
|
||||
ready-evt
|
||||
write-out
|
||||
close))
|
||||
|
||||
;; ----------------------------------------
|
||||
;; the code below used to be in `mzlib/port`
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user