From adedf861b233a49bb011192e84832c7f77e9939d Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Sat, 10 Jun 2017 12:36:02 -0600 Subject: [PATCH] rktio: finish epoll/kqueue layer --- racket/src/rktio/configure | 60 ++++++++ racket/src/rktio/configure.ac | 60 ++++++++ racket/src/rktio/main.c | 160 +++++++++++++++++++++- racket/src/rktio/rktio.h | 19 ++- racket/src/rktio/rktio_ltps.c | 203 ++++++++++++++++++++++++---- racket/src/rktio/rktio_poll_set.c | 41 +++--- racket/src/rktio/rktio_private.h | 14 +- racket/src/rktio/rktio_read_write.c | 15 +- 8 files changed, 519 insertions(+), 53 deletions(-) diff --git a/racket/src/rktio/configure b/racket/src/rktio/configure index e4231d8330..06944221a0 100755 --- a/racket/src/rktio/configure +++ b/racket/src/rktio/configure @@ -3064,6 +3064,66 @@ fi ############## platform tests ################ case "$host_os" in + solaris2*) + try_poll_syscall="no" # poll() has performance problems on Solaris? + use_flag_pthread="no" + use_flag_posix_pthread="yes" + ;; + aix*) + ;; + *freebsd*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + openbsd*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + bitrig*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + dragonfly*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + netbsd*) + try_kqueue_syscall=yes + ;; + irix*) + ;; + linux*) + try_poll_syscall=yes + try_epoll_syscall=yes + try_inotify_syscall=yes + ;; + osf1*) + ;; + hpux*) + ;; + *mingw*) + use_flag_pthread=no + skip_iconv_check=yes + ;; + cygwin*) + ;; + beos*) + ;; + darwin*) + PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT" + try_kqueue_syscall=yes + + # Although select() generally works as well as poll() on OS X, + # getdtablesize() appears not to be constant within a process, + # and that breaks static allocation of fd_sets + try_poll_syscall=yes + + # -pthread is not needed and triggers a warning + use_flag_pthread=no + ;; + nto-qnx*) + use_flag_pthread=no + ;; *) ;; esac diff --git a/racket/src/rktio/configure.ac b/racket/src/rktio/configure.ac index bbfb58e625..bbef567f87 100644 --- a/racket/src/rktio/configure.ac +++ b/racket/src/rktio/configure.ac @@ -19,6 +19,66 @@ AC_CHECK_LIB(dl, dlopen) ############## platform tests ################ case "$host_os" in + solaris2*) + try_poll_syscall="no" # poll() has performance problems on Solaris? + use_flag_pthread="no" + use_flag_posix_pthread="yes" + ;; + aix*) + ;; + *freebsd*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + openbsd*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + bitrig*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + dragonfly*) + enable_pthread_by_default=yes + try_kqueue_syscall=yes + ;; + netbsd*) + try_kqueue_syscall=yes + ;; + irix*) + ;; + linux*) + try_poll_syscall=yes + try_epoll_syscall=yes + try_inotify_syscall=yes + ;; + osf1*) + ;; + hpux*) + ;; + *mingw*) + use_flag_pthread=no + skip_iconv_check=yes + ;; + cygwin*) + ;; + beos*) + ;; + darwin*) + PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT" + try_kqueue_syscall=yes + + # Although select() generally works as well as poll() on OS X, + # getdtablesize() appears not to be constant within a process, + # and that breaks static allocation of fd_sets + try_poll_syscall=yes + + # -pthread is not needed and triggers a warning + use_flag_pthread=no + ;; + nto-qnx*) + use_flag_pthread=no + ;; *) ;; esac diff --git a/racket/src/rktio/main.c b/racket/src/rktio/main.c index b834e6bf78..0076d916c3 100644 --- a/racket/src/rktio/main.c +++ b/racket/src/rktio/main.c @@ -45,6 +45,124 @@ static void do_check_expected_racket_error(rktio_t *rktio, int err, int what, in #define check_expected_error(e) do_check_expected_error(rktio, e, __LINE__) #define check_expected_racket_error(e, what) do_check_expected_racket_error(rktio, e, what, __LINE__) +static rktio_ltps_t *try_check_ltps(rktio_t *rktio, + rktio_fd_t *fd, /* read mode */ + rktio_fd_t *fd2, /* write mode */ + rktio_ltps_handle_t **_h1, + rktio_ltps_handle_t **_h2) +{ + rktio_ltps_t *lt; + rktio_ltps_handle_t *h1, *h2, *hx, *hy; + + lt = rktio_open_ltps(rktio); + + /* Add read handle for fd1 */ + h1 = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CHECK_READ); + if (!h1 + && (rktio_get_last_error_kind(rktio) == RKTIO_ERROR_KIND_RACKET) + && (rktio_get_last_error(rktio) == RKTIO_ERROR_UNSUPPORTED)) { + check_valid(rktio_ltps_close(rktio, lt)); + return NULL; + } + check_expected_racket_error(!h1, RKTIO_ERROR_LTPS_NOT_FOUND); + h1 = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CREATE_READ); + check_valid(!!h1); + hx = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CREATE_READ); + check_valid(hx == h1); + hx = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CHECK_WRITE); + check_expected_racket_error(!hx, RKTIO_ERROR_LTPS_NOT_FOUND); + + /* Add write handle for fd2 */ + h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CHECK_READ); + check_expected_racket_error(!h2, RKTIO_ERROR_LTPS_NOT_FOUND); + h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CHECK_WRITE); + check_expected_racket_error(!h2, RKTIO_ERROR_LTPS_NOT_FOUND); + h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_WRITE); + check_valid(!!h2); + hx = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_READ); + check_valid(!!hx); + + /* Removing `fd2` should signal the handles `h2` and `hx` */ + hy = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_REMOVE); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_REMOVED); + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_valid((hy == h2) || (hy == hx)); + free(hy); + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_valid((hy == h2) || (hy == hx)); + free(hy); + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND); + /* Add write handle for fd2 again: */ + h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_WRITE); + check_valid(!!h2); + + *_h1 = h1; + *_h2 = h2; + + return lt; +} + +void check_ltps_write_ready(rktio_t *rktio, rktio_ltps_t *lt, rktio_ltps_handle_t *h2) +{ + rktio_ltps_handle_t *hy; + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND); + + check_valid(rktio_ltps_poll(rktio, lt)); + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_valid(hy == h2); + rktio_free(hy); + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND); +} + +void check_ltps_read_ready(rktio_t *rktio, rktio_ltps_t *lt, rktio_ltps_handle_t *h1) +{ + rktio_ltps_handle_t *hy; + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND); + + check_valid(rktio_ltps_poll(rktio, lt)); + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_valid(hy == h1); + rktio_free(hy); + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND); +} + +void check_ltps_read_and_write_ready(rktio_t *rktio, rktio_ltps_t *lt, rktio_ltps_handle_t *h1, rktio_ltps_handle_t *h2) +{ + rktio_ltps_handle_t *hy; + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND); + + check_valid(rktio_ltps_poll(rktio, lt)); + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + if (hy == h1) { + rktio_free(hy); + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_valid(hy == h2); + } else { + check_valid(hy == h2); + rktio_free(hy); + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_valid(hy == h1); + } + rktio_free(hy); + + hy = rktio_ltps_get_signaled_handle(rktio, lt); + check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND); +} + static void check_hello_content(rktio_t *rktio, char *fn) { rktio_fd_t *fd; @@ -73,7 +191,9 @@ int main() rktio_directory_list_t *ls; rktio_file_copy_t *cp; rktio_timestamp_t *ts1, *ts1a; - + rktio_ltps_t *lt; + rktio_ltps_handle_t *h1, *h2; + rktio = rktio_init(); /* Basic file I/O */ @@ -191,8 +311,22 @@ int main() } check_valid(saw_file); - /* Pipes and non-blocking operations */ - + /* We expect `lt` to work on regular files everywhere except Windows: */ +#if !defined(RKTIO_SYSTEM_WINDOWS) && !defined(HAVE_KQUEUE_SYSCALL) + fd = rktio_open(rktio, "test1", RKTIO_OPEN_READ); + check_valid(!!fd); + fd2 = rktio_open(rktio, "test1", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST); + check_valid(!!fd2); + lt = try_check_ltps(rktio, fd, fd2, &h1, &h2); + check_valid(!!lt); + check_ltps_read_and_write_ready(rktio, lt, h1, h2); + check_valid(rktio_ltps_close(rktio, lt)); + check_valid(rktio_close(rktio, fd)); + check_valid(rktio_close(rktio, fd2)); +#endif + + /* Pipes, non-blocking operations, and more long-term poll sets */ + fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ); check_valid(!!fd); check_valid(!rktio_poll_read_ready(rktio, fd)); @@ -200,19 +334,39 @@ int main() check_valid(!!fd2); check_valid(!rktio_poll_read_ready(rktio, fd)); + lt = try_check_ltps(rktio, fd, fd2, &h1, &h2); + /* We expect `lt` to work everywhere exception Windows and with kqueue: */ +#if !defined(RKTIO_SYSTEM_WINDOWS) && !defined(HAVE_KQUEUE_SYSCALL) + check_valid(!!lt); +#endif + + /* fd2 can write, fd cannot yet read */ + check_valid(!rktio_poll_read_ready(rktio, fd)); + if (lt) + check_ltps_write_ready(rktio, lt, h2); + + /* Round-trip data through pipe: */ amt = rktio_write(rktio, fd2, "hello", 5); check_valid(amt == 5); + check_valid(rktio_poll_read_ready(rktio, fd)); + if (lt) { + check_ltps_read_ready(rktio, lt, h1); + check_valid(rktio_ltps_close(rktio, lt)); + } + amt = rktio_read(rktio, fd, buffer, sizeof(buffer)); check_valid(amt == 5); check_valid(!strncmp(buffer, "hello", 5)); check_valid(!rktio_poll_read_ready(rktio, fd)); + /* Close pipe ends: */ check_valid(rktio_close(rktio, fd2)); amt = rktio_read(rktio, fd, buffer, sizeof(buffer)); check_valid(amt == RKTIO_READ_EOF); check_valid(rktio_close(rktio, fd)); + /* Open pipe ends again: */ fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST); check_valid(!!fd2); /* should eventually block: */ diff --git a/racket/src/rktio/rktio.h b/racket/src/rktio/rktio.h index a8b5ca203e..d03f8a2ad8 100644 --- a/racket/src/rktio/rktio.h +++ b/racket/src/rktio/rktio.h @@ -31,6 +31,9 @@ typedef struct rktio_fd_t rktio_fd_t; rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes); intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd); + rktio_fd_t *rktio_open(rktio_t *rktio, char *src, int modes); int rktio_close(rktio_t *rktio, rktio_fd_t *fd); @@ -63,6 +66,13 @@ void rktio_poll_set_add_eventmask(rktio_poll_set_t *fds, int mask); void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds); #endif +/*************************************************/ +/* Long-term poll sets */ + +/* "Long-term" means that the poll set will be used frequently with + incremental updates, which means that it's worthwhile to use an OS + facililty (epoll, kqueue, etc.) to speed up polling. */ + typedef struct rktio_ltps_t rktio_ltps_t; typedef struct rktio_ltps_handle_t rktio_ltps_handle_t; @@ -79,14 +89,14 @@ enum { rktio_ltps_t *rktio_open_ltps(rktio_t *rktio); int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt); + rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t *rfd, int mode); - -int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt); -int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt); - void rktio_ltps_handle_set_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s, void *data); void *rktio_ltps_handle_get_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s); +int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt); +rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t *lt); + void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt); /*************************************************/ @@ -213,6 +223,7 @@ enum { RKTIO_ERROR_UNKNOWN_USER, RKTIO_ERROR_INIT_FAILED, RKTIO_ERROR_LTPS_NOT_FOUND, + RKTIO_ERROR_LTPS_REMOVED, /* indicates success, instead of failure */ }; int rktio_get_last_error(rktio_t *rktio); diff --git a/racket/src/rktio/rktio_ltps.c b/racket/src/rktio/rktio_ltps.c index 3f839cf62d..36b1c0d4b1 100644 --- a/racket/src/rktio/rktio_ltps.c +++ b/racket/src/rktio/rktio_ltps.c @@ -3,6 +3,18 @@ #ifdef RKTIO_SYSTEM_UNIX # include #endif +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) +# include +#endif +#ifdef HAVE_KQUEUE_SYSCALL +# include +# include +# include +# include +#endif +#ifdef HAVE_POLL_SYSCALL +# include +#endif #include struct rktio_ltps_t { @@ -11,7 +23,11 @@ struct rktio_ltps_t { #else rktio_poll_set_t *fd_set; #endif + /* List of pending signaled handles: */ struct rktio_ltps_handle_t *signaled; + /* Hash table mapping fds to handles */ + struct ltps_bucket_t *buckets; + intptr_t size, count; }; struct rktio_ltps_handle_t { @@ -24,9 +40,12 @@ typedef struct rktio_ltps_handle_pair_t { rktio_ltps_handle_t *write_handle; } rktio_ltps_handle_pair_t; -static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd); -static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v); +static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, intptr_t fd); +static void ltps_hash_set(rktio_ltps_t *lt, intptr_t fd, rktio_ltps_handle_pair_t *v); +static void ltps_hash_remove(rktio_ltps_t *lt, intptr_t fd); static int ltps_is_hash_empty(rktio_ltps_t *lt); +static void ltps_hash_init(rktio_ltps_t *lt); +static void ltps_hash_free(rktio_ltps_t *lt); /************************************************************/ @@ -78,6 +97,8 @@ rktio_ltps_t *rktio_open_ltps(rktio_t *rktio) lt->signaled = NULL; + ltps_hash_init(lt); + return lt; } @@ -96,6 +117,13 @@ rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt) int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt) { + rktio_ltps_handle_t *s; + + while ((s = rktio_ltps_get_signaled_handle(rktio, lt))) + free(s); + + ltps_hash_free(lt); + #if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) if (lt->fd >= 0) { intptr_t rc; @@ -107,6 +135,7 @@ int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt) #else rktio_free_fdset_array(lt->fd_set, 3); #endif + return 1; } @@ -116,6 +145,8 @@ rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t s = lt->signaled; if (s) lt->signaled = s->next; + else + set_racket_error(RKTIO_ERROR_LTPS_NOT_FOUND); return s; } @@ -123,7 +154,7 @@ rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t static void log_kqueue_error(const char *action, int kr) { if (kr < 0) { - fprintf(stderr, "%d error at %s: %E", + fprintf(stderr, "%s error at %s: %d", #ifdef HAVE_KQUEUE_SYSCALL "kqueue", #else @@ -149,15 +180,15 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t # endif rktio_ltps_handle_pair_t *v; rktio_ltps_handle_t *s; - int fd = rktio_fd_system_fd(rktio, rfd); + intptr_t fd = rktio_fd_system_fd(rktio, rfd); # ifdef HAVE_KQUEUE_SYSCALL - if (!is_socket) { + if (!rktio_fd_is_socket(rktio, rfd)) { /* kqueue() might not work on devices, such as ttys; also, while Mac OS X kqueue() claims to work on FIFOs, there are problems: watching for reads on a FIFO sometimes disables watching for writes on the same FIFO with a different file descriptor */ - if (!rktio_fd_regular_file(rktio, rfd, 1)) { + if (!rktio_fd_is_regular_file(rktio, rfd)) { set_racket_error(RKTIO_ERROR_UNSUPPORTED); return NULL; } @@ -165,7 +196,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t if (lt->fd < 0) { lt->fd = kqueue(); if (lt->fd < 0) { - set_posix_error(); + get_posix_error(); log_kqueue_error("create", lt->fd); return NULL; } @@ -175,7 +206,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t if (lt->fd < 0) { lt->fd = epoll_create(5); if (lt->fd < 0) { - set_posix_error(); + get_posix_error(); log_kqueue_error("create", lt->fd); return NULL; } @@ -208,7 +239,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t if (s) ltps_signal_handle(lt, s); s = v->write_handle; if (s) ltps_signal_handle(lt, s); - ltps_hash_set(lt, fd, NULL); + ltps_hash_remove(lt, fd); free(v); s = NULL; # ifdef HAVE_KQUEUE_SYSCALL @@ -220,11 +251,11 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t EV_SET(&kev[pos], fd, EVFILT_VNODE, EV_DELETE, 0, 0, NULL); pos++; } else { - if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[0])) { + if (v->read_handle) { EV_SET(&kev[pos], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); pos++; } - if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[1])) { + if (v->write_handle) { EV_SET(&kev[pos], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); pos++; } @@ -245,6 +276,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t RKTIO_FD_CLR(fd, w); RKTIO_FD_CLR(fd, e); # endif + set_racket_error(RKTIO_ERROR_LTPS_REMOVED); /* success, not failure */ } else if ((mode == RKTIO_LTPS_CHECK_READ) || (mode == RKTIO_LTPS_CREATE_READ) || (mode == RKTIO_LTPS_CHECK_VNODE) @@ -257,7 +289,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t v->read_handle = s; # ifdef HAVE_KQUEUE_SYSCALL { - GC_CAN_IGNORE struct kevent kev; + struct kevent kev; struct timespec timeout = {0, 0}; int kr; if (mode == RKTIO_LTPS_CREATE_READ) @@ -274,7 +306,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t } # elif defined(HAVE_EPOLL_SYSCALL) { - GC_CAN_IGNORE struct epoll_event ev; + struct epoll_event ev; int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[1]), kr; memset(&ev, 0, sizeof(ev)); ev.data.fd = fd; @@ -299,7 +331,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t v->write_handle = s; # ifdef HAVE_KQUEUE_SYSCALL { - GC_CAN_IGNORE struct kevent kev; + struct kevent kev; struct timespec timeout = {0, 0}; int kr; EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL); @@ -311,7 +343,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t # elif defined(HAVE_EPOLL_SYSCALL) { - GC_CAN_IGNORE struct epoll_event ev; + struct epoll_event ev; int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[0]), kr; memset(&ev, 0, sizeof(ev)); ev.data.fd = fd; @@ -341,7 +373,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) rktio_ltps_handle_t *s; rktio_ltps_handle_pair_t *v; int key; - GC_CAN_IGNORE struct kevent kev; + struct kevent kev; struct timespec timeout = {0, 0}; int kr, hit = 0; @@ -374,7 +406,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) } } if (!v->read_handle && !v->write_handle) { - ltps_hash_set(lt, key, NULL); + ltps_hash_remove(lt, key); free(v); } } else { @@ -390,7 +422,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) rktio_ltps_handle_pair_t *v; int key; int kr, hit = 0; - GC_CAN_IGNORE struct epoll_event ev; + struct epoll_event ev; memset(&ev, 0, sizeof(ev)); if (lt->fd < 0) @@ -424,7 +456,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) } } if (!v->read_handle && !v->write_handle) { - ltps_hash_set(lt, key, NULL); + ltps_hash_remove(lt, key); free(v); kr = epoll_ctl(lt->fd, EPOLL_CTL_DEL, ev.data.fd, NULL); log_kqueue_error("remove*", kr); @@ -454,7 +486,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) return 0; rktio_clean_fd_set(lt->fd_set); - c = rkt_io_poll_count(lt->fd_set); + c = rktio_get_poll_count(lt->fd_set); pfd = rktio_get_poll_fd_array(lt->fd_set); do { @@ -486,7 +518,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) pfd[i].events -= (pfd[i].events & POLLOUT); } if (!v->read_handle && !v->write_handle) { - rktio_hash_set(rktio_semaphore_fd_mapping, key, NULL); + ltps_hash_remove(lt, key); free(v); } } @@ -501,7 +533,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) int i, actual_limit, r, w, e, sr, hit = 0; rktio_ltps_handle_t *s; rktio_ltps_handle_pair_t *v; - int key; + intptr_t key; DECL_FDSET(set, 3); rktio_poll_set_t *set1, *set2; @@ -555,7 +587,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) } if (!v->read_handle && !v->write_handle) { RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 2)); - ltps_hash_set(lt, key, NULL); + ltps_hash_remove(lt, key); free(v); } } @@ -569,16 +601,133 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) /************************************************************/ -static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd) +typedef struct ltps_bucket_t { + /* v is non-NULL => bucket is filled */ + /* v is NULL and fd is -1 => was removed */ + intptr_t fd; + rktio_ltps_handle_pair_t *v; +} ltps_bucket_t; + +static void ltps_rehash(rktio_ltps_t *lt, intptr_t new_size) { - return NULL; + if (new_size >= 16) { + ltps_bucket_t *old_buckets = lt->buckets; + intptr_t old_size = lt->size, i; + + lt->size = new_size; + lt->buckets = calloc(new_size, sizeof(ltps_bucket_t)); + lt->count = 0; + + for (i = old_size; --i; ) { + if (lt->buckets[i].v) + ltps_hash_set(lt, lt->buckets[i].fd, lt->buckets[i].v); + } + + free(old_buckets); + } } -static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v) +static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, intptr_t fd) { + if (lt->buckets) { + intptr_t mask = (lt->size - 1); + intptr_t hc = fd & mask; + intptr_t d = ((fd >> 3) & mask) | 0x1; + + while (1) { + if (lt->buckets[hc].fd == fd) + return lt->buckets[hc].v; + else if (lt->buckets[hc].v + || (lt->buckets[hc].fd == -1)) { + /* keep looking */ + hc = (hc + d) & mask; + } else + return NULL; + } + } else + return NULL; +} + +static void ltps_hash_remove(rktio_ltps_t *lt, intptr_t fd) +{ + if (lt->buckets) { + intptr_t mask = (lt->size - 1); + intptr_t hc = fd & mask; + intptr_t d = ((fd >> 3) & mask) | 0x1; + + while (1) { + if (lt->buckets[hc].fd == fd) { + lt->buckets[hc].fd = -1; + lt->buckets[hc].v = NULL; + --lt->count; + if (4 * lt->count <= lt->size) + ltps_rehash(lt, lt->size >> 1); + } else if (lt->buckets[hc].v + || (lt->buckets[hc].fd == -1)) { + /* keep looking */ + hc = (hc + d) & mask; + } else + break; + } + } +} + +static void ltps_hash_set(rktio_ltps_t *lt, intptr_t fd, rktio_ltps_handle_pair_t *v) +{ + if (!lt->buckets) { + lt->size = 16; + lt->buckets = calloc(lt->size, sizeof(ltps_bucket_t)); + } + + { + intptr_t mask = (lt->size - 1); + intptr_t hc = fd & mask; + intptr_t d = ((fd >> 3) & mask) | 0x1; + + while (1) { + if (lt->buckets[hc].v) { + if (lt->buckets[hc].fd == -1) { + /* use bucket whos content ws previouslt removed */ + break; + } else { + /* keep looking for a spot */ + hc = (hc + d) & mask; + } + } else + break; + } + + lt->buckets[hc].fd = fd; + lt->buckets[hc].v = v; + lt->count++; + + if (2 * lt->count >= lt->size) + ltps_rehash(lt, lt->size << 1); + } } static int ltps_is_hash_empty(rktio_ltps_t *lt) { - return 1; + return (lt->count == 0); +} + +static void ltps_hash_init(rktio_ltps_t *lt) +{ + lt->buckets = NULL; + lt->size = 0; + lt->count = 0; +} + +static void ltps_hash_free(rktio_ltps_t *lt) +{ + if (lt->buckets) { + intptr_t i; + + for (i = lt->size; --i; ) { + if (lt->buckets[i].v) + free(lt->buckets[i].v); + } + + free(lt->buckets); + } } diff --git a/racket/src/rktio/rktio_poll_set.c b/racket/src/rktio/rktio_poll_set.c index 7d62c070b3..2044775496 100644 --- a/racket/src/rktio/rktio_poll_set.c +++ b/racket/src/rktio/rktio_poll_set.c @@ -7,6 +7,10 @@ # include # include #endif +#ifdef HAVE_POLL_SYSCALL +# include +#endif +#include #include /* Generalize fd arrays (FD_SET, etc) with a runtime-determined size, @@ -66,17 +70,22 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count) pfd = malloc(sizeof(struct pollfd) * (32 + PFD_EXTRA_SPACE)); data->pfd = pfd; - return data; + return r; } void rktio_free_fdset_array(rktio_poll_set_t *fds, int count) { - FIXME; + struct rktio_fd_set_data_t *data = fds->data; + free(fds->w); + free(fds->e); + free(fds); + free(data->pfd); + free(data); } rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count) { - ((struct rktio_fd_set *)fdarray)->data->count = 0; + fdarray->data->count = 0; return fdarray; } @@ -163,7 +172,7 @@ void rktio_fdset(rktio_poll_set_t *fd, int n) int rktio_fdisset(rktio_poll_set_t *fd, int n) { - struct rktio_fd_set_data_t *data = data->data; + struct rktio_fd_set_data_t *data = fd->data; intptr_t flag = fd->flags; intptr_t pos; @@ -244,8 +253,6 @@ void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) free(pfds); } data->count = j; - - return fds; } void rktio_clean_fd_set(rktio_poll_set_t *fds) @@ -278,7 +285,7 @@ int rktio_get_poll_count(rktio_poll_set_t *fds) return fds->data->count; } -int rktio_get_poll_fd_array(rktio_poll_set_t *fds) +struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds) { return fds->data->pfd; } @@ -881,7 +888,7 @@ void rktio_wait_until_signal_received(rktio_t *rktio) #ifdef RKTIO_SYSTEM_UNIX int r; # ifdef HAVE_POLL_SYSCALL - GC_CAN_IGNORE struct pollfd pfd[1]; + struct pollfd pfd[1]; pfd[0].fd = rktio->external_event_fd; pfd[0].events = POLLIN; do { @@ -969,9 +976,9 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_ if (timeout < 0) timeout = 0; } - if (external_event_fd) { - GC_CAN_IGNORE struct pollfd pfd[1]; - pfd[0].fd = external_event_fd; + if (rktio->external_event_fd) { + struct pollfd pfd[1]; + pfd[0].fd = rktio->external_event_fd; pfd[0].events = POLLIN; poll(pfd, 1, timeout); } else { @@ -1023,22 +1030,22 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_ /******* poll() variant *******/ { - struct mz_fd_set_data_t *data = fds->data; + struct rktio_fd_set_data_t *data = fds->data; intptr_t count = data->count; int timeout; - if (v <= 0.0) + if (nsecs <= 0.0) timeout = -1; - else if (v > 100000) + else if (nsecs > 100000) timeout = 100000000; else { - timeout = (int)(v * 1000.0); + timeout = (int)(nsecs * 1000.0); if (timeout < 0) timeout = 0; } - if (external_event_fd) { - data->pfd[count].fd = external_event_fd; + if (rktio->external_event_fd) { + data->pfd[count].fd = rktio->external_event_fd; data->pfd[count].events = POLLIN; count++; } diff --git a/racket/src/rktio/rktio_private.h b/racket/src/rktio/rktio_private.h index 60d8dd967b..4514ddbfb2 100644 --- a/racket/src/rktio/rktio_private.h +++ b/racket/src/rktio/rktio_private.h @@ -39,7 +39,7 @@ struct rktio_t { #endif #ifdef USE_FAR_RKTIO_FDCALLS /* A single fdset that can be reused for immediate actions: */ - struct rktio_poll_set_t *rktio_global_fd_set; + struct rktio_poll_set_t *rktio_global_poll_set; #endif }; @@ -53,6 +53,13 @@ void rktio_alloc_global_poll_set(rktio_t *rktio); int rktio_initialize_signal(rktio_t *rktio); #ifdef USE_FAR_RKTIO_FDCALLS + +rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos); +void rktio_fdzero(rktio_poll_set_t *fd); +void rktio_fdset(rktio_poll_set_t *fd, int n); +void rktio_fdclr(rktio_poll_set_t *fd, int n); +int rktio_fdisset(rktio_poll_set_t *fd, int n); + # define DECL_FDSET(n, c) fd_set *n # define INIT_DECL_FDSET(r, w, e) { \ r = RKTIO_GET_FDSET(rktio->rktio_global_poll_set, 0 ); \ @@ -107,6 +114,11 @@ int rktio_ltps_get_fd(rktio_ltps_t *lt); rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt); #endif +#if defined(HAVE_POLL_SYSCALL) +int rktio_get_poll_count(rktio_poll_set_t *fds); +struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds); +#endif + /************************************************************/ /* Misc */ /************************************************************/ diff --git a/racket/src/rktio/rktio_read_write.c b/racket/src/rktio/rktio_read_write.c index 6f30838923..31feaa7f99 100644 --- a/racket/src/rktio/rktio_read_write.c +++ b/racket/src/rktio/rktio_read_write.c @@ -11,6 +11,9 @@ #ifdef RKTIO_SYSTEM_WINDOWS # include #endif +#ifdef HAVE_POLL_SYSCALL +# include +#endif #ifndef RKTIO_BINARY # define RKTIO_BINARY 0 @@ -127,6 +130,16 @@ intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd) return rfd->fd; } +int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd) +{ + return rfd->regfile; +} + +int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd) +{ + return 0; +} + /*************************************************************/ /* opening a file fd */ /*************************************************************/ @@ -575,7 +588,7 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush else { int sr; # ifdef HAVE_POLL_SYSCALL - GC_CAN_IGNORE struct pollfd pfd[1]; + struct pollfd pfd[1]; pfd[0].fd = rfd->fd; pfd[0].events = POLLOUT; do {