fix problems with pipe limits

A pipe's limit is supposed to apply only to unpeeked bytes, but
there were problems related to triggering further writes after
a peek, and also triggering further reads after a partial
write.
This commit is contained in:
Matthew Flatt 2013-10-14 14:37:37 -06:00
parent 2bc58b5663
commit e0c143ff33
2 changed files with 47 additions and 33 deletions

View File

@ -505,6 +505,19 @@
(test #"12311-" values s))
(test 3 write-bytes-avail #"1234" out))
;; Further test of peeking in a limited pipe (shouldn't get stuck):
(let-values ([(i o) (make-pipe 50)]
[(s) (make-semaphore)])
(define t
(thread (lambda ()
(peek-bytes 100 0 i)
(semaphore-wait s)
(peek-bytes 200 0 i))))
(display (make-bytes 100 65) o)
(sync (system-idle-evt))
(semaphore-post s)
(display (make-bytes 100 66) o)
(sync t))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Provide a location proc:

View File

@ -1655,9 +1655,9 @@ int scheme_is_user_port(Scheme_Object *port)
/* pipe ports */
/*========================================================================*/
static void pipe_did_read(Scheme_Input_Port *port, Scheme_Pipe *pipe)
static void pipe_did_read(Scheme_Input_Port *port, Scheme_Pipe *pipe, int peek)
{
if (port && port->progress_evt) {
if (port && port->progress_evt && !peek) {
scheme_post_sema_all(port->progress_evt);
port->progress_evt = NULL;
}
@ -1789,7 +1789,7 @@ static intptr_t pipe_get_or_peek_bytes(Scheme_Input_Port *p,
else
pipe->bufmaxextra = 0;
}
pipe_did_read(p, pipe);
pipe_did_read(p, pipe, 0);
} else {
if (!c) {
if (size && pipe->eof)
@ -1809,6 +1809,7 @@ static intptr_t pipe_get_or_peek_bytes(Scheme_Input_Port *p,
pipe->bufmaxextra = c + skipped;
}
}
pipe_did_read(p, pipe, 1);
}
}
@ -1897,6 +1898,7 @@ static intptr_t pipe_write_bytes(Scheme_Output_Port *p,
wrote += xavail;
d += xavail;
len -= xavail;
pipe_did_write(pipe);
/* For non-blocking mode, that might be good enough.
rarely_block == 2 means that even nothing is good enough. */
@ -1904,34 +1906,33 @@ static intptr_t pipe_write_bytes(Scheme_Output_Port *p,
return wrote;
/* Now, wait until we can write more, then start over. */
while (1) {
if (pipe->bufstart <= pipe->bufend) {
avail = (pipe->buflen - pipe->bufend) + pipe->bufstart - 1;
} else {
avail = pipe->bufstart - pipe->bufend - 1;
}
if (pipe->bufmax) {
/* Again, it's possible that the port grew to accommodate
past peeks... */
intptr_t extra;
extra = pipe->buflen - (pipe->bufmax + pipe->bufmaxextra);
if (extra > 0)
avail -= extra;
}
if (avail || pipe->eof || p->closed)
goto try_again;
my_sema = scheme_make_sema(0);
{
Scheme_Object *wp;
wp = scheme_make_pair(my_sema, pipe->wakeup_on_read);
pipe->wakeup_on_read = wp;
}
scheme_wait_sema(my_sema, enable_break ? -1 : 0);
if (pipe->bufstart <= pipe->bufend) {
avail = (pipe->buflen - pipe->bufend) + pipe->bufstart - 1;
} else {
avail = pipe->bufstart - pipe->bufend - 1;
}
/* Doesn't get here */
if (pipe->bufmax) {
/* Again, it's possible that the port grew to accommodate
past peeks... */
intptr_t extra;
extra = pipe->buflen - (pipe->bufmax + pipe->bufmaxextra);
if (extra > 0)
avail -= extra;
}
if (avail || pipe->eof || p->closed)
goto try_again;
my_sema = scheme_make_sema(0);
{
Scheme_Object *wp;
wp = scheme_make_pair(my_sema, pipe->wakeup_on_read);
pipe->wakeup_on_read = wp;
}
scheme_wait_sema(my_sema, enable_break ? -1 : 0);
goto try_again;
}
}
@ -1989,7 +1990,7 @@ static intptr_t pipe_write_bytes(Scheme_Output_Port *p,
pipe->bufend = endpos;
pipe_did_write(pipe);
return len + wrote;
}
@ -2014,7 +2015,7 @@ static void pipe_in_close(Scheme_Input_Port *p)
pipe->eof = 1;
/* to wake up any other threads blocked on pipe I/O: */
pipe_did_read(p, pipe);
pipe_did_read(p, pipe, 0);
pipe_did_write(pipe);
}
@ -2027,7 +2028,7 @@ static void pipe_out_close(Scheme_Output_Port *p)
pipe->eof = 1;
/* to wake up any other threads blocked on pipe I/O: */
pipe_did_read(NULL, pipe);
pipe_did_read(NULL, pipe, 0);
pipe_did_write(pipe);
}