rktio: UDP

This commit is contained in:
Matthew Flatt 2017-06-11 18:38:42 -06:00
parent e9d5260295
commit 6c2f71bf80
4 changed files with 429 additions and 1339 deletions

View File

@ -215,7 +215,10 @@ static void check_read_write_pair(rktio_t *rktio, rktio_fd_t *fd, rktio_fd_t *fd
check_ltps_write_ready(rktio, lt, h2);
/* Round-trip data through pipe: */
amt = rktio_write(rktio, fd2, "hello", 5);
if (rktio_fd_is_udp(rktio, fd2)) {
amt = rktio_udp_sendto(rktio, fd2, NULL, "hello", 5);
} else
amt = rktio_write(rktio, fd2, "hello", 5);
check_valid(amt == 5);
if (!immediate_available) {
@ -229,7 +232,19 @@ static void check_read_write_pair(rktio_t *rktio, rktio_fd_t *fd, rktio_fd_t *fd
check_valid(rktio_ltps_close(rktio, lt));
}
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
if (rktio_fd_is_udp(rktio, fd)) {
rktio_length_and_addrinfo_t *r;
do {
r = rktio_udp_recvfrom(rktio, fd, buffer, sizeof(buffer));
} while (!r
&& (rktio_get_last_error_kind(rktio) == RKTIO_ERROR_KIND_RACKET)
&& (rktio_get_last_error(rktio) == RKTIO_ERROR_INFO_TRY_AGAIN));
check_valid(r);
amt = r->len;
free(r->addr);
free(r);
} else
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == 5);
check_valid(!strncmp(buffer, "hello", 5));
check_valid(!rktio_poll_read_ready(rktio, fd));
@ -237,41 +252,62 @@ static void check_read_write_pair(rktio_t *rktio, rktio_fd_t *fd, rktio_fd_t *fd
/* Close pipe ends: */
check_valid(rktio_close(rktio, fd2));
if (!immediate_available) {
/* Wait for EOF to be ready; should not block for long */
wait_read(rktio, fd);
if (!rktio_fd_is_udp(rktio, fd)) {
if (!immediate_available) {
/* Wait for EOF to be ready; should not block for long */
wait_read(rktio, fd);
}
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == RKTIO_READ_EOF);
}
amt = rktio_read(rktio, fd, buffer, sizeof(buffer));
check_valid(amt == RKTIO_READ_EOF);
check_valid(rktio_close(rktio, fd));
}
/* Open pipe ends again: */
fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(fd2);
/* should eventually block: */
for (i = 0; i < 100000; i++) {
amt = rktio_write(rktio, fd2, "hello", 5);
#define AMOUNT_TO_WRITE_AND_BLOCK 1000000
#define AMOUNT_FOR_UDP 1000
static void check_fill_write(rktio_t *rktio, rktio_fd_t *fd2, rktio_addrinfo_t *dest_addr, intptr_t limit)
{
intptr_t i, amt;
if (!limit)
limit = AMOUNT_TO_WRITE_AND_BLOCK;
/* should eventually block, unless UDP: */
for (i = 0; i < limit; i++) {
if (dest_addr) {
amt = rktio_udp_sendto(rktio, fd2, dest_addr, "hello", 5);
} else
amt = rktio_write(rktio, fd2, "hello", 5);
check_valid(amt != RKTIO_WRITE_ERROR);
if (!amt)
break;
}
check_valid(i < 100000);
check_valid(i > 0);
if (!rktio_fd_is_udp(rktio, fd2))
check_valid(i < limit);
}
static void check_drain_read(rktio_t *rktio, rktio_fd_t *fd2, intptr_t limit)
{
intptr_t i, amt;
char buffer[256];
if (!limit)
limit = AMOUNT_TO_WRITE_AND_BLOCK;
fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ);
check_valid(fd);
/* should eventually block: */
for (i = 0; i < 100000; i++) {
for (i = 0; i < limit; i++) {
amt = rktio_read(rktio, fd2, buffer, sizeof(buffer));
check_valid(amt != RKTIO_READ_ERROR);
check_valid(amt != RKTIO_READ_EOF);
if (!amt)
break;
}
check_valid(i < 100000);
check_valid(rktio_close(rktio, fd));
check_valid(rktio_close(rktio, fd2));
check_valid(i > 0);
check_valid(i < limit);
}
void check_many_lookup(rktio_t *rktio)
@ -532,6 +568,18 @@ int main()
check_read_write_pair(rktio, fd, fd2);
/* Open pipe ends again: */
fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST);
check_valid(fd2);
check_fill_write(rktio, fd2, NULL, 0);
fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ);
check_valid(fd);
check_drain_read(rktio, fd2, 0);
check_valid(rktio_close(rktio, fd));
check_valid(rktio_close(rktio, fd2));
/* Networking */
{
rktio_addrinfo_t *addr;
@ -549,7 +597,6 @@ int main()
addr = lookup_loop(rktio, "localhost", 4536, -1, 0, 1);
fd = connect_loop(rktio, addr, NULL);
rktio_free_addrinfo(rktio, addr);
check_valid(rktio_poll_accept_ready(rktio, lnr) == RKTIO_POLL_READY);
@ -592,8 +639,68 @@ int main()
check_read_write_pair(rktio, fd, fd2);
fd = connect_loop(rktio, addr, NULL);
rktio_free_addrinfo(rktio, addr);
fd2 = rktio_accept(rktio, lnr);
check_fill_write(rktio, fd2, NULL, 0);
check_drain_read(rktio, fd, 0);
check_valid(rktio_close(rktio, fd));
check_valid(rktio_close(rktio, fd2));
rktio_listen_stop(rktio, lnr);
}
/* UDP */
{
rktio_addrinfo_t *intf_addr, *addr;
intf_addr = lookup_loop(rktio, "localhost", 0, -1, 1, 0);
check_valid(intf_addr);
fd = rktio_udp_open(rktio, intf_addr);
check_valid(fd);
addr = lookup_loop(rktio, NULL, 4536, -1, 1, 0);
check_valid(addr);
check_valid(rktio_udp_bind(rktio, fd, addr));
rktio_free_addrinfo(rktio, addr);
fd2 = rktio_udp_open(rktio, intf_addr);
check_valid(fd2);
addr = lookup_loop(rktio, "localhost", 4536, -1, 0, 0);
check_valid(addr);
check_valid(rktio_udp_connect(rktio, fd2, addr));
check_read_write_pair(rktio, fd, fd2);
/* Again, this time to fill & drain: */
fd = rktio_udp_open(rktio, intf_addr);
fd2 = rktio_udp_open(rktio, intf_addr);
check_valid(fd2);
addr = lookup_loop(rktio, NULL, 4536, -1, 1, 0);
check_valid(addr);
check_valid(rktio_udp_bind(rktio, fd, addr));
rktio_free_addrinfo(rktio, addr);
addr = lookup_loop(rktio, "localhost", 4536, -1, 0, 0);
check_valid(addr);
printf("udp\n");
check_fill_write(rktio, fd2, addr, AMOUNT_FOR_UDP);
check_drain_read(rktio, fd, AMOUNT_FOR_UDP+1);
rktio_free_addrinfo(rktio, addr);
rktio_free_addrinfo(rktio, intf_addr);
check_valid(rktio_close(rktio, fd));
check_valid(rktio_close(rktio, fd2));
}
return 0;
}

View File

@ -28,12 +28,14 @@ typedef struct rktio_fd_t rktio_fd_t;
#define RKTIO_OPEN_MUST_EXIST (1<<5)
#define RKTIO_OPEN_CAN_EXIST (1<<6)
#define RKTIO_OPEN_SOCKET (1<<7)
#define RKTIO_OPEN_UDP (1<<8)
rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes);
intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_fd_is_udp(rktio_t *rktio, rktio_fd_t *rfd);
rktio_fd_t *rktio_open(rktio_t *rktio, char *src, int modes);
int rktio_close(rktio_t *rktio, rktio_fd_t *fd);
@ -46,6 +48,7 @@ void rktio_forget(rktio_t *rktio, rktio_fd_t *fd);
#define RKTIO_WRITE_ERROR (-2)
#define RKTIO_POLL_ERROR (-2)
#define RKTIO_POLL_READY 1
#define RKTIO_PROP_ERROR (-2)
intptr_t rktio_read(rktio_t *rktio, rktio_fd_t *fd, char *buffer, intptr_t len);
intptr_t rktio_write(rktio_t *rktio, rktio_fd_t *fd, char *buffer, intptr_t len);
@ -92,9 +95,44 @@ int rktio_poll_connect_ready(rktio_t *rktio, rktio_connect_t *conn);
int rktio_socket_shutdown(rktio_t *rktio, rktio_fd_t *rfd, int mode);
rktio_fd_t *rktio_udp_open(rktio_t *rktio, rktio_addrinfo_t *addr);
int rktio_udp_disconnect(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_udp_bind(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr);
int rktio_udp_connect(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr);
intptr_t rktio_udp_sendto(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr,
char *buffer, intptr_t len);
typedef struct rktio_length_and_addrinfo_t {
intptr_t len;
rktio_addrinfo_t *addr;
} rktio_length_and_addrinfo_t;
rktio_length_and_addrinfo_t *rktio_udp_recvfrom(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len);
/* The following accessors return RKTIO_PROP_ERROR on failure */
int rktio_udp_get_multicast_loopback(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_udp_set_multicast_loopback(rktio_t *rktio, rktio_fd_t *rfd, int on);
int rktio_udp_get_multicast_ttl(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_udp_set_multicast_ttl(rktio_t *rktio, rktio_fd_t *rfd, int ttl_val);
char **rktio_socket_address(rktio_t *rktio, rktio_fd_t *rfd);
char **rktio_socket_peer_address(rktio_t *rktio, rktio_fd_t *rfd);
char *rktio_udp_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd);
int rktio_udp_set_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr);
enum {
RKTIO_ADD_MEMBERSHIP,
RKTIO_DROP_MEMBERSHIP
};
int rktio_udp_change_multicast_group(rktio_t *rktio, rktio_fd_t *rfd,
rktio_addrinfo_t *group_addr,
rktio_addrinfo_t *intf_addr,
int action);
/*************************************************/
/* File-descriptor sets for polling */
@ -282,7 +320,9 @@ enum {
RKTIO_ERROR_CONNECT_TRYING_NEXT, /* indicates that failure is not (yet) premanent */
RKTIO_ERROR_ACCEPT_NOT_READY,
RKTIO_ERROR_HOST_AND_PORT_BOTH_UNSPECIFIED,
RKTIO_ERROR_TRY_AGAIN_WITH_IPV4,
RKTIO_ERROR_INFO_TRY_AGAIN, /* for UDP */
RKTIO_ERROR_TRY_AGAIN, /* for UDP */
RKTIO_ERROR_TRY_AGAIN_WITH_IPV4, /* for TCP listen */
};
/* GAI error sub-codes */

File diff suppressed because it is too large Load Diff

View File

@ -83,7 +83,7 @@ static void init_read_fd(rktio_fd_t *rfd)
th->checking = 0;
sm = CreateSemaphore(NULL, 0, 1, NULL);
v th->checking_sema = sm;
th->checking_sema = sm;
sm = CreateSemaphore(NULL, 0, 1, NULL);
th->ready_sema = sm;
sm = CreateSemaphore(NULL, 1, 1, NULL);
@ -153,6 +153,11 @@ int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd)
return (rfd->modes & RKTIO_OPEN_SOCKET);
}
int rktio_fd_is_udp(rktio_t *rktio, rktio_fd_t *rfd)
{
return (rfd->modes & RKTIO_OPEN_UDP);
}
rktio_fd_t *rktio_dup(rktio_t *rktio, rktio_fd_t *rfd) {
#ifdef RKTIO_SYSTEM_UNIX
intptr_t nfd;