rktio: add networking

This commit is contained in:
Matthew Flatt 2017-06-11 09:40:25 -06:00
parent adedf861b2
commit 3065773b31
11 changed files with 3642 additions and 240 deletions

View File

@ -10,6 +10,7 @@ OBJS = rktio_filesystem.o \
rktio_read_write.o \
rktio_poll_set.o \
rktio_ltps.o \
rktio_network.o \
rktio_error.o \
rktio_main.o
@ -33,6 +34,9 @@ rktio_poll_set.o: $(srcdir)/rktio_poll_set.c $(RKTIO_HEADERS)
rktio_ltps.o: $(srcdir)/rktio_ltps.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_ltps.o -c $(srcdir)/rktio_ltps.c
rktio_network.o: $(srcdir)/rktio_network.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_network.o -c $(srcdir)/rktio_network.c
rktio_error.o: $(srcdir)/rktio_error.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_error.o -c $(srcdir)/rktio_error.c

View File

@ -41,7 +41,7 @@ static void do_check_expected_racket_error(rktio_t *rktio, int err, int what, in
}
}
#define check_valid(e) do_check_valid(rktio, e, __LINE__)
#define check_valid(e) do_check_valid(rktio, ((e)?1:0), __LINE__)
#define check_expected_error(e) do_check_expected_error(rktio, e, __LINE__)
#define check_expected_racket_error(e, what) do_check_expected_racket_error(rktio, e, what, __LINE__)
@ -66,7 +66,7 @@ static rktio_ltps_t *try_check_ltps(rktio_t *rktio,
}
check_expected_racket_error(!h1, RKTIO_ERROR_LTPS_NOT_FOUND);
h1 = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CREATE_READ);
check_valid(!!h1);
check_valid(h1);
hx = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CREATE_READ);
check_valid(hx == h1);
hx = rktio_ltps_add(rktio, lt, fd, RKTIO_LTPS_CHECK_WRITE);
@ -78,9 +78,9 @@ static rktio_ltps_t *try_check_ltps(rktio_t *rktio,
h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CHECK_WRITE);
check_expected_racket_error(!h2, RKTIO_ERROR_LTPS_NOT_FOUND);
h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_WRITE);
check_valid(!!h2);
check_valid(h2);
hx = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_READ);
check_valid(!!hx);
check_valid(hx);
/* Removing `fd2` should signal the handles `h2` and `hx` */
hy = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_REMOVE);
@ -95,7 +95,7 @@ static rktio_ltps_t *try_check_ltps(rktio_t *rktio,
check_expected_racket_error(!hy, RKTIO_ERROR_LTPS_NOT_FOUND);
/* Add write handle for fd2 again: */
h2 = rktio_ltps_add(rktio, lt, fd2, RKTIO_LTPS_CREATE_WRITE);
check_valid(!!h2);
check_valid(h2);
*_h1 = h1;
*_h2 = h2;
@ -170,8 +170,8 @@ static void check_hello_content(rktio_t *rktio, char *fn)
char buffer[256], *s;
fd = rktio_open(rktio, fn, RKTIO_OPEN_READ);
check_valid(!!fd);
check_valid(rktio_poll_read_ready(rktio, fd) != RKTIO_POLL_ERROR);
check_valid(fd);
check_valid(rktio_poll_read_ready(rktio, fd) == RKTIO_POLL_READY);
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == 5);
check_valid(!strncmp(buffer, "hello", 5));
@ -180,6 +180,159 @@ static void check_hello_content(rktio_t *rktio, char *fn)
check_valid(rktio_close(rktio, fd));
}
static void wait_read(rktio_t *rktio, rktio_fd_t *fd)
{
rktio_poll_set_t *ps;
ps = rktio_make_poll_set(rktio);
check_valid(ps);
rktio_poll_add(rktio, fd, ps, RKTIO_POLL_READ);
rktio_sleep(rktio, 0, ps, NULL);
rktio_poll_set_close(rktio, ps);
}
static void check_read_write_pair(rktio_t *rktio, rktio_fd_t *fd, rktio_fd_t *fd2)
{
rktio_ltps_t *lt;
rktio_ltps_handle_t *h1, *h2;
intptr_t amt, i;
char buffer[256];
int immediate_available = (!rktio_fd_is_socket(rktio, fd) && !rktio_fd_is_socket(rktio, fd2));
lt = try_check_ltps(rktio, fd, fd2, &h1, &h2);
/* We expect `lt` to work everywhere exception Windows and with kqueue on non-sockets: */
#if !defined(RKTIO_SYSTEM_WINDOWS)
# if !defined(HAVE_KQUEUE_SYSCALL)
check_valid(lt);
# else
if (rktio_fd_is_socket(rktio, fd) && rktio_fd_is_socket(rktio, fd2))
check_valid(lt);
# endif
#endif
/* fd2 can write, fd cannot yet read */
check_valid(!rktio_poll_read_ready(rktio, fd));
if (lt)
check_ltps_write_ready(rktio, lt, h2);
/* Round-trip data through pipe: */
amt = rktio_write(rktio, fd2, "hello", 5);
check_valid(amt == 5);
if (!immediate_available) {
/* Wait for read to be ready; should not block for long */
wait_read(rktio, fd);
}
check_valid(rktio_poll_read_ready(rktio, fd) == RKTIO_POLL_READY);
if (lt) {
check_ltps_read_ready(rktio, lt, h1);
check_valid(rktio_ltps_close(rktio, lt));
}
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == 5);
check_valid(!strncmp(buffer, "hello", 5));
check_valid(!rktio_poll_read_ready(rktio, fd));
/* Close pipe ends: */
check_valid(rktio_close(rktio, fd2));
if (!immediate_available) {
/* Wait for EOF to be ready; should not block for long */
wait_read(rktio, fd);
}
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == RKTIO_READ_EOF);
check_valid(rktio_close(rktio, fd));
/* Open pipe ends again: */
fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(fd2);
/* should eventually block: */
for (i = 0; i < 100000; i++) {
amt = rktio_write(rktio, fd2, "hello", 5);
check_valid(amt != RKTIO_WRITE_ERROR);
if (!amt)
break;
}
check_valid(i < 100000);
fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ);
check_valid(fd);
/* should eventually block: */
for (i = 0; i < 100000; i++) {
amt = rktio_read(rktio, fd2, buffer, sizeof(buffer));
check_valid(amt != RKTIO_READ_ERROR);
check_valid(amt != RKTIO_READ_EOF);
if (!amt)
break;
}
check_valid(i < 100000);
check_valid(rktio_close(rktio, fd));
check_valid(rktio_close(rktio, fd2));
}
rktio_addrinfo_t *lookup_loop(rktio_t *rktio,
const char *hostname, int portno,
int family, int passive, int tcp)
{
rktio_addrinfo_lookup_t *lookup;
rktio_addrinfo_t *addr;
rktio_poll_set_t *ps;
ps = rktio_make_poll_set(rktio);
check_valid(ps);
lookup = rktio_start_addrinfo_lookup(rktio, hostname, portno, family, passive, tcp);
check_valid(lookup);
rktio_poll_add_addrinfo_lookup(rktio, lookup, ps);
rktio_sleep(rktio, 0, ps, NULL);
rktio_poll_set_close(rktio, ps);
check_valid(rktio_poll_addrinfo_lookup_ready(rktio, lookup) == RKTIO_POLL_READY);
addr = rktio_addrinfo_lookup_get(rktio, lookup);
check_valid(addr);
return addr;
}
static rktio_fd_t *connect_loop(rktio_t *rktio, rktio_addrinfo_t *addr, rktio_addrinfo_t *local_addr)
{
rktio_connect_t *conn;
rktio_poll_set_t *ps;
rktio_fd_t *fd;
conn = rktio_start_connect(rktio, addr, local_addr);
check_valid(conn);
while (1) {
ps = rktio_make_poll_set(rktio);
check_valid(ps);
rktio_poll_add_connect(rktio, conn, ps);
rktio_sleep(rktio, 0, ps, NULL);
rktio_poll_set_close(rktio, ps);
check_valid(rktio_poll_connect_ready(rktio, conn) == RKTIO_POLL_READY);
fd = rktio_connect_finish(rktio, conn);
if (!fd) {
if ((rktio_get_last_error_kind(rktio) == RKTIO_ERROR_KIND_RACKET)
&& (rktio_get_last_error(rktio) == RKTIO_ERROR_CONNECT_TRYING_NEXT)) {
/* loop to try again */
} else {
check_valid(fd);
}
} else
break;
}
return fd;
}
int main()
{
rktio_t *rktio;
@ -187,7 +340,7 @@ int main()
rktio_fd_t *fd, *fd2;
intptr_t amt, i, saw_file;
int perms;
char buffer[256], *s, *pwd;
char *s, *pwd;
rktio_directory_list_t *ls;
rktio_file_copy_t *cp;
rktio_timestamp_t *ts1, *ts1a;
@ -199,7 +352,7 @@ int main()
/* Basic file I/O */
fd = rktio_open(rktio, "test1", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(!!fd);
check_valid(fd);
check_valid(rktio_poll_write_ready(rktio, fd) != RKTIO_POLL_ERROR);
amt = rktio_write(rktio, fd, "hello", 5);
check_valid(amt == 5);
@ -210,7 +363,7 @@ int main()
check_valid(rktio_is_regular_file(rktio, "test1"));
s = rktio_get_current_directory(rktio);
check_valid(!!s);
check_valid(s);
check_valid(rktio_directory_exists(rktio, s));
check_valid(!rktio_file_exists(rktio, s));
check_valid(!rktio_is_regular_file(rktio, s));
@ -219,7 +372,7 @@ int main()
pwd = s;
sz = rktio_file_size(rktio, "test1");
check_valid(!!sz);
check_valid(sz);
check_valid(sz->lo == 5);
check_valid(sz->hi == 0);
free(sz);
@ -259,7 +412,7 @@ int main()
rktio_set_file_or_directory_permissions(rktio, "test1", perms);
cp = rktio_copy_file_start(rktio, "test1a", "test1", 0);
check_valid(!!cp);
check_valid(cp);
while (!rktio_copy_file_is_done(rktio, cp)) {
check_valid(rktio_copy_file_step(rktio, cp));
}
@ -282,7 +435,7 @@ int main()
check_expected_racket_error(!cp, RKTIO_ERROR_EXISTS);
cp = rktio_copy_file_start(rktio, "test1a", "test1", 1);
check_valid(!!cp);
check_valid(cp);
rktio_copy_file_stop(rktio, cp);
check_valid(rktio_rename_file(rktio, "test1b", "test1a", 0));
@ -299,11 +452,11 @@ int main()
/* Listing directory content */
ls = rktio_directory_list_start(rktio, pwd, 0);
check_valid(!!ls);
check_valid(ls);
saw_file = 0;
while (1) {
s = rktio_directory_list_step(rktio, ls);
check_valid(!!s);
check_valid(s);
if (!*s) break;
if (!strcmp(s, "test1"))
saw_file = 1;
@ -314,11 +467,11 @@ int main()
/* We expect `lt` to work on regular files everywhere except Windows: */
#if !defined(RKTIO_SYSTEM_WINDOWS) && !defined(HAVE_KQUEUE_SYSCALL)
fd = rktio_open(rktio, "test1", RKTIO_OPEN_READ);
check_valid(!!fd);
check_valid(fd);
fd2 = rktio_open(rktio, "test1", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(!!fd2);
check_valid(fd2);
lt = try_check_ltps(rktio, fd, fd2, &h1, &h2);
check_valid(!!lt);
check_valid(lt);
check_ltps_read_and_write_ready(rktio, lt, h1, h2);
check_valid(rktio_ltps_close(rktio, lt));
check_valid(rktio_close(rktio, fd));
@ -328,70 +481,41 @@ int main()
/* Pipes, non-blocking operations, and more long-term poll sets */
fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ);
check_valid(!!fd);
check_valid(fd);
check_valid(!rktio_poll_read_ready(rktio, fd));
fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(!!fd2);
check_valid(fd2);
check_valid(!rktio_poll_read_ready(rktio, fd));
lt = try_check_ltps(rktio, fd, fd2, &h1, &h2);
/* We expect `lt` to work everywhere exception Windows and with kqueue: */
#if !defined(RKTIO_SYSTEM_WINDOWS) && !defined(HAVE_KQUEUE_SYSCALL)
check_valid(!!lt);
#endif
check_read_write_pair(rktio, fd, fd2);
/* fd2 can write, fd cannot yet read */
check_valid(!rktio_poll_read_ready(rktio, fd));
if (lt)
check_ltps_write_ready(rktio, lt, h2);
/* Networking */
{
rktio_addrinfo_t *addr;
rktio_listener_t *lnr;
addr = lookup_loop(rktio, "localhost", 4536, -1, 1, 1);
/* Round-trip data through pipe: */
amt = rktio_write(rktio, fd2, "hello", 5);
check_valid(amt == 5);
check_valid(rktio_poll_read_ready(rktio, fd));
if (lt) {
check_ltps_read_ready(rktio, lt, h1);
check_valid(rktio_ltps_close(rktio, lt));
lnr = rktio_listen(rktio, addr, 5, 1);
check_valid(lnr);
rktio_free_addrinfo(rktio, addr);
check_valid(!rktio_poll_accept_ready(rktio, lnr));
addr = lookup_loop(rktio, "localhost", 4536, -1, 0, 1);
fd = connect_loop(rktio, addr, NULL);
rktio_free_addrinfo(rktio, addr);
check_valid(rktio_poll_accept_ready(rktio, lnr) == RKTIO_POLL_READY);
fd2 = rktio_accept(rktio, lnr);
check_valid(fd2);
check_valid(!rktio_poll_accept_ready(rktio, lnr));
check_read_write_pair(rktio, fd, fd2);
rktio_listen_stop(rktio, lnr);
}
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == 5);
check_valid(!strncmp(buffer, "hello", 5));
check_valid(!rktio_poll_read_ready(rktio, fd));
/* Close pipe ends: */
check_valid(rktio_close(rktio, fd2));
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == RKTIO_READ_EOF);
check_valid(rktio_close(rktio, fd));
/* Open pipe ends again: */
fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(!!fd2);
/* should eventually block: */
for (i = 0; i < 100000; i++) {
amt = rktio_write(rktio, fd2, "hello", 5);
check_valid(amt != RKTIO_WRITE_ERROR);
if (!amt)
break;
}
check_valid(i < 100000);
fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ);
check_valid(!!fd);
/* should eventually block: */
for (i = 0; i < 100000; i++) {
amt = rktio_read(rktio, fd2, buffer, sizeof(buffer));
check_valid(amt != RKTIO_READ_ERROR);
check_valid(amt != RKTIO_READ_EOF);
if (!amt)
break;
}
check_valid(i < 100000);
check_valid(rktio_close(rktio, fd));
check_valid(rktio_close(rktio, fd2));
return 0;
}

View File

@ -27,6 +27,7 @@ typedef struct rktio_fd_t rktio_fd_t;
#define RKTIO_OPEN_REPLACE (1<<4)
#define RKTIO_OPEN_MUST_EXIST (1<<5)
#define RKTIO_OPEN_CAN_EXIST (1<<6)
#define RKTIO_OPEN_SOCKET (1<<7)
rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes);
intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd);
@ -41,6 +42,7 @@ int rktio_close(rktio_t *rktio, rktio_fd_t *fd);
#define RKTIO_READ_ERROR (-2)
#define RKTIO_WRITE_ERROR (-2)
#define RKTIO_POLL_ERROR (-2)
#define RKTIO_POLL_READY 1
intptr_t rktio_read(rktio_t *rktio, rktio_fd_t *fd, char *buffer, intptr_t len);
intptr_t rktio_write(rktio_t *rktio, rktio_fd_t *fd, char *buffer, intptr_t len);
@ -49,6 +51,44 @@ int rktio_poll_read_ready(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_poll_write_ready(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_poll_write_flushed(rktio_t *rktio, rktio_fd_t *rfd);
/*************************************************/
/* Network */
typedef struct rktio_addrinfo_lookup_t rktio_addrinfo_lookup_t;
typedef struct rktio_addrinfo_t rktio_addrinfo_t;
int rktio_get_ipv4_family(rktio_t *rktio);
rktio_addrinfo_lookup_t *rktio_start_addrinfo_lookup(rktio_t *rktio,
const char *hostname, int portno,
int family, int passive, int tcp);
int rktio_poll_addrinfo_lookup_ready(rktio_t *rktio, rktio_addrinfo_lookup_t *lookup);
rktio_addrinfo_t *rktio_addrinfo_lookup_get(rktio_t *rktio, rktio_addrinfo_lookup_t *lookup);
void rktio_addrinfo_lookup_stop(rktio_t *rktio, rktio_addrinfo_lookup_t *lookup);
void rktio_free_addrinfo(rktio_t *rktio, struct rktio_addrinfo_t *a);
typedef struct rktio_listener_t rktio_listener_t;
typedef struct rktio_connect_t rktio_connect_t;
#define RKTIO_SHUTDOWN_READ RKTIO_OPEN_READ
#define RKTIO_SHUTDOWN_WRITE RKTIO_OPEN_WRITE
rktio_listener_t *rktio_listen(rktio_t *rktio, rktio_addrinfo_t *local, int backlog, int reuse);
void rktio_listen_stop(rktio_t *rktio, rktio_listener_t *l);
int rktio_poll_accept_ready(rktio_t *rktio, rktio_listener_t *listener);
rktio_fd_t *rktio_accept(rktio_t *rktio, rktio_listener_t *listener);
/* Addreses must not be freed until the connection is complete or stopped: */
rktio_connect_t *rktio_start_connect(rktio_t *rktio, rktio_addrinfo_t *remote, rktio_addrinfo_t *local);
/* A `RKTIO_ERROR_CONNECT_TRYING_NEXT` error effectively means "try again",
and the connection object is still valid: */
rktio_fd_t *rktio_connect_finish(rktio_t *rktio, rktio_connect_t *conn);
void rktio_connect_stop(rktio_t *rktio, rktio_connect_t *conn);
int rktio_poll_connect_ready(rktio_t *rktio, rktio_connect_t *conn);
int rktio_socket_shutdown(rktio_t *rktio, rktio_fd_t *rfd, int mode);
/*************************************************/
/* File-descriptor sets for polling */
@ -57,13 +97,19 @@ typedef struct rktio_poll_set_t rktio_poll_set_t;
#define RKTIO_POLL_READ RKTIO_OPEN_READ
#define RKTIO_POLL_WRITE RKTIO_OPEN_WRITE
rktio_poll_set_t *rktio_make_poll_set();
rktio_poll_set_t *rktio_make_poll_set(rktio_t *rktio);
void rktio_poll_set_close(rktio_t *rktio, rktio_poll_set_t *fds);
void rktio_poll_add(rktio_t *rktio, rktio_fd_t *rfd, rktio_poll_set_t *fds, int modes);
void rktio_poll_add_receive(rktio_t *rktio, rktio_listener_t *listener, rktio_poll_set_t *fds);
void rktio_poll_add_connect(rktio_t *rktio, rktio_connect_t *conn, rktio_poll_set_t *fds);
void rktio_poll_add_addrinfo_lookup(rktio_t *rktio, rktio_addrinfo_lookup_t *lookup, rktio_poll_set_t *fds);
void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds);
#ifdef RKTIO_SYSTEM_WINDOWS
void rktio_poll_set_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost);
void rktio_poll_set_add_eventmask(rktio_poll_set_t *fds, int mask);
void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds);
#endif
/*************************************************/
@ -205,7 +251,7 @@ void rktio_signal_received(rktio_t *rktio);
enum {
RKTIO_ERROR_KIND_POSIX,
RKTIO_ERROR_KIND_WINDOWS,
RKTIO_ERROR_KIND_GAI,
RKTIO_ERROR_KIND_GAI, /* => error sub-code available */
RKTIO_ERROR_KIND_RACKET
};
@ -224,12 +270,22 @@ enum {
RKTIO_ERROR_INIT_FAILED,
RKTIO_ERROR_LTPS_NOT_FOUND,
RKTIO_ERROR_LTPS_REMOVED, /* indicates success, instead of failure */
RKTIO_ERROR_CONNECT_TRYING_NEXT, /* indicates that failure is not (yet) premanent */
RKTIO_ERROR_ACCEPT_NOT_READY,
RKTIO_ERROR_HOST_AND_PORT_BOTH_UNSPECIFIED,
RKTIO_ERROR_TRY_AGAIN_WITH_IPV4,
};
/* GAI error sub-codes */
enum {
RKTIO_ERROR_REMOTE_HOST_NOT_FOUND,
RKTIO_ERROR_LOCAL_HOST_NOT_FOUND,
};
int rktio_get_last_error(rktio_t *rktio);
int rktio_get_last_error_kind(rktio_t *rktio);
char *rktio_get_error_string(rktio_t *rktio, int kind, int errid);
const char *rktio_get_error_string(rktio_t *rktio, int kind, int errid);
/*************************************************/

View File

@ -36,11 +36,13 @@ int rktio_get_last_error_kind(rktio_t *rktio)
return rktio->errkind;
}
char *rktio_get_error_string(rktio_t *rktio, int kind, int errid)
const char *rktio_get_error_string(rktio_t *rktio, int kind, int errid)
{
char *s = NULL;
const char *s = NULL;
if (kind == RKTIO_ERROR_KIND_POSIX)
s = strerror(errid);
else if (kind == RKTIO_ERROR_KIND_GAI)
s = rktio_gai_strerror(errid);
if (s) return s;
return "???";
}

View File

@ -1276,9 +1276,9 @@ rktio_size_t *rktio_file_size(rktio_t *rktio, char *filename)
#endif
}
/*************************************************************/
/* directory list */
/*************************************************************/
/*========================================================================*/
/* directory list */
/*========================================================================*/
#ifdef USE_FINDFIRST
@ -1457,9 +1457,9 @@ char *rktio_directory_list_step(rktio_t *rktio, rktio_directory_list_t *dl)
#endif
/*************************************************************/
/* copy file */
/*************************************************************/
/*========================================================================*/
/* copy file */
/*========================================================================*/
struct rktio_file_copy_t {
int done;
@ -1584,9 +1584,9 @@ void rktio_copy_file_stop(rktio_t *rktio, rktio_file_copy_t *fc)
free(fc);
}
/*************************************************************/
/* filesystem root list */
/*************************************************************/
/*========================================================================*/
/* filesystem root list */
/*========================================================================*/
char **rktio_filesystem_root_list(rktio_t *rktio)
/* returns a NULL-terminated array of strings */
@ -1647,9 +1647,9 @@ char **rktio_filesystem_root_list(rktio_t *rktio)
#endif
}
/*************************************************************/
/* expand user tilde & system paths */
/*************************************************************/
/*========================================================================*/
/* expand user tilde & system paths */
/*========================================================================*/
char *rktio_expand_user_tilde(rktio_t *rktio, char *filename) {
#ifdef RKTIO_SYSTEM_WINDOWS

View File

@ -47,7 +47,7 @@ static int ltps_is_hash_empty(rktio_ltps_t *lt);
static void ltps_hash_init(rktio_ltps_t *lt);
static void ltps_hash_free(rktio_ltps_t *lt);
/************************************************************/
/*========================================================================*/
rktio_ltps_handle_pair_t *make_ltps_handle_pair()
{
@ -82,7 +82,7 @@ void *rktio_ltps_handle_get_data(rktio_ltps_t *lt, rktio_ltps_handle_t *s)
return s->data;
}
/************************************************************/
/*========================================================================*/
rktio_ltps_t *rktio_open_ltps(rktio_t *rktio)
{
@ -240,7 +240,6 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
s = v->write_handle;
if (s) ltps_signal_handle(lt, s);
ltps_hash_remove(lt, fd);
free(v);
s = NULL;
# ifdef HAVE_KQUEUE_SYSCALL
{
@ -276,6 +275,7 @@ rktio_ltps_handle_t *rktio_ltps_add(rktio_t *rktio, rktio_ltps_t *lt, rktio_fd_t
RKTIO_FD_CLR(fd, w);
RKTIO_FD_CLR(fd, e);
# endif
free(v);
set_racket_error(RKTIO_ERROR_LTPS_REMOVED); /* success, not failure */
} else if ((mode == RKTIO_LTPS_CHECK_READ)
|| (mode == RKTIO_LTPS_CREATE_READ)
@ -599,7 +599,7 @@ int rktio_ltps_poll(rktio_t *rktio, rktio_ltps_t *lt)
#endif
}
/************************************************************/
/*========================================================================*/
typedef struct ltps_bucket_t {
/* v is non-NULL => bucket is filled */

View File

@ -21,6 +21,8 @@ rktio_t *rktio_init(void)
void rktio_destroy(rktio_t *rktio)
{
rktio_free_ghbn(rktio);
rktio_free_global_poll_set(rktio);
free(rktio);
}

File diff suppressed because it is too large Load Diff

View File

@ -13,19 +13,9 @@
#include <string.h>
#include <stdlib.h>
/* Generalize fd arrays (FD_SET, etc) with a runtime-determined size,
special hooks for Windows "descriptors" like even queues and
semaphores, etc. */
void rktio_alloc_global_poll_set(rktio_t *rktio) {
#ifdef USE_FAR_RKTIO_FDCALLS
rktio->rktio_global_poll_set = rktio_alloc_fdset_array(3);
#endif
}
/************************************************************/
/* Poll variant */
/************************************************************/
/*========================================================================*/
/* Poll variant */
/*========================================================================*/
#ifdef HAVE_POLL_SYSCALL
@ -41,9 +31,10 @@ struct rktio_poll_set_t {
struct rktio_fd_set_data_t {
struct pollfd *pfd;
intptr_t size, count;
int skip_sleep;
};
rktio_poll_set_t *rktio_alloc_fdset_array(int count)
static rktio_poll_set_t *alloc_fdset_arrays()
{
struct rktio_fd_set_data_t *data;
rktio_poll_set_t *r, *w, *e;
@ -66,6 +57,7 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
data->size = 32;
data->count = 0;
data->skip_sleep = 0;
pfd = malloc(sizeof(struct pollfd) * (32 + PFD_EXTRA_SPACE));
data->pfd = pfd;
@ -73,7 +65,7 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
return r;
}
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
static void free_fdset_arrays(rktio_poll_set_t *fds)
{
struct rktio_fd_set_data_t *data = fds->data;
free(fds->w);
@ -83,12 +75,6 @@ void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
free(data);
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
{
fdarray->data->count = 0;
return fdarray;
}
rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
{
switch (pos) {
@ -290,11 +276,21 @@ struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds)
return fds->data->pfd;
}
void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
{
fds->data->skip_sleep = 1;
}
static int fdset_has_nosleep(rktio_poll_set_t *fds)
{
return fds->data->skip_sleep;
}
#elif defined(USE_DYNAMIC_FDSET_SIZE)
/************************************************************/
/* Variant with run-time determined fd_set length */
/************************************************************/
/*========================================================================*/
/* Variant with run-time determined fd_set length */
/*========================================================================*/
struct rktio_poll_set_t {
fd_set data;
@ -306,8 +302,10 @@ static int dynamic_fd_size;
# define STORED_ACTUAL_FDSET_LIMIT
# define FDSET_LIMIT(fd) (*(int *)((char *)fd + dynamic_fd_size))
rktio_poll_set_t *rktio_alloc_fdset_array(int count)
static rktio_poll_set_t *alloc_fdset_arrays()
{
void *p;
if (!dynamic_fd_size) {
# ifdef USE_ULIMIT
dynamic_fd_size = ulimit(4, 0);
@ -321,19 +319,22 @@ rktio_poll_set_t *rktio_alloc_fdset_array(int count)
dynamic_fd_size += sizeof(void*) - (dynamic_fd_size % sizeof(void*));
}
return malloc(count * (dynamic_fd_size + sizeof(intptr_t)));
/* Allocate an array with 1 extra intptr_t in each set to hold a
"max" fd counter, and 1 extra intger used to record "no
sleeping" */
p = malloc((3 * (dynamic_fd_size + sizeof(intptr_t))) + sizeof(int));
*(int *)((char *)p + (3 * (dynamic_fd_size + sizeof(intptr_t)))) = 0;
return p;
}
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
static void free_fdset_arrays(rktio_poll_set_t *fds)
{
free(fds);
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
{
return fdarray;
}
rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
{
return (rktio_poll_set_t *)(((char *)fdarray) + (pos * (dynamic_fd_size + sizeof(intptr_t))));
@ -341,14 +342,27 @@ rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
void rktio_fdzero(rktio_poll_set_t *fd)
{
memset(fd, 0, dynamic_fd_size + sizeof(intptr_t));
memset(fd, 0, dynamic_fd_size + sizeof(intptr_t) + sizeof(int));
}
void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
{
*(int *)((char *)fds + (3 * (dynamic_fd_size + sizeof(intptr_t)))) = 1;
}
static int fdset_has_nosleep(rktio_poll_set_t *fds)
{
return *(int *)((char *)fds + (3 * (dynamic_fd_size + sizeof(intptr_t))));
}
/* Continues below: */
#define USE_PLAIN_FDS_SET_OPS
#elif defined (RKTIO_SYSTEM_WINDOWS)
/************************************************************/
/* Windows variant */
/************************************************************/
/*========================================================================*/
/* Windows variant */
/*========================================================================*/
typedef struct {
SOCKET *sockets;
@ -370,36 +384,7 @@ typedef struct {
intptr_t combined_len;
} rktio_poll_set_t;
rktio_poll_set_t *rktio_alloc_fdset_array(int count)
{
rktio_poll_set_t *fdarray;
if (count) {
fdarray = calloc(count, sizeof(rktio_poll_set_t));
rktio_init_fdset_array(fdarray, count);
} else
fdarray = NULL;
return fdarray;
}
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
{
FIXME;
}
static void reset_wait_array(rktio_poll_set_t *efd)
{
/* Allocate an array that may be big enough to hold all events
when we eventually call WaitForMultipleObjects. One of the three
arrays will be big enough. */
int sz = (3 * (efd->alloc + efd->alloc_handles)) + 2;
HANDLE *wa;
if (efd->wait_array) free(efd->wait_array);
wa = calloc(sz, sizeof(HANDLE));
efd->wait_array = wa;
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
static void init_fdset_array(rktio_poll_set_t *fdarray, int count)
{
if (count) {
int i;
@ -433,6 +418,35 @@ rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
}
}
static rktio_poll_set_t *alloc_fdset_arrays()
{
rktio_poll_set_t *fdarray;
if (count) {
fdarray = calloc(3, sizeof(rktio_poll_set_t));
init_fdset_array(fdarray, 3);
} else
fdarray = NULL;
return fdarray;
}
static void free_fdset_arrays(rktio_poll_set_t *fds)
{
FIXME;
}
static void reset_wait_array(rktio_poll_set_t *efd)
{
/* Allocate an array that may be big enough to hold all events
when we eventually call WaitForMultipleObjects. One of the three
arrays will be big enough. */
int sz = (3 * (efd->alloc + efd->alloc_handles)) + 2;
HANDLE *wa;
if (efd->wait_array) free(efd->wait_array);
wa = calloc(sz, sizeof(HANDLE));
efd->wait_array = wa;
}
rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
{
return fdarray + pos;
@ -440,7 +454,7 @@ rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
void rktio_fdzero(rktio_poll_set_t *fd)
{
rktio_init_fdset_array(fd, 1);
init_fdset_array(fd, 1);
}
void rktio_fdclr(rktio_poll_set_t *fd, int n)
@ -533,11 +547,16 @@ void rktio_poll_set_add_handle(HANDLE h, rktio_poll_set_t *fds, int repost)
efd->num_handles++;
}
void rktio_poll_set_add_nosleep(rktio_poll_set_t *fds)
void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
{
fds->no_sleep = 1;
}
static int fdset_has_nosleep(rktio_poll_set_t *fds)
{
return fds->no_sleep;
}
void rktio_poll_set_eventmask(rktio_poll_set_t *fds, int mask)
{
fds->wait_event_mask |= mask;
@ -708,25 +727,20 @@ void rktio_collapse_win_fd(rktio_poll_set_t *fds)
#else
/************************************************************/
/* Plain fd_set variant */
/************************************************************/
/*========================================================================*/
/* Plain fd_set variant */
/*========================================================================*/
rktio_poll_set_t *rktio_alloc_fdset_array(int count)
static rktio_poll_set_t *alloc_fdset_arrays()
{
return malloc(count * sizeof(fd_set));
p = malloc((3 * sizeof(fd_set)) + sizeof(int));
}
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count)
static void free_fdset_arrays(rktio_poll_set_t *fds)
{
free(fds);
}
rktio_poll_set_t *rktio_init_fdset_array(rktio_poll_set_t *fdarray, int count)
{
return fdarray;
}
rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
{
return fdarray + pos;
@ -737,6 +751,22 @@ void rktio_fdzero(rktio_poll_set_t *fd)
FD_ZERO(&(fd)->data);
}
void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
{
*(int *)((char *)fds + (3 * sizeof(fd_set))) = 1;
}
static int fdset_has_nosleep(rktio_poll_set_t *fds)
{
return *(int *)((char *)fds + (3 * sizeof(fd_set)));
}
#define USE_PLAIN_FDS_SET_OPS
#endif
#ifdef USE_PLAIN_FDS_SET_OPS
void rktio_fdclr(rktio_poll_set_t *fd, int n)
{
FD_CLR(n, &(fd)->data);
@ -810,9 +840,46 @@ int rktio_get_fd_limit(rktio_poll_set_t *fds)
#endif
/************************************************************/
/* Sleeping as a generalized select() */
/************************************************************/
/*========================================================================*/
/* Shared internal poll set */
/*========================================================================*/
/* Generalize fd arrays (FD_SET, etc) with a runtime-determined size,
special hooks for Windows "descriptors" like even queues and
semaphores, etc. */
void rktio_alloc_global_poll_set(rktio_t *rktio) {
#ifdef USE_FAR_RKTIO_FDCALLS
rktio->rktio_global_poll_set = alloc_fdset_arrays();
#endif
}
void rktio_free_global_poll_set(rktio_t *rktio) {
#ifdef USE_FAR_RKTIO_FDCALLS
free_fdset_arrays(rktio->rktio_global_poll_set);
#endif
}
/*========================================================================*/
/* Create a poll set */
/*========================================================================*/
/* Internally, poll sets are used with macros like DECL_FDSET(), but this
is the API for external use. */
rktio_poll_set_t *rktio_make_poll_set(rktio_t *rktio)
{
return alloc_fdset_arrays();
}
void rktio_poll_set_close(rktio_t *rktio, rktio_poll_set_t *fds)
{
free_fdset_arrays(fds);
}
/*========================================================================*/
/* Sleeping as a generalized select() */
/*========================================================================*/
int rktio_initialize_signal(rktio_t *rktio)
{
@ -954,6 +1021,9 @@ void rkio_notify_sleep_progress(void)
void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt)
{
if (fds && fdset_has_nosleep(fds))
return;
if (fds && lt) {
#if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
int fd = rktio_ltps_get_fd(lt);
@ -1098,9 +1168,6 @@ void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_
HANDLE *array, just_two_array[2];
int count, rcount, *rps;
if (fds->no_sleep)
return;
scheme_collapse_win_fd(fds); /* merges */
rcount = fds->num_handles;

View File

@ -20,13 +20,14 @@
# define USE_FAR_RKTIO_FDCALLS
#endif
/************************************************************/
/* Globals, as gathered into `rktio_t` */
/************************************************************/
/*========================================================================*/
/* Globals, as gathered into `rktio_t` */
/*========================================================================*/
struct rktio_t {
intptr_t errid;
int errkind;
#ifdef RKTIO_SYSTEM_UNIX
struct group_member_cache_entry_t *group_member_cache;
int external_event_fd;
@ -36,20 +37,38 @@ struct rktio_t {
#ifdef RKTIO_SYSTEM_WINDOWS
int windows_nt_or_later;
HANDLE break_semaphore;
int wsr_size = 0;
struct rktio_socket_t *wsr_array;
#endif
#ifdef USE_FAR_RKTIO_FDCALLS
/* A single fdset that can be reused for immediate actions: */
struct rktio_poll_set_t *rktio_global_poll_set;
#endif
#if defined(RKTIO_SYSTEM_WINDOWS) || defined(RKTIO_USE_PTHREADS)
int ghbn_started, ghbn_run;
struct rktio_addr_lookup_t *ghbn_requests;
# ifdef RKTIO_USE_PTHREADS
HANDLE ghbn_th;
pthread_mutex_t ghbn_lock;
pthread_cond_t ghbn_start;
# endif
# ifdef RKTIO_SYSTEM_WINDOWS
pthread_t ghbn_th;
HANDLE ghbn_lock;
HANDLE ghbn_start;
# endif
#endif
};
/************************************************************/
/* Poll sets */
/************************************************************/
/*========================================================================*/
/* Poll sets */
/*========================================================================*/
typedef struct rktio_poll_set_t rktio_poll_set_t;
void rktio_alloc_global_poll_set(rktio_t *rktio);
void rktio_free_global_poll_set(rktio_t *rktio);
int rktio_initialize_signal(rktio_t *rktio);
#ifdef USE_FAR_RKTIO_FDCALLS
@ -101,9 +120,6 @@ struct rktio_poll_set_t { fd_set data; };
#endif
rktio_poll_set_t *rktio_alloc_fdset_array(int count);
void rktio_free_fdset_array(rktio_poll_set_t *fds, int count);
void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds);
void rktio_clean_fd_set(rktio_poll_set_t *fds);
int rktio_get_fd_limit(rktio_poll_set_t *fds);
@ -118,10 +134,26 @@ rktio_poll_set_t *rktio_ltps_get_fd_set(rktio_ltps_t *lt);
int rktio_get_poll_count(rktio_poll_set_t *fds);
struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds);
#endif
/*========================================================================*/
/* Network */
/*========================================================================*/
int rktio_socket_close(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_socket_poll_write_ready(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_socket_poll_read_ready(rktio_t *rktio, rktio_fd_t *rfd);
intptr_t rktio_socket_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len);
intptr_t rktio_socket_read(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len);
/************************************************************/
/* Misc */
/************************************************************/
void rktio_free_ghbn(rktio_t *rktio);
const char *rktio_gai_strerror(int errnum);
/*========================================================================*/
/* Misc */
/*========================================================================*/
#ifdef RKTIO_SYSTEM_WINDOWS
# define MSC_IZE(n) _ ## n

View File

@ -31,7 +31,10 @@ struct rktio_fd_t {
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
HANDLE fd;
union {
HANDLE fd;
int sock; /* when `modes & RKTIO_OPEN_SOCKET` */
};
Win_FD_Input_Thread *th; /* input mode */
Win_FD_Output_Thread *oth; /* output mode */
#endif
@ -99,8 +102,7 @@ rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes)
rktio_fd_t *rfd;
rfd = malloc(sizeof(rktio_fd_t));
rfd->modes = modes;
rfd->modes = modes;
#ifdef RKTIO_SYSTEM_UNIX
rfd->fd = system_fd;
@ -115,7 +117,10 @@ rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes)
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
rfd->fd = (HANDLE)system_fd;
if (modes & RKTIO_OPEN_SOCKET)
rfd->s = system_fd;
else
rfd->fd = (HANDLE)system_fd;
rfd->regfile = (GetFileType(rfd->fd) == FILE_TYPE_DISK);
#endif
@ -127,7 +132,15 @@ rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes)
intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd)
{
#ifdef RKTIO_SYSTEM_UNIX
return rfd->fd;
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rfd->sock;
else
return (intptr_t)rfd->fd;
#endif
}
int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd)
@ -137,7 +150,7 @@ int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd)
int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd)
{
return 0;
return (rfd->modes & RKTIO_OPEN_SOCKET);
}
/*************************************************************/
@ -394,7 +407,8 @@ int rktio_close(rktio_t *rktio, rktio_fd_t *rfd)
int cr;
# ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS
release_lockf(rfd->fd);
if (!(rfd->modes & RKTIO_OPEN_SOCKET))
release_lockf(rfd->fd);
# endif
do {
@ -402,6 +416,9 @@ int rktio_close(rktio_t *rktio, rktio_fd_t *rfd)
} while ((cr == -1) && (errno == EINTR));
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_close(rktio, rfd);
if (rfd->th) {
CSI_proc csi;
@ -493,7 +510,7 @@ static int try_get_fd_char(int fd, int *ready)
int rktio_poll_read_ready(rktio_t *rktio, rktio_fd_t *rfd)
{
if (rfd->regfile)
return 1;
return RKTIO_POLL_READY;
#ifdef RKTIO_SYSTEM_UNIX
{
@ -501,7 +518,7 @@ int rktio_poll_read_ready(rktio_t *rktio, rktio_fd_t *rfd)
# ifdef SOME_FDS_ARE_NOT_SELECTABLE
if (rfd->bufcount)
return 1;
return RKTIO_POLL_READY;
# endif
# ifdef HAVE_POLL_SYSCALL
@ -531,7 +548,7 @@ int rktio_poll_read_ready(rktio_t *rktio, rktio_fd_t *rfd)
# ifdef SOME_FDS_ARE_NOT_SELECTABLE
/* Try a non-blocking read: */
if (!r && !rfd->textmode) {
if (!r && !(rfd->modes & RKTIO_OPEN_SOCKET) && !rfd->textmode) {
int c, ready;
c = try_get_fd_char(rfd->fd, &ready);
@ -550,13 +567,16 @@ int rktio_poll_read_ready(rktio_t *rktio, rktio_fd_t *rfd)
}
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_poll_read_ready(rktio, rfd->sock);
if (!rfd->th) {
/* No thread -- so wait works. This case isn't actually used
right now, because wait doesn't seem to work reliably for
anything that we can recognize other than regfiles, which are
handled above. */
if (WaitForSingleObject(rfd->fd, 0) == WAIT_OBJECT_0)
return 1;
return RKTIO_POLL_READY;
} else {
/* Has the reader thread pulled in data? */
if (rfd->th->checking) {
@ -564,10 +584,10 @@ int rktio_poll_read_ready(rktio_t *rktio, rktio_fd_t *rfd)
data-is-ready sema: */
if (WaitForSingleObject(rfd->th->ready_sema, 0) == WAIT_OBJECT_0) {
rfd->th->checking = 0;
return 1;
return RKTIO_POLL_READY;
}
} else if (rfd->th->avail || rfd->th->err || rfd->th->eof)
return 1; /* other thread found data */
return RKTIO_POLL_READY; /* other thread found data */
else {
/* Doesn't have anything, and it's not even looking. Tell it
to look: */
@ -584,7 +604,7 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush
{
#ifdef RKTIO_SYSTEM_UNIX
if (check_flushed)
return 1;
return RKTIO_POLL_READY;
else {
int sr;
# ifdef HAVE_POLL_SYSCALL
@ -610,8 +630,8 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush
do {
/* Mac OS X 10.8 and 10.9: select() seems to claim that a pipe
is always ready for output. To work around that problem,
kqueue() support is enabled for pipes, so we shouldn't get
here much for pipes. */
kqueue() support might be used for pipes, but that has different
problems. The poll() code above should be used, instead. */
sr = select(rfd->fd + 1, NULL, RKTIO_FDS(writefds), RKTIO_FDS(exnfds), &time);
} while ((sr == -1) && (errno == EINTR));
# endif
@ -624,6 +644,9 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush
}
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_poll_write_ready(rktio, rfd->sock);
if (rfd->oth) {
/* Pipe output that can block... */
int retval;
@ -648,7 +671,7 @@ int poll_write_ready_or_flushed(rktio_t *rktio, rktio_fd_t *rfd, int check_flush
return retval;
} else
return 1; /* non-blocking output, such as a console, or haven't written yet */
return RKTIO_POLL_READY; /* non-blocking output, such as a console, or haven't written yet */
#endif
}
@ -678,35 +701,50 @@ void rktio_poll_add(rktio_t *rktio, rktio_fd_t *rfd, rktio_poll_set_t *fds, int
RKTIO_FD_SET(rfd->fd, fds2);
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
if (modes & RKTIO_POLL_READ) {
if (rfd->th) {
/* See fd_byte_ready */
if (!rfd->th->checking) {
if (rfd->th->avail || rfd->th->err || rfd->th->eof) {
/* Data is ready. We shouldn't be trying to sleep, so force an
immediate wake-up: */
rktio_fdset_add_nosleep(fds);
} else {
rfd->th->checking = 1;
ReleaseSemaphore(rfd->th->checking_sema, 1, NULL);
rktio_fdset_add_handle(rfd->th->ready_sema, fds, 1);
}
} else
rktio_fdset_add_handle(rfd->th->ready_sema, fds, 1);
} else if (rfd->regfile) {
/* regular files never block */
rktio_fdset_add_nosleep(fds);
} else {
/* This case is not currently used. See fd_byte_ready. */
rktio_fdset_add_handle(rfd->fd, fds, 0);
}
}
if (rfd->modes & RKTIO_OPEN_SOCKET) {
/* RKTIO_FD_SET(), etc., for Windows expects sockets */
rktio_poll_set_t *fds2;
if (modes & RKTIO_POLL_WRITE) {
if (rfp->oth && !fd_write_ready(port))
rktio_fdset_add_handle(rfp->oth->ready_sema, fds, 1);
else
rktio_fdset_nosleep(fds);
if (modes & RKTIO_POLL_READ) {
RKTIO_FD_SET(rfd->sock, fds);
}
if (modes & RKTIO_POLL_WRITE) {
fds2 = RKTIO_GET_FDSET(fds, 1);
RKTIO_FD_SET(rfd->sock, fds2);
}
fds2 = RKTIO_GET_FDSET(fds, 2);
RKTIO_FD_SET(rfd->sock, fds2);
} else {
if (modes & RKTIO_POLL_READ) {
if (rfd->th) {
/* See fd_byte_ready */
if (!rfd->th->checking) {
if (rfd->th->avail || rfd->th->err || rfd->th->eof) {
/* Data is ready. We shouldn't be trying to sleep, so force an
immediate wake-up: */
rktio_fdset_add_nosleep(fds);
} else {
rfd->th->checking = 1;
ReleaseSemaphore(rfd->th->checking_sema, 1, NULL);
rktio_fdset_add_handle(rfd->th->ready_sema, fds, 1);
}
} else
rktio_fdset_add_handle(rfd->th->ready_sema, fds, 1);
} else if (rfd->regfile) {
/* regular files never block */
rktio_fdset_add_nosleep(fds);
} else {
/* This case is not currently used. See fd_byte_ready. */
rktio_fdset_add_handle(rfd->fd, fds, 0);
}
}
if (modes & RKTIO_POLL_WRITE) {
if (rfp->oth && !fd_write_ready(port))
rktio_fdset_add_handle(rfp->oth->ready_sema, fds, 1);
else
rktio_fdset_nosleep(fds);
}
}
#endif
}
@ -719,7 +757,10 @@ intptr_t rktio_read(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len)
{
#ifdef RKTIO_SYSTEM_UNIX
intptr_t bc;
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_read(rktio, rfd, buffer, len);
if (rfd->regfile) {
/* Reading regular file never blocks */
do {
@ -765,6 +806,9 @@ intptr_t rktio_read(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len)
}
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_read(rktio, rfd, buffer, len);
if (!rfd->th) {
/* We can read directly. This must be a regular file, where
reading never blocks. */
@ -887,6 +931,9 @@ intptr_t rktio_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len
#ifdef RKTIO_SYSTEM_UNIX
int flags, errsaved;
intptr_t amt;
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_write(rktio, rfd, buffer, len);
flags = fcntl(rfd->fd, F_GETFL, 0);
if (!(flags & RKTIO_NONBLOCKING))
@ -923,7 +970,10 @@ intptr_t rktio_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len
#endif
#ifdef RKTIO_SYSTEM_WINDOWS
DWORD winwrote;
if (rfd->modes & RKTIO_OPEN_SOCKET)
return rktio_socket_write(rktio, rfd, buffer, len);
if (rfd->regfile) {
/* Regular files never block, so this code looks like the Unix
code. We've cheated in the make_fd proc and called