rktio: finish epoll/kqueue layer

This commit is contained in:
Matthew Flatt 2017-06-10 12:36:02 -06:00
parent 04130638fc
commit adedf861b2
8 changed files with 519 additions and 53 deletions

View File

@ -3064,6 +3064,66 @@ fi
############## platform tests ################
case "$host_os" in
solaris2*)
try_poll_syscall="no" # poll() has performance problems on Solaris?
use_flag_pthread="no"
use_flag_posix_pthread="yes"
;;
aix*)
;;
*freebsd*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
openbsd*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
bitrig*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
dragonfly*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
netbsd*)
try_kqueue_syscall=yes
;;
irix*)
;;
linux*)
try_poll_syscall=yes
try_epoll_syscall=yes
try_inotify_syscall=yes
;;
osf1*)
;;
hpux*)
;;
*mingw*)
use_flag_pthread=no
skip_iconv_check=yes
;;
cygwin*)
;;
beos*)
;;
darwin*)
PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT"
try_kqueue_syscall=yes
# Although select() generally works as well as poll() on OS X,
# getdtablesize() appears not to be constant within a process,
# and that breaks static allocation of fd_sets
try_poll_syscall=yes
# -pthread is not needed and triggers a warning
use_flag_pthread=no
;;
nto-qnx*)
use_flag_pthread=no
;;
*)
;;
esac

View File

@ -19,6 +19,66 @@ AC_CHECK_LIB(dl, dlopen)
############## platform tests ################
case "$host_os" in
solaris2*)
try_poll_syscall="no" # poll() has performance problems on Solaris?
use_flag_pthread="no"
use_flag_posix_pthread="yes"
;;
aix*)
;;
*freebsd*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
openbsd*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
bitrig*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
dragonfly*)
enable_pthread_by_default=yes
try_kqueue_syscall=yes
;;
netbsd*)
try_kqueue_syscall=yes
;;
irix*)
;;
linux*)
try_poll_syscall=yes
try_epoll_syscall=yes
try_inotify_syscall=yes
;;
osf1*)
;;
hpux*)
;;
*mingw*)
use_flag_pthread=no
skip_iconv_check=yes
;;
cygwin*)
;;
beos*)
;;
darwin*)
PREFLAGS="$PREFLAGS -DOS_X -D_DARWIN_UNLIMITED_SELECT"
try_kqueue_syscall=yes
# Although select() generally works as well as poll() on OS X,
# getdtablesize() appears not to be constant within a process,
# and that breaks static allocation of fd_sets
try_poll_syscall=yes
# -pthread is not needed and triggers a warning
use_flag_pthread=no
;;
nto-qnx*)
use_flag_pthread=no
;;
*)
;;
esac

View File

@ -45,6 +45,124 @@ static void do_check_expected_racket_error(rktio_t *rktio, int err, int what, in
#define check_expected_error(e) do_check_expected_error(rktio, e, __LINE__)
#define check_expected_racket_error(e, what) do_check_expected_racket_error(rktio, e, what, __LINE__)
static rktio_ltps_t *try_check_ltps(rktio_t *rktio,
rktio_fd_t *fd, /* read mode */
rktio_fd_t *fd2, /* write mode */
rktio_ltps_handle_t **_h1,
rktio_ltps_handle_t **_h2)
{
rktio_ltps_t *lt;
rktio_ltps_handle_t *h1, *h2, *hx, *hy;
lt = rktio_open_ltps(rktio);
/* Add read handle for fd1 */
h1 = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CHECK_READ);
if (!h1
&& (rktio_get_last_error_kind(rktio) == RKTIO_ERROR_KIND_RACKET)
&& (rktio_get_last_error(rktio) == RKTIO_ERROR_UNSUPPORTED)) {
check_valid(rktio_ltps_close(rktio, lt));
return NULL;
}
check_expected_racket_error(!h1, RKTIO_ERROR_LTPS_NOT_FOUND);
h1 = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CREATE_READ);
check_valid(!!h1);
hx = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CREATE_READ);
check_valid(hx == h1);
hx = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CHECK_WRITE);
check_expected_racket_error(!hx, RKTIO_ERROR_LTPS_NOT_FOUND);
/* Add write handle for fd2 */
h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CHECK_READ);
check_expected_racket_error(!h2, RKTIO_ERROR_LTPS_NOT_FOUND);
h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CHECK_WRITE);
check_expected_racket_error(!h2, RKTIO_ERROR_LTPS_NOT_FOUND);
h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_WRITE);
check_valid(!!h2);
hx = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_READ);
check_valid(!!hx);
/* Removing `fd2` should signal the handles `h2` and `hx` */
hy = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_REMOVE);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_REMOVED);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_valid((hy == h2) || (hy == hx));
free(hy);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_valid((hy == h2) || (hy == hx));
free(hy);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
/* Add write handle for fd2 again: */
h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_WRITE);
check_valid(!!h2);
*_h1 = h1;
*_h2 = h2;
return lt;
}
void check_ltps_write_ready(rktio_t *rktio, rktio_ltps_t *lt, rktio_ltps_handle_t *h2)
{
rktio_ltps_handle_t *hy;
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
check_valid(rktio_ltps_poll(rktio, lt));
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_valid(hy == h2);
rktio_free(hy);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
}
void check_ltps_read_ready(rktio_t *rktio, rktio_ltps_t *lt, rktio_ltps_handle_t *h1)
{
rktio_ltps_handle_t *hy;
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
check_valid(rktio_ltps_poll(rktio, lt));
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_valid(hy == h1);
rktio_free(hy);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
}
void check_ltps_read_and_write_ready(rktio_t *rktio, rktio_ltps_t *lt, rktio_ltps_handle_t *h1, rktio_ltps_handle_t *h2)
{
rktio_ltps_handle_t *hy;
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
check_valid(rktio_ltps_poll(rktio, lt));
hy = rktio_ltps_get_signaled_handle(rktio, lt);
if (hy == h1) {
rktio_free(hy);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_valid(hy == h2);
} else {
check_valid(hy == h2);
rktio_free(hy);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_valid(hy == h1);
}
rktio_free(hy);
hy = rktio_ltps_get_signaled_handle(rktio, lt);
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
}
static void check_hello_content(rktio_t *rktio, char *fn)
{
rktio_fd_t *fd;
@ -73,6 +191,8 @@ int main()
rktio_directory_list_t *ls;
rktio_file_copy_t *cp;
rktio_timestamp_t *ts1, *ts1a;
rktio_ltps_t *lt;
rktio_ltps_handle_t *h1, *h2;
rktio = rktio_init();
@ -191,7 +311,21 @@ int main()
}
check_valid(saw_file);
/* Pipes and non-blocking operations */
/* We expect `lt` to work on regular files everywhere except Windows: */
#if !defined(RKTIO_SYSTEM_WINDOWS) && !defined(HAVE_KQUEUE_SYSCALL)
fd = rktio_open(rktio, "test1", RKTIO_OPEN_READ);
check_valid(!!fd);
fd2 = rktio_open(rktio, "test1", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(!!fd2);
lt = try_check_ltps(rktio, fd, fd2, &h1, &h2);
check_valid(!!lt);
check_ltps_read_and_write_ready(rktio, lt, h1, h2);
check_valid(rktio_ltps_close(rktio, lt));
check_valid(rktio_close(rktio, fd));
check_valid(rktio_close(rktio, fd2));
#endif
/* Pipes, non-blocking operations, and more long-term poll sets */
fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ);
check_valid(!!fd);
@ -200,19 +334,39 @@ int main()
check_valid(!!fd2);
check_valid(!rktio_poll_read_ready(rktio, fd));
lt = try_check_ltps(rktio, fd, fd2, &h1, &h2);
/* We expect `lt` to work everywhere exception Windows and with kqueue: */
#if !defined(RKTIO_SYSTEM_WINDOWS) && !defined(HAVE_KQUEUE_SYSCALL)
check_valid(!!lt);
#endif
/* fd2 can write, fd cannot yet read */
check_valid(!rktio_poll_read_ready(rktio, fd));
if (lt)
check_ltps_write_ready(rktio, lt, h2);
/* Round-trip data through pipe: */
amt = rktio_write(rktio, fd2, "hello", 5);
check_valid(amt == 5);
check_valid(rktio_poll_read_ready(rktio, fd));
if (lt) {
check_ltps_read_ready(rktio, lt, h1);
check_valid(rktio_ltps_close(rktio, lt));
}
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == 5);
check_valid(!strncmp(buffer, "hello", 5));
check_valid(!rktio_poll_read_ready(rktio, fd));
/* Close pipe ends: */
check_valid(rktio_close(rktio, fd2));
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == RKTIO_READ_EOF);
check_valid(rktio_close(rktio, fd));
/* Open pipe ends again: */
fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(!!fd2);
/* should eventually block: */

View File

@ -31,6 +31,9 @@ typedef struct rktio_fd_t rktio_fd_t;
rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes);
intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd);
rktio_fd_t *rktio_open(rktio_t *rktio, char *src, int modes);
int rktio_close(rktio_t *rktio, rktio_fd_t *fd);
@ -63,6 +66,13 @@ void rktio_poll_set_add_eventmask(rktio_poll_set_t *fds, int mask);
void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds);
#endif
/*************************************************/
/* Long-term poll sets */
/* "Long-term" means that the poll set will be used frequently with
incremental updates, which means that it's worthwhile to use an OS
facililty (epoll, kqueue, etc.) to speed up polling. */
typedef struct rktio_ltps_t rktio_ltps_t;
typedef struct rktio_ltps_handle_t rktio_ltps_handle_t;
@ -79,14 +89,14 @@ enum {
rktio_ltps_t *rktio_open_ltps(rktio_t *rktio);
int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt);
rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t *rfd, int mode);
int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt);
int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt);
void rktio_ltps_handle_set_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s, void *data);
void *rktio_ltps_handle_get_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s);
int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt);
rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t *lt);
void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt);
/*************************************************/
@ -213,6 +223,7 @@ enum {
RKTIO_ERROR_UNKNOWN_USER,
RKTIO_ERROR_INIT_FAILED,
RKTIO_ERROR_LTPS_NOT_FOUND,
RKTIO_ERROR_LTPS_REMOVED, /* indicates success, instead of failure */
};
int rktio_get_last_error(rktio_t *rktio);

View File

@ -3,6 +3,18 @@
#ifdef RKTIO_SYSTEM_UNIX
# include <errno.h>
#endif
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
# include <stdio.h>
#endif
#ifdef HAVE_KQUEUE_SYSCALL
# include <unistd.h>
# include <sys/types.h>
# include <sys/event.h>
# include <sys/time.h>
#endif
#ifdef HAVE_POLL_SYSCALL
# include <poll.h>
#endif
#include <stdlib.h>
struct rktio_ltps_t {
@ -11,7 +23,11 @@ struct rktio_ltps_t {
#else
rktio_poll_set_t *fd_set;
#endif
/* List of pending signaled handles: */
struct rktio_ltps_handle_t *signaled;
/* Hash table mapping fds to handles */
struct ltps_bucket_t *buckets;
intptr_t size, count;
};
struct rktio_ltps_handle_t {
@ -24,9 +40,12 @@ typedef struct rktio_ltps_handle_pair_t {
rktio_ltps_handle_t *write_handle;
} rktio_ltps_handle_pair_t;
static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd);
static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v);
static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, intptr_t fd);
static void ltps_hash_set(rktio_ltps_t *lt, intptr_t fd, rktio_ltps_handle_pair_t *v);
static void ltps_hash_remove(rktio_ltps_t *lt, intptr_t fd);
static int ltps_is_hash_empty(rktio_ltps_t *lt);
static void ltps_hash_init(rktio_ltps_t *lt);
static void ltps_hash_free(rktio_ltps_t *lt);
/************************************************************/
@ -78,6 +97,8 @@ rktio_ltps_t *rktio_open_ltps(rktio_t *rktio)
lt->signaled = NULL;
ltps_hash_init(lt);
return lt;
}
@ -96,6 +117,13 @@ rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt)
int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt)
{
rktio_ltps_handle_t *s;
while ((s = rktio_ltps_get_signaled_handle(rktio, lt)))
free(s);
ltps_hash_free(lt);
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
if (lt->fd >= 0) {
intptr_t rc;
@ -107,6 +135,7 @@ int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt)
#else
rktio_free_fdset_array(lt->fd_set, 3);
#endif
return 1;
}
@ -116,6 +145,8 @@ rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t
s = lt->signaled;
if (s)
lt->signaled = s->next;
else
set_racket_error(RKTIO_ERROR_LTPS_NOT_FOUND);
return s;
}
@ -123,7 +154,7 @@ rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t
static void log_kqueue_error(const char *action, int kr)
{
if (kr < 0) {
fprintf(stderr, "%d error at %s: %E",
fprintf(stderr, "%s error at %s: %d",
#ifdef HAVE_KQUEUE_SYSCALL
"kqueue",
#else
@ -149,15 +180,15 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
# endif
rktio_ltps_handle_pair_t *v;
rktio_ltps_handle_t *s;
int fd = rktio_fd_system_fd(rktio, rfd);
intptr_t fd = rktio_fd_system_fd(rktio, rfd);
# ifdef HAVE_KQUEUE_SYSCALL
if (!is_socket) {
if (!rktio_fd_is_socket(rktio, rfd)) {
/* kqueue() might not work on devices, such as ttys; also, while
Mac OS X kqueue() claims to work on FIFOs, there are problems:
watching for reads on a FIFO sometimes disables watching for
writes on the same FIFO with a different file descriptor */
if (!rktio_fd_regular_file(rktio, rfd, 1)) {
if (!rktio_fd_is_regular_file(rktio, rfd)) {
set_racket_error(RKTIO_ERROR_UNSUPPORTED);
return NULL;
}
@ -165,7 +196,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
if (lt->fd < 0) {
lt->fd = kqueue();
if (lt->fd < 0) {
set_posix_error();
get_posix_error();
log_kqueue_error("create", lt->fd);
return NULL;
}
@ -175,7 +206,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
if (lt->fd < 0) {
lt->fd = epoll_create(5);
if (lt->fd < 0) {
set_posix_error();
get_posix_error();
log_kqueue_error("create", lt->fd);
return NULL;
}
@ -208,7 +239,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
if (s) ltps_signal_handle(lt, s);
s = v->write_handle;
if (s) ltps_signal_handle(lt, s);
ltps_hash_set(lt, fd, NULL);
ltps_hash_remove(lt, fd);
free(v);
s = NULL;
# ifdef HAVE_KQUEUE_SYSCALL
@ -220,11 +251,11 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
EV_SET(&kev[pos], fd, EVFILT_VNODE, EV_DELETE, 0, 0, NULL);
pos++;
} else {
if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[0])) {
if (v->read_handle) {
EV_SET(&kev[pos], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
pos++;
}
if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[1])) {
if (v->write_handle) {
EV_SET(&kev[pos], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
pos++;
}
@ -245,6 +276,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
RKTIO_FD_CLR(fd, w);
RKTIO_FD_CLR(fd, e);
# endif
set_racket_error(RKTIO_ERROR_LTPS_REMOVED); /* success, not failure */
} else if ((mode == RKTIO_LTPS_CHECK_READ)
|| (mode == RKTIO_LTPS_CREATE_READ)
|| (mode == RKTIO_LTPS_CHECK_VNODE)
@ -257,7 +289,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
v->read_handle = s;
# ifdef HAVE_KQUEUE_SYSCALL
{
GC_CAN_IGNORE struct kevent kev;
struct kevent kev;
struct timespec timeout = {0, 0};
int kr;
if (mode == RKTIO_LTPS_CREATE_READ)
@ -274,7 +306,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
}
# elif defined(HAVE_EPOLL_SYSCALL)
{
GC_CAN_IGNORE struct epoll_event ev;
struct epoll_event ev;
int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[1]), kr;
memset(&ev, 0, sizeof(ev));
ev.data.fd = fd;
@ -299,7 +331,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
v->write_handle = s;
# ifdef HAVE_KQUEUE_SYSCALL
{
GC_CAN_IGNORE struct kevent kev;
struct kevent kev;
struct timespec timeout = {0, 0};
int kr;
EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL);
@ -311,7 +343,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
# elif defined(HAVE_EPOLL_SYSCALL)
{
GC_CAN_IGNORE struct epoll_event ev;
struct epoll_event ev;
int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[0]), kr;
memset(&ev, 0, sizeof(ev));
ev.data.fd = fd;
@ -341,7 +373,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
rktio_ltps_handle_t *s;
rktio_ltps_handle_pair_t *v;
int key;
GC_CAN_IGNORE struct kevent kev;
struct kevent kev;
struct timespec timeout = {0, 0};
int kr, hit = 0;
@ -374,7 +406,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
}
}
if (!v->read_handle && !v->write_handle) {
ltps_hash_set(lt, key, NULL);
ltps_hash_remove(lt, key);
free(v);
}
} else {
@ -390,7 +422,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
rktio_ltps_handle_pair_t *v;
int key;
int kr, hit = 0;
GC_CAN_IGNORE struct epoll_event ev;
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
if (lt->fd < 0)
@ -424,7 +456,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
}
}
if (!v->read_handle && !v->write_handle) {
ltps_hash_set(lt, key, NULL);
ltps_hash_remove(lt, key);
free(v);
kr = epoll_ctl(lt->fd, EPOLL_CTL_DEL, ev.data.fd, NULL);
log_kqueue_error("remove*", kr);
@ -454,7 +486,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
return 0;
rktio_clean_fd_set(lt->fd_set);
c = rkt_io_poll_count(lt->fd_set);
c = rktio_get_poll_count(lt->fd_set);
pfd = rktio_get_poll_fd_array(lt->fd_set);
do {
@ -486,7 +518,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
pfd[i].events -= (pfd[i].events & POLLOUT);
}
if (!v->read_handle && !v->write_handle) {
rktio_hash_set(rktio_semaphore_fd_mapping, key, NULL);
ltps_hash_remove(lt, key);
free(v);
}
}
@ -501,7 +533,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
int i, actual_limit, r, w, e, sr, hit = 0;
rktio_ltps_handle_t *s;
rktio_ltps_handle_pair_t *v;
int key;
intptr_t key;
DECL_FDSET(set, 3);
rktio_poll_set_t *set1, *set2;
@ -555,7 +587,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
}
if (!v->read_handle && !v->write_handle) {
RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 2));
ltps_hash_set(lt, key, NULL);
ltps_hash_remove(lt, key);
free(v);
}
}
@ -569,16 +601,133 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
/************************************************************/
static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd)
typedef struct ltps_bucket_t {
/* v is non-NULL => bucket is filled */
/* v is NULL and fd is -1 => was removed */
intptr_t fd;
rktio_ltps_handle_pair_t *v;
} ltps_bucket_t;
static void ltps_rehash(rktio_ltps_t *lt, intptr_t new_size)
{
if (new_size >= 16) {
ltps_bucket_t *old_buckets = lt->buckets;
intptr_t old_size = lt->size, i;
lt->size = new_size;
lt->buckets = calloc(new_size, sizeof(ltps_bucket_t));
lt->count = 0;
for (i = old_size; --i; ) {
if (lt->buckets[i].v)
ltps_hash_set(lt, lt->buckets[i].fd, lt->buckets[i].v);
}
free(old_buckets);
}
}
static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, intptr_t fd)
{
if (lt->buckets) {
intptr_t mask = (lt->size - 1);
intptr_t hc = fd & mask;
intptr_t d = ((fd >> 3) & mask) | 0x1;
while (1) {
if (lt->buckets[hc].fd == fd)
return lt->buckets[hc].v;
else if (lt->buckets[hc].v
|| (lt->buckets[hc].fd == -1)) {
/* keep looking */
hc = (hc + d) & mask;
} else
return NULL;
}
} else
return NULL;
}
static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v)
static void ltps_hash_remove(rktio_ltps_t *lt, intptr_t fd)
{
if (lt->buckets) {
intptr_t mask = (lt->size - 1);
intptr_t hc = fd & mask;
intptr_t d = ((fd >> 3) & mask) | 0x1;
while (1) {
if (lt->buckets[hc].fd == fd) {
lt->buckets[hc].fd = -1;
lt->buckets[hc].v = NULL;
--lt->count;
if (4 * lt->count <= lt->size)
ltps_rehash(lt, lt->size >> 1);
} else if (lt->buckets[hc].v
|| (lt->buckets[hc].fd == -1)) {
/* keep looking */
hc = (hc + d) & mask;
} else
break;
}
}
}
static void ltps_hash_set(rktio_ltps_t *lt, intptr_t fd, rktio_ltps_handle_pair_t *v)
{
if (!lt->buckets) {
lt->size = 16;
lt->buckets = calloc(lt->size, sizeof(ltps_bucket_t));
}
{
intptr_t mask = (lt->size - 1);
intptr_t hc = fd & mask;
intptr_t d = ((fd >> 3) & mask) | 0x1;
while (1) {
if (lt->buckets[hc].v) {
if (lt->buckets[hc].fd == -1) {
/* use bucket whos content ws previouslt removed */
break;
} else {
/* keep looking for a spot */
hc = (hc + d) & mask;
}
} else
break;
}
lt->buckets[hc].fd = fd;
lt->buckets[hc].v = v;
lt->count++;
if (2 * lt->count >= lt->size)
ltps_rehash(lt, lt->size << 1);
}
}
static int ltps_is_hash_empty(rktio_ltps_t *lt)
{
return 1;
return (lt->count == 0);
}
static void ltps_hash_init(rktio_ltps_t *lt)
{
lt->buckets = NULL;
lt->size = 0;
lt->count = 0;
}
static void ltps_hash_free(rktio_ltps_t *lt)
{
if (lt->buckets) {
intptr_t i;
for (i = lt->size; --i; ) {
if (lt->buckets[i].v)
free(lt->buckets[i].v);
}
free(lt->buckets);
}
}

View File

@ -7,6 +7,10 @@
# include <errno.h>
# include <math.h>
#endif
#ifdef HAVE_POLL_SYSCALL
# include <poll.h>
#endif
#include <string.h>
#include <stdlib.h>
/* Generalize fd arrays (FD_SET, etc) with a runtime-determined size,
@ -66,17 +70,22 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
pfd = malloc(sizeof(struct pollfd) * (32 + PFD_EXTRA_SPACE));
data->pfd = pfd;
return data;
return r;
}
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
{
FIXME;
struct rktio_fd_set_data_t *data = fds->data;
free(fds->w);
free(fds->e);
free(fds);
free(data->pfd);
free(data);
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
{
((struct rktio_fd_set *)fdarray)->data->count = 0;
fdarray->data->count = 0;
return fdarray;
}
@ -163,7 +172,7 @@ void rktio_fdset(rktio_poll_set_t *fd, int n)
int rktio_fdisset(rktio_poll_set_t *fd, int n)
{
struct rktio_fd_set_data_t *data = data->data;
struct rktio_fd_set_data_t *data = fd->data;
intptr_t flag = fd->flags;
intptr_t pos;
@ -244,8 +253,6 @@ void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
free(pfds);
}
data->count = j;
return fds;
}
void rktio_clean_fd_set(rktio_poll_set_t *fds)
@ -278,7 +285,7 @@ int rktio_get_poll_count(rktio_poll_set_t *fds)
return fds->data->count;
}
int rktio_get_poll_fd_array(rktio_poll_set_t *fds)
struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds)
{
return fds->data->pfd;
}
@ -881,7 +888,7 @@ void rktio_wait_until_signal_received(rktio_t *rktio)
#ifdef RKTIO_SYSTEM_UNIX
int r;
# ifdef HAVE_POLL_SYSCALL
GC_CAN_IGNORE struct pollfd pfd[1];
struct pollfd pfd[1];
pfd[0].fd = rktio->external_event_fd;
pfd[0].events = POLLIN;
do {
@ -969,9 +976,9 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_
if (timeout < 0)
timeout = 0;
}
if (external_event_fd) {
GC_CAN_IGNORE struct pollfd pfd[1];
pfd[0].fd = external_event_fd;
if (rktio->external_event_fd) {
struct pollfd pfd[1];
pfd[0].fd = rktio->external_event_fd;
pfd[0].events = POLLIN;
poll(pfd, 1, timeout);
} else {
@ -1023,22 +1030,22 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_
/******* poll() variant *******/
{
struct mz_fd_set_data_t *data = fds->data;
struct rktio_fd_set_data_t *data = fds->data;
intptr_t count = data->count;
int timeout;
if (v <= 0.0)
if (nsecs <= 0.0)
timeout = -1;
else if (v > 100000)
else if (nsecs > 100000)
timeout = 100000000;
else {
timeout = (int)(v * 1000.0);
timeout = (int)(nsecs * 1000.0);
if (timeout < 0)
timeout = 0;
}
if (external_event_fd) {
data->pfd[count].fd = external_event_fd;
if (rktio->external_event_fd) {
data->pfd[count].fd = rktio->external_event_fd;
data->pfd[count].events = POLLIN;
count++;
}

View File

@ -39,7 +39,7 @@ struct rktio_t {
#endif
#ifdef USE_FAR_RKTIO_FDCALLS
/* A single fdset that can be reused for immediate actions: */
struct rktio_poll_set_t *rktio_global_fd_set;
struct rktio_poll_set_t *rktio_global_poll_set;
#endif
};
@ -53,6 +53,13 @@ void rktio_alloc_global_poll_set(rktio_t *rktio);
int rktio_initialize_signal(rktio_t *rktio);
#ifdef USE_FAR_RKTIO_FDCALLS
rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos);
void rktio_fdzero(rktio_poll_set_t *fd);
void rktio_fdset(rktio_poll_set_t *fd, int n);
void rktio_fdclr(rktio_poll_set_t *fd, int n);
int rktio_fdisset(rktio_poll_set_t *fd, int n);
# define DECL_FDSET(n, c) fd_set *n
# define INIT_DECL_FDSET(r, w, e) { \
r = RKTIO_GET_FDSET(rktio->rktio_global_poll_set, 0 ); \
@ -107,6 +114,11 @@ int rktio_ltps_get_fd(rktio_ltps_t *lt);
rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt);
#endif
#if defined(HAVE_POLL_SYSCALL)
int rktio_get_poll_count(rktio_poll_set_t *fds);
struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds);
#endif
/************************************************************/
/* Misc */
/************************************************************/

View File

@ -11,6 +11,9 @@
#ifdef RKTIO_SYSTEM_WINDOWS
# include <windows.h>
#endif
#ifdef HAVE_POLL_SYSCALL
# include <poll.h>
#endif
#ifndef RKTIO_BINARY
# define RKTIO_BINARY 0
@ -127,6 +130,16 @@ intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd)
return rfd->fd;
}
int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd)
{
return rfd->regfile;
}
int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd)
{
return 0;
}
/*************************************************************/
/* opening a file fd */
/*************************************************************/
@ -575,7 +588,7 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush
else {
int sr;
# ifdef HAVE_POLL_SYSCALL
GC_CAN_IGNORE struct pollfd pfd[1];
struct pollfd pfd[1];
pfd[0].fd = rfd->fd;
pfd[0].events = POLLOUT;
do {