Add combine-output-ports

This commit is contained in:
Luka Hadzi-Djokic 2020-07-02 22:14:48 +02:00 committed by GitHub
parent c40a7ae2fc
commit 8464896255
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 155 additions and 0 deletions

View File

@ -297,6 +297,24 @@ The optional @racket[in-name] and @racket[out-name] arguments
determine the names of the result ports.}
@defproc[(combine-output [a-out output-port?]
[b-out output-port?])
output-port?]{
Accepts two output ports and returns a new output port
combining the original ports. When written to, the combined port
first writes as many bytes as possible to @racket[a-out], and then
tries to write the same number of bytes to @racket[b-out]. If that
doesn't succeed, what is left over is buffered and no further writes
can go through until the ports are evened out. The port is ready (for
the purposes of synchronization) when each port reports being ready.
However, the first port may stop being ready while waiting on
the second port to sync, so it cannot be guaranteed that both
ports are ready at once. Closing the combined port is done
after writing all remaining bytes to @racket[b-out].}
@history[#:added "7.7.0.9"]
@defproc[(merge-input [a-in input-port?]
[b-in input-port?]
[buffer-limit (or/c exact-nonnegative-integer? #f) 4096])

View File

@ -995,6 +995,72 @@
(write-bytes (make-bytes 80) o2)
(test 0 file-position o2))
;; --------------------------------------------------
;; test combine-output
(let ([port-a (open-output-string)]
[port-b (open-output-string)])
(define two-byte-port (make-output-port
`two-byte-port
port-b
(lambda (s start end non-blocking? breakable?)
(cond
[non-blocking?
(write-bytes-avail* (subbytes
s
start
(if (< start (- end 1)) (+ start 2) end))
port-b)]
[breakable?
(write-bytes-avail/enable-break
(subbytes
s
start
(if (< start (- end 1)) (+ start 2) end))
port-b)]
[else
(write-bytes s port-b)]))
void))
(define port-ab (combine-output port-a two-byte-port))
(test 12 write-bytes #"hello, world" port-ab)
(test "hello, world" get-output-string port-a)
(test "he" get-output-string port-b)
(test 0 write-bytes-avail* #" test" port-ab)
(test "hello, world" get-output-string port-a)
(test "hell" get-output-string port-b)
(test (void) flush-output port-ab)
(test "hello, world" get-output-string port-a)
(test "hello, world" get-output-string port-b)
(define worker1 (thread
(lambda ()
(for ([i 10])
(write-bytes (string->bytes/utf-8 (number->string i)) port-ab)))))
(define worker2 (thread
(lambda ()
(write-bytes-avail* #"0123456789" port-ab))))
(thread-wait worker1)
(thread-wait worker2)
(test "hello, world01234567890123456789" get-output-string port-a)
(test "hello, world01234567890123456789" get-output-string port-b)
(test (void) close-output-port port-ab)
(test (void) close-output-port port-a)
(test (void) close-output-port port-b)
(define-values (i1 o1) (make-pipe 10 'i1 'o1))
(define-values (i2 o2) (make-pipe 10 'i2 'o2))
(define two-pipes (combine-output o1 o2))
(test 10 write-bytes #"0123456789" two-pipes)
(define sync-test-var 0)
(define sync-thread (thread (lambda ()
(begin
(sync two-pipes)
(set! sync-test-var 1)))))
(test #t equal? sync-test-var 0)
(test "01234" read-string 5 i1)
(test #t equal? sync-test-var 0)
(test "012" read-string 3 i2)
(thread-wait sync-thread)
(test #t equal? sync-test-var 1)
(test 5 write-bytes-avail* #"test123" two-pipes))
;; --------------------------------------------------
(let-values ([(in out) (make-pipe)])

View File

@ -24,6 +24,8 @@
call-with-input-string
call-with-input-bytes
combine-output
;; `mzlib/port` exports
open-output-nowhere
make-pipe-with-specials
@ -164,6 +166,75 @@
(define (call-with-input-bytes str proc)
(with-input-from-x 'call-with-input-bytes 1 #t str proc))
(define (combine-output a-out b-out)
(struct buffer (bstr out start end) #:authentic #:mutable)
(define pending #f)
(define lock (make-semaphore 1))
(define ready-evt (replace-evt a-out (λ (_) b-out)))
(define retry-evt (handle-evt
(replace-evt
(semaphore-peek-evt lock)
(λ (_) (replace-evt a-out (λ (_) b-out))))
(λ (_) #f)))
(define (write-pending!)
(when pending
(define bstr (buffer-bstr pending))
(define out (buffer-out pending))
(define start (buffer-start pending))
(define end (buffer-end pending))
(define n (write-bytes-avail* bstr out start end))
(when n
(cond
[(= n (- end start))
(set! pending #f)]
[(> n 0)
(set-buffer-start! pending (+ start n))]))))
(define (write-out bstr start end non-blocking? enable-break?)
(define result
(call-with-semaphore
lock
(λ ()
(write-pending!)
(cond
[pending retry-evt]
[(= start end) 0]
[enable-break? (write-out* write-bytes-avail/enable-break bstr start end)]
[else (write-out* write-bytes-avail* bstr start end)]))
(λ () retry-evt)))
(when (eqv? result 0)
(flush-output a-out)
(flush-output b-out))
result)
(define (write-out* write-initial bstr start end)
(define m (write-initial bstr a-out start end))
(cond
[(or (not m) (= m 0))
retry-evt]
[else
(define n
(or (write-bytes-avail* bstr b-out start (+ start m))
0))
(when (< n m)
(set! pending (buffer bstr b-out (+ start n) (+ start m))))
m]))
(define (close)
(call-with-semaphore
lock
(λ ()
(when pending
(write-bytes (buffer-bstr pending)
(buffer-out pending)
(buffer-start pending)
(buffer-end pending))
(set! pending #f))))
(flush-output a-out)
(flush-output b-out))
(make-output-port
'tee
ready-evt
write-out
close))
;; ----------------------------------------
;; the code below used to be in `mzlib/port`