change handling of blocking I/O to collapse sources to a single poll

This improvement applies to both poll() and select() modes, and it
can reduce scheduling overhead when blocking on many I/O sources
at once.

This mode is not enabled for Windows, however, since Racket doesn't
exactly use select() on Windows.
This commit is contained in:
Matthew Flatt 2011-11-09 12:13:26 -07:00
parent dfe5e599e4
commit 67df1f7bce
10 changed files with 715 additions and 127 deletions

View File

@ -2004,6 +2004,13 @@ extern Scheme_Extension_Table *scheme_extension_table;
# define MZ_FD_ISSET(n, p) FD_ISSET(n, p) # define MZ_FD_ISSET(n, p) FD_ISSET(n, p)
#endif #endif
/* For scheme_fd_to_semaphore(): */
#define MZFD_CREATE_READ 1
#define MZFD_CREATE_WRITE 2
#define MZFD_CHECK_READ 3
#define MZFD_CHECK_WRITE 4
#define MZFD_REMOVE 5
/*========================================================================*/ /*========================================================================*/
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -174,6 +174,8 @@ typedef struct Thread_Local_Variables {
struct Scheme_Object *scheme_orig_stderr_port_; struct Scheme_Object *scheme_orig_stderr_port_;
struct Scheme_Object *scheme_orig_stdin_port_; struct Scheme_Object *scheme_orig_stdin_port_;
struct mz_fd_set *scheme_fd_set_; struct mz_fd_set *scheme_fd_set_;
struct mz_fd_set *scheme_semaphore_fd_set_;
struct Scheme_Hash_Table *scheme_semaphore_fd_mapping_;
#ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS #ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
struct Scheme_Hash_Table *locked_fd_process_map_; struct Scheme_Hash_Table *locked_fd_process_map_;
#endif #endif
@ -518,6 +520,8 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define scheme_orig_stderr_port XOA (scheme_get_thread_local_variables()->scheme_orig_stderr_port_) #define scheme_orig_stderr_port XOA (scheme_get_thread_local_variables()->scheme_orig_stderr_port_)
#define scheme_orig_stdin_port XOA (scheme_get_thread_local_variables()->scheme_orig_stdin_port_) #define scheme_orig_stdin_port XOA (scheme_get_thread_local_variables()->scheme_orig_stdin_port_)
#define scheme_fd_set XOA (scheme_get_thread_local_variables()->scheme_fd_set_) #define scheme_fd_set XOA (scheme_get_thread_local_variables()->scheme_fd_set_)
#define scheme_semaphore_fd_set XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_set_)
#define scheme_semaphore_fd_mapping XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_mapping_)
#define locked_fd_process_map XOA (scheme_get_thread_local_variables()->locked_fd_process_map_) #define locked_fd_process_map XOA (scheme_get_thread_local_variables()->locked_fd_process_map_)
#define new_port_cust XOA (scheme_get_thread_local_variables()->new_port_cust_) #define new_port_cust XOA (scheme_get_thread_local_variables()->new_port_cust_)
#define scheme_break_semaphore XOA (scheme_get_thread_local_variables()->scheme_break_semaphore_) #define scheme_break_semaphore XOA (scheme_get_thread_local_variables()->scheme_break_semaphore_)

View File

@ -12,6 +12,9 @@ static int mark_listener_MARK(void *p, struct NewGC *gc) {
gcMARK2(l->mref, gc); gcMARK2(l->mref, gc);
# ifdef HAVE_POLL_SYSCALL
gcMARK2(l->pfd, gc);
# endif
return return
gcBYTES_TO_WORDS(sizeof(listener_t) + ((l->count - mzFLEX_DELTA) * sizeof(tcp_t))); gcBYTES_TO_WORDS(sizeof(listener_t) + ((l->count - mzFLEX_DELTA) * sizeof(tcp_t)));
@ -22,6 +25,9 @@ static int mark_listener_FIXUP(void *p, struct NewGC *gc) {
gcFIXUP2(l->mref, gc); gcFIXUP2(l->mref, gc);
# ifdef HAVE_POLL_SYSCALL
gcFIXUP2(l->pfd, gc);
# endif
return return
gcBYTES_TO_WORDS(sizeof(listener_t) + ((l->count - mzFLEX_DELTA) * sizeof(tcp_t))); gcBYTES_TO_WORDS(sizeof(listener_t) + ((l->count - mzFLEX_DELTA) * sizeof(tcp_t)));

View File

@ -294,6 +294,22 @@ void scheme_init_network(Scheme_Env *env)
#endif #endif
} }
static int check_fd_sema(tcp_t s, int mode, Scheme_Schedule_Info *sinfo)
{
Scheme_Object *sema;
sema = scheme_fd_to_semaphore(s, mode);
if (sema) {
if (!scheme_wait_sema(sema, 1)) {
if (sinfo)
scheme_set_sync_target(sinfo, sema, NULL, NULL, 0, 0, NULL);
return 0;
}
}
return 1;
}
/*========================================================================*/ /*========================================================================*/
/* TCP glue */ /* TCP glue */
@ -888,13 +904,41 @@ static void TCP_INIT(char *name)
/* Forward declaration */ /* Forward declaration */
static int stop_listener(Scheme_Object *o); static int stop_listener(Scheme_Object *o);
static int tcp_check_accept(Scheme_Object *_listener) static Scheme_Object *listener_to_evt(listener_t *listener)
{
Scheme_Object **a, *sema;
int i;
a = MALLOC_N(Scheme_Object*, listener->count);
for (i = listener->count; i--; ) {
sema = scheme_fd_to_semaphore(listener->s[i], MZFD_CREATE_READ);
if (!sema)
return NULL;
a[i] = sema;
}
return scheme_make_evt_set(listener->count, a);
}
static int tcp_check_accept(Scheme_Object *_listener, Scheme_Schedule_Info *sinfo)
{ {
#ifdef USE_SOCKETS_TCP #ifdef USE_SOCKETS_TCP
listener_t *listener = (listener_t *)_listener; listener_t *listener = (listener_t *)_listener;
# ifdef HAVE_POLL_SYSCALL
int sr, i; int sr, i;
# ifndef HAVE_POLL_SYSCALL
tcp_t s, mx;
DECL_OS_FDSET(readfds);
DECL_OS_FDSET(exnfds);
struct timeval time = {0, 0};
# endif
for (i = listener->count; i--; ) {
if (check_fd_sema(listener->s[i], MZFD_CHECK_READ, NULL))
break;
}
if (i < 0) return 0;
# ifdef HAVE_POLL_SYSCALL
if (LISTENER_WAS_CLOSED(listener)) if (LISTENER_WAS_CLOSED(listener))
return 1; return 1;
@ -909,13 +953,9 @@ static int tcp_check_accept(Scheme_Object *_listener)
} }
} }
return sr; if (sr)
return sr;
# else # else
tcp_t s, mx;
DECL_OS_FDSET(readfds);
DECL_OS_FDSET(exnfds);
struct timeval time = {0, 0};
int sr, i;
INIT_DECL_OS_RD_FDSET(readfds); INIT_DECL_OS_RD_FDSET(readfds);
INIT_DECL_OS_ER_FDSET(exnfds); INIT_DECL_OS_ER_FDSET(exnfds);
@ -948,9 +988,24 @@ static int tcp_check_accept(Scheme_Object *_listener)
} }
} }
return sr; if (sr)
return sr;
# endif # endif
if (sinfo) {
Scheme_Object *evt;
evt = listener_to_evt(listener);
if (evt)
scheme_set_sync_target(sinfo, evt, NULL, NULL, 0, 0, NULL);
} else {
for (i = listener->count; i--; ) {
check_fd_sema(listener->s[i], MZFD_CREATE_READ, NULL);
}
}
#endif #endif
return 0;
} }
static void tcp_accept_needs_wakeup(Scheme_Object *_listener, void *fds) static void tcp_accept_needs_wakeup(Scheme_Object *_listener, void *fds)
@ -973,7 +1028,7 @@ static void tcp_accept_needs_wakeup(Scheme_Object *_listener, void *fds)
#endif #endif
} }
static int tcp_check_connect(Scheme_Object *connector_p) static int tcp_check_connect(Scheme_Object *connector_p, Scheme_Schedule_Info *sinfo)
{ {
#ifdef USE_SOCKETS_TCP #ifdef USE_SOCKETS_TCP
tcp_t s; tcp_t s;
@ -981,6 +1036,9 @@ static int tcp_check_connect(Scheme_Object *connector_p)
s = *(tcp_t *)connector_p; s = *(tcp_t *)connector_p;
if (!check_fd_sema(s, MZFD_CHECK_WRITE, sinfo))
return 0;
# ifdef HAVE_POLL_SYSCALL # ifdef HAVE_POLL_SYSCALL
{ {
GC_CAN_IGNORE struct pollfd pfd[1]; GC_CAN_IGNORE struct pollfd pfd[1];
@ -990,9 +1048,9 @@ static int tcp_check_connect(Scheme_Object *connector_p)
sr = poll(pfd, 1, 0); sr = poll(pfd, 1, 0);
} while ((sr == -1) && (errno == EINTR)); } while ((sr == -1) && (errno == EINTR));
if (!sr) if (!sr) {
return 0; /* fall through */
else if (pfd[0].revents & POLLOUT) } else if (pfd[0].revents & POLLOUT)
return 1; return 1;
else else
return -1; return -1;
@ -1016,17 +1074,19 @@ static int tcp_check_connect(Scheme_Object *connector_p)
sr = select(s + 1, NULL, writefds, exnfds, &time); sr = select(s + 1, NULL, writefds, exnfds, &time);
} while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR));
if (!sr) if (!sr) {
return 0; /* fall through */
else if (FD_ISSET(s, exnfds)) } else if (FD_ISSET(s, exnfds))
return -1; return -1;
else else
return 1; return 1;
} }
# endif # endif
#else
return 0; check_fd_sema(s, MZFD_CREATE_WRITE, sinfo);
#endif #endif
return 0;
} }
static void tcp_connect_needs_wakeup(Scheme_Object *connector_p, void *fds) static void tcp_connect_needs_wakeup(Scheme_Object *connector_p, void *fds)
@ -1043,13 +1103,16 @@ static void tcp_connect_needs_wakeup(Scheme_Object *connector_p, void *fds)
#endif #endif
} }
static int tcp_check_write(Scheme_Object *port) static int tcp_check_write(Scheme_Object *port, Scheme_Schedule_Info *sinfo)
{ {
Scheme_Tcp *data = (Scheme_Tcp *)((Scheme_Output_Port *)port)->port_data; Scheme_Tcp *data = (Scheme_Tcp *)((Scheme_Output_Port *)port)->port_data;
if (((Scheme_Output_Port *)port)->closed) if (((Scheme_Output_Port *)port)->closed)
return 1; return 1;
if (!check_fd_sema(data->tcp, MZFD_CHECK_WRITE, sinfo))
return 0;
#ifdef USE_SOCKETS_TCP #ifdef USE_SOCKETS_TCP
# ifdef HAVE_POLL_SYSCALL # ifdef HAVE_POLL_SYSCALL
{ {
@ -1062,9 +1125,9 @@ static int tcp_check_write(Scheme_Object *port)
sr = poll(pfd, 1, 0); sr = poll(pfd, 1, 0);
} while ((sr == -1) && (errno == EINTR)); } while ((sr == -1) && (errno == EINTR));
if (!sr) if (!sr) {
return 0; /* fall through */
else if (pfd[0].revents & POLLOUT) } else if (pfd[0].revents & POLLOUT)
return 1; return 1;
else else
return -1; return -1;
@ -1091,7 +1154,8 @@ static int tcp_check_write(Scheme_Object *port)
sr = select(s + 1, NULL, writefds, exnfds, &time); sr = select(s + 1, NULL, writefds, exnfds, &time);
} while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR));
return sr; if (sr)
return sr;
} }
# endif # endif
#else #else
@ -1115,6 +1179,10 @@ static int tcp_check_write(Scheme_Object *port)
return !!bytes; return !!bytes;
} }
#endif #endif
check_fd_sema(data->tcp, MZFD_CREATE_WRITE, sinfo);
return 0;
} }
static void tcp_write_needs_wakeup(Scheme_Object *port, void *fds) static void tcp_write_needs_wakeup(Scheme_Object *port, void *fds)
@ -1168,7 +1236,7 @@ static Scheme_Tcp *make_tcp_port_data(MAKE_TCP_ARG int refcount)
return data; return data;
} }
static int tcp_byte_ready (Scheme_Input_Port *port) static int tcp_byte_ready (Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo)
{ {
Scheme_Tcp *data; Scheme_Tcp *data;
#ifdef USE_SOCKETS_TCP #ifdef USE_SOCKETS_TCP
@ -1193,6 +1261,9 @@ static int tcp_byte_ready (Scheme_Input_Port *port)
if (data->b.bufpos < data->b.bufmax) if (data->b.bufpos < data->b.bufmax)
return 1; return 1;
if (!check_fd_sema(data->tcp, MZFD_CHECK_READ, sinfo))
return 0;
#ifdef USE_SOCKETS_TCP #ifdef USE_SOCKETS_TCP
# ifdef HAVE_POLL_SYSCALL # ifdef HAVE_POLL_SYSCALL
{ {
@ -1204,7 +1275,8 @@ static int tcp_byte_ready (Scheme_Input_Port *port)
sr = poll(pfd, 1, 0); sr = poll(pfd, 1, 0);
} while ((sr == -1) && (errno == EINTR)); } while ((sr == -1) && (errno == EINTR));
return sr; if (sr)
return sr;
} }
# else # else
{ {
@ -1217,11 +1289,14 @@ static int tcp_byte_ready (Scheme_Input_Port *port)
sr = select(data->tcp + 1, readfds, NULL, exfds, &time); sr = select(data->tcp + 1, readfds, NULL, exfds, &time);
} while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR));
return sr; if (sr)
return sr;
} }
# endif # endif
#endif #endif
check_fd_sema(data->tcp, MZFD_CREATE_READ, sinfo);
return 0; return 0;
} }
@ -1257,16 +1332,23 @@ static intptr_t tcp_get_string(Scheme_Input_Port *port,
return n; return n;
} }
while (!tcp_byte_ready(port)) { while (!tcp_byte_ready(port, NULL)) {
if (nonblock > 0) if (nonblock > 0)
return 0; return 0;
#ifdef USE_SOCKETS_TCP #ifdef USE_SOCKETS_TCP
scheme_block_until_unless((Scheme_Ready_Fun)tcp_byte_ready, {
scheme_need_wakeup, Scheme_Object *sema;
(Scheme_Object *)port, sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_READ);
0.0, unless, if (sema)
nonblock); scheme_wait_sema(sema, nonblock ? -1 : 0);
else
scheme_block_until_unless((Scheme_Ready_Fun)tcp_byte_ready,
scheme_need_wakeup,
(Scheme_Object *)port,
0.0, unless,
nonblock);
}
#else #else
do { do {
scheme_thread_block_enable_break((float)0.0, nonblock); scheme_thread_block_enable_break((float)0.0, nonblock);
@ -1378,6 +1460,7 @@ static void tcp_close_input(Scheme_Input_Port *port)
closesocket(data->tcp); closesocket(data->tcp);
#endif #endif
(void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE);
} }
static int static int
@ -1457,8 +1540,17 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port,
return 0; return 0;
/* Block for writing: */ /* Block for writing: */
scheme_block_until_enable_break(tcp_check_write, tcp_write_needs_wakeup, (Scheme_Object *)port, {
(float)0.0, enable_break); Scheme_Object *sema;
sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_WRITE);
if (sema)
scheme_wait_sema(sema, enable_break ? -1 : 0);
else
scheme_block_until_enable_break((Scheme_Ready_Fun)tcp_check_write,
tcp_write_needs_wakeup,
(Scheme_Object *)port,
(float)0.0, enable_break);
}
/* Closed while blocking? */ /* Closed while blocking? */
if (((Scheme_Output_Port *)port)->closed) { if (((Scheme_Output_Port *)port)->closed) {
@ -1576,6 +1668,7 @@ static void tcp_close_output(Scheme_Output_Port *port)
closesocket(data->tcp); closesocket(data->tcp);
#endif #endif
(void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE);
} }
static int static int
@ -1611,7 +1704,7 @@ make_tcp_input_port_symbol_name(void *data, Scheme_Object *name, Scheme_Object *
NULL, NULL,
scheme_progress_evt_via_get, scheme_progress_evt_via_get,
scheme_peeked_read_via_get, scheme_peeked_read_via_get,
tcp_byte_ready, (Scheme_In_Ready_Fun)tcp_byte_ready,
tcp_close_input, tcp_close_input,
tcp_need_wakeup, tcp_need_wakeup,
1); 1);
@ -1673,6 +1766,7 @@ typedef struct Close_Socket_Data {
static void closesocket_w_decrement(Close_Socket_Data *csd) static void closesocket_w_decrement(Close_Socket_Data *csd)
{ {
closesocket(csd->s); closesocket(csd->s);
(void)scheme_fd_to_semaphore(csd->s, MZFD_REMOVE);
if (csd->src_addr) if (csd->src_addr)
mz_freeaddrinfo(csd->src_addr); mz_freeaddrinfo(csd->src_addr);
mz_freeaddrinfo(csd->dest_addr); mz_freeaddrinfo(csd->dest_addr);
@ -1796,6 +1890,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[])
if (inprogress) { if (inprogress) {
tcp_t *sptr; tcp_t *sptr;
Close_Socket_Data *csd; Close_Socket_Data *csd;
Scheme_Object *sema;
sptr = (tcp_t *)scheme_malloc_atomic(sizeof(tcp_t)); sptr = (tcp_t *)scheme_malloc_atomic(sizeof(tcp_t));
*sptr = s; *sptr = s;
@ -1805,8 +1900,16 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[])
csd->src_addr = tcp_connect_src; csd->src_addr = tcp_connect_src;
csd->dest_addr = tcp_connect_dest; csd->dest_addr = tcp_connect_dest;
sema = scheme_fd_to_semaphore(s, MZFD_CREATE_WRITE);
BEGIN_ESCAPEABLE(closesocket_w_decrement, csd); BEGIN_ESCAPEABLE(closesocket_w_decrement, csd);
scheme_block_until(tcp_check_connect, tcp_connect_needs_wakeup, (void *)sptr, (float)0.0); if (sema)
scheme_wait_sema(sema, 0);
else
scheme_block_until((Scheme_Ready_Fun)tcp_check_connect,
tcp_connect_needs_wakeup,
(void *)sptr,
(float)0.0);
END_ESCAPEABLE(); END_ESCAPEABLE();
/* Check whether connect succeeded, or get error: */ /* Check whether connect succeeded, or get error: */
@ -1823,7 +1926,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[])
/* getsockopt() seems not to work in Windows 95, so use the /* getsockopt() seems not to work in Windows 95, so use the
result from select(), which seems to reliably detect an error condition */ result from select(), which seems to reliably detect an error condition */
if (!status) { if (!status) {
if (tcp_check_connect((Scheme_Object *)sptr) == -1) { if (tcp_check_connect((Scheme_Object *)sptr, NULL) == -1) {
status = 1; status = 1;
errno = WSAECONNREFUSED; /* guess! */ errno = WSAECONNREFUSED; /* guess! */
} }
@ -1851,6 +1954,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[])
} else { } else {
errid = errno; errid = errno;
closesocket(s); closesocket(s);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE);
errpart = 6; errpart = 6;
} }
} else { } else {
@ -2055,6 +2159,7 @@ tcp_listen(int argc, Scheme_Object *argv[])
} else { } else {
errid = errno; errid = errno;
closesocket(s); closesocket(s);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE);
errno = errid; errno = errid;
s = INVALID_SOCKET; s = INVALID_SOCKET;
} }
@ -2197,6 +2302,7 @@ static int stop_listener(Scheme_Object *o)
s = listener->s[i]; s = listener->s[i];
UNREGISTER_SOCKET(s); UNREGISTER_SOCKET(s);
closesocket(s); closesocket(s);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE);
listener->s[i] = INVALID_SOCKET; listener->s[i] = INVALID_SOCKET;
} }
scheme_remove_managed(((listener_t *)o)->mref, o); scheme_remove_managed(((listener_t *)o)->mref, o);
@ -2251,7 +2357,7 @@ tcp_accept_ready(int argc, Scheme_Object *argv[])
return NULL; return NULL;
} }
ready = tcp_check_accept(argv[0]); ready = tcp_check_accept(argv[0], NULL);
return (ready ? scheme_true : scheme_false); return (ready ? scheme_true : scheme_false);
#else #else
@ -2283,10 +2389,18 @@ do_tcp_accept(int argc, Scheme_Object *argv[], Scheme_Object *cust, char **_fail
was_closed = LISTENER_WAS_CLOSED(listener); was_closed = LISTENER_WAS_CLOSED(listener);
if (!was_closed) { if (!was_closed) {
ready_pos = tcp_check_accept(listener); ready_pos = tcp_check_accept(listener, NULL);
if (!ready_pos) { if (!ready_pos) {
scheme_block_until(tcp_check_accept, tcp_accept_needs_wakeup, listener, 0.0); Scheme_Object *evt;
ready_pos = tcp_check_accept(listener); evt = listener_to_evt((listener_t *)listener);
if (evt)
scheme_sync(1, &evt);
else
scheme_block_until((Scheme_Ready_Fun)tcp_check_accept,
tcp_accept_needs_wakeup,
listener,
0.0);
ready_pos = tcp_check_accept(listener, NULL);
} }
was_closed = LISTENER_WAS_CLOSED(listener); was_closed = LISTENER_WAS_CLOSED(listener);
} else } else
@ -2369,7 +2483,7 @@ tcp_accept_break(int argc, Scheme_Object *argv[])
void register_network_evts() void register_network_evts()
{ {
#ifdef USE_TCP #ifdef USE_TCP
scheme_add_evt(scheme_listener_type, tcp_check_accept, tcp_accept_needs_wakeup, NULL, 0); scheme_add_evt(scheme_listener_type, (Scheme_Ready_Fun)tcp_check_accept, tcp_accept_needs_wakeup, NULL, 0);
scheme_add_evt(scheme_tcp_accept_evt_type, (Scheme_Ready_Fun)tcp_check_accept_evt, tcp_accept_evt_needs_wakeup, NULL, 0); scheme_add_evt(scheme_tcp_accept_evt_type, (Scheme_Ready_Fun)tcp_check_accept_evt, tcp_accept_evt_needs_wakeup, NULL, 0);
# ifdef UDP_IS_SUPPORTED # ifdef UDP_IS_SUPPORTED
scheme_add_evt(scheme_udp_evt_type, (Scheme_Ready_Fun)udp_evt_check_ready, udp_evt_needs_wakeup, NULL, 0); scheme_add_evt(scheme_udp_evt_type, (Scheme_Ready_Fun)udp_evt_check_ready, udp_evt_needs_wakeup, NULL, 0);
@ -2621,7 +2735,7 @@ static Scheme_Object *accept_failed(void *msg, int argc, Scheme_Object **argv)
static int tcp_check_accept_evt(Scheme_Object *ae, Scheme_Schedule_Info *sinfo) static int tcp_check_accept_evt(Scheme_Object *ae, Scheme_Schedule_Info *sinfo)
{ {
if (tcp_check_accept(SCHEME_PTR1_VAL(ae))) { if (tcp_check_accept(SCHEME_PTR1_VAL(ae), NULL)) {
Scheme_Object *a[2]; Scheme_Object *a[2];
char *fail_reason = NULL; char *fail_reason = NULL;
a[0] = SCHEME_PTR1_VAL(ae); a[0] = SCHEME_PTR1_VAL(ae);
@ -2759,6 +2873,7 @@ void scheme_close_socket_fd(intptr_t fd)
#ifdef USE_SOCKETS_TCP #ifdef USE_SOCKETS_TCP
UNREGISTER_SOCKET(fd); UNREGISTER_SOCKET(fd);
closesocket(fd); closesocket(fd);
(void)scheme_fd_to_semaphore(fd, MZFD_REMOVE);
#endif #endif
} }
@ -2786,6 +2901,7 @@ static int udp_close_it(Scheme_Object *_udp)
if (udp->s != INVALID_SOCKET) { if (udp->s != INVALID_SOCKET) {
closesocket(udp->s); closesocket(udp->s);
(void)scheme_fd_to_semaphore(udp->s, MZFD_REMOVE);
udp->s = INVALID_SOCKET; udp->s = INVALID_SOCKET;
scheme_remove_managed(udp->mref, (Scheme_Object *)udp); scheme_remove_managed(udp->mref, (Scheme_Object *)udp);
@ -3092,13 +3208,16 @@ static Scheme_Object *udp_connect(int argc, Scheme_Object *argv[])
#ifdef UDP_IS_SUPPORTED #ifdef UDP_IS_SUPPORTED
static int udp_check_send(Scheme_Object *_udp) static int udp_check_send(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo)
{ {
Scheme_UDP *udp = (Scheme_UDP *)_udp; Scheme_UDP *udp = (Scheme_UDP *)_udp;
if (udp->s == INVALID_SOCKET) if (udp->s == INVALID_SOCKET)
return 1; return 1;
if (!check_fd_sema(udp->s, MZFD_CHECK_WRITE, sinfo))
return 0;
# ifdef HAVE_POLL_SYSCALL # ifdef HAVE_POLL_SYSCALL
{ {
GC_CAN_IGNORE struct pollfd pfd[1]; GC_CAN_IGNORE struct pollfd pfd[1];
@ -3110,7 +3229,8 @@ static int udp_check_send(Scheme_Object *_udp)
sr = poll(pfd, 1, 0); sr = poll(pfd, 1, 0);
} while ((sr == -1) && (errno == EINTR)); } while ((sr == -1) && (errno == EINTR));
return sr; if (sr)
return sr;
} }
# else # else
{ {
@ -3131,9 +3251,14 @@ static int udp_check_send(Scheme_Object *_udp)
sr = select(udp->s + 1, NULL, writefds, exnfds, &time); sr = select(udp->s + 1, NULL, writefds, exnfds, &time);
} while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR));
return sr; if (sr)
return sr;
} }
#endif #endif
check_fd_sema(udp->s, MZFD_CREATE_WRITE, sinfo);
return 0;
} }
static void udp_send_needs_wakeup(Scheme_Object *_udp, void *fds) static void udp_send_needs_wakeup(Scheme_Object *_udp, void *fds)
@ -3190,7 +3315,15 @@ static Scheme_Object *do_udp_send_it(const char *name, Scheme_UDP *udp,
if (WAS_EAGAIN(errid)) { if (WAS_EAGAIN(errid)) {
if (can_block) { if (can_block) {
/* Block and eventually try again. */ /* Block and eventually try again. */
scheme_block_until(udp_check_send, udp_send_needs_wakeup, (Scheme_Object *)udp, 0); Scheme_Object *sema;
sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_WRITE);
if (sema)
scheme_wait_sema(sema, 0);
else
scheme_block_until((Scheme_Ready_Fun)udp_check_send,
udp_send_needs_wakeup,
(Scheme_Object *)udp,
0);
} else } else
return scheme_false; return scheme_false;
} else if (NOT_WINSOCK(errid) != EINTR) } else if (NOT_WINSOCK(errid) != EINTR)
@ -3339,13 +3472,16 @@ static Scheme_Object *udp_send_enable_break(int argc, Scheme_Object *argv[])
#ifdef UDP_IS_SUPPORTED #ifdef UDP_IS_SUPPORTED
static int udp_check_recv(Scheme_Object *_udp) static int udp_check_recv(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo)
{ {
Scheme_UDP *udp = (Scheme_UDP *)_udp; Scheme_UDP *udp = (Scheme_UDP *)_udp;
if (udp->s == INVALID_SOCKET) if (udp->s == INVALID_SOCKET)
return 1; return 1;
if (!check_fd_sema(udp->s, MZFD_CHECK_READ, sinfo))
return 0;
# ifdef HAVE_POLL_SYSCALL # ifdef HAVE_POLL_SYSCALL
{ {
GC_CAN_IGNORE struct pollfd pfd[1]; GC_CAN_IGNORE struct pollfd pfd[1];
@ -3357,7 +3493,8 @@ static int udp_check_recv(Scheme_Object *_udp)
sr = poll(pfd, 1, 0); sr = poll(pfd, 1, 0);
} while ((sr == -1) && (errno == EINTR)); } while ((sr == -1) && (errno == EINTR));
return sr; if (sr)
return sr;
} }
# else # else
{ {
@ -3378,9 +3515,14 @@ static int udp_check_recv(Scheme_Object *_udp)
sr = select(udp->s + 1, readfds, NULL, exnfds, &time); sr = select(udp->s + 1, readfds, NULL, exnfds, &time);
} while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR));
return sr; if (sr)
return sr;
} }
# endif # endif
check_fd_sema(udp->s, MZFD_CREATE_READ, sinfo);
return 0;
} }
static void udp_recv_needs_wakeup(Scheme_Object *_udp, void *fds) static void udp_recv_needs_wakeup(Scheme_Object *_udp, void *fds)
@ -3438,7 +3580,15 @@ static int do_udp_recv(const char *name, Scheme_UDP *udp, char *bstr, intptr_t s
} if (WAS_EAGAIN(errid)) { } if (WAS_EAGAIN(errid)) {
if (can_block) { if (can_block) {
/* Block and eventually try again. */ /* Block and eventually try again. */
scheme_block_until(udp_check_recv, udp_recv_needs_wakeup, (Scheme_Object *)udp, 0); Scheme_Object *sema;
sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_READ);
if (sema)
scheme_wait_sema(sema, 0);
else
scheme_block_until((Scheme_Ready_Fun)udp_check_recv,
udp_recv_needs_wakeup,
(Scheme_Object *)udp,
0);
} else { } else {
v[0] = scheme_false; v[0] = scheme_false;
v[1] = scheme_false; v[1] = scheme_false;
@ -3614,7 +3764,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo)
} else } else
return 0; return 0;
} else { } else {
return udp_check_recv((Scheme_Object *)uw->udp); return udp_check_recv((Scheme_Object *)uw->udp, NULL);
} }
} else { } else {
if (uw->str) { if (uw->str) {
@ -3629,7 +3779,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo)
} else } else
return 0; return 0;
} else } else
return udp_check_send((Scheme_Object *)uw->udp); return udp_check_send((Scheme_Object *)uw->udp, NULL);
} }
} }

View File

@ -344,6 +344,8 @@ THREAD_LOCAL_DECL(Scheme_Object *scheme_orig_stderr_port);
THREAD_LOCAL_DECL(Scheme_Object *scheme_orig_stdin_port); THREAD_LOCAL_DECL(Scheme_Object *scheme_orig_stdin_port);
THREAD_LOCAL_DECL(struct mz_fd_set *scheme_fd_set); THREAD_LOCAL_DECL(struct mz_fd_set *scheme_fd_set);
THREAD_LOCAL_DECL(struct mz_fd_set *scheme_semaphore_fd_set);
THREAD_LOCAL_DECL(Scheme_Hash_Table *scheme_semaphore_fd_mapping);
#ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS #ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
THREAD_LOCAL_DECL(Scheme_Hash_Table *locked_fd_process_map); THREAD_LOCAL_DECL(Scheme_Hash_Table *locked_fd_process_map);
@ -781,6 +783,19 @@ void scheme_alloc_global_fdset() {
REGISTER_SO(scheme_fd_set); REGISTER_SO(scheme_fd_set);
scheme_fd_set = (struct mz_fd_set *)scheme_alloc_fdset_array(3, 0); scheme_fd_set = (struct mz_fd_set *)scheme_alloc_fdset_array(3, 0);
#endif #endif
REGISTER_SO(scheme_semaphore_fd_set);
#ifdef USE_FAR_MZ_FDCALLS
scheme_semaphore_fd_set = (struct mz_fd_set *)scheme_alloc_fdset_array(3, 0);
#else
scheme_semaphore_fd_set = (struct mz_fd_set *)scheme_malloc_atomic(sizeof(struct mz_fd_set));
#endif
scheme_fdzero(MZ_GET_FDSET(scheme_semaphore_fd_set, 0));
scheme_fdzero(MZ_GET_FDSET(scheme_semaphore_fd_set, 1));
scheme_fdzero(MZ_GET_FDSET(scheme_semaphore_fd_set, 2));
REGISTER_SO(scheme_semaphore_fd_mapping);
scheme_semaphore_fd_mapping = scheme_make_hash_table_eqv();
} }
#ifdef HAVE_POLL_SYSCALL #ifdef HAVE_POLL_SYSCALL
@ -815,7 +830,7 @@ void *scheme_alloc_fdset_array(int count, int permanent)
data->pfd = pfd; data->pfd = pfd;
if (permanent) if (permanent)
scheme_dont_gc_ptr(data); scheme_dont_gc_ptr(r);
return r; return r;
} }
@ -844,18 +859,35 @@ void scheme_fdzero(void *fd)
((struct mz_fd_set *)fd)->data->count = scheme_make_integer(0); ((struct mz_fd_set *)fd)->data->count = scheme_make_integer(0);
} }
static int find_fd_pos(struct mz_fd_set_data *data, int n)
{
intptr_t count = SCHEME_INT_VAL(data->count);
intptr_t i;
Scheme_Object *v;
/* This linear search probably isn't good enough for hundreds or
thousands of descriptors, but epoll()/kqueue() mode should handle
that case, anyway. */
for (i = 0; i < count; i++) {
if (data->pfd[i].fd == n) {
return i;
}
}
return -1;
}
void scheme_fdclr(void *fd, int n) void scheme_fdclr(void *fd, int n)
{ {
struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data; struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data;
intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags); intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags);
intptr_t count = SCHEME_INT_VAL(data->count); intptr_t pos;
intptr_t i;
for (i = 0; i < count; i++) { if (!flag) return;
if (data->pfd[i].fd == n) {
data->pfd[i].events -= (data->pfd[i].events & flag); pos = find_fd_pos(data, n);
return; if (pos >= 0) {
} data->pfd[pos].events -= (data->pfd[pos].events & flag);
} }
} }
@ -863,17 +895,18 @@ void scheme_fdset(void *fd, int n)
{ {
struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data; struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data;
intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags); intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags);
intptr_t count = SCHEME_INT_VAL(data->count); intptr_t count, size, pos;
intptr_t size, i;
struct pollfd *pfd; struct pollfd *pfd;
for (i = 0; i < count; i++) { if (!flag) return;
if (data->pfd[i].fd == n) {
data->pfd[i].events |= flag; pos = find_fd_pos(data, n);
return; if (pos >= 0) {
} data->pfd[pos].events |= flag;
return;
} }
count = SCHEME_INT_VAL(data->count);
size = SCHEME_INT_VAL(data->size); size = SCHEME_INT_VAL(data->size);
if (count >= size) { if (count >= size) {
size = size * 2; size = size * 2;
@ -893,20 +926,110 @@ int scheme_fdisset(void *fd, int n)
{ {
struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data; struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data;
intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags); intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags);
intptr_t count = SCHEME_INT_VAL(data->count); intptr_t pos;
intptr_t i;
if (!flag) flag = (POLLERR | POLLHUP); if (!flag) flag = (POLLERR | POLLHUP);
pos = find_fd_pos(data, n);
if (pos >= 0) {
if (data->pfd[pos].revents & flag)
return 1;
else
return 0;
}
return 0;
}
static int cmp_fd(const void *_a, const void *_b)
{
struct pollfd *a = (struct pollfd *)_a;
struct pollfd *b = (struct pollfd *)_b;
return a->fd - b->fd;
}
void *scheme_merge_fd_sets(void *fds, void *src_fds)
{
struct mz_fd_set_data *data = ((struct mz_fd_set *)fds)->data;
struct mz_fd_set_data *src_data = ((struct mz_fd_set *)src_fds)->data;
int i, si, c, sc, j, nc;
struct pollfd *pfds;
scheme_clean_fd_set(fds);
scheme_clean_fd_set(src_fds);
c = SCHEME_INT_VAL(data->count);
sc = SCHEME_INT_VAL(src_data->count);
if (!c)
return src_fds;
if (!sc)
return fds;
qsort(data->pfd, c, sizeof(struct pollfd), cmp_fd);
qsort(src_data->pfd, sc, sizeof(struct pollfd), cmp_fd);
nc = c + sc;
pfds = (struct pollfd *)scheme_malloc_atomic(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE));
j = 0;
for (i = 0, si = 0; (i < c) && (si < sc); ) {
if (data->pfd[i].fd == src_data->pfd[si].fd) {
pfds[j].fd = data->pfd[i].fd;
pfds[j].events = (data->pfd[i].events | src_data->pfd[si].events);
i++;
si++;
} else if (data->pfd[i].fd < src_data->pfd[si].fd) {
pfds[j].fd = data->pfd[i].fd;
pfds[j].events = data->pfd[i].events;
i++;
} else {
pfds[j].fd = src_data->pfd[si].fd;
pfds[j].events = src_data->pfd[si].events;
si++;
}
j++;
}
for ( ; i < c; i++, j++) {
pfds[j].fd = data->pfd[i].fd;
pfds[j].events = data->pfd[i].events;
}
for ( ; si < sc; si++, j++) {
pfds[j].fd = src_data->pfd[si].fd;
pfds[j].events = src_data->pfd[si].events;
}
if (nc > SCHEME_INT_VAL(data->size)) {
data->pfd = pfds;
data->size = scheme_make_integer(nc);
} else
memcpy(data->pfd, pfds, j * sizeof(struct pollfd));
data->count = scheme_make_integer(j);
return fds;
}
void scheme_clean_fd_set(void *fds)
{
struct mz_fd_set_data *data = ((struct mz_fd_set *)fds)->data;
intptr_t count = SCHEME_INT_VAL(data->count);
intptr_t i, j = 0;
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
if (data->pfd[i].fd == n) { if (data->pfd[i].events) {
if (data->pfd[i].revents & flag) if (j < i) {
return 1; data->pfd[j].fd = data->pfd[i].fd;
else data->pfd[j].events = data->pfd[i].events;
return 0; }
j++;
} }
} }
count = j;
data->count = scheme_make_integer(count);
}
int scheme_get_fd_limit(void *fds)
{
return 0; return 0;
} }
@ -1115,6 +1238,79 @@ int scheme_fdisset(void *fd, int n)
#endif #endif
} }
void *scheme_merge_fd_sets(void *fds, void *src_fds)
{
#if defined(WIN32_FD_HANDLES)
win_extended_fd_set *efd = (win_extended_fd_set *)src_fds;
int i;
for (i = SCHEME_INT_VAL(efd->added); i--; ) {
if (efd->sockets[i] != INVALID_SOCKET)
scheme_fdset(fds, efd->sockets[i]);
}
return fds;
#else
int i, j;
GC_CAN_IGNORE unsigned char *p, *sp;
for (j = 0; j < 3; j++) {
p = scheme_get_fdset(fds, j);
sp = scheme_get_fdset(src_fds, j);
if (FDSET_LIMIT(sp) > FDSET_LIMIT(p)) {
i = FDSET_LIMIT(sp);
FDSET_LIMIT(p) = i;
}
# if defined(USE_DYNAMIC_FDSET_SIZE)
i = dynamic_fd_size;
# else
i = sizeof(fd_set);
# endif
for (; i--; p++, sp++) {
*p |= *sp;
}
}
return fds;
#endif
}
void scheme_clean_fd_set(void *fds)
{
}
int scheme_get_fd_limit(void *fds)
{
int limit, actual_limit;
fd_set *rd, *wr, *ex;
# ifdef USE_WINSOCK_TCP
limit = 0;
# else
# ifdef USE_ULIMIT
limit = ulimit(4, 0);
# else
# ifdef FIXED_FD_LIMIT
limit = FIXED_FD_LIMIT;
# else
limit = getdtablesize();
# endif
# endif
# endif
rd = (fd_set *)fds;
wr = (fd_set *)MZ_GET_FDSET(fds, 1);
ex = (fd_set *)MZ_GET_FDSET(fds, 2);
# ifdef STORED_ACTUAL_FDSET_LIMIT
actual_limit = FDSET_LIMIT(rd);
if (FDSET_LIMIT(wr) > actual_limit)
actual_limit = FDSET_LIMIT(wr);
if (FDSET_LIMIT(ex) > actual_limit)
actual_limit = FDSET_LIMIT(ex);
actual_limit++;
# else
actual_limit = limit;
# endif
return actual_limit;
}
#endif #endif
void scheme_add_fd_handle(void *h, void *fds, int repost) void scheme_add_fd_handle(void *h, void *fds, int repost)
@ -1691,9 +1887,9 @@ static int output_ready(Scheme_Object *port, Scheme_Schedule_Info *sinfo)
} }
if (op->ready_fun) { if (op->ready_fun) {
Scheme_Out_Ready_Fun rf; Scheme_Out_Ready_Fun_FPC rf;
rf = op->ready_fun; rf = (Scheme_Out_Ready_Fun_FPC)op->ready_fun;
return rf(op); return rf(op, sinfo);
} }
return 1; return 1;
@ -1714,6 +1910,8 @@ static void output_need_wakeup (Scheme_Object *port, void *fds)
} }
} }
static int byte_input_ready (Scheme_Object *port, Scheme_Schedule_Info *sinfo);
int scheme_byte_ready_or_user_port_ready(Scheme_Object *p, Scheme_Schedule_Info *sinfo) int scheme_byte_ready_or_user_port_ready(Scheme_Object *p, Scheme_Schedule_Info *sinfo)
{ {
Scheme_Input_Port *ip; Scheme_Input_Port *ip;
@ -1732,14 +1930,14 @@ int scheme_byte_ready_or_user_port_ready(Scheme_Object *p, Scheme_Schedule_Info
scheduler isn't requesting the status, we need sinfo. */ scheduler isn't requesting the status, we need sinfo. */
return scheme_user_port_byte_probably_ready(ip, sinfo); return scheme_user_port_byte_probably_ready(ip, sinfo);
} else } else
return scheme_byte_ready(p); return byte_input_ready(p, sinfo);
} }
static void register_port_wait() static void register_port_wait()
{ {
scheme_add_evt(scheme_input_port_type, scheme_add_evt(scheme_input_port_type,
(Scheme_Ready_Fun)scheme_byte_ready_or_user_port_ready, scheme_need_wakeup, (Scheme_Ready_Fun)scheme_byte_ready_or_user_port_ready, scheme_need_wakeup,
evt_input_port_p, 1); evt_input_port_p, 1);
scheme_add_evt(scheme_output_port_type, scheme_add_evt(scheme_output_port_type,
(Scheme_Ready_Fun)output_ready, output_need_wakeup, (Scheme_Ready_Fun)output_ready, output_need_wakeup,
evt_output_port_p, 1); evt_output_port_p, 1);
@ -3381,8 +3579,7 @@ scheme_ungetc (int ch, Scheme_Object *port)
} }
} }
int int byte_input_ready (Scheme_Object *port, Scheme_Schedule_Info *sinfo)
scheme_byte_ready (Scheme_Object *port)
{ {
Scheme_Input_Port *ip; Scheme_Input_Port *ip;
int retval; int retval;
@ -3397,13 +3594,20 @@ scheme_byte_ready (Scheme_Object *port)
|| pipe_char_count(ip->peeked_read))) || pipe_char_count(ip->peeked_read)))
retval = 1; retval = 1;
else { else {
Scheme_In_Ready_Fun f = ip->byte_ready_fun; Scheme_In_Ready_Fun_FPC f;
retval = f(ip); f = (Scheme_In_Ready_Fun_FPC)ip->byte_ready_fun;
retval = f(ip, NULL);
} }
return retval; return retval;
} }
int
scheme_byte_ready (Scheme_Object *port)
{
return byte_input_ready(port, NULL);
}
int int
scheme_char_ready (Scheme_Object *port) scheme_char_ready (Scheme_Object *port)
{ {
@ -5760,17 +5964,26 @@ static intptr_t fd_get_string_slow(Scheme_Input_Port *port,
int none_avail = 0; int none_avail = 0;
int target_size, target_offset, ext_target; int target_size, target_offset, ext_target;
char *target; char *target;
Scheme_Object *sema;
/* If no chars appear to be ready, go to sleep. */ /* If no chars appear to be ready, go to sleep. */
while (!fd_byte_ready(port)) { while (!fd_byte_ready(port)) {
if (nonblock > 0) if (nonblock > 0)
return 0; return 0;
scheme_block_until_unless((Scheme_Ready_Fun)fd_byte_ready, #ifdef WINDOWS_FILE_HANDLES
(Scheme_Needs_Wakeup_Fun)fd_need_wakeup, sema = NULL;
(Scheme_Object *)port, #else
0.0, unless, sema = scheme_fd_to_semaphore(fip->fd, MZFD_CREATE_READ);
nonblock); #endif
if (sema)
scheme_wait_sema(sema, nonblock ? -1 : 0);
else
scheme_block_until_unless((Scheme_Ready_Fun)fd_byte_ready,
(Scheme_Needs_Wakeup_Fun)fd_need_wakeup,
(Scheme_Object *)port,
0.0, unless,
nonblock);
scheme_wait_input_allowed(port, nonblock); scheme_wait_input_allowed(port, nonblock);
@ -6061,6 +6274,7 @@ fd_close_input(Scheme_Input_Port *port)
# ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS # ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
release_lockf(fip->fd); release_lockf(fip->fd);
# endif # endif
(void)scheme_fd_to_semaphore(fip->fd, MZFD_REMOVE);
} }
} }
#endif #endif
@ -7181,16 +7395,27 @@ static intptr_t flush_fd(Scheme_Output_Port *op,
return wrote; return wrote;
} else if (full_write_buffer) { } else if (full_write_buffer) {
/* Need to block; remember that we're holding a lock. */ /* Need to block; remember that we're holding a lock. */
Scheme_Object *sema;
if (immediate_only == 2) { if (immediate_only == 2) {
fop->flushing = 0; fop->flushing = 0;
return wrote; return wrote;
} }
#ifdef WINDOWS_FILE_HANDLES
sema = NULL;
#else
sema = scheme_fd_to_semaphore(fop->fd, MZFD_CREATE_WRITE);
#endif
BEGIN_ESCAPEABLE(release_flushing_lock, fop); BEGIN_ESCAPEABLE(release_flushing_lock, fop);
scheme_block_until_enable_break(fd_write_ready, if (sema)
fd_write_need_wakeup, scheme_wait_sema(sema, enable_break ? -1 : 0);
(Scheme_Object *)op, 0.0, else
enable_break); scheme_block_until_enable_break(fd_write_ready,
fd_write_need_wakeup,
(Scheme_Object *)op, 0.0,
enable_break);
END_ESCAPEABLE(); END_ESCAPEABLE();
} else { } else {
fop->flushing = 0; fop->flushing = 0;
@ -7352,6 +7577,7 @@ fd_close_output(Scheme_Output_Port *port)
# ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS # ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
release_lockf(fop->fd); release_lockf(fop->fd);
# endif # endif
(void)scheme_fd_to_semaphore(fop->fd, MZFD_REMOVE);
} }
} }
#endif #endif
@ -9387,7 +9613,7 @@ static void default_sleep(float v, void *fds)
#if defined(FILES_HAVE_FDS) || defined(USE_WINSOCK_TCP) #if defined(FILES_HAVE_FDS) || defined(USE_WINSOCK_TCP)
# ifndef HAVE_POLL_SYSCALL # ifndef HAVE_POLL_SYSCALL
int limit, actual_limit; int actual_limit;
fd_set *rd, *wr, *ex; fd_set *rd, *wr, *ex;
struct timeval time; struct timeval time;
# endif # endif
@ -9416,33 +9642,11 @@ static void default_sleep(float v, void *fds)
time.tv_usec = usecs; time.tv_usec = usecs;
} }
# ifdef USE_WINSOCK_TCP
limit = 0;
# else
# ifdef USE_ULIMIT
limit = ulimit(4, 0);
# else
# ifdef FIXED_FD_LIMIT
limit = FIXED_FD_LIMIT;
# else
limit = getdtablesize();
# endif
# endif
# endif
rd = (fd_set *)fds; rd = (fd_set *)fds;
wr = (fd_set *)MZ_GET_FDSET(fds, 1); wr = (fd_set *)MZ_GET_FDSET(fds, 1);
ex = (fd_set *)MZ_GET_FDSET(fds, 2); ex = (fd_set *)MZ_GET_FDSET(fds, 2);
# ifdef STORED_ACTUAL_FDSET_LIMIT
actual_limit = FDSET_LIMIT(rd); actual_limit = scheme_get_fd_limit(fds);
if (FDSET_LIMIT(wr) > actual_limit)
actual_limit = FDSET_LIMIT(wr);
if (FDSET_LIMIT(ex) > actual_limit)
actual_limit = FDSET_LIMIT(ex);
actual_limit++;
# else
actual_limit = limit;
# endif
# endif # endif
/******* Start Windows stuff *******/ /******* Start Windows stuff *******/

View File

@ -949,6 +949,8 @@ MZ_EXTERN int scheme_get_port_socket(Scheme_Object *p, intptr_t *_s);
MZ_EXTERN void scheme_socket_to_ports(intptr_t s, const char *name, int takeover, MZ_EXTERN void scheme_socket_to_ports(intptr_t s, const char *name, int takeover,
Scheme_Object **_inp, Scheme_Object **_outp); Scheme_Object **_inp, Scheme_Object **_outp);
MZ_EXTERN Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode);
MZ_EXTERN void scheme_set_type_printer(Scheme_Type stype, Scheme_Type_Printer printer); MZ_EXTERN void scheme_set_type_printer(Scheme_Type stype, Scheme_Type_Printer printer);
MZ_EXTERN void scheme_print_bytes(Scheme_Print_Params *pp, const char *str, int offset, int len); MZ_EXTERN void scheme_print_bytes(Scheme_Print_Params *pp, const char *str, int offset, int len);
MZ_EXTERN void scheme_print_utf8(Scheme_Print_Params *pp, const char *str, int offset, int len); MZ_EXTERN void scheme_print_utf8(Scheme_Print_Params *pp, const char *str, int offset, int len);

View File

@ -780,6 +780,7 @@ intptr_t (*scheme_get_port_fd)(Scheme_Object *p);
int (*scheme_get_port_socket)(Scheme_Object *p, intptr_t *_s); int (*scheme_get_port_socket)(Scheme_Object *p, intptr_t *_s);
void (*scheme_socket_to_ports)(intptr_t s, const char *name, int takeover, void (*scheme_socket_to_ports)(intptr_t s, const char *name, int takeover,
Scheme_Object **_inp, Scheme_Object **_outp); Scheme_Object **_inp, Scheme_Object **_outp);
Scheme_Object *(*scheme_fd_to_semaphore)(intptr_t fd, int mode);
void (*scheme_set_type_printer)(Scheme_Type stype, Scheme_Type_Printer printer); void (*scheme_set_type_printer)(Scheme_Type stype, Scheme_Type_Printer printer);
void (*scheme_print_bytes)(Scheme_Print_Params *pp, const char *str, int offset, int len); void (*scheme_print_bytes)(Scheme_Print_Params *pp, const char *str, int offset, int len);
void (*scheme_print_utf8)(Scheme_Print_Params *pp, const char *str, int offset, int len); void (*scheme_print_utf8)(Scheme_Print_Params *pp, const char *str, int offset, int len);

View File

@ -31,3 +31,7 @@ struct mz_fd_set { fd_set data; };
# define INIT_DECL_WR_FDSET(r) /* empty */ # define INIT_DECL_WR_FDSET(r) /* empty */
# define INIT_DECL_ER_FDSET(r) /* empty */ # define INIT_DECL_ER_FDSET(r) /* empty */
#endif #endif
void *scheme_merge_fd_sets(void *fds, void *src_fds);
void scheme_clean_fd_set(void *fds);
int scheme_get_fd_limit(void *fds);

View File

@ -611,6 +611,8 @@ struct Syncing;
void scheme_accept_sync(struct Syncing *syncing, int i); void scheme_accept_sync(struct Syncing *syncing, int i);
typedef int (*Scheme_Ready_Fun_FPC)(Scheme_Object *o, Scheme_Schedule_Info *sinfo); typedef int (*Scheme_Ready_Fun_FPC)(Scheme_Object *o, Scheme_Schedule_Info *sinfo);
typedef int (*Scheme_Out_Ready_Fun_FPC)(Scheme_Output_Port *port, Scheme_Schedule_Info *sinfo);
typedef int (*Scheme_In_Ready_Fun_FPC)(Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo);
void scheme_check_break_now(void); void scheme_check_break_now(void);

View File

@ -56,6 +56,10 @@
# ifdef USE_BEOS_SOCKET_INCLUDE # ifdef USE_BEOS_SOCKET_INCLUDE
# include <be/net/socket.h> # include <be/net/socket.h>
# endif # endif
# ifdef HAVE_POLL_SYSCALL
# include <poll.h>
# endif
# include <errno.h>
#endif #endif
#ifdef USE_WINSOCK_TCP #ifdef USE_WINSOCK_TCP
# ifdef USE_TCP # ifdef USE_TCP
@ -3385,12 +3389,194 @@ static Scheme_Object *call_as_nested_thread(int argc, Scheme_Object *argv[])
/* thread scheduling and termination */ /* thread scheduling and termination */
/*========================================================================*/ /*========================================================================*/
Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode)
{
#ifdef USE_WINSOCK_TCP
return NULL;
#else
Scheme_Object *key, *v, *s;
void *r, *w, *e;
if (!scheme_semaphore_fd_mapping)
return NULL;
key = scheme_make_integer_value(fd);
v = scheme_hash_get(scheme_semaphore_fd_mapping, key);
if (!v && ((mode == MZFD_CHECK_READ)
|| (mode == MZFD_CHECK_WRITE)
|| (mode == MZFD_REMOVE)))
return NULL;
if (!v) {
v = scheme_make_vector(2, scheme_false);
scheme_hash_set(scheme_semaphore_fd_mapping, key, v);
}
r = MZ_GET_FDSET(scheme_semaphore_fd_set, 0);
w = MZ_GET_FDSET(scheme_semaphore_fd_set, 1);
e = MZ_GET_FDSET(scheme_semaphore_fd_set, 2);
if (mode == MZFD_REMOVE) {
scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL);
MZ_FD_CLR(fd, r);
MZ_FD_CLR(fd, w);
MZ_FD_CLR(fd, e);
s = NULL;
} else if ((mode == MZFD_CHECK_READ)
|| (mode == MZFD_CREATE_READ)) {
s = SCHEME_VEC_ELS(v)[0];
if (SCHEME_FALSEP(s)) {
if (mode == MZFD_CREATE_READ) {
s = scheme_make_sema(0);
SCHEME_VEC_ELS(v)[0] = s;
MZ_FD_SET(fd, r);
MZ_FD_SET(fd, e);
}
}
} else {
s = SCHEME_VEC_ELS(v)[1];
if (SCHEME_FALSEP(s)) {
if (mode == MZFD_CREATE_WRITE) {
s = scheme_make_sema(0);
SCHEME_VEC_ELS(v)[1] = s;
MZ_FD_SET(fd, w);
MZ_FD_SET(fd, e);
}
}
}
return s;
#endif
}
static int check_fd_semaphores()
{
#ifdef USE_WINSOCK_TCP
return 0;
#elif defined(HAVE_POLL_SYSCALL)
struct pollfd *pfd;
intptr_t i, c;
Scheme_Object *v, *s, *key;
int sr, hit = 0;
if (!scheme_semaphore_fd_mapping || !scheme_semaphore_fd_mapping->count)
return 0;
scheme_clean_fd_set(scheme_semaphore_fd_set);
c = SCHEME_INT_VAL(scheme_semaphore_fd_set->data->count);
pfd = scheme_semaphore_fd_set->data->pfd;
do {
sr = poll(pfd, c, 0);
} while ((sr == -1) && (errno == EINTR));
if (sr > 0) {
for (i = 0; i < c; i++) {
if (pfd[i].revents) {
key = scheme_make_integer_value(pfd[i].fd);
v = scheme_hash_get(scheme_semaphore_fd_mapping, key);
if (v) {
if (pfd[i].revents & (POLLIN | POLLHUP | POLLERR)) {
s = SCHEME_VEC_ELS(v)[0];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[0] = scheme_false;
}
pfd[i].revents -= (pfd[i].revents & POLLIN);
}
if (pfd[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
s = SCHEME_VEC_ELS(v)[1];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[1] = scheme_false;
}
pfd[i].revents -= (pfd[i].revents & POLLOUT);
}
if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0])
&& SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1]))
scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL);
}
}
}
}
return hit;
#else
void *fds;
struct timeval time = {0, 0};
int i, actual_limit, r, w, e, sr, hit = 0;
Scheme_Object *key, *v, *s;
DECL_FDSET(set, 3);
fd_set *set1, *set2;
if (!scheme_semaphore_fd_mapping || !scheme_semaphore_fd_mapping->count)
return 0;
INIT_DECL_FDSET(set, set1, set2);
set1 = (fd_set *) MZ_GET_FDSET(set, 1);
set2 = (fd_set *) MZ_GET_FDSET(set, 2);
fds = (void *)set;
MZ_FD_ZERO(set);
MZ_FD_ZERO(set1);
MZ_FD_ZERO(set2);
scheme_merge_fd_sets(fds, scheme_semaphore_fd_set);
actual_limit = scheme_get_fd_limit(fds);
do {
sr = select(actual_limit, set, set1, set2, &time);
} while ((sr == -1) && (errno == EINTR));
if (sr > 0) {
for (i = 0; i < actual_limit; i++) {
r = MZ_FD_ISSET(i, set);
w = MZ_FD_ISSET(i, set1);
e = MZ_FD_ISSET(i, set2);
if (r || w || e) {
key = scheme_make_integer_value(i);
v = scheme_hash_get(scheme_semaphore_fd_mapping, key);
if (v) {
if (r || e) {
s = SCHEME_VEC_ELS(v)[0];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[0] = scheme_false;
}
MZ_FD_CLR(i, MZ_GET_FDSET(scheme_semaphore_fd_set, 0));
}
if (w || e) {
s = SCHEME_VEC_ELS(v)[1];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[1] = scheme_false;
}
MZ_FD_CLR(i, MZ_GET_FDSET(scheme_semaphore_fd_set, 1));
}
if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0])
&& SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1])) {
MZ_FD_CLR(i, MZ_GET_FDSET(scheme_semaphore_fd_set, 2));
scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL);
}
}
}
}
}
return hit;
#endif
}
static int check_sleep(int need_activity, int sleep_now) static int check_sleep(int need_activity, int sleep_now)
/* Signals should be suspended */ /* Signals should be suspended */
{ {
Scheme_Thread *p, *p2; Scheme_Thread *p, *p2;
int end_with_act; int end_with_act;
#if defined(USING_FDS) #if defined(USING_FDS)
DECL_FDSET(set, 3); DECL_FDSET(set, 3);
fd_set *set1, *set2; fd_set *set1, *set2;
@ -3506,6 +3692,9 @@ static int check_sleep(int need_activity, int sleep_now)
return 0; return 0;
} }
scheme_clean_fd_set(fds);
fds = scheme_merge_fd_sets(fds, scheme_semaphore_fd_set);
if (sleep_now) { if (sleep_now) {
float mst = (float)max_sleep_time; float mst = (float)max_sleep_time;
@ -4066,6 +4255,7 @@ void scheme_thread_block(float sleep_time)
double sleep_end; double sleep_end;
Scheme_Thread *next; Scheme_Thread *next;
Scheme_Thread *p = scheme_current_thread; Scheme_Thread *p = scheme_current_thread;
int skip_sleep;
if (p->return_marks_to) /* just in case we get here */ if (p->return_marks_to) /* just in case we get here */
return; return;
@ -4136,6 +4326,24 @@ void scheme_thread_block(float sleep_time)
scheme_check_foreign_work(); scheme_check_foreign_work();
#endif #endif
skip_sleep = 0;
if (check_fd_semaphores()) {
/* double check whether a semaphore for this thread woke up: */
if (p->block_descriptor == GENERIC_BLOCKED) {
if (p->block_check) {
Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)p->block_check;
Scheme_Schedule_Info sinfo;
init_schedule_info(&sinfo, p, sleep_end);
if (f(p->blocker, &sinfo)) {
sleep_end = 0;
skip_sleep = 1;
} else {
sleep_end = sinfo.sleep_end;
}
}
}
}
if (!do_atomic && (sleep_end >= 0.0)) { if (!do_atomic && (sleep_end >= 0.0)) {
find_next_thread(&next); find_next_thread(&next);
} else } else
@ -4204,7 +4412,7 @@ void scheme_thread_block(float sleep_time)
} }
} else { } else {
/* If all processes are blocked, check for total process sleeping: */ /* If all processes are blocked, check for total process sleeping: */
if (p->block_descriptor != NOT_BLOCKED) { if ((p->block_descriptor != NOT_BLOCKED) && !skip_sleep) {
check_sleep(1, 1); check_sleep(1, 1);
} }
} }