rktio: add kqueue/epoll layer

This commit is contained in:
Matthew Flatt 2017-06-09 15:45:23 -06:00
parent 09703b94f7
commit 04130638fc
5 changed files with 693 additions and 26 deletions

View File

@ -9,6 +9,7 @@ RKTIO_HEADERS = $(srcdir)/rktio.h $(srcdir)/rktio_private.h rktio_config.h
OBJS = rktio_filesystem.o \ OBJS = rktio_filesystem.o \
rktio_read_write.o \ rktio_read_write.o \
rktio_poll_set.o \ rktio_poll_set.o \
rktio_ltps.o \
rktio_error.o \ rktio_error.o \
rktio_main.o rktio_main.o
@ -29,6 +30,9 @@ rktio_read_write.o: $(srcdir)/rktio_read_write.c $(RKTIO_HEADERS)
rktio_poll_set.o: $(srcdir)/rktio_poll_set.c $(RKTIO_HEADERS) rktio_poll_set.o: $(srcdir)/rktio_poll_set.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_poll_set.o -c $(srcdir)/rktio_poll_set.c $(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_poll_set.o -c $(srcdir)/rktio_poll_set.c
rktio_ltps.o: $(srcdir)/rktio_ltps.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_ltps.o -c $(srcdir)/rktio_ltps.c
rktio_error.o: $(srcdir)/rktio_error.c $(RKTIO_HEADERS) rktio_error.o: $(srcdir)/rktio_error.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_error.o -c $(srcdir)/rktio_error.c $(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_error.o -c $(srcdir)/rktio_error.c

View File

@ -47,15 +47,48 @@ int rktio_poll_write_ready(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_poll_write_flushed(rktio_t *rktio, rktio_fd_t *rfd); int rktio_poll_write_flushed(rktio_t *rktio, rktio_fd_t *rfd);
/*************************************************/ /*************************************************/
/* File-descriptor sets */ /* File-descriptor sets for polling */
typedef struct rktio_poll_set_t rktio_poll_set_t; typedef struct rktio_poll_set_t rktio_poll_set_t;
#define RKTIO_POLL_READ RKTIO_OPEN_READ #define RKTIO_POLL_READ RKTIO_OPEN_READ
#define RKTIO_POLL_WRITE RKTIO_OPEN_WRITE #define RKTIO_POLL_WRITE RKTIO_OPEN_WRITE
rktio_poll_set_t *rktio_make_poll_set();
void rktio_poll_add(rktio_t *rktio, rktio_fd_t *rfd, rktio_poll_set_t *fds, int modes); void rktio_poll_add(rktio_t *rktio, rktio_fd_t *rfd, rktio_poll_set_t *fds, int modes);
#ifdef RKTIO_SYSTEM_WINDOWS
void rktio_poll_set_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost);
void rktio_poll_set_add_eventmask(rktio_poll_set_t *fds, int mask);
void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds);
#endif
typedef struct rktio_ltps_t rktio_ltps_t;
typedef struct rktio_ltps_handle_t rktio_ltps_handle_t;
enum {
RKTIO_LTPS_CREATE_READ = 1,
RKTIO_LTPS_CREATE_WRITE,
RKTIO_LTPS_CHECK_READ,
RKTIO_LTPS_CHECK_WRITE,
RKTIO_LTPS_REMOVE,
RKTIO_LTPS_CREATE_VNODE,
RKTIO_LTPS_CHECK_VNODE,
RKTIO_LTPS_REMOVE_VNODE
};
rktio_ltps_t *rktio_open_ltps(rktio_t *rktio);
int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt);
rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t *rfd, int mode);
int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt);
int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt);
void rktio_ltps_handle_set_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s, void *data);
void *rktio_ltps_handle_get_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s);
void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt);
/*************************************************/ /*************************************************/
/* Files, directories, and links */ /* Files, directories, and links */
@ -178,7 +211,8 @@ enum {
RKTIO_ERROR_NO_TILDE, RKTIO_ERROR_NO_TILDE,
RKTIO_ERROR_ILL_FORMED_USER, RKTIO_ERROR_ILL_FORMED_USER,
RKTIO_ERROR_UNKNOWN_USER, RKTIO_ERROR_UNKNOWN_USER,
RKTIO_ERROR_INIT_FAILED RKTIO_ERROR_INIT_FAILED,
RKTIO_ERROR_LTPS_NOT_FOUND,
}; };
int rktio_get_last_error(rktio_t *rktio); int rktio_get_last_error(rktio_t *rktio);

View File

@ -0,0 +1,584 @@
#include "rktio.h"
#include "rktio_private.h"
#ifdef RKTIO_SYSTEM_UNIX
# include <errno.h>
#endif
#include <stdlib.h>
struct rktio_ltps_t {
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
int fd;
#else
rktio_poll_set_t *fd_set;
#endif
struct rktio_ltps_handle_t *signaled;
};
struct rktio_ltps_handle_t {
void *data; /* arbitrary data from client */
struct rktio_ltps_handle_t *next; /* in signaled chain */
};
typedef struct rktio_ltps_handle_pair_t {
rktio_ltps_handle_t *read_handle;
rktio_ltps_handle_t *write_handle;
} rktio_ltps_handle_pair_t;
static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd);
static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v);
static int ltps_is_hash_empty(rktio_ltps_t *lt);
/************************************************************/
rktio_ltps_handle_pair_t *make_ltps_handle_pair()
{
rktio_ltps_handle_pair_t *v;
v = malloc(sizeof(rktio_ltps_handle_pair_t));
v->read_handle = NULL;
v->write_handle = NULL;
return v;
}
rktio_ltps_handle_t *make_ltps_handle()
{
rktio_ltps_handle_t *s;
s = malloc(sizeof(rktio_ltps_handle_t));
s->data = NULL;
return s;
}
void ltps_signal_handle(rktio_ltps_t *lt, rktio_ltps_handle_t *s)
{
s->next = lt->signaled;
lt->signaled = s;
}
void rktio_ltps_handle_set_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s, void *data)
{
s->data = data;
}
void *rktio_ltps_handle_get_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s)
{
return s->data;
}
/************************************************************/
rktio_ltps_t *rktio_open_ltps(rktio_t *rktio)
{
rktio_ltps_t *lt;
lt = malloc(sizeof(rktio_ltps_t));
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
lt->fd = -1;
#else
lt->fd_set = rktio_alloc_fdset_array(3);
#endif
lt->signaled = NULL;
return lt;
}
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
int rktio_ltps_get_fd(rktio_ltps_t *lt)
{
return lt->fd;
}
#else
rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt)
{
return lt->fd_set;
}
#endif
int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt)
{
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
if (lt->fd >= 0) {
intptr_t rc;
do {
rc = close(lt->fd);
} while ((rc == -1) && (errno == EINTR));
}
free(lt);
#else
rktio_free_fdset_array(lt->fd_set, 3);
#endif
return 1;
}
rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t *lt)
{
rktio_ltps_handle_t *s;
s = lt->signaled;
if (s)
lt->signaled = s->next;
return s;
}
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
static void log_kqueue_error(const char *action, int kr)
{
if (kr < 0) {
fprintf(stderr, "%d error at %s: %E",
#ifdef HAVE_KQUEUE_SYSCALL
"kqueue",
#else
"epoll",
#endif
action, errno);
}
}
static void log_kqueue_fd(int fd, int flags)
{
}
#endif
rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t *rfd, int mode)
{
#ifdef RKTIO_SYSTEM_WINDOWS
set_racket_error(RKTIO_ERROR_UNSUPPORTED);
return NULL;
#else
# if !defined(HAVE_KQUEUE_SYSCALL) && !defined(HAVE_EPOLL_SYSCALL)
rktio_poll_set_t *r, *w, *e;
# endif
rktio_ltps_handle_pair_t *v;
rktio_ltps_handle_t *s;
int fd = rktio_fd_system_fd(rktio, rfd);
# ifdef HAVE_KQUEUE_SYSCALL
if (!is_socket) {
/* kqueue() might not work on devices, such as ttys; also, while
Mac OS X kqueue() claims to work on FIFOs, there are problems:
watching for reads on a FIFO sometimes disables watching for
writes on the same FIFO with a different file descriptor */
if (!rktio_fd_regular_file(rktio, rfd, 1)) {
set_racket_error(RKTIO_ERROR_UNSUPPORTED);
return NULL;
}
}
if (lt->fd < 0) {
lt->fd = kqueue();
if (lt->fd < 0) {
set_posix_error();
log_kqueue_error("create", lt->fd);
return NULL;
}
}
# endif
# ifdef HAVE_EPOLL_SYSCALL
if (lt->fd < 0) {
lt->fd = epoll_create(5);
if (lt->fd < 0) {
set_posix_error();
log_kqueue_error("create", lt->fd);
return NULL;
}
}
# endif
v = ltps_hash_get(lt, fd);
if (!v && ((mode == RKTIO_LTPS_CHECK_READ)
|| (mode == RKTIO_LTPS_CHECK_WRITE)
|| (mode == RKTIO_LTPS_CHECK_VNODE)
|| (mode == RKTIO_LTPS_REMOVE)
|| (mode == RKTIO_LTPS_REMOVE_VNODE))) {
set_racket_error(RKTIO_ERROR_LTPS_NOT_FOUND);
return NULL;
}
if (!v) {
v = make_ltps_handle_pair();
ltps_hash_set(lt, fd, v);
}
# if !defined(HAVE_KQUEUE_SYSCALL) && !defined(HAVE_EPOLL_SYSCALL)
r = RKTIO_GET_FDSET(lt->fd_set, 0);
w = RKTIO_GET_FDSET(lt->fd_set, 1);
e = RKTIO_GET_FDSET(lt->fd_set, 2);
# endif
if ((mode == RKTIO_LTPS_REMOVE) || (mode == RKTIO_LTPS_REMOVE_VNODE)) {
s = v->read_handle;
if (s) ltps_signal_handle(lt, s);
s = v->write_handle;
if (s) ltps_signal_handle(lt, s);
ltps_hash_set(lt, fd, NULL);
free(v);
s = NULL;
# ifdef HAVE_KQUEUE_SYSCALL
{
struct kevent kev[2];
struct timespec timeout = {0, 0};
int kr, pos = 0;
if (mode == RKTIO_LTPS_REMOVE_VNODE) {
EV_SET(&kev[pos], fd, EVFILT_VNODE, EV_DELETE, 0, 0, NULL);
pos++;
} else {
if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[0])) {
EV_SET(&kev[pos], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
pos++;
}
if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[1])) {
EV_SET(&kev[pos], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
pos++;
}
}
do {
kr = kevent(lt->fd, kev, pos, NULL, 0, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("remove", kr);
}
# elif defined(HAVE_EPOLL_SYSCALL)
{
int kr;
kr = epoll_ctl(lt->fd, EPOLL_CTL_DEL, fd, NULL);
log_kqueue_error("remove", kr);
}
# else
RKTIO_FD_CLR(fd, r);
RKTIO_FD_CLR(fd, w);
RKTIO_FD_CLR(fd, e);
# endif
} else if ((mode == RKTIO_LTPS_CHECK_READ)
|| (mode == RKTIO_LTPS_CREATE_READ)
|| (mode == RKTIO_LTPS_CHECK_VNODE)
|| (mode == RKTIO_LTPS_CREATE_VNODE)) {
s = v->read_handle;
if (!s) {
if ((mode == RKTIO_LTPS_CREATE_READ)
|| (mode == RKTIO_LTPS_CREATE_VNODE)) {
s = make_ltps_handle();
v->read_handle = s;
# ifdef HAVE_KQUEUE_SYSCALL
{
GC_CAN_IGNORE struct kevent kev;
struct timespec timeout = {0, 0};
int kr;
if (mode == RKTIO_LTPS_CREATE_READ)
EV_SET(&kev, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL);
else
EV_SET(&kev, fd, EVFILT_VNODE, EV_ADD | EV_ONESHOT,
(NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
| NOTE_RENAME | NOTE_ATTRIB),
0, NULL);
do {
kr = kevent(lt->fd, &kev, 1, NULL, 0, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("read", kr);
}
# elif defined(HAVE_EPOLL_SYSCALL)
{
GC_CAN_IGNORE struct epoll_event ev;
int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[1]), kr;
memset(&ev, 0, sizeof(ev));
ev.data.fd = fd;
ev.events = EPOLLIN | (already ? EPOLLOUT : 0);
kr = epoll_ctl(lt->fd,
(already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev);
log_kqueue_error("read", kr);
}
# else
RKTIO_FD_SET(fd, r);
RKTIO_FD_SET(fd, e);
#endif
} else
s = NULL;
}
} else if ((mode == RKTIO_LTPS_CHECK_WRITE)
|| (mode == RKTIO_LTPS_CREATE_WRITE)) {
s = v->write_handle;
if (!s) {
if (mode == RKTIO_LTPS_CREATE_WRITE) {
s = make_ltps_handle();
v->write_handle = s;
# ifdef HAVE_KQUEUE_SYSCALL
{
GC_CAN_IGNORE struct kevent kev;
struct timespec timeout = {0, 0};
int kr;
EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL);
do {
kr = kevent(lt->fd, &kev, 1, NULL, 0, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("write", kr);
}
# elif defined(HAVE_EPOLL_SYSCALL)
{
GC_CAN_IGNORE struct epoll_event ev;
int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[0]), kr;
memset(&ev, 0, sizeof(ev));
ev.data.fd = fd;
ev.events = EPOLLOUT | (already ? EPOLLIN : 0);
kr = epoll_ctl(lt->fd,
(already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev);
log_kqueue_error("write", kr);
}
# else
RKTIO_FD_SET(fd, w);
RKTIO_FD_SET(fd, e);
#endif
} else
s = NULL;
}
}
return s;
#endif
}
int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
{
#ifdef RKTIO_SYSTEM_WINDOWS
return 0;
#elif defined(HAVE_KQUEUE_SYSCALL)
rktio_ltps_handle_t *s;
rktio_ltps_handle_pair_t *v;
int key;
GC_CAN_IGNORE struct kevent kev;
struct timespec timeout = {0, 0};
int kr, hit = 0;
if (lt->fd < 0)
return 0;
while (1) {
do {
kr = kevent(lt->fd, NULL, 0, &kev, 1, &timeout);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("wait", kr);
if (kr > 0) {
key = kev.ident;
v = ltps_hash_get(lt, key);
if (v) {
if ((kev.filter == EVFILT_READ) || (kev.filter == EVFILT_VNODE)) {
s = v->read_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->read_handle = NULL;
}
} else if (kev.filter == EVFILT_WRITE) {
s = v->write_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->write_handle = NULL;
}
}
if (!v->read_handle && !v->write_handle) {
ltps_hash_set(lt, key, NULL);
free(v);
}
} else {
log_kqueue_fd(kev.ident, kev.filter);
}
} else
break;
}
return hit;
#elif defined(HAVE_EPOLL_SYSCALL)
rktio_ltps_handle_t *s;
rktio_ltps_handle_pair_t *v;
int key;
int kr, hit = 0;
GC_CAN_IGNORE struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
if (lt->fd < 0)
return 0;
while (1) {
do {
kr = epoll_wait(lt->fd, &ev, 1, 0);
} while ((kr == -1) && (errno == EINTR));
log_kqueue_error("wait", kr);
if (kr > 0) {
key = ev.data.fd;
v = ltps_hash_get(lt, key);
if (v) {
if (ev.events & (POLLIN | POLLHUP | POLLERR)) {
s = v->read_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->read_handle = NULL;
}
}
if (ev.events & (POLLOUT | POLLHUP | POLLERR)) {
s = v->write_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->write_handle = NULL;
}
}
if (!v->read_handle && !v->write_handle) {
ltps_hash_set(lt, key, NULL);
free(v);
kr = epoll_ctl(lt->fd, EPOLL_CTL_DEL, ev.data.fd, NULL);
log_kqueue_error("remove*", kr);
} else {
ev.events = ((!v->read_handle ? 0 : POLLIN)
| (!v->write_handle ? 0 : POLLOUT));
kr = epoll_ctl(lt->fd, EPOLL_CTL_MOD, ev.data.fd, &ev);
log_kqueue_error("update", kr);
}
} else {
log_kqueue_fd(ev.data.fd, ev.events);
}
} else
break;
}
return hit;
#elif defined(HAVE_POLL_SYSCALL)
struct pollfd *pfd;
intptr_t i, c;
rktio_ltps_handle_t *s;
rktio_ltps_handle_pair_t *v;
int key;
int sr, hit = 0;
if (ltps_is_hash_empty(lt))
return 0;
rktio_clean_fd_set(lt->fd_set);
c = rkt_io_poll_count(lt->fd_set);
pfd = rktio_get_poll_fd_array(lt->fd_set);
do {
sr = poll(pfd, c, 0);
} while ((sr == -1) && (errno == EINTR));
if (sr > 0) {
for (i = 0; i < c; i++) {
if (pfd[i].revents) {
key = pfd[i].fd;
v = ltps_hash_get(lt, key);
if (v) {
if (pfd[i].revents & (POLLIN | POLLHUP | POLLERR)) {
s = v->read_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->read_handle = NULL;
}
pfd[i].events -= (pfd[i].events & POLLIN);
}
if (pfd[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
s = v->write_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->write_handle = NULL;
}
pfd[i].events -= (pfd[i].events & POLLOUT);
}
if (!v->read_handle && !v->write_handle) {
rktio_hash_set(rktio_semaphore_fd_mapping, key, NULL);
free(v);
}
}
}
}
}
return hit;
#else
rktio_poll_set_t *fds;
struct timeval time = {0, 0};
int i, actual_limit, r, w, e, sr, hit = 0;
rktio_ltps_handle_t *s;
rktio_ltps_handle_pair_t *v;
int key;
DECL_FDSET(set, 3);
rktio_poll_set_t *set1, *set2;
INIT_DECL_FDSET(set, set1, set2);
set1 = RKTIO_GET_FDSET(set, 1);
set2 = RKTIO_GET_FDSET(set, 2);
fds = set;
RKTIO_FD_ZERO(set);
RKTIO_FD_ZERO(set1);
RKTIO_FD_ZERO(set2);
if (ltps_is_hash_empty(lt))
return 0;
rktio_merge_fd_sets(fds, lt->fd_set);
actual_limit = rktio_get_fd_limit(fds);
do {
sr = select(actual_limit, RKTIO_FDS(set), RKTIO_FDS(set1), RKTIO_FDS(set2), &time);
} while ((sr == -1) && (errno == EINTR));
if (sr > 0) {
for (i = 0; i < actual_limit; i++) {
r = RKTIO_FD_ISSET(i, set);
w = RKTIO_FD_ISSET(i, set1);
e = RKTIO_FD_ISSET(i, set2);
if (r || w || e) {
key = i;
v = ltps_hash_get(lt, key);
if (v) {
if (r || e) {
s = v->read_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->read_handle = NULL;
}
RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 0));
}
if (w || e) {
s = v->write_handle;
if (s) {
ltps_signal_handle(lt, s);
hit = 1;
v->write_handle = NULL;
}
RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 1));
}
if (!v->read_handle && !v->write_handle) {
RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 2));
ltps_hash_set(lt, key, NULL);
free(v);
}
}
}
}
}
return hit;
#endif
}
/************************************************************/
static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd)
{
return NULL;
}
static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v)
{
}
static int ltps_is_hash_empty(rktio_ltps_t *lt)
{
return 1;
}

View File

@ -7,9 +7,6 @@
# include <errno.h> # include <errno.h>
# include <math.h> # include <math.h>
#endif #endif
#ifdef RKTIO_SYSTEM_WINDOWS
# include <windows.h>
#endif
#include <stdlib.h> #include <stdlib.h>
/* Generalize fd arrays (FD_SET, etc) with a runtime-determined size, /* Generalize fd arrays (FD_SET, etc) with a runtime-determined size,
@ -72,6 +69,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
return data; return data;
} }
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
{
FIXME;
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count) rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
{ {
((struct rktio_fd_set *)fdarray)->data->count = 0; ((struct rktio_fd_set *)fdarray)->data->count = 0;
@ -185,7 +187,7 @@ static int cmp_fd(const void *_a, const void *_b)
return a->fd - b->fd; return a->fd - b->fd;
} }
rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
{ {
struct rktio_fd_set_data_t *data = fds->data; struct rktio_fd_set_data_t *data = fds->data;
struct rktio_fd_set_data_t *src_data = src_fds->data; struct rktio_fd_set_data_t *src_data = src_fds->data;
@ -198,16 +200,14 @@ rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *s
c = data->count; c = data->count;
sc = src_data->count; sc = src_data->count;
if (!c)
return src_fds;
if (!sc) if (!sc)
return fds; return;
qsort(data->pfd, c, sizeof(struct pollfd), cmp_fd); qsort(data->pfd, c, sizeof(struct pollfd), cmp_fd);
qsort(src_data->pfd, sc, sizeof(struct pollfd), cmp_fd); qsort(src_data->pfd, sc, sizeof(struct pollfd), cmp_fd);
nc = c + sc; nc = c + sc;
pfds = (struct pollfd *)rktio_malloc_atomic(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE)); pfds = malloc(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE));
j = 0; j = 0;
for (i = 0, si = 0; (i < c) && (si < sc); ) { for (i = 0, si = 0; (i < c) && (si < sc); ) {
if (data->pfd[i].fd == src_data->pfd[si].fd) { if (data->pfd[i].fd == src_data->pfd[si].fd) {
@ -235,12 +235,15 @@ rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *s
pfds[j].events = src_data->pfd[si].events; pfds[j].events = src_data->pfd[si].events;
} }
if (nc > RKTIO_INT_VAL(data->size)) { if (nc > data->size) {
free(data->pfd);
data->pfd = pfds; data->pfd = pfds;
data->size = rktio_make_integer(nc); data->size = nc;
} else } else {
memcpy(data->pfd, pfds, j * sizeof(struct pollfd)); memcpy(data->pfd, pfds, j * sizeof(struct pollfd));
data->count = rktio_make_integer(j); free(pfds);
}
data->count = j;
return fds; return fds;
} }
@ -270,6 +273,16 @@ int rktio_get_fd_limit(rktio_poll_set_t *fds)
return 0; return 0;
} }
int rktio_get_poll_count(rktio_poll_set_t *fds)
{
return fds->data->count;
}
int rktio_get_poll_fd_array(rktio_poll_set_t *fds)
{
return fds->data->pfd;
}
#elif defined(USE_DYNAMIC_FDSET_SIZE) #elif defined(USE_DYNAMIC_FDSET_SIZE)
/************************************************************/ /************************************************************/
@ -304,6 +317,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
return malloc(count * (dynamic_fd_size + sizeof(intptr_t))); return malloc(count * (dynamic_fd_size + sizeof(intptr_t)));
} }
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
{
free(fds);
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count) rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
{ {
return fdarray; return fdarray;
@ -357,6 +375,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
return fdarray; return fdarray;
} }
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
{
FIXME;
}
static void reset_wait_array(rktio_poll_set_t *efd) static void reset_wait_array(rktio_poll_set_t *efd)
{ {
/* Allocate an array that may be big enough to hold all events /* Allocate an array that may be big enough to hold all events
@ -455,14 +478,13 @@ int rktio_fdisset(rktio_poll_set_t *fd, int n)
return 0; return 0;
} }
rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
{ {
int i; int i;
for (i = src_fd->added; i--; ) { for (i = src_fd->added; i--; ) {
if (stv_fd->sockets[i] != INVALID_SOCKET) if (stv_fd->sockets[i] != INVALID_SOCKET)
rktio_fdset(fds, src_fd->sockets[i]); rktio_fdset(fds, src_fd->sockets[i]);
} }
return fds;
} }
void rktio_clean_fd_set(rktio_poll_set_t *fds) void rktio_clean_fd_set(rktio_poll_set_t *fds)
@ -474,7 +496,7 @@ int rktio_get_fd_limit(rktio_poll_set_t *fds)
return 0; return 0;
} }
void rktio_fdset_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost) void rktio_poll_set_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost)
{ {
rktio_poll_set_t *efd = fds; rktio_poll_set_t *efd = fds;
OS_SEMAPHORE_TYPE *hs; OS_SEMAPHORE_TYPE *hs;
@ -504,12 +526,12 @@ void rktio_fdset_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost)
efd->num_handles++; efd->num_handles++;
} }
void rktio_add_fd_nosleep(rktio_poll_set_t *fds) void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds)
{ {
fds->no_sleep = 1; fds->no_sleep = 1;
} }
void rktio_fdset_add_eventmask(rktio_poll_set_t *fds, int mask) void rktio_poll_set_eventmask(rktio_poll_set_t *fds, int mask)
{ {
fds->wait_event_mask |= mask; fds->wait_event_mask |= mask;
} }
@ -688,6 +710,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
return malloc(count * sizeof(fd_set)); return malloc(count * sizeof(fd_set));
} }
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
{
free(fds);
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count) rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
{ {
return fdarray; return fdarray;
@ -724,7 +751,7 @@ int rktio_fdisset(rktio_poll_set_t *fd, int n)
return FD_ISSET(n, &(fd)->data); return FD_ISSET(n, &(fd)->data);
} }
rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
{ {
int i, j; int i, j;
unsigned char *p, *sp; unsigned char *p, *sp;
@ -746,7 +773,6 @@ rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *s
*p |= *sp; *p |= *sp;
} }
} }
return fds;
} }
void rktio_clean_fd_set(rktio_poll_set_t *fds) void rktio_clean_fd_set(rktio_poll_set_t *fds)
@ -919,8 +945,18 @@ void rkio_notify_sleep_progress(void)
/* FIXME: don't forget SIGCHILD_DOESNT_INTERRUPT_SELECT handling in Racket */ /* FIXME: don't forget SIGCHILD_DOESNT_INTERRUPT_SELECT handling in Racket */
void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds) void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt)
{ {
if (fds && lt) {
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
int fd = rktio_ltps_get_fd(lt);
RKTIO_FD_SET(fd, fds);
RKTIO_FD_SET(fd, RKTIO_GET_FDSET(fds, 2));
#else
rktio_merge_fd_sets(fds, rktio_ltps_get_fd_set(lt));
#endif
}
if (!fds) { if (!fds) {
/* Nothing to block on - just sleep for some amount of time. */ /* Nothing to block on - just sleep for some amount of time. */
#ifdef RKTIO_SYSTEM_UNIX #ifdef RKTIO_SYSTEM_UNIX
@ -1052,7 +1088,7 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds)
{ {
intptr_t result; intptr_t result;
HANDLE *array, just_two_array[2], break_sema; HANDLE *array, just_two_array[2];
int count, rcount, *rps; int count, rcount, *rps;
if (fds->no_sleep) if (fds->no_sleep)
@ -1068,8 +1104,7 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds)
/* add break semaphore: */ /* add break semaphore: */
if (!count) if (!count)
array = just_two_array; array = just_two_array;
break_sema = (HANDLE)scheme_break_semaphore; array[count++] = rktio->break_semaphore;
array[count++] = break_sema;
/* Extensions may handle events. /* Extensions may handle events.
If the event queue is empty (as reported by GetQueueStatus), If the event queue is empty (as reported by GetQueueStatus),

View File

@ -31,6 +31,7 @@ struct rktio_t {
struct group_member_cache_entry_t *group_member_cache; struct group_member_cache_entry_t *group_member_cache;
int external_event_fd; int external_event_fd;
int put_external_event_fd; int put_external_event_fd;
int long_term_poll_set_fd;
#endif #endif
#ifdef RKTIO_SYSTEM_WINDOWS #ifdef RKTIO_SYSTEM_WINDOWS
int windows_nt_or_later; int windows_nt_or_later;
@ -93,10 +94,19 @@ struct rktio_poll_set_t { fd_set data; };
#endif #endif
rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds); rktio_poll_set_t *rktio_alloc_fdset_array(int count);
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count);
void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds);
void rktio_clean_fd_set(rktio_poll_set_t *fds); void rktio_clean_fd_set(rktio_poll_set_t *fds);
int rktio_get_fd_limit(rktio_poll_set_t *fds); int rktio_get_fd_limit(rktio_poll_set_t *fds);
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
int rktio_ltps_get_fd(rktio_ltps_t *lt);
#else
rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt);
#endif
/************************************************************/ /************************************************************/
/* Misc */ /* Misc */
/************************************************************/ /************************************************************/