add epoll() support for Linux, kqueue() support for BSDs

This change builds on the blocking-I/O change to provide substantially
better performance when waiting on lots of sockets.
This commit is contained in:
Matthew Flatt 2011-11-09 14:40:30 -07:00
parent 67df1f7bce
commit eb25a2db47
12 changed files with 545 additions and 64 deletions

View File

@ -650,7 +650,7 @@
(loop (add1 n))
n)))
(test #f sync/timeout 0 sw sr)
(test cr sync/timeout 0 sw sr cr)
(test cr sync sw sr cr) ; sync/timeout 0 might work, but it depends on the underlying transport
;; Flush cr:
(let ([s (make-bytes 4096)])
(let loop ()

142
src/configure vendored
View File

@ -3943,13 +3943,16 @@ case "$host_os" in
LIBS="$LIBS -rdynamic"
DYN_CFLAGS="-fPIC"
enable_pthread=yes
try_kqueue_syscall=yes
;;
openbsd*)
LIBS="$LIBS -rdynamic -Wl,--export-dynamic"
enable_pthread=yes
try_kqueue_syscall=yes
;;
netbsd*)
LIBS="$LIBS -rdynamic"
try_kqueue_syscall=yes
;;
irix*)
enable_cgcdefault="yes"
@ -3964,6 +3967,7 @@ case "$host_os" in
DYN_CFLAGS="-fPIC"
STRIP_DEBUG="strip -S"
try_poll_syscall=yes
try_epoll_syscall=yes
case "$host_cpu" in
#Required for CentOS 4.6
x86_64)
@ -4046,6 +4050,7 @@ case "$host_os" in
esac
PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT"
try_kqueue_syscall=yes
STRIP_DEBUG="/usr/bin/strip -S"
@ -5220,6 +5225,143 @@ _ACEOF
fi
fi
if test "${try_epoll_syscall}" = "yes" ; then
msg="for epoll"
{ echo "$as_me:$LINENO: checking $msg" >&5
echo $ECHO_N "checking $msg... $ECHO_C" >&6; }
if test "$cross_compiling" = yes; then
use_epoll=no
else
cat >conftest.$ac_ext <<_ACEOF
/* confdefs.h. */
_ACEOF
cat confdefs.h >>conftest.$ac_ext
cat >>conftest.$ac_ext <<_ACEOF
/* end confdefs.h. */
#include <sys/epoll.h>
int main() {
int fd;
struct epoll_event ev;
fd = epoll_create(5);
ev.events = EPOLLIN | EPOLLONESHOT;
epoll_ctl(fd, EPOLL_CTL_ADD, 0, &ev);
return 0;
}
_ACEOF
rm -f conftest$ac_exeext
if { (ac_try="$ac_link"
case "(($ac_try" in
*\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
*) ac_try_echo=$ac_try;;
esac
eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5
(eval "$ac_link") 2>&5
ac_status=$?
echo "$as_me:$LINENO: \$? = $ac_status" >&5
(exit $ac_status); } && { ac_try='./conftest$ac_exeext'
{ (case "(($ac_try" in
*\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
*) ac_try_echo=$ac_try;;
esac
eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5
(eval "$ac_try") 2>&5
ac_status=$?
echo "$as_me:$LINENO: \$? = $ac_status" >&5
(exit $ac_status); }; }; then
use_epoll=yes
else
echo "$as_me: program exited with status $ac_status" >&5
echo "$as_me: failed program was:" >&5
sed 's/^/| /' conftest.$ac_ext >&5
( exit $ac_status )
use_epoll=no
fi
rm -f core *.core core.conftest.* gmon.out bb.out conftest$ac_exeext conftest.$ac_objext conftest.$ac_ext
fi
{ echo "$as_me:$LINENO: result: $use_epoll" >&5
echo "${ECHO_T}$use_epoll" >&6; }
if test "${use_epoll}" = "yes" ; then
cat >>confdefs.h <<\_ACEOF
#define HAVE_EPOLL_SYSCALL 1
_ACEOF
fi
fi
if test "${try_kqueue_syscall}" = "yes" ; then
msg="for kqueue"
{ echo "$as_me:$LINENO: checking $msg" >&5
echo $ECHO_N "checking $msg... $ECHO_C" >&6; }
if test "$cross_compiling" = yes; then
use_kqueue=no
else
cat >conftest.$ac_ext <<_ACEOF
/* confdefs.h. */
_ACEOF
cat confdefs.h >>conftest.$ac_ext
cat >>conftest.$ac_ext <<_ACEOF
/* end confdefs.h. */
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
int main() {
int kq;
struct kevent kev;
struct timespec timeout = {0, 0};
kq = kqueue();
EV_SET(&kev, 0, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, &timeout);
return 0;
}
_ACEOF
rm -f conftest$ac_exeext
if { (ac_try="$ac_link"
case "(($ac_try" in
*\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
*) ac_try_echo=$ac_try;;
esac
eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5
(eval "$ac_link") 2>&5
ac_status=$?
echo "$as_me:$LINENO: \$? = $ac_status" >&5
(exit $ac_status); } && { ac_try='./conftest$ac_exeext'
{ (case "(($ac_try" in
*\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
*) ac_try_echo=$ac_try;;
esac
eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5
(eval "$ac_try") 2>&5
ac_status=$?
echo "$as_me:$LINENO: \$? = $ac_status" >&5
(exit $ac_status); }; }; then
use_kqueue=yes
else
echo "$as_me: program exited with status $ac_status" >&5
echo "$as_me: failed program was:" >&5
sed 's/^/| /' conftest.$ac_ext >&5
( exit $ac_status )
use_kqueue=no
fi
rm -f core *.core core.conftest.* gmon.out bb.out conftest$ac_exeext conftest.$ac_objext conftest.$ac_ext
fi
{ echo "$as_me:$LINENO: result: $use_kqueue" >&5
echo "${ECHO_T}$use_kqueue" >&6; }
if test "${use_kqueue}" = "yes" ; then
cat >>confdefs.h <<\_ACEOF
#define HAVE_KQUEUE_SYSCALL 1
_ACEOF
fi
fi
if test "${enable_libffi}" = "yes" ; then
if test "${enable_foreign}" = "yes" ; then
{ echo "$as_me:$LINENO: checking for libffi" >&5

View File

@ -473,13 +473,16 @@ case "$host_os" in
LIBS="$LIBS -rdynamic"
DYN_CFLAGS="-fPIC"
enable_pthread=yes
try_kqueue_syscall=yes
;;
openbsd*)
LIBS="$LIBS -rdynamic -Wl,--export-dynamic"
enable_pthread=yes
try_kqueue_syscall=yes
;;
netbsd*)
LIBS="$LIBS -rdynamic"
try_kqueue_syscall=yes
;;
irix*)
enable_cgcdefault="yes"
@ -494,6 +497,7 @@ case "$host_os" in
DYN_CFLAGS="-fPIC"
STRIP_DEBUG="strip -S"
try_poll_syscall=yes
try_epoll_syscall=yes
case "$host_cpu" in
#Required for CentOS 4.6
x86_64)
@ -576,6 +580,7 @@ case "$host_os" in
esac
PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT"
try_kqueue_syscall=yes
STRIP_DEBUG="/usr/bin/strip -S"
@ -801,7 +806,48 @@ if test "${try_poll_syscall}" = "yes" ; then
}, use_poll=yes, use_poll=no, use_poll=no)
AC_MSG_RESULT($use_poll)
if test "${use_poll}" = "yes" ; then
AC_DEFINE(HAVE_POLL_SYSCALL,1,[Have poll()])
AC_DEFINE(HAVE_POLL_SYSCALL,1,[Have poll])
fi
fi
if test "${try_epoll_syscall}" = "yes" ; then
[ msg="for epoll" ]
AC_MSG_CHECKING($msg)
AC_TRY_RUN(
[ #include <sys/epoll.h> ]
int main() {
int fd;
struct epoll_event ev;
fd = epoll_create(5);
ev.events = EPOLLIN | EPOLLONESHOT;
epoll_ctl(fd, EPOLL_CTL_ADD, 0, &ev);
return 0;
}, use_epoll=yes, use_epoll=no, use_epoll=no)
AC_MSG_RESULT($use_epoll)
if test "${use_epoll}" = "yes" ; then
AC_DEFINE(HAVE_EPOLL_SYSCALL,1,[Have epoll])
fi
fi
if test "${try_kqueue_syscall}" = "yes" ; then
[ msg="for kqueue" ]
AC_MSG_CHECKING($msg)
AC_TRY_RUN(
[ #include <sys/types.h> ]
[ #include <sys/event.h> ]
[ #include <sys/time.h> ]
int main() {
int kq;
struct kevent kev;
[ struct timespec timeout = {0, 0}; ]
kq = kqueue();
EV_SET(&kev, 0, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &kev, 1, NULL, 0, &timeout);
return 0;
}, use_kqueue=yes, use_kqueue=no, use_kqueue=no)
AC_MSG_RESULT($use_kqueue)
if test "${use_kqueue}" = "yes" ; then
AC_DEFINE(HAVE_KQUEUE_SYSCALL,1,[Have kqueue])
fi
fi

View File

@ -176,6 +176,7 @@ typedef struct Thread_Local_Variables {
struct mz_fd_set *scheme_fd_set_;
struct mz_fd_set *scheme_semaphore_fd_set_;
struct Scheme_Hash_Table *scheme_semaphore_fd_mapping_;
int scheme_semaphore_fd_kqueue_;
#ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
struct Scheme_Hash_Table *locked_fd_process_map_;
#endif
@ -522,6 +523,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define scheme_fd_set XOA (scheme_get_thread_local_variables()->scheme_fd_set_)
#define scheme_semaphore_fd_set XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_set_)
#define scheme_semaphore_fd_mapping XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_mapping_)
#define scheme_semaphore_fd_kqueue XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_kqueue_)
#define locked_fd_process_map XOA (scheme_get_thread_local_variables()->locked_fd_process_map_)
#define new_port_cust XOA (scheme_get_thread_local_variables()->new_port_cust_)
#define scheme_break_semaphore XOA (scheme_get_thread_local_variables()->scheme_break_semaphore_)

View File

@ -56,8 +56,10 @@ typedef unsigned long uintptr_t;
/* Whether pthread_rwlock is available. */
#undef HAVE_PTHREAD_RWLOCK
/* When poll() is available: */
/* When poll(), epoll(), and/or kqueue() is available: */
#undef HAVE_POLL_SYSCALL
#undef HAVE_EPOLL_SYSCALL
#undef HAVE_KQUEUE_SYSCALL
/* Enable futures: */
#undef MZ_USE_FUTURES

View File

@ -453,6 +453,7 @@ static Scheme_Env *place_instance_init(void *stack_base, int initial_main_os_thr
scheme_init_stx_places(initial_main_os_thread);
scheme_init_sema_places();
scheme_init_gmp_places();
scheme_init_kqueue();
scheme_alloc_global_fdset();
scheme_init_file_places();
#ifndef DONT_USE_FOREIGN
@ -558,6 +559,7 @@ void scheme_place_instance_destroy(int force)
#endif
scheme_free_all_code();
scheme_free_ghbn_data();
scheme_release_kqueue();
}
static void make_kernel_env(void)

View File

@ -294,16 +294,16 @@ void scheme_init_network(Scheme_Env *env)
#endif
}
static int check_fd_sema(tcp_t s, int mode, Scheme_Schedule_Info *sinfo)
static int check_fd_sema(tcp_t s, int mode, Scheme_Schedule_Info *sinfo, Scheme_Object *orig)
{
Scheme_Object *sema;
sema = scheme_fd_to_semaphore(s, mode);
sema = scheme_fd_to_semaphore(s, mode, 1);
if (sema) {
if (!scheme_wait_sema(sema, 1)) {
if (sinfo)
scheme_set_sync_target(sinfo, sema, NULL, NULL, 0, 0, NULL);
if (sinfo && !sinfo->no_redirect)
scheme_set_sync_target(sinfo, sema, orig, NULL, 0, 0, NULL);
return 0;
}
}
@ -911,7 +911,7 @@ static Scheme_Object *listener_to_evt(listener_t *listener)
a = MALLOC_N(Scheme_Object*, listener->count);
for (i = listener->count; i--; ) {
sema = scheme_fd_to_semaphore(listener->s[i], MZFD_CREATE_READ);
sema = scheme_fd_to_semaphore(listener->s[i], MZFD_CREATE_READ, 1);
if (!sema)
return NULL;
a[i] = sema;
@ -932,11 +932,13 @@ static int tcp_check_accept(Scheme_Object *_listener, Scheme_Schedule_Info *sinf
struct timeval time = {0, 0};
# endif
if (!sinfo || !sinfo->is_poll) {
for (i = listener->count; i--; ) {
if (check_fd_sema(listener->s[i], MZFD_CHECK_READ, NULL))
if (check_fd_sema(listener->s[i], MZFD_CHECK_READ, NULL, NULL))
break;
}
if (i < 0) return 0;
}
# ifdef HAVE_POLL_SYSCALL
if (LISTENER_WAS_CLOSED(listener))
@ -992,14 +994,14 @@ static int tcp_check_accept(Scheme_Object *_listener, Scheme_Schedule_Info *sinf
return sr;
# endif
if (sinfo) {
if (sinfo && !sinfo->no_redirect) {
Scheme_Object *evt;
evt = listener_to_evt(listener);
if (evt)
scheme_set_sync_target(sinfo, evt, NULL, NULL, 0, 0, NULL);
scheme_set_sync_target(sinfo, evt, _listener, NULL, 0, 1, NULL);
} else {
for (i = listener->count; i--; ) {
check_fd_sema(listener->s[i], MZFD_CREATE_READ, NULL);
check_fd_sema(listener->s[i], MZFD_CREATE_READ, NULL, NULL);
}
}
@ -1036,8 +1038,10 @@ static int tcp_check_connect(Scheme_Object *connector_p, Scheme_Schedule_Info *s
s = *(tcp_t *)connector_p;
if (!check_fd_sema(s, MZFD_CHECK_WRITE, sinfo))
if (!sinfo || !sinfo->is_poll) {
if (!check_fd_sema(s, MZFD_CHECK_WRITE, sinfo, NULL))
return 0;
}
# ifdef HAVE_POLL_SYSCALL
{
@ -1083,7 +1087,7 @@ static int tcp_check_connect(Scheme_Object *connector_p, Scheme_Schedule_Info *s
}
# endif
check_fd_sema(s, MZFD_CREATE_WRITE, sinfo);
check_fd_sema(s, MZFD_CREATE_WRITE, sinfo, NULL);
#endif
return 0;
@ -1110,8 +1114,10 @@ static int tcp_check_write(Scheme_Object *port, Scheme_Schedule_Info *sinfo)
if (((Scheme_Output_Port *)port)->closed)
return 1;
if (!check_fd_sema(data->tcp, MZFD_CHECK_WRITE, sinfo))
if (!sinfo || !sinfo->is_poll) {
if (!check_fd_sema(data->tcp, MZFD_CHECK_WRITE, sinfo, port))
return 0;
}
#ifdef USE_SOCKETS_TCP
# ifdef HAVE_POLL_SYSCALL
@ -1180,7 +1186,7 @@ static int tcp_check_write(Scheme_Object *port, Scheme_Schedule_Info *sinfo)
}
#endif
check_fd_sema(data->tcp, MZFD_CREATE_WRITE, sinfo);
check_fd_sema(data->tcp, MZFD_CREATE_WRITE, sinfo, port);
return 0;
}
@ -1261,8 +1267,10 @@ static int tcp_byte_ready (Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo)
if (data->b.bufpos < data->b.bufmax)
return 1;
if (!check_fd_sema(data->tcp, MZFD_CHECK_READ, sinfo))
if (!sinfo || !sinfo->is_poll) {
if (!check_fd_sema(data->tcp, MZFD_CHECK_READ, sinfo, (Scheme_Object *)port))
return 0;
}
#ifdef USE_SOCKETS_TCP
# ifdef HAVE_POLL_SYSCALL
@ -1295,7 +1303,7 @@ static int tcp_byte_ready (Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo)
# endif
#endif
check_fd_sema(data->tcp, MZFD_CREATE_READ, sinfo);
check_fd_sema(data->tcp, MZFD_CREATE_READ, sinfo, (Scheme_Object *)port);
return 0;
}
@ -1339,7 +1347,7 @@ static intptr_t tcp_get_string(Scheme_Input_Port *port,
#ifdef USE_SOCKETS_TCP
{
Scheme_Object *sema;
sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_READ);
sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_READ, 1);
if (sema)
scheme_wait_sema(sema, nonblock ? -1 : 0);
else
@ -1460,7 +1468,7 @@ static void tcp_close_input(Scheme_Input_Port *port)
closesocket(data->tcp);
#endif
(void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE, 1);
}
static int
@ -1542,7 +1550,7 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port,
/* Block for writing: */
{
Scheme_Object *sema;
sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_WRITE);
sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_WRITE, 1);
if (sema)
scheme_wait_sema(sema, enable_break ? -1 : 0);
else
@ -1668,7 +1676,7 @@ static void tcp_close_output(Scheme_Output_Port *port)
closesocket(data->tcp);
#endif
(void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE, 1);
}
static int
@ -1766,7 +1774,7 @@ typedef struct Close_Socket_Data {
static void closesocket_w_decrement(Close_Socket_Data *csd)
{
closesocket(csd->s);
(void)scheme_fd_to_semaphore(csd->s, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(csd->s, MZFD_REMOVE, 1);
if (csd->src_addr)
mz_freeaddrinfo(csd->src_addr);
mz_freeaddrinfo(csd->dest_addr);
@ -1900,7 +1908,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[])
csd->src_addr = tcp_connect_src;
csd->dest_addr = tcp_connect_dest;
sema = scheme_fd_to_semaphore(s, MZFD_CREATE_WRITE);
sema = scheme_fd_to_semaphore(s, MZFD_CREATE_WRITE, 1);
BEGIN_ESCAPEABLE(closesocket_w_decrement, csd);
if (sema)
@ -1954,7 +1962,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[])
} else {
errid = errno;
closesocket(s);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE, 1);
errpart = 6;
}
} else {
@ -2159,7 +2167,7 @@ tcp_listen(int argc, Scheme_Object *argv[])
} else {
errid = errno;
closesocket(s);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE, 1);
errno = errid;
s = INVALID_SOCKET;
}
@ -2302,7 +2310,7 @@ static int stop_listener(Scheme_Object *o)
s = listener->s[i];
UNREGISTER_SOCKET(s);
closesocket(s);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(s, MZFD_REMOVE, 1);
listener->s[i] = INVALID_SOCKET;
}
scheme_remove_managed(((listener_t *)o)->mref, o);
@ -2873,7 +2881,7 @@ void scheme_close_socket_fd(intptr_t fd)
#ifdef USE_SOCKETS_TCP
UNREGISTER_SOCKET(fd);
closesocket(fd);
(void)scheme_fd_to_semaphore(fd, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(fd, MZFD_REMOVE, 1);
#endif
}
@ -2901,7 +2909,7 @@ static int udp_close_it(Scheme_Object *_udp)
if (udp->s != INVALID_SOCKET) {
closesocket(udp->s);
(void)scheme_fd_to_semaphore(udp->s, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(udp->s, MZFD_REMOVE, 1);
udp->s = INVALID_SOCKET;
scheme_remove_managed(udp->mref, (Scheme_Object *)udp);
@ -3215,8 +3223,10 @@ static int udp_check_send(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo)
if (udp->s == INVALID_SOCKET)
return 1;
if (!check_fd_sema(udp->s, MZFD_CHECK_WRITE, sinfo))
if (!sinfo || !sinfo->is_poll) {
if (!check_fd_sema(udp->s, MZFD_CHECK_WRITE, sinfo, NULL))
return 0;
}
# ifdef HAVE_POLL_SYSCALL
{
@ -3256,7 +3266,7 @@ static int udp_check_send(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo)
}
#endif
check_fd_sema(udp->s, MZFD_CREATE_WRITE, sinfo);
check_fd_sema(udp->s, MZFD_CREATE_WRITE, sinfo, NULL);
return 0;
}
@ -3316,7 +3326,7 @@ static Scheme_Object *do_udp_send_it(const char *name, Scheme_UDP *udp,
if (can_block) {
/* Block and eventually try again. */
Scheme_Object *sema;
sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_WRITE);
sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_WRITE, 1);
if (sema)
scheme_wait_sema(sema, 0);
else
@ -3479,8 +3489,10 @@ static int udp_check_recv(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo)
if (udp->s == INVALID_SOCKET)
return 1;
if (!check_fd_sema(udp->s, MZFD_CHECK_READ, sinfo))
if (!sinfo || !sinfo->is_poll) {
if (!check_fd_sema(udp->s, MZFD_CHECK_READ, sinfo, NULL))
return 0;
}
# ifdef HAVE_POLL_SYSCALL
{
@ -3520,7 +3532,7 @@ static int udp_check_recv(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo)
}
# endif
check_fd_sema(udp->s, MZFD_CREATE_READ, sinfo);
check_fd_sema(udp->s, MZFD_CREATE_READ, sinfo, NULL);
return 0;
}
@ -3581,7 +3593,7 @@ static int do_udp_recv(const char *name, Scheme_UDP *udp, char *bstr, intptr_t s
if (can_block) {
/* Block and eventually try again. */
Scheme_Object *sema;
sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_READ);
sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_READ, 1);
if (sema)
scheme_wait_sema(sema, 0);
else

View File

@ -863,7 +863,6 @@ static int find_fd_pos(struct mz_fd_set_data *data, int n)
{
intptr_t count = SCHEME_INT_VAL(data->count);
intptr_t i;
Scheme_Object *v;
/* This linear search probably isn't good enough for hundreds or
thousands of descriptors, but epoll()/kqueue() mode should handle
@ -5974,7 +5973,7 @@ static intptr_t fd_get_string_slow(Scheme_Input_Port *port,
#ifdef WINDOWS_FILE_HANDLES
sema = NULL;
#else
sema = scheme_fd_to_semaphore(fip->fd, MZFD_CREATE_READ);
sema = scheme_fd_to_semaphore(fip->fd, MZFD_CREATE_READ, 0);
#endif
if (sema)
scheme_wait_sema(sema, nonblock ? -1 : 0);
@ -6274,7 +6273,7 @@ fd_close_input(Scheme_Input_Port *port)
# ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
release_lockf(fip->fd);
# endif
(void)scheme_fd_to_semaphore(fip->fd, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(fip->fd, MZFD_REMOVE, 0);
}
}
#endif
@ -7405,7 +7404,7 @@ static intptr_t flush_fd(Scheme_Output_Port *op,
#ifdef WINDOWS_FILE_HANDLES
sema = NULL;
#else
sema = scheme_fd_to_semaphore(fop->fd, MZFD_CREATE_WRITE);
sema = scheme_fd_to_semaphore(fop->fd, MZFD_CREATE_WRITE, 0);
#endif
BEGIN_ESCAPEABLE(release_flushing_lock, fop);
@ -7577,7 +7576,7 @@ fd_close_output(Scheme_Output_Port *port)
# ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
release_lockf(fop->fd);
# endif
(void)scheme_fd_to_semaphore(fop->fd, MZFD_REMOVE);
(void)scheme_fd_to_semaphore(fop->fd, MZFD_REMOVE, 0);
}
}
#endif

View File

@ -949,7 +949,7 @@ MZ_EXTERN int scheme_get_port_socket(Scheme_Object *p, intptr_t *_s);
MZ_EXTERN void scheme_socket_to_ports(intptr_t s, const char *name, int takeover,
Scheme_Object **_inp, Scheme_Object **_outp);
MZ_EXTERN Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode);
MZ_EXTERN Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode, int is_socket);
MZ_EXTERN void scheme_set_type_printer(Scheme_Type stype, Scheme_Type_Printer printer);
MZ_EXTERN void scheme_print_bytes(Scheme_Print_Params *pp, const char *str, int offset, int len);

View File

@ -780,7 +780,7 @@ intptr_t (*scheme_get_port_fd)(Scheme_Object *p);
int (*scheme_get_port_socket)(Scheme_Object *p, intptr_t *_s);
void (*scheme_socket_to_ports)(intptr_t s, const char *name, int takeover,
Scheme_Object **_inp, Scheme_Object **_outp);
Scheme_Object *(*scheme_fd_to_semaphore)(intptr_t fd, int mode);
Scheme_Object *(*scheme_fd_to_semaphore)(intptr_t fd, int mode, int is_socket);
void (*scheme_set_type_printer)(Scheme_Type stype, Scheme_Type_Printer printer);
void (*scheme_print_bytes)(Scheme_Print_Params *pp, const char *str, int offset, int len);
void (*scheme_print_utf8)(Scheme_Print_Params *pp, const char *str, int offset, int len);

View File

@ -598,8 +598,7 @@ typedef struct {
Scheme_Object *current_syncing;
double sleep_end;
int w_i;
short spin;
short is_poll;
char spin, is_poll, no_redirect;
} Scheme_Schedule_Info;
typedef Scheme_Object *(*Scheme_Accept_Sync)(Scheme_Object *wrap);
@ -3479,6 +3478,12 @@ Scheme_Object *scheme_file_unlock(int argc, Scheme_Object **argv);
void scheme_reserve_file_descriptor(void);
void scheme_release_file_descriptor(void);
void scheme_init_kqueue(void);
void scheme_release_kqueue(void);
THREAD_LOCAL_DECL(extern struct mz_fd_set *scheme_semaphore_fd_set);
THREAD_LOCAL_DECL(extern Scheme_Hash_Table *scheme_semaphore_fd_mapping);
intptr_t scheme_get_byte_string_or_ch_put(const char *who,
Scheme_Object *port,
char *buffer, intptr_t offset, intptr_t size,

View File

@ -59,6 +59,14 @@
# ifdef HAVE_POLL_SYSCALL
# include <poll.h>
# endif
# ifdef HAVE_EPOLL_SYSCALL
# include <sys/epoll.h>
# endif
# ifdef HAVE_KQUEUE_SYSCALL
# include <sys/types.h>
# include <sys/event.h>
# include <sys/time.h>
# endif
# include <errno.h>
#endif
#ifdef USE_WINSOCK_TCP
@ -176,6 +184,8 @@ THREAD_LOCAL_DECL(MZ_MARK_STACK_TYPE scheme_current_cont_mark_stack);
THREAD_LOCAL_DECL(MZ_MARK_POS_TYPE scheme_current_cont_mark_pos);
#endif
THREAD_LOCAL_DECL(int scheme_semaphore_fd_kqueue);
THREAD_LOCAL_DECL(static Scheme_Custodian *main_custodian);
THREAD_LOCAL_DECL(static Scheme_Custodian *last_custodian);
THREAD_LOCAL_DECL(static Scheme_Hash_Table *limited_custodians = NULL);
@ -3389,17 +3399,92 @@ static Scheme_Object *call_as_nested_thread(int argc, Scheme_Object *argv[])
/* thread scheduling and termination */
/*========================================================================*/
Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode)
void scheme_init_kqueue(void)
{
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
scheme_semaphore_fd_kqueue = -1;
#endif
}
void scheme_release_kqueue(void)
{
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
if (scheme_semaphore_fd_kqueue >= 0) {
intptr_t rc;
do {
rc = close(scheme_semaphore_fd_kqueue);
} while ((rc == -1) && (errno == EINTR));
}
#endif
}
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
static void log_kqueue_error(const char *action, int kr)
{
if (kr < 0) {
Scheme_Logger *logger;
logger = scheme_get_main_logger();
scheme_log(logger, SCHEME_LOG_WARNING, 0,
#ifdef HAVE_KQUEUE_SYSCALL
"kqueue"
#else
"epoll"
#endif
" error at %s: %E",
action, errno);
}
}
static void log_kqueue_fd(int fd, int flags)
{
Scheme_Logger *logger;
logger = scheme_get_main_logger();
scheme_log(logger, SCHEME_LOG_WARNING, 0,
#ifdef HAVE_KQUEUE_SYSCALL
"kqueue"
#else
"epoll"
#endif
" expected event %d %d",
fd, flags);
}
#endif
Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode, int is_socket)
{
#ifdef USE_WINSOCK_TCP
return NULL;
#else
Scheme_Object *key, *v, *s;
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
# else
void *r, *w, *e;
# endif
if (!scheme_semaphore_fd_mapping)
return NULL;
# ifdef HAVE_KQUEUE_SYSCALL
if (!is_socket)
return NULL; /* kqueue() might not work on devices, such as ttys */
if (scheme_semaphore_fd_kqueue < 0) {
scheme_semaphore_fd_kqueue = kqueue();
if (scheme_semaphore_fd_kqueue < 0) {
log_kqueue_error("create", scheme_semaphore_fd_kqueue);
return NULL;
}
}
# endif
# ifdef HAVE_EPOLL_SYSCALL
if (scheme_semaphore_fd_kqueue < 0) {
scheme_semaphore_fd_kqueue = epoll_create(5);
if (scheme_semaphore_fd_kqueue < 0) {
log_kqueue_error("create", scheme_semaphore_fd_kqueue);
return NULL;
}
}
# endif
key = scheme_make_integer_value(fd);
v = scheme_hash_get(scheme_semaphore_fd_mapping, key);
if (!v && ((mode == MZFD_CHECK_READ)
@ -3412,15 +3497,44 @@ Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode)
scheme_hash_set(scheme_semaphore_fd_mapping, key, v);
}
# if !defined(HAVE_KQUEUE_SYSCALL) && !defined(HAVE_EPOLL_SYSCALL)
r = MZ_GET_FDSET(scheme_semaphore_fd_set, 0);
w = MZ_GET_FDSET(scheme_semaphore_fd_set, 1);
e = MZ_GET_FDSET(scheme_semaphore_fd_set, 2);
# endif
if (mode == MZFD_REMOVE) {
s = SCHEME_VEC_ELS(v)[0];
if (!SCHEME_FALSEP(s))
scheme_post_sema_all(s);
s = SCHEME_VEC_ELS(v)[1];
if (!SCHEME_FALSEP(s))
scheme_post_sema_all(s);
s = NULL;
scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL);
# ifdef HAVE_KQUEUE_SYSCALL
{
struct kevent kev[2];
struct timespec timeout = {0, 0};
int kr;
EV_SET(kev, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[1], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
do {
kr = kevent(scheme_semaphore_fd_kqueue, kev, 2, NULL, 0, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("remove", kr);
}
# elif defined(HAVE_EPOLL_SYSCALL)
{
int kr;
kr = epoll_ctl(scheme_semaphore_fd_kqueue, EPOLL_CTL_DEL, fd, NULL);
log_kqueue_error("remove", kr);
}
# else
MZ_FD_CLR(fd, r);
MZ_FD_CLR(fd, w);
MZ_FD_CLR(fd, e);
# endif
s = NULL;
} else if ((mode == MZFD_CHECK_READ)
|| (mode == MZFD_CREATE_READ)) {
@ -3429,9 +3543,33 @@ Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode)
if (mode == MZFD_CREATE_READ) {
s = scheme_make_sema(0);
SCHEME_VEC_ELS(v)[0] = s;
# ifdef HAVE_KQUEUE_SYSCALL
{
struct kevent kev;
struct timespec timeout = {0, 0};
int kr;
EV_SET(&kev, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL);
do {
kr = kevent(scheme_semaphore_fd_kqueue, &kev, 1, NULL, 0, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("read", kr);
}
# elif defined(HAVE_EPOLL_SYSCALL)
{
struct epoll_event ev;
int already = !SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1]), kr;
ev.data.fd = fd;
ev.events = EPOLLIN | (already ? EPOLLOUT : 0);
kr = epoll_ctl(scheme_semaphore_fd_kqueue,
(already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev);
log_kqueue_error("read", kr);
}
# else
MZ_FD_SET(fd, r);
MZ_FD_SET(fd, e);
}
#endif
} else
s = NULL;
}
} else {
s = SCHEME_VEC_ELS(v)[1];
@ -3439,9 +3577,34 @@ Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode)
if (mode == MZFD_CREATE_WRITE) {
s = scheme_make_sema(0);
SCHEME_VEC_ELS(v)[1] = s;
# ifdef HAVE_KQUEUE_SYSCALL
{
struct kevent kev;
struct timespec timeout = {0, 0};
int kr;
EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL);
do {
kr = kevent(scheme_semaphore_fd_kqueue, &kev, 1, NULL, 0, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("write", kr);
}
# elif defined(HAVE_EPOLL_SYSCALL)
{
struct epoll_event ev;
int already = !SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]), kr;
ev.data.fd = fd;
ev.events = EPOLLOUT | (already ? EPOLLIN : 0);
kr = epoll_ctl(scheme_semaphore_fd_kqueue,
(already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev);
log_kqueue_error("write", kr);
}
# else
MZ_FD_SET(fd, w);
MZ_FD_SET(fd, e);
}
#endif
} else
s = NULL;
}
}
@ -3453,6 +3616,105 @@ static int check_fd_semaphores()
{
#ifdef USE_WINSOCK_TCP
return 0;
#elif defined(HAVE_KQUEUE_SYSCALL)
Scheme_Object *v, *s, *key;
struct kevent kev;
struct timespec timeout = {0, 0};
int kr, hit = 0;
if (!scheme_semaphore_fd_mapping || (scheme_semaphore_fd_kqueue < 0))
return 0;
while (1) {
do {
kr = kevent(scheme_semaphore_fd_kqueue, NULL, 0, &kev, 1, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("wait", kr);
if (kr > 0) {
key = scheme_make_integer_value(kev.ident);
v = scheme_hash_get(scheme_semaphore_fd_mapping, key);
if (v) {
if (kev.filter == EVFILT_READ) {
s = SCHEME_VEC_ELS(v)[0];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[0] = scheme_false;
}
} else if (kev.filter == EVFILT_WRITE) {
s = SCHEME_VEC_ELS(v)[1];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[1] = scheme_false;
}
}
if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0])
&& SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1]))
scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL);
} else {
log_kqueue_fd(kev.ident, kev.filter);
}
} else
break;
}
return hit;
#elif defined(HAVE_EPOLL_SYSCALL)
Scheme_Object *v, *s, *key;
int kr, hit = 0;
struct epoll_event ev;
if (!scheme_semaphore_fd_mapping || (scheme_semaphore_fd_kqueue < 0))
return 0;
while (1) {
do {
kr = epoll_wait(scheme_semaphore_fd_kqueue, &ev, 1, 0);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("wait", kr);
if (kr > 0) {
key = scheme_make_integer_value(ev.data.fd);
v = scheme_hash_get(scheme_semaphore_fd_mapping, key);
if (v) {
if (ev.events & (POLLIN | POLLHUP | POLLERR)) {
s = SCHEME_VEC_ELS(v)[0];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[0] = scheme_false;
}
}
if (ev.events & (POLLOUT | POLLHUP | POLLERR)) {
s = SCHEME_VEC_ELS(v)[1];
if (!SCHEME_FALSEP(s)) {
scheme_post_sema_all(s);
hit = 1;
SCHEME_VEC_ELS(v)[1] = scheme_false;
}
}
if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0])
&& SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1])) {
scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL);
kr = epoll_ctl(scheme_semaphore_fd_kqueue, EPOLL_CTL_DEL, ev.data.fd, NULL);
log_kqueue_error("remove*", kr);
} else {
ev.events = ((SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]) ? 0 : POLLIN)
| (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1]) ? 0 : POLLOUT));
kr = epoll_ctl(scheme_semaphore_fd_kqueue, EPOLL_CTL_MOD, ev.data.fd, &ev);
log_kqueue_error("update", kr);
}
} else {
log_kqueue_fd(ev.data.fd, ev.events);
}
} else
break;
}
return hit;
#elif defined(HAVE_POLL_SYSCALL)
struct pollfd *pfd;
intptr_t i, c;
@ -3692,8 +3954,16 @@ static int check_sleep(int need_activity, int sleep_now)
return 0;
}
scheme_clean_fd_set(fds);
# if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
if (scheme_semaphore_fd_mapping && (scheme_semaphore_fd_kqueue >= 0)) {
MZ_FD_SET(scheme_semaphore_fd_kqueue, set);
MZ_FD_SET(scheme_semaphore_fd_kqueue, set2);
}
# else
fds = scheme_merge_fd_sets(fds, scheme_semaphore_fd_set);
# endif
scheme_clean_fd_set(fds);
if (sleep_now) {
float mst = (float)max_sleep_time;
@ -3768,13 +4038,14 @@ void scheme_out_of_fuel(void)
}
static void init_schedule_info(Scheme_Schedule_Info *sinfo, Scheme_Thread *false_pos_ok,
double sleep_end)
int no_redirect, double sleep_end)
{
sinfo->false_positive_ok = false_pos_ok;
sinfo->potentially_false_positive = 0;
sinfo->current_syncing = NULL;
sinfo->spin = 0;
sinfo->is_poll = 0;
sinfo->no_redirect = no_redirect;
sinfo->sleep_end = sleep_end;
}
@ -4179,7 +4450,7 @@ static void find_next_thread(Scheme_Thread **return_arg) {
if (next->block_check) {
Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)next->block_check;
Scheme_Schedule_Info sinfo;
init_schedule_info(&sinfo, next, next->sleep_end);
init_schedule_info(&sinfo, next, 1, next->sleep_end);
if (f(next->blocker, &sinfo))
break;
next->sleep_end = sinfo.sleep_end;
@ -4333,7 +4604,7 @@ void scheme_thread_block(float sleep_time)
if (p->block_check) {
Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)p->block_check;
Scheme_Schedule_Info sinfo;
init_schedule_info(&sinfo, p, sleep_end);
init_schedule_info(&sinfo, p, 1, sleep_end);
if (f(p->blocker, &sinfo)) {
sleep_end = 0;
skip_sleep = 1;
@ -4471,7 +4742,7 @@ void scheme_thread_block(float sleep_time)
if (p->block_check) {
Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)p->block_check;
Scheme_Schedule_Info sinfo;
init_schedule_info(&sinfo, p, sleep_end);
init_schedule_info(&sinfo, p, 1, sleep_end);
if (f(p->blocker, &sinfo)) {
sleep_end = 0;
} else {
@ -4520,12 +4791,12 @@ int scheme_block_until(Scheme_Ready_Fun _f, Scheme_Needs_Wakeup_Fun fdf,
/* We make an sinfo to be polite, but we also assume
that f will not generate any redirections! */
init_schedule_info(&sinfo, NULL, sleep_end);
init_schedule_info(&sinfo, NULL, 1, sleep_end);
while (!(result = f((Scheme_Object *)data, &sinfo))) {
sleep_end = sinfo.sleep_end;
if (sinfo.spin) {
init_schedule_info(&sinfo, NULL, 0.0);
init_schedule_info(&sinfo, NULL, 1, 0.0);
scheme_thread_block(0.0);
scheme_current_thread->ran_some = 1;
} else {
@ -5844,7 +6115,7 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo)
if (ready) {
int yep;
init_schedule_info(&r_sinfo, sinfo->false_positive_ok, sleep_end);
init_schedule_info(&r_sinfo, sinfo->false_positive_ok, 0, sleep_end);
r_sinfo.current_syncing = (Scheme_Object *)syncing;
r_sinfo.w_i = i;