diff --git a/racket/src/rktio/Makefile.in b/racket/src/rktio/Makefile.in index e7cdb67adf..a6f830ccbc 100644 --- a/racket/src/rktio/Makefile.in +++ b/racket/src/rktio/Makefile.in @@ -9,6 +9,7 @@ RKTIO_HEADERS = $(srcdir)/rktio.h $(srcdir)/rktio_private.h rktio_config.h OBJS = rktio_filesystem.o \ rktio_read_write.o \ rktio_poll_set.o \ + rktio_ltps.o \ rktio_error.o \ rktio_main.o @@ -29,6 +30,9 @@ rktio_read_write.o: $(srcdir)/rktio_read_write.c $(RKTIO_HEADERS) rktio_poll_set.o: $(srcdir)/rktio_poll_set.c $(RKTIO_HEADERS) $(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_poll_set.o -c $(srcdir)/rktio_poll_set.c +rktio_ltps.o: $(srcdir)/rktio_ltps.c $(RKTIO_HEADERS) + $(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_ltps.o -c $(srcdir)/rktio_ltps.c + rktio_error.o: $(srcdir)/rktio_error.c $(RKTIO_HEADERS) $(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_error.o -c $(srcdir)/rktio_error.c diff --git a/racket/src/rktio/rktio.h b/racket/src/rktio/rktio.h index d8bf3c4354..a8b5ca203e 100644 --- a/racket/src/rktio/rktio.h +++ b/racket/src/rktio/rktio.h @@ -47,15 +47,48 @@ int rktio_poll_write_ready(rktio_t *rktio, rktio_fd_t *rfd); int rktio_poll_write_flushed(rktio_t *rktio, rktio_fd_t *rfd); /*************************************************/ -/* File-descriptor sets */ +/* File-descriptor sets for polling */ typedef struct rktio_poll_set_t rktio_poll_set_t; #define RKTIO_POLL_READ RKTIO_OPEN_READ #define RKTIO_POLL_WRITE RKTIO_OPEN_WRITE +rktio_poll_set_t *rktio_make_poll_set(); void rktio_poll_add(rktio_t *rktio, rktio_fd_t *rfd, rktio_poll_set_t *fds, int modes); +#ifdef RKTIO_SYSTEM_WINDOWS +void rktio_poll_set_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost); +void rktio_poll_set_add_eventmask(rktio_poll_set_t *fds, int mask); +void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds); +#endif + +typedef struct rktio_ltps_t rktio_ltps_t; +typedef struct rktio_ltps_handle_t rktio_ltps_handle_t; + +enum { + RKTIO_LTPS_CREATE_READ = 1, + RKTIO_LTPS_CREATE_WRITE, + RKTIO_LTPS_CHECK_READ, + RKTIO_LTPS_CHECK_WRITE, + RKTIO_LTPS_REMOVE, + RKTIO_LTPS_CREATE_VNODE, + RKTIO_LTPS_CHECK_VNODE, + RKTIO_LTPS_REMOVE_VNODE +}; + +rktio_ltps_t *rktio_open_ltps(rktio_t *rktio); +int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt); +rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t *rfd, int mode); + +int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt); +int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt); + +void rktio_ltps_handle_set_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s, void *data); +void *rktio_ltps_handle_get_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s); + +void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt); + /*************************************************/ /* Files, directories, and links */ @@ -178,7 +211,8 @@ enum { RKTIO_ERROR_NO_TILDE, RKTIO_ERROR_ILL_FORMED_USER, RKTIO_ERROR_UNKNOWN_USER, - RKTIO_ERROR_INIT_FAILED + RKTIO_ERROR_INIT_FAILED, + RKTIO_ERROR_LTPS_NOT_FOUND, }; int rktio_get_last_error(rktio_t *rktio); diff --git a/racket/src/rktio/rktio_ltps.c b/racket/src/rktio/rktio_ltps.c new file mode 100644 index 0000000000..3f839cf62d --- /dev/null +++ b/racket/src/rktio/rktio_ltps.c @@ -0,0 +1,584 @@ +#include "rktio.h" +#include "rktio_private.h" +#ifdef RKTIO_SYSTEM_UNIX +# include +#endif +#include + +struct rktio_ltps_t { +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) + int fd; +#else + rktio_poll_set_t *fd_set; +#endif + struct rktio_ltps_handle_t *signaled; +}; + +struct rktio_ltps_handle_t { + void *data; /* arbitrary data from client */ + struct rktio_ltps_handle_t *next; /* in signaled chain */ +}; + +typedef struct rktio_ltps_handle_pair_t { + rktio_ltps_handle_t *read_handle; + rktio_ltps_handle_t *write_handle; +} rktio_ltps_handle_pair_t; + +static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd); +static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v); +static int ltps_is_hash_empty(rktio_ltps_t *lt); + +/************************************************************/ + +rktio_ltps_handle_pair_t *make_ltps_handle_pair() +{ + rktio_ltps_handle_pair_t *v; + v = malloc(sizeof(rktio_ltps_handle_pair_t)); + v->read_handle = NULL; + v->write_handle = NULL; + return v; +} + +rktio_ltps_handle_t *make_ltps_handle() +{ + rktio_ltps_handle_t *s; + s = malloc(sizeof(rktio_ltps_handle_t)); + s->data = NULL; + return s; +} + +void ltps_signal_handle(rktio_ltps_t *lt, rktio_ltps_handle_t *s) +{ + s->next = lt->signaled; + lt->signaled = s; +} + +void rktio_ltps_handle_set_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s, void *data) +{ + s->data = data; +} + +void *rktio_ltps_handle_get_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s) +{ + return s->data; +} + +/************************************************************/ + +rktio_ltps_t *rktio_open_ltps(rktio_t *rktio) +{ + rktio_ltps_t *lt; + + lt = malloc(sizeof(rktio_ltps_t)); +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) + lt->fd = -1; +#else + lt->fd_set = rktio_alloc_fdset_array(3); +#endif + + lt->signaled = NULL; + + return lt; +} + +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) +int rktio_ltps_get_fd(rktio_ltps_t *lt) +{ + return lt->fd; +} +#else +rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt) +{ + return lt->fd_set; +} +#endif + + +int rktio_ltps_close(rktio_t *rktio, rktio_ltps_t *lt) +{ +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) + if (lt->fd >= 0) { + intptr_t rc; + do { + rc = close(lt->fd); + } while ((rc == -1) && (errno == EINTR)); + } + free(lt); +#else + rktio_free_fdset_array(lt->fd_set, 3); +#endif + return 1; +} + +rktio_ltps_handle_t *rktio_ltps_get_signaled_handle(rktio_t *rktio, rktio_ltps_t *lt) +{ + rktio_ltps_handle_t *s; + s = lt->signaled; + if (s) + lt->signaled = s->next; + return s; +} + +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) +static void log_kqueue_error(const char *action, int kr) +{ + if (kr < 0) { + fprintf(stderr, "%d error at %s: %E", +#ifdef HAVE_KQUEUE_SYSCALL + "kqueue", +#else + "epoll", +#endif + action, errno); + } +} + +static void log_kqueue_fd(int fd, int flags) +{ +} +#endif + +rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t *rfd, int mode) +{ +#ifdef RKTIO_SYSTEM_WINDOWS + set_racket_error(RKTIO_ERROR_UNSUPPORTED); + return NULL; +#else +# if !defined(HAVE_KQUEUE_SYSCALL) && !defined(HAVE_EPOLL_SYSCALL) + rktio_poll_set_t *r, *w, *e; +# endif + rktio_ltps_handle_pair_t *v; + rktio_ltps_handle_t *s; + int fd = rktio_fd_system_fd(rktio, rfd); + +# ifdef HAVE_KQUEUE_SYSCALL + if (!is_socket) { + /* kqueue() might not work on devices, such as ttys; also, while + Mac OS X kqueue() claims to work on FIFOs, there are problems: + watching for reads on a FIFO sometimes disables watching for + writes on the same FIFO with a different file descriptor */ + if (!rktio_fd_regular_file(rktio, rfd, 1)) { + set_racket_error(RKTIO_ERROR_UNSUPPORTED); + return NULL; + } + } + if (lt->fd < 0) { + lt->fd = kqueue(); + if (lt->fd < 0) { + set_posix_error(); + log_kqueue_error("create", lt->fd); + return NULL; + } + } +# endif +# ifdef HAVE_EPOLL_SYSCALL + if (lt->fd < 0) { + lt->fd = epoll_create(5); + if (lt->fd < 0) { + set_posix_error(); + log_kqueue_error("create", lt->fd); + return NULL; + } + } +# endif + + v = ltps_hash_get(lt, fd); + if (!v && ((mode == RKTIO_LTPS_CHECK_READ) + || (mode == RKTIO_LTPS_CHECK_WRITE) + || (mode == RKTIO_LTPS_CHECK_VNODE) + || (mode == RKTIO_LTPS_REMOVE) + || (mode == RKTIO_LTPS_REMOVE_VNODE))) { + set_racket_error(RKTIO_ERROR_LTPS_NOT_FOUND); + return NULL; + } + + if (!v) { + v = make_ltps_handle_pair(); + ltps_hash_set(lt, fd, v); + } + +# if !defined(HAVE_KQUEUE_SYSCALL) && !defined(HAVE_EPOLL_SYSCALL) + r = RKTIO_GET_FDSET(lt->fd_set, 0); + w = RKTIO_GET_FDSET(lt->fd_set, 1); + e = RKTIO_GET_FDSET(lt->fd_set, 2); +# endif + + if ((mode == RKTIO_LTPS_REMOVE) || (mode == RKTIO_LTPS_REMOVE_VNODE)) { + s = v->read_handle; + if (s) ltps_signal_handle(lt, s); + s = v->write_handle; + if (s) ltps_signal_handle(lt, s); + ltps_hash_set(lt, fd, NULL); + free(v); + s = NULL; +# ifdef HAVE_KQUEUE_SYSCALL + { + struct kevent kev[2]; + struct timespec timeout = {0, 0}; + int kr, pos = 0; + if (mode == RKTIO_LTPS_REMOVE_VNODE) { + EV_SET(&kev[pos], fd, EVFILT_VNODE, EV_DELETE, 0, 0, NULL); + pos++; + } else { + if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[0])) { + EV_SET(&kev[pos], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + pos++; + } + if (RKTIO_TRUEP(RKTIO_VEC_ELS(v)[1])) { + EV_SET(&kev[pos], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + pos++; + } + } + do { + kr = kevent(lt->fd, kev, pos, NULL, 0, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("remove", kr); + } +# elif defined(HAVE_EPOLL_SYSCALL) + { + int kr; + kr = epoll_ctl(lt->fd, EPOLL_CTL_DEL, fd, NULL); + log_kqueue_error("remove", kr); + } +# else + RKTIO_FD_CLR(fd, r); + RKTIO_FD_CLR(fd, w); + RKTIO_FD_CLR(fd, e); +# endif + } else if ((mode == RKTIO_LTPS_CHECK_READ) + || (mode == RKTIO_LTPS_CREATE_READ) + || (mode == RKTIO_LTPS_CHECK_VNODE) + || (mode == RKTIO_LTPS_CREATE_VNODE)) { + s = v->read_handle; + if (!s) { + if ((mode == RKTIO_LTPS_CREATE_READ) + || (mode == RKTIO_LTPS_CREATE_VNODE)) { + s = make_ltps_handle(); + v->read_handle = s; +# ifdef HAVE_KQUEUE_SYSCALL + { + GC_CAN_IGNORE struct kevent kev; + struct timespec timeout = {0, 0}; + int kr; + if (mode == RKTIO_LTPS_CREATE_READ) + EV_SET(&kev, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL); + else + EV_SET(&kev, fd, EVFILT_VNODE, EV_ADD | EV_ONESHOT, + (NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND + | NOTE_RENAME | NOTE_ATTRIB), + 0, NULL); + do { + kr = kevent(lt->fd, &kev, 1, NULL, 0, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("read", kr); + } +# elif defined(HAVE_EPOLL_SYSCALL) + { + GC_CAN_IGNORE struct epoll_event ev; + int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[1]), kr; + memset(&ev, 0, sizeof(ev)); + ev.data.fd = fd; + ev.events = EPOLLIN | (already ? EPOLLOUT : 0); + kr = epoll_ctl(lt->fd, + (already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev); + log_kqueue_error("read", kr); + } +# else + RKTIO_FD_SET(fd, r); + RKTIO_FD_SET(fd, e); +#endif + } else + s = NULL; + } + } else if ((mode == RKTIO_LTPS_CHECK_WRITE) + || (mode == RKTIO_LTPS_CREATE_WRITE)) { + s = v->write_handle; + if (!s) { + if (mode == RKTIO_LTPS_CREATE_WRITE) { + s = make_ltps_handle(); + v->write_handle = s; +# ifdef HAVE_KQUEUE_SYSCALL + { + GC_CAN_IGNORE struct kevent kev; + struct timespec timeout = {0, 0}; + int kr; + EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL); + do { + kr = kevent(lt->fd, &kev, 1, NULL, 0, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("write", kr); + } + +# elif defined(HAVE_EPOLL_SYSCALL) + { + GC_CAN_IGNORE struct epoll_event ev; + int already = !RKTIO_FALSEP(RKTIO_VEC_ELS(v)[0]), kr; + memset(&ev, 0, sizeof(ev)); + ev.data.fd = fd; + ev.events = EPOLLOUT | (already ? EPOLLIN : 0); + kr = epoll_ctl(lt->fd, + (already ? EPOLL_CTL_MOD : EPOLL_CTL_ADD), fd, &ev); + log_kqueue_error("write", kr); + } +# else + RKTIO_FD_SET(fd, w); + RKTIO_FD_SET(fd, e); +#endif + } else + s = NULL; + } + } + + return s; +#endif +} + +int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt) +{ +#ifdef RKTIO_SYSTEM_WINDOWS + return 0; +#elif defined(HAVE_KQUEUE_SYSCALL) + rktio_ltps_handle_t *s; + rktio_ltps_handle_pair_t *v; + int key; + GC_CAN_IGNORE struct kevent kev; + struct timespec timeout = {0, 0}; + int kr, hit = 0; + + if (lt->fd < 0) + return 0; + + while (1) { + do { + kr = kevent(lt->fd, NULL, 0, &kev, 1, &timeout); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("wait", kr); + + if (kr > 0) { + key = kev.ident; + v = ltps_hash_get(lt, key); + if (v) { + if ((kev.filter == EVFILT_READ) || (kev.filter == EVFILT_VNODE)) { + s = v->read_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->read_handle = NULL; + } + } else if (kev.filter == EVFILT_WRITE) { + s = v->write_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->write_handle = NULL; + } + } + if (!v->read_handle && !v->write_handle) { + ltps_hash_set(lt, key, NULL); + free(v); + } + } else { + log_kqueue_fd(kev.ident, kev.filter); + } + } else + break; + } + + return hit; +#elif defined(HAVE_EPOLL_SYSCALL) + rktio_ltps_handle_t *s; + rktio_ltps_handle_pair_t *v; + int key; + int kr, hit = 0; + GC_CAN_IGNORE struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + + if (lt->fd < 0) + return 0; + + while (1) { + + do { + kr = epoll_wait(lt->fd, &ev, 1, 0); + } while ((kr == -1) && (errno == EINTR)); + log_kqueue_error("wait", kr); + + if (kr > 0) { + key = ev.data.fd; + v = ltps_hash_get(lt, key); + if (v) { + if (ev.events & (POLLIN | POLLHUP | POLLERR)) { + s = v->read_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->read_handle = NULL; + } + } + if (ev.events & (POLLOUT | POLLHUP | POLLERR)) { + s = v->write_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->write_handle = NULL; + } + } + if (!v->read_handle && !v->write_handle) { + ltps_hash_set(lt, key, NULL); + free(v); + kr = epoll_ctl(lt->fd, EPOLL_CTL_DEL, ev.data.fd, NULL); + log_kqueue_error("remove*", kr); + } else { + ev.events = ((!v->read_handle ? 0 : POLLIN) + | (!v->write_handle ? 0 : POLLOUT)); + kr = epoll_ctl(lt->fd, EPOLL_CTL_MOD, ev.data.fd, &ev); + log_kqueue_error("update", kr); + } + } else { + log_kqueue_fd(ev.data.fd, ev.events); + } + } else + break; + } + + return hit; +#elif defined(HAVE_POLL_SYSCALL) + struct pollfd *pfd; + intptr_t i, c; + rktio_ltps_handle_t *s; + rktio_ltps_handle_pair_t *v; + int key; + int sr, hit = 0; + + if (ltps_is_hash_empty(lt)) + return 0; + + rktio_clean_fd_set(lt->fd_set); + c = rkt_io_poll_count(lt->fd_set); + pfd = rktio_get_poll_fd_array(lt->fd_set); + + do { + sr = poll(pfd, c, 0); + } while ((sr == -1) && (errno == EINTR)); + + if (sr > 0) { + for (i = 0; i < c; i++) { + if (pfd[i].revents) { + key = pfd[i].fd; + v = ltps_hash_get(lt, key); + if (v) { + if (pfd[i].revents & (POLLIN | POLLHUP | POLLERR)) { + s = v->read_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->read_handle = NULL; + } + pfd[i].events -= (pfd[i].events & POLLIN); + } + if (pfd[i].revents & (POLLOUT | POLLHUP | POLLERR)) { + s = v->write_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->write_handle = NULL; + } + pfd[i].events -= (pfd[i].events & POLLOUT); + } + if (!v->read_handle && !v->write_handle) { + rktio_hash_set(rktio_semaphore_fd_mapping, key, NULL); + free(v); + } + } + } + } + } + + return hit; +#else + rktio_poll_set_t *fds; + struct timeval time = {0, 0}; + int i, actual_limit, r, w, e, sr, hit = 0; + rktio_ltps_handle_t *s; + rktio_ltps_handle_pair_t *v; + int key; + + DECL_FDSET(set, 3); + rktio_poll_set_t *set1, *set2; + + INIT_DECL_FDSET(set, set1, set2); + set1 = RKTIO_GET_FDSET(set, 1); + set2 = RKTIO_GET_FDSET(set, 2); + + fds = set; + RKTIO_FD_ZERO(set); + RKTIO_FD_ZERO(set1); + RKTIO_FD_ZERO(set2); + + if (ltps_is_hash_empty(lt)) + return 0; + + rktio_merge_fd_sets(fds, lt->fd_set); + + actual_limit = rktio_get_fd_limit(fds); + + do { + sr = select(actual_limit, RKTIO_FDS(set), RKTIO_FDS(set1), RKTIO_FDS(set2), &time); + } while ((sr == -1) && (errno == EINTR)); + + if (sr > 0) { + for (i = 0; i < actual_limit; i++) { + r = RKTIO_FD_ISSET(i, set); + w = RKTIO_FD_ISSET(i, set1); + e = RKTIO_FD_ISSET(i, set2); + if (r || w || e) { + key = i; + v = ltps_hash_get(lt, key); + if (v) { + if (r || e) { + s = v->read_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->read_handle = NULL; + } + RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 0)); + } + if (w || e) { + s = v->write_handle; + if (s) { + ltps_signal_handle(lt, s); + hit = 1; + v->write_handle = NULL; + } + RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 1)); + } + if (!v->read_handle && !v->write_handle) { + RKTIO_FD_CLR(i, RKTIO_GET_FDSET(lt->fd_set, 2)); + ltps_hash_set(lt, key, NULL); + free(v); + } + } + } + } + } + + return hit; +#endif +} + +/************************************************************/ + +static rktio_ltps_handle_pair_t *ltps_hash_get(rktio_ltps_t *lt, int fd) +{ + return NULL; +} + +static void ltps_hash_set(rktio_ltps_t *lt, int fd, rktio_ltps_handle_pair_t *v) +{ +} + +static int ltps_is_hash_empty(rktio_ltps_t *lt) +{ + return 1; +} diff --git a/racket/src/rktio/rktio_poll_set.c b/racket/src/rktio/rktio_poll_set.c index 2a93e133c1..7d62c070b3 100644 --- a/racket/src/rktio/rktio_poll_set.c +++ b/racket/src/rktio/rktio_poll_set.c @@ -7,9 +7,6 @@ # include # include #endif -#ifdef RKTIO_SYSTEM_WINDOWS -# include -#endif #include /* Generalize fd arrays (FD_SET, etc) with a runtime-determined size, @@ -72,6 +69,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count) return data; } +void rktio_free_fdset_array(rktio_poll_set_t *fds, int count) +{ + FIXME; +} + rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count) { ((struct rktio_fd_set *)fdarray)->data->count = 0; @@ -185,7 +187,7 @@ static int cmp_fd(const void *_a, const void *_b) return a->fd - b->fd; } -rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) +void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) { struct rktio_fd_set_data_t *data = fds->data; struct rktio_fd_set_data_t *src_data = src_fds->data; @@ -198,16 +200,14 @@ rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *s c = data->count; sc = src_data->count; - if (!c) - return src_fds; if (!sc) - return fds; + return; qsort(data->pfd, c, sizeof(struct pollfd), cmp_fd); qsort(src_data->pfd, sc, sizeof(struct pollfd), cmp_fd); nc = c + sc; - pfds = (struct pollfd *)rktio_malloc_atomic(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE)); + pfds = malloc(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE)); j = 0; for (i = 0, si = 0; (i < c) && (si < sc); ) { if (data->pfd[i].fd == src_data->pfd[si].fd) { @@ -235,12 +235,15 @@ rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *s pfds[j].events = src_data->pfd[si].events; } - if (nc > RKTIO_INT_VAL(data->size)) { + if (nc > data->size) { + free(data->pfd); data->pfd = pfds; - data->size = rktio_make_integer(nc); - } else + data->size = nc; + } else { memcpy(data->pfd, pfds, j * sizeof(struct pollfd)); - data->count = rktio_make_integer(j); + free(pfds); + } + data->count = j; return fds; } @@ -270,6 +273,16 @@ int rktio_get_fd_limit(rktio_poll_set_t *fds) return 0; } +int rktio_get_poll_count(rktio_poll_set_t *fds) +{ + return fds->data->count; +} + +int rktio_get_poll_fd_array(rktio_poll_set_t *fds) +{ + return fds->data->pfd; +} + #elif defined(USE_DYNAMIC_FDSET_SIZE) /************************************************************/ @@ -304,6 +317,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count) return malloc(count * (dynamic_fd_size + sizeof(intptr_t))); } +void rktio_free_fdset_array(rktio_poll_set_t *fds, int count) +{ + free(fds); +} + rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count) { return fdarray; @@ -357,6 +375,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count) return fdarray; } +void rktio_free_fdset_array(rktio_poll_set_t *fds, int count) +{ + FIXME; +} + static void reset_wait_array(rktio_poll_set_t *efd) { /* Allocate an array that may be big enough to hold all events @@ -455,14 +478,13 @@ int rktio_fdisset(rktio_poll_set_t *fd, int n) return 0; } -rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) +void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) { int i; for (i = src_fd->added; i--; ) { if (stv_fd->sockets[i] != INVALID_SOCKET) rktio_fdset(fds, src_fd->sockets[i]); } - return fds; } void rktio_clean_fd_set(rktio_poll_set_t *fds) @@ -474,7 +496,7 @@ int rktio_get_fd_limit(rktio_poll_set_t *fds) return 0; } -void rktio_fdset_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost) +void rktio_poll_set_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost) { rktio_poll_set_t *efd = fds; OS_SEMAPHORE_TYPE *hs; @@ -504,12 +526,12 @@ void rktio_fdset_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost) efd->num_handles++; } -void rktio_add_fd_nosleep(rktio_poll_set_t *fds) +void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds) { fds->no_sleep = 1; } -void rktio_fdset_add_eventmask(rktio_poll_set_t *fds, int mask) +void rktio_poll_set_eventmask(rktio_poll_set_t *fds, int mask) { fds->wait_event_mask |= mask; } @@ -688,6 +710,11 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count) return malloc(count * sizeof(fd_set)); } +void rktio_free_fdset_array(rktio_poll_set_t *fds, int count) +{ + free(fds); +} + rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count) { return fdarray; @@ -724,7 +751,7 @@ int rktio_fdisset(rktio_poll_set_t *fd, int n) return FD_ISSET(n, &(fd)->data); } -rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) +void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds) { int i, j; unsigned char *p, *sp; @@ -746,7 +773,6 @@ rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *s *p |= *sp; } } - return fds; } void rktio_clean_fd_set(rktio_poll_set_t *fds) @@ -919,8 +945,18 @@ void rkio_notify_sleep_progress(void) /* FIXME: don't forget SIGCHILD_DOESNT_INTERRUPT_SELECT handling in Racket */ -void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds) +void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt) { + if (fds && lt) { +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) + int fd = rktio_ltps_get_fd(lt); + RKTIO_FD_SET(fd, fds); + RKTIO_FD_SET(fd, RKTIO_GET_FDSET(fds, 2)); +#else + rktio_merge_fd_sets(fds, rktio_ltps_get_fd_set(lt)); +#endif + } + if (!fds) { /* Nothing to block on - just sleep for some amount of time. */ #ifdef RKTIO_SYSTEM_UNIX @@ -1052,7 +1088,7 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds) { intptr_t result; - HANDLE *array, just_two_array[2], break_sema; + HANDLE *array, just_two_array[2]; int count, rcount, *rps; if (fds->no_sleep) @@ -1068,8 +1104,7 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds) /* add break semaphore: */ if (!count) array = just_two_array; - break_sema = (HANDLE)scheme_break_semaphore; - array[count++] = break_sema; + array[count++] = rktio->break_semaphore; /* Extensions may handle events. If the event queue is empty (as reported by GetQueueStatus), diff --git a/racket/src/rktio/rktio_private.h b/racket/src/rktio/rktio_private.h index 0f2d4526b5..60d8dd967b 100644 --- a/racket/src/rktio/rktio_private.h +++ b/racket/src/rktio/rktio_private.h @@ -31,6 +31,7 @@ struct rktio_t { struct group_member_cache_entry_t *group_member_cache; int external_event_fd; int put_external_event_fd; + int long_term_poll_set_fd; #endif #ifdef RKTIO_SYSTEM_WINDOWS int windows_nt_or_later; @@ -93,10 +94,19 @@ struct rktio_poll_set_t { fd_set data; }; #endif -rktio_poll_set_t *rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds); +rktio_poll_set_t *rktio_alloc_fdset_array(int count); +void rktio_free_fdset_array(rktio_poll_set_t *fds, int count); + +void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds); void rktio_clean_fd_set(rktio_poll_set_t *fds); int rktio_get_fd_limit(rktio_poll_set_t *fds); +#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL) +int rktio_ltps_get_fd(rktio_ltps_t *lt); +#else +rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt); +#endif + /************************************************************/ /* Misc */ /************************************************************/