diff --git a/pkgs/racket-doc/scribblings/reference/port-lib.scrbl b/pkgs/racket-doc/scribblings/reference/port-lib.scrbl index 506a319c2e..dd2e7053c8 100644 --- a/pkgs/racket-doc/scribblings/reference/port-lib.scrbl +++ b/pkgs/racket-doc/scribblings/reference/port-lib.scrbl @@ -297,6 +297,24 @@ The optional @racket[in-name] and @racket[out-name] arguments determine the names of the result ports.} +@defproc[(combine-output [a-out output-port?] + [b-out output-port?]) + output-port?]{ + +Accepts two output ports and returns a new output port +combining the original ports. When written to, the combined port +first writes as many bytes as possible to @racket[a-out], and then +tries to write the same number of bytes to @racket[b-out]. If that +doesn't succeed, what is left over is buffered and no further writes +can go through until the ports are evened out. The port is ready (for +the purposes of synchronization) when each port reports being ready. +However, the first port may stop being ready while waiting on +the second port to sync, so it cannot be guaranteed that both +ports are ready at once. Closing the combined port is done +after writing all remaining bytes to @racket[b-out].} + +@history[#:added "7.7.0.9"] + @defproc[(merge-input [a-in input-port?] [b-in input-port?] [buffer-limit (or/c exact-nonnegative-integer? #f) 4096]) diff --git a/pkgs/racket-test-core/tests/racket/portlib.rktl b/pkgs/racket-test-core/tests/racket/portlib.rktl index f05ff6f5e3..dc9e58ede2 100644 --- a/pkgs/racket-test-core/tests/racket/portlib.rktl +++ b/pkgs/racket-test-core/tests/racket/portlib.rktl @@ -995,6 +995,72 @@ (write-bytes (make-bytes 80) o2) (test 0 file-position o2)) +;; -------------------------------------------------- +;; test combine-output +(let ([port-a (open-output-string)] + [port-b (open-output-string)]) + (define two-byte-port (make-output-port + `two-byte-port + port-b + (lambda (s start end non-blocking? breakable?) + (cond + [non-blocking? + (write-bytes-avail* (subbytes + s + start + (if (< start (- end 1)) (+ start 2) end)) + port-b)] + [breakable? + (write-bytes-avail/enable-break + (subbytes + s + start + (if (< start (- end 1)) (+ start 2) end)) + port-b)] + [else + (write-bytes s port-b)])) + void)) + (define port-ab (combine-output port-a two-byte-port)) + (test 12 write-bytes #"hello, world" port-ab) + (test "hello, world" get-output-string port-a) + (test "he" get-output-string port-b) + (test 0 write-bytes-avail* #" test" port-ab) + (test "hello, world" get-output-string port-a) + (test "hell" get-output-string port-b) + (test (void) flush-output port-ab) + (test "hello, world" get-output-string port-a) + (test "hello, world" get-output-string port-b) + (define worker1 (thread + (lambda () + (for ([i 10]) + (write-bytes (string->bytes/utf-8 (number->string i)) port-ab))))) + (define worker2 (thread + (lambda () + (write-bytes-avail* #"0123456789" port-ab)))) + (thread-wait worker1) + (thread-wait worker2) + (test "hello, world01234567890123456789" get-output-string port-a) + (test "hello, world01234567890123456789" get-output-string port-b) + (test (void) close-output-port port-ab) + (test (void) close-output-port port-a) + (test (void) close-output-port port-b) + (define-values (i1 o1) (make-pipe 10 'i1 'o1)) + (define-values (i2 o2) (make-pipe 10 'i2 'o2)) + (define two-pipes (combine-output o1 o2)) + (test 10 write-bytes #"0123456789" two-pipes) + (define sync-test-var 0) + (define sync-thread (thread (lambda () + (begin + (sync two-pipes) + (set! sync-test-var 1))))) + (test #t equal? sync-test-var 0) + (test "01234" read-string 5 i1) + (test #t equal? sync-test-var 0) + (test "012" read-string 3 i2) + (thread-wait sync-thread) + (test #t equal? sync-test-var 1) + (test 5 write-bytes-avail* #"test123" two-pipes)) + ;; -------------------------------------------------- (let-values ([(in out) (make-pipe)]) diff --git a/racket/collects/racket/port.rkt b/racket/collects/racket/port.rkt index 8ca478168d..b626eb8b5e 100644 --- a/racket/collects/racket/port.rkt +++ b/racket/collects/racket/port.rkt @@ -24,6 +24,8 @@ call-with-input-string call-with-input-bytes + combine-output + ;; `mzlib/port` exports open-output-nowhere make-pipe-with-specials @@ -164,6 +166,75 @@ (define (call-with-input-bytes str proc) (with-input-from-x 'call-with-input-bytes 1 #t str proc)) +(define (combine-output a-out b-out) + (struct buffer (bstr out start end) #:authentic #:mutable) + (define pending #f) + (define lock (make-semaphore 1)) + (define ready-evt (replace-evt a-out (λ (_) b-out))) + (define retry-evt (handle-evt + (replace-evt + (semaphore-peek-evt lock) + (λ (_) (replace-evt a-out (λ (_) b-out)))) + (λ (_) #f))) + (define (write-pending!) + (when pending + (define bstr (buffer-bstr pending)) + (define out (buffer-out pending)) + (define start (buffer-start pending)) + (define end (buffer-end pending)) + (define n (write-bytes-avail* bstr out start end)) + (when n + (cond + [(= n (- end start)) + (set! pending #f)] + [(> n 0) + (set-buffer-start! pending (+ start n))])))) + (define (write-out bstr start end non-blocking? enable-break?) + (define result + (call-with-semaphore + lock + (λ () + (write-pending!) + (cond + [pending retry-evt] + [(= start end) 0] + [enable-break? (write-out* write-bytes-avail/enable-break bstr start end)] + [else (write-out* write-bytes-avail* bstr start end)])) + (λ () retry-evt))) + (when (eqv? result 0) + (flush-output a-out) + (flush-output b-out)) + result) + (define (write-out* write-initial bstr start end) + (define m (write-initial bstr a-out start end)) + (cond + [(or (not m) (= m 0)) + retry-evt] + [else + (define n + (or (write-bytes-avail* bstr b-out start (+ start m)) + 0)) + (when (< n m) + (set! pending (buffer bstr b-out (+ start n) (+ start m)))) + m])) + (define (close) + (call-with-semaphore + lock + (λ () + (when pending + (write-bytes (buffer-bstr pending) + (buffer-out pending) + (buffer-start pending) + (buffer-end pending)) + (set! pending #f)))) + (flush-output a-out) + (flush-output b-out)) + (make-output-port + 'tee + ready-evt + write-out + close)) + ;; ---------------------------------------- ;; the code below used to be in `mzlib/port`