diff --git a/pkgs/racket-test-core/tests/racket/port.rktl b/pkgs/racket-test-core/tests/racket/port.rktl index 1d84f275a9..6b55a1c255 100644 --- a/pkgs/racket-test-core/tests/racket/port.rktl +++ b/pkgs/racket-test-core/tests/racket/port.rktl @@ -95,6 +95,51 @@ (test #t string-port? (open-output-bytes)) (test #t string-port? (open-output-string)) +;; concurrent close on input fails +(let () + (define-values (i o) (make-pipe)) + (thread (lambda () + (sync (system-idle-evt)) + (close-input-port i))) + (err/rt-test + (peek-bytes-avail! (make-bytes 10) 0 #f i) + exn:fail?)) + +;; concurrent close on input triggers progress +(let () + (define-values (i o) (make-pipe)) + (thread (lambda () + (sync (system-idle-evt)) + (close-input-port i))) + (test 0 peek-bytes-avail! (make-bytes 10) 0 (port-progress-evt i) i)) + +;; concurrent close on output fails +(let () + (define-values (i o) (make-pipe 4096)) + (thread (lambda () + (sync (system-idle-evt)) + (close-output-port o))) + (err/rt-test + (let loop () + (write-bytes #"hello" o) + (loop)) + exn:fail?)) + +;; concurrent close of input unblocks limited output +(let () + (define-values (i o) (make-pipe 4096)) + (define done? #f) + (thread (lambda () + (sync (system-idle-evt)) + (set! done? #t) + (close-input-port i))) + + ;; Shouldn't get stuck: + (let loop () + (write-bytes #"hello" o) + (unless done? + (loop)))) + ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Based on the Racket manual... diff --git a/racket/src/io/demo.rkt b/racket/src/io/demo.rkt index 2ad8e2722a..7294ad6a0b 100644 --- a/racket/src/io/demo.rkt +++ b/racket/src/io/demo.rkt @@ -15,6 +15,22 @@ (path->string (current-directory)) (set-string->number?! string->number) +(let () + (define-values (i o) (make-pipe 4096)) + + (define done? #f) + + (thread (lambda () + (sync (system-idle-evt)) + (set! done? #t) + (close-input-port i))) + + ;; Should error: + (let loop () + (write-bytes #"hello" o) + (unless done? + (loop)))) + (define-syntax-rule (test expect rhs) (let ([e expect] [v rhs]) diff --git a/racket/src/io/port/pipe.rkt b/racket/src/io/port/pipe.rkt index b82be0e8b7..330ca30f4f 100644 --- a/racket/src/io/port/pipe.rkt +++ b/racket/src/io/port/pipe.rkt @@ -284,7 +284,9 @@ (when input-ref (slow-mode!) (set! input-ref #f) - (progress!))))] + (progress!) + (check-input-unblocking) + (check-output-unblocking))))] [get-progress-evt (lambda () @@ -433,6 +435,9 @@ (cond [(fx= src-start src-end) ;; => flush 0] + [(not input-ref) + ;; No input end => accept all bytes + (fx- src-end src-start)] [(and (end . fx>= . start) (end . fx< . top-pos)) (define amt (apply-limit (fxmin (fx- top-pos end) @@ -486,7 +491,8 @@ (when output-ref (slow-mode!) (set! output-ref #f) - (check-input-unblocking))))]) + (check-input-unblocking) + (check-output-unblocking))))]) ;; ---------------------------------------- @@ -531,8 +537,6 @@ ;; ---------------------------------------- -;; Note: a thread blocked on writing to a limited pipe cannot be GCed -;; due to the use of `replace-evt`. (struct pipe-write-poller (d) #:property prop:evt @@ -541,7 +545,9 @@ (with-object pipe-data (pipe-write-poller-d pwp) (sync-both) (cond - [(not (output-full?)) + [(or (not (output-full?)) + (not input-ref) + (not output-ref)) (values (list pwp) #f)] [else (unless write-ready-sema @@ -560,7 +566,9 @@ (with-object pipe-data (pipe-read-poller-d prp) (sync-both) (cond - [(not (input-empty?)) + [(or (not (input-empty?)) + (not output-ref) + (not input-ref)) (values (list 0) #f)] [else (unless read-ready-sema