diff --git a/collects/tests/racket/thread.rktl b/collects/tests/racket/thread.rktl index a3063fd688..53f3956cab 100644 --- a/collects/tests/racket/thread.rktl +++ b/collects/tests/racket/thread.rktl @@ -650,7 +650,7 @@ (loop (add1 n)) n))) (test #f sync/timeout 0 sw sr) - (test cr sync/timeout 0 sw sr cr) + (test cr sync sw sr cr) ; sync/timeout 0 might work, but it depends on the underlying transport ;; Flush cr: (let ([s (make-bytes 4096)]) (let loop () diff --git a/src/configure b/src/configure index cc8abee2a5..19a0a811f6 100755 --- a/src/configure +++ b/src/configure @@ -3943,13 +3943,16 @@ case "$host_os" in LIBS="$LIBS -rdynamic" DYN_CFLAGS="-fPIC" enable_pthread=yes + try_kqueue_syscall=yes ;; openbsd*) LIBS="$LIBS -rdynamic -Wl,--export-dynamic" enable_pthread=yes + try_kqueue_syscall=yes ;; netbsd*) LIBS="$LIBS -rdynamic" + try_kqueue_syscall=yes ;; irix*) enable_cgcdefault="yes" @@ -3964,6 +3967,7 @@ case "$host_os" in DYN_CFLAGS="-fPIC" STRIP_DEBUG="strip -S" try_poll_syscall=yes + try_epoll_syscall=yes case "$host_cpu" in #Required for CentOS 4.6 x86_64) @@ -4046,6 +4050,7 @@ case "$host_os" in esac PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT" + try_kqueue_syscall=yes STRIP_DEBUG="/usr/bin/strip -S" @@ -5220,6 +5225,143 @@ _ACEOF fi fi +if test "${try_epoll_syscall}" = "yes" ; then + msg="for epoll" + { echo "$as_me:$LINENO: checking $msg" >&5 +echo $ECHO_N "checking $msg... $ECHO_C" >&6; } + if test "$cross_compiling" = yes; then + use_epoll=no +else + cat >conftest.$ac_ext <<_ACEOF +/* confdefs.h. */ +_ACEOF +cat confdefs.h >>conftest.$ac_ext +cat >>conftest.$ac_ext <<_ACEOF +/* end confdefs.h. */ + #include + int main() { + int fd; + struct epoll_event ev; + fd = epoll_create(5); + ev.events = EPOLLIN | EPOLLONESHOT; + epoll_ctl(fd, EPOLL_CTL_ADD, 0, &ev); + return 0; + } +_ACEOF +rm -f conftest$ac_exeext +if { (ac_try="$ac_link" +case "(($ac_try" in + *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;; + *) ac_try_echo=$ac_try;; +esac +eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5 + (eval "$ac_link") 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); } && { ac_try='./conftest$ac_exeext' + { (case "(($ac_try" in + *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;; + *) ac_try_echo=$ac_try;; +esac +eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5 + (eval "$ac_try") 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); }; }; then + use_epoll=yes +else + echo "$as_me: program exited with status $ac_status" >&5 +echo "$as_me: failed program was:" >&5 +sed 's/^/| /' conftest.$ac_ext >&5 + +( exit $ac_status ) +use_epoll=no +fi +rm -f core *.core core.conftest.* gmon.out bb.out conftest$ac_exeext conftest.$ac_objext conftest.$ac_ext +fi + + + { echo "$as_me:$LINENO: result: $use_epoll" >&5 +echo "${ECHO_T}$use_epoll" >&6; } + if test "${use_epoll}" = "yes" ; then + +cat >>confdefs.h <<\_ACEOF +#define HAVE_EPOLL_SYSCALL 1 +_ACEOF + + fi +fi + +if test "${try_kqueue_syscall}" = "yes" ; then + msg="for kqueue" + { echo "$as_me:$LINENO: checking $msg" >&5 +echo $ECHO_N "checking $msg... $ECHO_C" >&6; } + if test "$cross_compiling" = yes; then + use_kqueue=no +else + cat >conftest.$ac_ext <<_ACEOF +/* confdefs.h. */ +_ACEOF +cat confdefs.h >>conftest.$ac_ext +cat >>conftest.$ac_ext <<_ACEOF +/* end confdefs.h. */ + #include + #include + #include + int main() { + int kq; + struct kevent kev; + struct timespec timeout = {0, 0}; + kq = kqueue(); + EV_SET(&kev, 0, EVFILT_READ, EV_ADD, 0, 0, NULL); + kevent(kq, &kev, 1, NULL, 0, &timeout); + return 0; + } +_ACEOF +rm -f conftest$ac_exeext +if { (ac_try="$ac_link" +case "(($ac_try" in + *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;; + *) ac_try_echo=$ac_try;; +esac +eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5 + (eval "$ac_link") 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); } && { ac_try='./conftest$ac_exeext' + { (case "(($ac_try" in + *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;; + *) ac_try_echo=$ac_try;; +esac +eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5 + (eval "$ac_try") 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); }; }; then + use_kqueue=yes +else + echo "$as_me: program exited with status $ac_status" >&5 +echo "$as_me: failed program was:" >&5 +sed 's/^/| /' conftest.$ac_ext >&5 + +( exit $ac_status ) +use_kqueue=no +fi +rm -f core *.core core.conftest.* gmon.out bb.out conftest$ac_exeext conftest.$ac_objext conftest.$ac_ext +fi + + + { echo "$as_me:$LINENO: result: $use_kqueue" >&5 +echo "${ECHO_T}$use_kqueue" >&6; } + if test "${use_kqueue}" = "yes" ; then + +cat >>confdefs.h <<\_ACEOF +#define HAVE_KQUEUE_SYSCALL 1 +_ACEOF + + fi +fi + if test "${enable_libffi}" = "yes" ; then if test "${enable_foreign}" = "yes" ; then { echo "$as_me:$LINENO: checking for libffi" >&5 diff --git a/src/racket/configure.ac b/src/racket/configure.ac index 24844be0e8..38b1a0ba0f 100644 --- a/src/racket/configure.ac +++ b/src/racket/configure.ac @@ -473,13 +473,16 @@ case "$host_os" in LIBS="$LIBS -rdynamic" DYN_CFLAGS="-fPIC" enable_pthread=yes + try_kqueue_syscall=yes ;; openbsd*) LIBS="$LIBS -rdynamic -Wl,--export-dynamic" enable_pthread=yes + try_kqueue_syscall=yes ;; netbsd*) LIBS="$LIBS -rdynamic" + try_kqueue_syscall=yes ;; irix*) enable_cgcdefault="yes" @@ -494,6 +497,7 @@ case "$host_os" in DYN_CFLAGS="-fPIC" STRIP_DEBUG="strip -S" try_poll_syscall=yes + try_epoll_syscall=yes case "$host_cpu" in #Required for CentOS 4.6 x86_64) @@ -576,6 +580,7 @@ case "$host_os" in esac PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT" + try_kqueue_syscall=yes STRIP_DEBUG="/usr/bin/strip -S" @@ -801,7 +806,48 @@ if test "${try_poll_syscall}" = "yes" ; then }, use_poll=yes, use_poll=no, use_poll=no) AC_MSG_RESULT($use_poll) if test "${use_poll}" = "yes" ; then - AC_DEFINE(HAVE_POLL_SYSCALL,1,[Have poll()]) + AC_DEFINE(HAVE_POLL_SYSCALL,1,[Have poll]) + fi +fi + +if test "${try_epoll_syscall}" = "yes" ; then + [ msg="for epoll" ] + AC_MSG_CHECKING($msg) + AC_TRY_RUN( + [ #include ] + int main() { + int fd; + struct epoll_event ev; + fd = epoll_create(5); + ev.events = EPOLLIN | EPOLLONESHOT; + epoll_ctl(fd, EPOLL_CTL_ADD, 0, &ev); + return 0; + }, use_epoll=yes, use_epoll=no, use_epoll=no) + AC_MSG_RESULT($use_epoll) + if test "${use_epoll}" = "yes" ; then + AC_DEFINE(HAVE_EPOLL_SYSCALL,1,[Have epoll]) + fi +fi + +if test "${try_kqueue_syscall}" = "yes" ; then + [ msg="for kqueue" ] + AC_MSG_CHECKING($msg) + AC_TRY_RUN( + [ #include ] + [ #include ] + [ #include ] + int main() { + int kq; + struct kevent kev; + [ struct timespec timeout = {0, 0}; ] + kq = kqueue(); + EV_SET(&kev, 0, EVFILT_READ, EV_ADD, 0, 0, NULL); + kevent(kq, &kev, 1, NULL, 0, &timeout); + return 0; + }, use_kqueue=yes, use_kqueue=no, use_kqueue=no) + AC_MSG_RESULT($use_kqueue) + if test "${use_kqueue}" = "yes" ; then + AC_DEFINE(HAVE_KQUEUE_SYSCALL,1,[Have kqueue]) fi fi diff --git a/src/racket/include/schthread.h b/src/racket/include/schthread.h index 4aa7c59b68..f911a0e6eb 100644 --- a/src/racket/include/schthread.h +++ b/src/racket/include/schthread.h @@ -176,6 +176,7 @@ typedef struct Thread_Local_Variables { struct mz_fd_set *scheme_fd_set_; struct mz_fd_set *scheme_semaphore_fd_set_; struct Scheme_Hash_Table *scheme_semaphore_fd_mapping_; + int scheme_semaphore_fd_kqueue_; #ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS struct Scheme_Hash_Table *locked_fd_process_map_; #endif @@ -522,6 +523,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL; #define scheme_fd_set XOA (scheme_get_thread_local_variables()->scheme_fd_set_) #define scheme_semaphore_fd_set XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_set_) #define scheme_semaphore_fd_mapping XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_mapping_) +#define scheme_semaphore_fd_kqueue XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_kqueue_) #define locked_fd_process_map XOA (scheme_get_thread_local_variables()->locked_fd_process_map_) #define new_port_cust XOA (scheme_get_thread_local_variables()->new_port_cust_) #define scheme_break_semaphore XOA (scheme_get_thread_local_variables()->scheme_break_semaphore_) diff --git a/src/racket/mzconfig.h.in b/src/racket/mzconfig.h.in index a8b0d37e9c..68993db32a 100644 --- a/src/racket/mzconfig.h.in +++ b/src/racket/mzconfig.h.in @@ -56,8 +56,10 @@ typedef unsigned long uintptr_t; /* Whether pthread_rwlock is available. */ #undef HAVE_PTHREAD_RWLOCK -/* When poll() is available: */ +/* When poll(), epoll(), and/or kqueue() is available: */ #undef HAVE_POLL_SYSCALL +#undef HAVE_EPOLL_SYSCALL +#undef HAVE_KQUEUE_SYSCALL /* Enable futures: */ #undef MZ_USE_FUTURES diff --git a/src/racket/src/env.c b/src/racket/src/env.c index 7d8591e378..168355b75c 100644 --- a/src/racket/src/env.c +++ b/src/racket/src/env.c @@ -453,6 +453,7 @@ static Scheme_Env *place_instance_init(void *stack_base, int initial_main_os_thr scheme_init_stx_places(initial_main_os_thread); scheme_init_sema_places(); scheme_init_gmp_places(); + scheme_init_kqueue(); scheme_alloc_global_fdset(); scheme_init_file_places(); #ifndef DONT_USE_FOREIGN @@ -558,6 +559,7 @@ void scheme_place_instance_destroy(int force) #endif scheme_free_all_code(); scheme_free_ghbn_data(); + scheme_release_kqueue(); } static void make_kernel_env(void) diff --git a/src/racket/src/network.c b/src/racket/src/network.c index 09c71f5a93..15204eb52e 100644 --- a/src/racket/src/network.c +++ b/src/racket/src/network.c @@ -294,16 +294,16 @@ void scheme_init_network(Scheme_Env *env) #endif } -static int check_fd_sema(tcp_t s, int mode, Scheme_Schedule_Info *sinfo) +static int check_fd_sema(tcp_t s, int mode, Scheme_Schedule_Info *sinfo, Scheme_Object *orig) { Scheme_Object *sema; - sema = scheme_fd_to_semaphore(s, mode); + sema = scheme_fd_to_semaphore(s, mode, 1); if (sema) { if (!scheme_wait_sema(sema, 1)) { - if (sinfo) - scheme_set_sync_target(sinfo, sema, NULL, NULL, 0, 0, NULL); + if (sinfo && !sinfo->no_redirect) + scheme_set_sync_target(sinfo, sema, orig, NULL, 0, 0, NULL); return 0; } } @@ -911,7 +911,7 @@ static Scheme_Object *listener_to_evt(listener_t *listener) a = MALLOC_N(Scheme_Object*, listener->count); for (i = listener->count; i--; ) { - sema = scheme_fd_to_semaphore(listener->s[i], MZFD_CREATE_READ); + sema = scheme_fd_to_semaphore(listener->s[i], MZFD_CREATE_READ, 1); if (!sema) return NULL; a[i] = sema; @@ -932,11 +932,13 @@ static int tcp_check_accept(Scheme_Object *_listener, Scheme_Schedule_Info *sinf struct timeval time = {0, 0}; # endif - for (i = listener->count; i--; ) { - if (check_fd_sema(listener->s[i], MZFD_CHECK_READ, NULL)) - break; + if (!sinfo || !sinfo->is_poll) { + for (i = listener->count; i--; ) { + if (check_fd_sema(listener->s[i], MZFD_CHECK_READ, NULL, NULL)) + break; + } + if (i < 0) return 0; } - if (i < 0) return 0; # ifdef HAVE_POLL_SYSCALL if (LISTENER_WAS_CLOSED(listener)) @@ -992,14 +994,14 @@ static int tcp_check_accept(Scheme_Object *_listener, Scheme_Schedule_Info *sinf return sr; # endif - if (sinfo) { + if (sinfo && !sinfo->no_redirect) { Scheme_Object *evt; evt = listener_to_evt(listener); if (evt) - scheme_set_sync_target(sinfo, evt, NULL, NULL, 0, 0, NULL); + scheme_set_sync_target(sinfo, evt, _listener, NULL, 0, 1, NULL); } else { for (i = listener->count; i--; ) { - check_fd_sema(listener->s[i], MZFD_CREATE_READ, NULL); + check_fd_sema(listener->s[i], MZFD_CREATE_READ, NULL, NULL); } } @@ -1036,8 +1038,10 @@ static int tcp_check_connect(Scheme_Object *connector_p, Scheme_Schedule_Info *s s = *(tcp_t *)connector_p; - if (!check_fd_sema(s, MZFD_CHECK_WRITE, sinfo)) - return 0; + if (!sinfo || !sinfo->is_poll) { + if (!check_fd_sema(s, MZFD_CHECK_WRITE, sinfo, NULL)) + return 0; + } # ifdef HAVE_POLL_SYSCALL { @@ -1083,7 +1087,7 @@ static int tcp_check_connect(Scheme_Object *connector_p, Scheme_Schedule_Info *s } # endif - check_fd_sema(s, MZFD_CREATE_WRITE, sinfo); + check_fd_sema(s, MZFD_CREATE_WRITE, sinfo, NULL); #endif return 0; @@ -1110,8 +1114,10 @@ static int tcp_check_write(Scheme_Object *port, Scheme_Schedule_Info *sinfo) if (((Scheme_Output_Port *)port)->closed) return 1; - if (!check_fd_sema(data->tcp, MZFD_CHECK_WRITE, sinfo)) - return 0; + if (!sinfo || !sinfo->is_poll) { + if (!check_fd_sema(data->tcp, MZFD_CHECK_WRITE, sinfo, port)) + return 0; + } #ifdef USE_SOCKETS_TCP # ifdef HAVE_POLL_SYSCALL @@ -1180,7 +1186,7 @@ static int tcp_check_write(Scheme_Object *port, Scheme_Schedule_Info *sinfo) } #endif - check_fd_sema(data->tcp, MZFD_CREATE_WRITE, sinfo); + check_fd_sema(data->tcp, MZFD_CREATE_WRITE, sinfo, port); return 0; } @@ -1261,8 +1267,10 @@ static int tcp_byte_ready (Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo) if (data->b.bufpos < data->b.bufmax) return 1; - if (!check_fd_sema(data->tcp, MZFD_CHECK_READ, sinfo)) - return 0; + if (!sinfo || !sinfo->is_poll) { + if (!check_fd_sema(data->tcp, MZFD_CHECK_READ, sinfo, (Scheme_Object *)port)) + return 0; + } #ifdef USE_SOCKETS_TCP # ifdef HAVE_POLL_SYSCALL @@ -1295,7 +1303,7 @@ static int tcp_byte_ready (Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo) # endif #endif - check_fd_sema(data->tcp, MZFD_CREATE_READ, sinfo); + check_fd_sema(data->tcp, MZFD_CREATE_READ, sinfo, (Scheme_Object *)port); return 0; } @@ -1339,7 +1347,7 @@ static intptr_t tcp_get_string(Scheme_Input_Port *port, #ifdef USE_SOCKETS_TCP { Scheme_Object *sema; - sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_READ); + sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_READ, 1); if (sema) scheme_wait_sema(sema, nonblock ? -1 : 0); else @@ -1460,7 +1468,7 @@ static void tcp_close_input(Scheme_Input_Port *port) closesocket(data->tcp); #endif - (void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE, 1); } static int @@ -1542,7 +1550,7 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port, /* Block for writing: */ { Scheme_Object *sema; - sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_WRITE); + sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_WRITE, 1); if (sema) scheme_wait_sema(sema, enable_break ? -1 : 0); else @@ -1668,7 +1676,7 @@ static void tcp_close_output(Scheme_Output_Port *port) closesocket(data->tcp); #endif - (void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE, 1); } static int @@ -1766,7 +1774,7 @@ typedef struct Close_Socket_Data { static void closesocket_w_decrement(Close_Socket_Data *csd) { closesocket(csd->s); - (void)scheme_fd_to_semaphore(csd->s, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(csd->s, MZFD_REMOVE, 1); if (csd->src_addr) mz_freeaddrinfo(csd->src_addr); mz_freeaddrinfo(csd->dest_addr); @@ -1900,7 +1908,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[]) csd->src_addr = tcp_connect_src; csd->dest_addr = tcp_connect_dest; - sema = scheme_fd_to_semaphore(s, MZFD_CREATE_WRITE); + sema = scheme_fd_to_semaphore(s, MZFD_CREATE_WRITE, 1); BEGIN_ESCAPEABLE(closesocket_w_decrement, csd); if (sema) @@ -1954,7 +1962,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[]) } else { errid = errno; closesocket(s); - (void)scheme_fd_to_semaphore(s, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(s, MZFD_REMOVE, 1); errpart = 6; } } else { @@ -2159,7 +2167,7 @@ tcp_listen(int argc, Scheme_Object *argv[]) } else { errid = errno; closesocket(s); - (void)scheme_fd_to_semaphore(s, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(s, MZFD_REMOVE, 1); errno = errid; s = INVALID_SOCKET; } @@ -2302,7 +2310,7 @@ static int stop_listener(Scheme_Object *o) s = listener->s[i]; UNREGISTER_SOCKET(s); closesocket(s); - (void)scheme_fd_to_semaphore(s, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(s, MZFD_REMOVE, 1); listener->s[i] = INVALID_SOCKET; } scheme_remove_managed(((listener_t *)o)->mref, o); @@ -2873,7 +2881,7 @@ void scheme_close_socket_fd(intptr_t fd) #ifdef USE_SOCKETS_TCP UNREGISTER_SOCKET(fd); closesocket(fd); - (void)scheme_fd_to_semaphore(fd, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(fd, MZFD_REMOVE, 1); #endif } @@ -2901,7 +2909,7 @@ static int udp_close_it(Scheme_Object *_udp) if (udp->s != INVALID_SOCKET) { closesocket(udp->s); - (void)scheme_fd_to_semaphore(udp->s, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(udp->s, MZFD_REMOVE, 1); udp->s = INVALID_SOCKET; scheme_remove_managed(udp->mref, (Scheme_Object *)udp); @@ -3215,8 +3223,10 @@ static int udp_check_send(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) if (udp->s == INVALID_SOCKET) return 1; - if (!check_fd_sema(udp->s, MZFD_CHECK_WRITE, sinfo)) - return 0; + if (!sinfo || !sinfo->is_poll) { + if (!check_fd_sema(udp->s, MZFD_CHECK_WRITE, sinfo, NULL)) + return 0; + } # ifdef HAVE_POLL_SYSCALL { @@ -3256,7 +3266,7 @@ static int udp_check_send(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) } #endif - check_fd_sema(udp->s, MZFD_CREATE_WRITE, sinfo); + check_fd_sema(udp->s, MZFD_CREATE_WRITE, sinfo, NULL); return 0; } @@ -3316,7 +3326,7 @@ static Scheme_Object *do_udp_send_it(const char *name, Scheme_UDP *udp, if (can_block) { /* Block and eventually try again. */ Scheme_Object *sema; - sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_WRITE); + sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_WRITE, 1); if (sema) scheme_wait_sema(sema, 0); else @@ -3479,8 +3489,10 @@ static int udp_check_recv(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) if (udp->s == INVALID_SOCKET) return 1; - if (!check_fd_sema(udp->s, MZFD_CHECK_READ, sinfo)) - return 0; + if (!sinfo || !sinfo->is_poll) { + if (!check_fd_sema(udp->s, MZFD_CHECK_READ, sinfo, NULL)) + return 0; + } # ifdef HAVE_POLL_SYSCALL { @@ -3520,7 +3532,7 @@ static int udp_check_recv(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) } # endif - check_fd_sema(udp->s, MZFD_CREATE_READ, sinfo); + check_fd_sema(udp->s, MZFD_CREATE_READ, sinfo, NULL); return 0; } @@ -3581,7 +3593,7 @@ static int do_udp_recv(const char *name, Scheme_UDP *udp, char *bstr, intptr_t s if (can_block) { /* Block and eventually try again. */ Scheme_Object *sema; - sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_READ); + sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_READ, 1); if (sema) scheme_wait_sema(sema, 0); else diff --git a/src/racket/src/port.c b/src/racket/src/port.c index 623c7fb555..f93325a394 100644 --- a/src/racket/src/port.c +++ b/src/racket/src/port.c @@ -863,7 +863,6 @@ static int find_fd_pos(struct mz_fd_set_data *data, int n) { intptr_t count = SCHEME_INT_VAL(data->count); intptr_t i; - Scheme_Object *v; /* This linear search probably isn't good enough for hundreds or thousands of descriptors, but epoll()/kqueue() mode should handle @@ -5974,7 +5973,7 @@ static intptr_t fd_get_string_slow(Scheme_Input_Port *port, #ifdef WINDOWS_FILE_HANDLES sema = NULL; #else - sema = scheme_fd_to_semaphore(fip->fd, MZFD_CREATE_READ); + sema = scheme_fd_to_semaphore(fip->fd, MZFD_CREATE_READ, 0); #endif if (sema) scheme_wait_sema(sema, nonblock ? -1 : 0); @@ -6274,7 +6273,7 @@ fd_close_input(Scheme_Input_Port *port) # ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS release_lockf(fip->fd); # endif - (void)scheme_fd_to_semaphore(fip->fd, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(fip->fd, MZFD_REMOVE, 0); } } #endif @@ -7405,7 +7404,7 @@ static intptr_t flush_fd(Scheme_Output_Port *op, #ifdef WINDOWS_FILE_HANDLES sema = NULL; #else - sema = scheme_fd_to_semaphore(fop->fd, MZFD_CREATE_WRITE); + sema = scheme_fd_to_semaphore(fop->fd, MZFD_CREATE_WRITE, 0); #endif BEGIN_ESCAPEABLE(release_flushing_lock, fop); @@ -7577,7 +7576,7 @@ fd_close_output(Scheme_Output_Port *port) # ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS release_lockf(fop->fd); # endif - (void)scheme_fd_to_semaphore(fop->fd, MZFD_REMOVE); + (void)scheme_fd_to_semaphore(fop->fd, MZFD_REMOVE, 0); } } #endif diff --git a/src/racket/src/schemef.h b/src/racket/src/schemef.h index a7c88b5af4..d0217154c8 100644 --- a/src/racket/src/schemef.h +++ b/src/racket/src/schemef.h @@ -949,7 +949,7 @@ MZ_EXTERN int scheme_get_port_socket(Scheme_Object *p, intptr_t *_s); MZ_EXTERN void scheme_socket_to_ports(intptr_t s, const char *name, int takeover, Scheme_Object **_inp, Scheme_Object **_outp); -MZ_EXTERN Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode); +MZ_EXTERN Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode, int is_socket); MZ_EXTERN void scheme_set_type_printer(Scheme_Type stype, Scheme_Type_Printer printer); MZ_EXTERN void scheme_print_bytes(Scheme_Print_Params *pp, const char *str, int offset, int len); diff --git a/src/racket/src/schemex.h b/src/racket/src/schemex.h index cf081dfd22..c17aa87077 100644 --- a/src/racket/src/schemex.h +++ b/src/racket/src/schemex.h @@ -780,7 +780,7 @@ intptr_t (*scheme_get_port_fd)(Scheme_Object *p); int (*scheme_get_port_socket)(Scheme_Object *p, intptr_t *_s); void (*scheme_socket_to_ports)(intptr_t s, const char *name, int takeover, Scheme_Object **_inp, Scheme_Object **_outp); -Scheme_Object *(*scheme_fd_to_semaphore)(intptr_t fd, int mode); +Scheme_Object *(*scheme_fd_to_semaphore)(intptr_t fd, int mode, int is_socket); void (*scheme_set_type_printer)(Scheme_Type stype, Scheme_Type_Printer printer); void (*scheme_print_bytes)(Scheme_Print_Params *pp, const char *str, int offset, int len); void (*scheme_print_utf8)(Scheme_Print_Params *pp, const char *str, int offset, int len); diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 345b33ae9d..7e57086986 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -598,8 +598,7 @@ typedef struct { Scheme_Object *current_syncing; double sleep_end; int w_i; - short spin; - short is_poll; + char spin, is_poll, no_redirect; } Scheme_Schedule_Info; typedef Scheme_Object *(*Scheme_Accept_Sync)(Scheme_Object *wrap); @@ -3479,6 +3478,12 @@ Scheme_Object *scheme_file_unlock(int argc, Scheme_Object **argv); void scheme_reserve_file_descriptor(void); void scheme_release_file_descriptor(void); +void scheme_init_kqueue(void); +void scheme_release_kqueue(void); + +THREAD_LOCAL_DECL(extern struct mz_fd_set *scheme_semaphore_fd_set); +THREAD_LOCAL_DECL(extern Scheme_Hash_Table *scheme_semaphore_fd_mapping); + intptr_t scheme_get_byte_string_or_ch_put(const char *who, Scheme_Object *port, char *buffer, intptr_t offset, intptr_t size, diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index 70505747b2..a46fc953d1 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -59,6 +59,14 @@ # ifdef HAVE_POLL_SYSCALL # include # endif +# ifdef HAVE_EPOLL_SYSCALL +# include +# endif +# ifdef HAVE_KQUEUE_SYSCALL +# include +# include +# include +# endif # include #endif #ifdef USE_WINSOCK_TCP @@ -176,6 +184,8 @@ THREAD_LOCAL_DECL(MZ_MARK_STACK_TYPE scheme_current_cont_mark_stack); THREAD_LOCAL_DECL(MZ_MARK_POS_TYPE scheme_current_cont_mark_pos); #endif +THREAD_LOCAL_DECL(int scheme_semaphore_fd_kqueue); + THREAD_LOCAL_DECL(static Scheme_Custodian *main_custodian); THREAD_LOCAL_DECL(static Scheme_Custodian *last_custodian); THREAD_LOCAL_DECL(static Scheme_Hash_Table *limited_custodians = NULL); @@ -3389,17 +3399,92 @@ static Scheme_Object *call_as_nested_thread(int argc, Scheme_Object *argv[]) /* thread scheduling and termination */ /*========================================================================*/ -Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode) +void scheme_init_kqueue(void) +{ +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) + scheme_semaphore_fd_kqueue = -1; +#endif +} + +void scheme_release_kqueue(void) +{ +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) + if (scheme_semaphore_fd_kqueue >= 0) { + intptr_t rc; + do { + rc = close(scheme_semaphore_fd_kqueue); + } while ((rc == -1) && (errno == EINTR)); + } +#endif +} + +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) +static void log_kqueue_error(const char *action, int kr) +{ + if (kr < 0) { + Scheme_Logger *logger; + logger = scheme_get_main_logger(); + scheme_log(logger, SCHEME_LOG_WARNING, 0, +#ifdef HAVE_KQUEUE_SYSCALL + "kqueue" +#else + "epoll" +#endif + " error at %s: %E", + action, errno); + } +} + +static void log_kqueue_fd(int fd, int flags) +{ + Scheme_Logger *logger; + logger = scheme_get_main_logger(); + scheme_log(logger, SCHEME_LOG_WARNING, 0, +#ifdef HAVE_KQUEUE_SYSCALL + "kqueue" +#else + "epoll" +#endif + " expected event %d %d", + fd, flags); +} +#endif + +Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode, int is_socket) { #ifdef USE_WINSOCK_TCP return NULL; #else Scheme_Object *key, *v, *s; +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) +# else void *r, *w, *e; +# endif if (!scheme_semaphore_fd_mapping) return NULL; +# ifdef HAVE_KQUEUE_SYSCALL + if (!is_socket) + return NULL; /* kqueue() might not work on devices, such as ttys */ + if (scheme_semaphore_fd_kqueue < 0) { + scheme_semaphore_fd_kqueue = kqueue(); + if (scheme_semaphore_fd_kqueue < 0) { + log_kqueue_error("create", scheme_semaphore_fd_kqueue); + return NULL; + } + } +# endif +# ifdef HAVE_EPOLL_SYSCALL + if (scheme_semaphore_fd_kqueue < 0) { + scheme_semaphore_fd_kqueue = epoll_create(5); + if (scheme_semaphore_fd_kqueue < 0) { + log_kqueue_error("create", scheme_semaphore_fd_kqueue); + return NULL; + } + } +# endif + key = scheme_make_integer_value(fd); v = scheme_hash_get(scheme_semaphore_fd_mapping, key); if (!v && ((mode == MZFD_CHECK_READ) @@ -3412,15 +3497,44 @@ Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode) scheme_hash_set(scheme_semaphore_fd_mapping, key, v); } +# if !defined(HAVE_KQUEUE_SYSCALL) && !defined(HAVE_EPOLL_SYSCALL) r = MZ_GET_FDSET(scheme_semaphore_fd_set, 0); w = MZ_GET_FDSET(scheme_semaphore_fd_set, 1); e = MZ_GET_FDSET(scheme_semaphore_fd_set, 2); +# endif if (mode == MZFD_REMOVE) { + s = SCHEME_VEC_ELS(v)[0]; + if (!SCHEME_FALSEP(s)) + scheme_post_sema_all(s); + s = SCHEME_VEC_ELS(v)[1]; + if (!SCHEME_FALSEP(s)) + scheme_post_sema_all(s); + s = NULL; scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL); +# ifdef HAVE_KQUEUE_SYSCALL + { + struct kevent kev[2]; + struct timespec timeout = {0, 0}; + int kr; + EV_SET(kev, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&kev[1], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + do { + kr = kevent(scheme_semaphore_fd_kqueue, kev, 2, NULL, 0, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("remove", kr); + } +# elif defined(HAVE_EPOLL_SYSCALL) + { + int kr; + kr = epoll_ctl(scheme_semaphore_fd_kqueue, EPOLL_CTL_DEL, fd, NULL); + log_kqueue_error("remove", kr); + } +# else MZ_FD_CLR(fd, r); MZ_FD_CLR(fd, w); MZ_FD_CLR(fd, e); +# endif s = NULL; } else if ((mode == MZFD_CHECK_READ) || (mode == MZFD_CREATE_READ)) { @@ -3429,9 +3543,33 @@ Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode) if (mode == MZFD_CREATE_READ) { s = scheme_make_sema(0); SCHEME_VEC_ELS(v)[0] = s; +# ifdef HAVE_KQUEUE_SYSCALL + { + struct kevent kev; + struct timespec timeout = {0, 0}; + int kr; + EV_SET(&kev, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL); + do { + kr = kevent(scheme_semaphore_fd_kqueue, &kev, 1, NULL, 0, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("read", kr); + } +# elif defined(HAVE_EPOLL_SYSCALL) + { + struct epoll_event ev; + int already = !SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1]), kr; + ev.data.fd = fd; + ev.events = EPOLLIN | (already ? EPOLLOUT : 0); + kr = epoll_ctl(scheme_semaphore_fd_kqueue, + (already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev); + log_kqueue_error("read", kr); + } +# else MZ_FD_SET(fd, r); MZ_FD_SET(fd, e); - } +#endif + } else + s = NULL; } } else { s = SCHEME_VEC_ELS(v)[1]; @@ -3439,9 +3577,34 @@ Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode) if (mode == MZFD_CREATE_WRITE) { s = scheme_make_sema(0); SCHEME_VEC_ELS(v)[1] = s; +# ifdef HAVE_KQUEUE_SYSCALL + { + struct kevent kev; + struct timespec timeout = {0, 0}; + int kr; + EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL); + do { + kr = kevent(scheme_semaphore_fd_kqueue, &kev, 1, NULL, 0, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("write", kr); + } + +# elif defined(HAVE_EPOLL_SYSCALL) + { + struct epoll_event ev; + int already = !SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]), kr; + ev.data.fd = fd; + ev.events = EPOLLOUT | (already ? EPOLLIN : 0); + kr = epoll_ctl(scheme_semaphore_fd_kqueue, + (already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev); + log_kqueue_error("write", kr); + } +# else MZ_FD_SET(fd, w); MZ_FD_SET(fd, e); - } +#endif + } else + s = NULL; } } @@ -3453,6 +3616,105 @@ static int check_fd_semaphores() { #ifdef USE_WINSOCK_TCP return 0; +#elif defined(HAVE_KQUEUE_SYSCALL) + Scheme_Object *v, *s, *key; + struct kevent kev; + struct timespec timeout = {0, 0}; + int kr, hit = 0; + + if (!scheme_semaphore_fd_mapping || (scheme_semaphore_fd_kqueue < 0)) + return 0; + + while (1) { + do { + kr = kevent(scheme_semaphore_fd_kqueue, NULL, 0, &kev, 1, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("wait", kr); + + if (kr > 0) { + key = scheme_make_integer_value(kev.ident); + v = scheme_hash_get(scheme_semaphore_fd_mapping, key); + if (v) { + if (kev.filter == EVFILT_READ) { + s = SCHEME_VEC_ELS(v)[0]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[0] = scheme_false; + } + } else if (kev.filter == EVFILT_WRITE) { + s = SCHEME_VEC_ELS(v)[1]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[1] = scheme_false; + } + } + if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]) + && SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1])) + scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL); + } else { + log_kqueue_fd(kev.ident, kev.filter); + } + } else + break; + } + + return hit; +#elif defined(HAVE_EPOLL_SYSCALL) + Scheme_Object *v, *s, *key; + int kr, hit = 0; + struct epoll_event ev; + + if (!scheme_semaphore_fd_mapping || (scheme_semaphore_fd_kqueue < 0)) + return 0; + + while (1) { + + do { + kr = epoll_wait(scheme_semaphore_fd_kqueue, &ev, 1, 0); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("wait", kr); + + if (kr > 0) { + key = scheme_make_integer_value(ev.data.fd); + v = scheme_hash_get(scheme_semaphore_fd_mapping, key); + if (v) { + if (ev.events & (POLLIN | POLLHUP | POLLERR)) { + s = SCHEME_VEC_ELS(v)[0]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[0] = scheme_false; + } + } + if (ev.events & (POLLOUT | POLLHUP | POLLERR)) { + s = SCHEME_VEC_ELS(v)[1]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[1] = scheme_false; + } + } + if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]) + && SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1])) { + scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL); + kr = epoll_ctl(scheme_semaphore_fd_kqueue, EPOLL_CTL_DEL, ev.data.fd, NULL); + log_kqueue_error("remove*", kr); + } else { + ev.events = ((SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]) ? 0 : POLLIN) + | (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1]) ? 0 : POLLOUT)); + kr = epoll_ctl(scheme_semaphore_fd_kqueue, EPOLL_CTL_MOD, ev.data.fd, &ev); + log_kqueue_error("update", kr); + } + } else { + log_kqueue_fd(ev.data.fd, ev.events); + } + } else + break; + } + + return hit; #elif defined(HAVE_POLL_SYSCALL) struct pollfd *pfd; intptr_t i, c; @@ -3691,9 +3953,17 @@ static int check_sleep(int need_activity, int sleep_now) if (post_system_idle()) { return 0; } - + +# if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) + if (scheme_semaphore_fd_mapping && (scheme_semaphore_fd_kqueue >= 0)) { + MZ_FD_SET(scheme_semaphore_fd_kqueue, set); + MZ_FD_SET(scheme_semaphore_fd_kqueue, set2); + } +# else + fds = scheme_merge_fd_sets(fds, scheme_semaphore_fd_set); +# endif + scheme_clean_fd_set(fds); - fds = scheme_merge_fd_sets(fds, scheme_semaphore_fd_set); if (sleep_now) { float mst = (float)max_sleep_time; @@ -3768,13 +4038,14 @@ void scheme_out_of_fuel(void) } static void init_schedule_info(Scheme_Schedule_Info *sinfo, Scheme_Thread *false_pos_ok, - double sleep_end) + int no_redirect, double sleep_end) { sinfo->false_positive_ok = false_pos_ok; sinfo->potentially_false_positive = 0; sinfo->current_syncing = NULL; sinfo->spin = 0; sinfo->is_poll = 0; + sinfo->no_redirect = no_redirect; sinfo->sleep_end = sleep_end; } @@ -4179,7 +4450,7 @@ static void find_next_thread(Scheme_Thread **return_arg) { if (next->block_check) { Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)next->block_check; Scheme_Schedule_Info sinfo; - init_schedule_info(&sinfo, next, next->sleep_end); + init_schedule_info(&sinfo, next, 1, next->sleep_end); if (f(next->blocker, &sinfo)) break; next->sleep_end = sinfo.sleep_end; @@ -4333,7 +4604,7 @@ void scheme_thread_block(float sleep_time) if (p->block_check) { Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)p->block_check; Scheme_Schedule_Info sinfo; - init_schedule_info(&sinfo, p, sleep_end); + init_schedule_info(&sinfo, p, 1, sleep_end); if (f(p->blocker, &sinfo)) { sleep_end = 0; skip_sleep = 1; @@ -4471,7 +4742,7 @@ void scheme_thread_block(float sleep_time) if (p->block_check) { Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)p->block_check; Scheme_Schedule_Info sinfo; - init_schedule_info(&sinfo, p, sleep_end); + init_schedule_info(&sinfo, p, 1, sleep_end); if (f(p->blocker, &sinfo)) { sleep_end = 0; } else { @@ -4520,12 +4791,12 @@ int scheme_block_until(Scheme_Ready_Fun _f, Scheme_Needs_Wakeup_Fun fdf, /* We make an sinfo to be polite, but we also assume that f will not generate any redirections! */ - init_schedule_info(&sinfo, NULL, sleep_end); + init_schedule_info(&sinfo, NULL, 1, sleep_end); while (!(result = f((Scheme_Object *)data, &sinfo))) { sleep_end = sinfo.sleep_end; if (sinfo.spin) { - init_schedule_info(&sinfo, NULL, 0.0); + init_schedule_info(&sinfo, NULL, 1, 0.0); scheme_thread_block(0.0); scheme_current_thread->ran_some = 1; } else { @@ -5844,7 +6115,7 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo) if (ready) { int yep; - init_schedule_info(&r_sinfo, sinfo->false_positive_ok, sleep_end); + init_schedule_info(&r_sinfo, sinfo->false_positive_ok, 0, sleep_end); r_sinfo.current_syncing = (Scheme_Object *)syncing; r_sinfo.w_i = i;