cs & io: fix pipe read/write on concurrent close

This commit is contained in:
Matthew Flatt 2019-10-14 07:53:44 -06:00
parent ca2eafbfc4
commit 2799713ea5
3 changed files with 75 additions and 6 deletions

View File

@ -95,6 +95,51 @@
(test #t string-port? (open-output-bytes))
(test #t string-port? (open-output-string))
;; concurrent close on input fails
(let ()
(define-values (i o) (make-pipe))
(thread (lambda ()
(sync (system-idle-evt))
(close-input-port i)))
(err/rt-test
(peek-bytes-avail! (make-bytes 10) 0 #f i)
exn:fail?))
;; concurrent close on input triggers progress
(let ()
(define-values (i o) (make-pipe))
(thread (lambda ()
(sync (system-idle-evt))
(close-input-port i)))
(test 0 peek-bytes-avail! (make-bytes 10) 0 (port-progress-evt i) i))
;; concurrent close on output fails
(let ()
(define-values (i o) (make-pipe 4096))
(thread (lambda ()
(sync (system-idle-evt))
(close-output-port o)))
(err/rt-test
(let loop ()
(write-bytes #"hello" o)
(loop))
exn:fail?))
;; concurrent close of input unblocks limited output
(let ()
(define-values (i o) (make-pipe 4096))
(define done? #f)
(thread (lambda ()
(sync (system-idle-evt))
(set! done? #t)
(close-input-port i)))
;; Shouldn't get stuck:
(let loop ()
(write-bytes #"hello" o)
(unless done?
(loop))))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Based on the Racket manual...

View File

@ -15,6 +15,22 @@
(path->string (current-directory))
(set-string->number?! string->number)
(let ()
(define-values (i o) (make-pipe 4096))
(define done? #f)
(thread (lambda ()
(sync (system-idle-evt))
(set! done? #t)
(close-input-port i)))
;; Should error:
(let loop ()
(write-bytes #"hello" o)
(unless done?
(loop))))
(define-syntax-rule (test expect rhs)
(let ([e expect]
[v rhs])

View File

@ -284,7 +284,9 @@
(when input-ref
(slow-mode!)
(set! input-ref #f)
(progress!))))]
(progress!)
(check-input-unblocking)
(check-output-unblocking))))]
[get-progress-evt
(lambda ()
@ -433,6 +435,9 @@
(cond
[(fx= src-start src-end) ;; => flush
0]
[(not input-ref)
;; No input end => accept all bytes
(fx- src-end src-start)]
[(and (end . fx>= . start)
(end . fx< . top-pos))
(define amt (apply-limit (fxmin (fx- top-pos end)
@ -486,7 +491,8 @@
(when output-ref
(slow-mode!)
(set! output-ref #f)
(check-input-unblocking))))])
(check-input-unblocking)
(check-output-unblocking))))])
;; ----------------------------------------
@ -531,8 +537,6 @@
;; ----------------------------------------
;; Note: a thread blocked on writing to a limited pipe cannot be GCed
;; due to the use of `replace-evt`.
(struct pipe-write-poller (d)
#:property
prop:evt
@ -541,7 +545,9 @@
(with-object pipe-data (pipe-write-poller-d pwp)
(sync-both)
(cond
[(not (output-full?))
[(or (not (output-full?))
(not input-ref)
(not output-ref))
(values (list pwp) #f)]
[else
(unless write-ready-sema
@ -560,7 +566,9 @@
(with-object pipe-data (pipe-read-poller-d prp)
(sync-both)
(cond
[(not (input-empty?))
[(or (not (input-empty?))
(not output-ref)
(not input-ref))
(values (list 0) #f)]
[else
(unless read-ready-sema