From 6c2f71bf808da919a591823bb813c6018fdebbd0 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Sun, 11 Jun 2017 18:38:42 -0600 Subject: [PATCH] rktio: UDP --- racket/src/rktio/demo.c | 155 ++- racket/src/rktio/rktio.h | 42 +- racket/src/rktio/rktio_network.c | 1564 +++++---------------------- racket/src/rktio/rktio_read_write.c | 7 +- 4 files changed, 429 insertions(+), 1339 deletions(-) diff --git a/racket/src/rktio/demo.c b/racket/src/rktio/demo.c index c7c477ed44..48dfc455ea 100644 --- a/racket/src/rktio/demo.c +++ b/racket/src/rktio/demo.c @@ -215,7 +215,10 @@ static void check_read_write_pair(rktio_t *rktio, rktio_fd_t *fd, rktio_fd_t *fd check_ltps_write_ready(rktio, lt, h2); /* Round-trip data through pipe: */ - amt = rktio_write(rktio, fd2, "hello", 5); + if (rktio_fd_is_udp(rktio, fd2)) { + amt = rktio_udp_sendto(rktio, fd2, NULL, "hello", 5); + } else + amt = rktio_write(rktio, fd2, "hello", 5); check_valid(amt == 5); if (!immediate_available) { @@ -228,8 +231,20 @@ static void check_read_write_pair(rktio_t *rktio, rktio_fd_t *fd, rktio_fd_t *fd check_ltps_read_ready(rktio, lt, h1); check_valid(rktio_ltps_close(rktio, lt)); } - - amt = rktio_read(rktio, fd, buffer, sizeof(buffer)); + + if (rktio_fd_is_udp(rktio, fd)) { + rktio_length_and_addrinfo_t *r; + do { + r = rktio_udp_recvfrom(rktio, fd, buffer, sizeof(buffer)); + } while (!r + && (rktio_get_last_error_kind(rktio) == RKTIO_ERROR_KIND_RACKET) + && (rktio_get_last_error(rktio) == RKTIO_ERROR_INFO_TRY_AGAIN)); + check_valid(r); + amt = r->len; + free(r->addr); + free(r); + } else + amt = rktio_read(rktio, fd, buffer, sizeof(buffer)); check_valid(amt == 5); check_valid(!strncmp(buffer, "hello", 5)); check_valid(!rktio_poll_read_ready(rktio, fd)); @@ -237,41 +252,62 @@ static void check_read_write_pair(rktio_t *rktio, rktio_fd_t *fd, rktio_fd_t *fd /* Close pipe ends: */ check_valid(rktio_close(rktio, fd2)); - if (!immediate_available) { - /* Wait for EOF to be ready; should not block for long */ - wait_read(rktio, fd); + if (!rktio_fd_is_udp(rktio, fd)) { + if (!immediate_available) { + /* Wait for EOF to be ready; should not block for long */ + wait_read(rktio, fd); + } + + amt = rktio_read(rktio, fd, buffer, sizeof(buffer)); + check_valid(amt == RKTIO_READ_EOF); } - amt = rktio_read(rktio, fd, buffer, sizeof(buffer)); - check_valid(amt == RKTIO_READ_EOF); check_valid(rktio_close(rktio, fd)); +} - /* Open pipe ends again: */ - fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST); - check_valid(fd2); - /* should eventually block: */ - for (i = 0; i < 100000; i++) { - amt = rktio_write(rktio, fd2, "hello", 5); +#define AMOUNT_TO_WRITE_AND_BLOCK 1000000 +#define AMOUNT_FOR_UDP 1000 + +static void check_fill_write(rktio_t *rktio, rktio_fd_t *fd2, rktio_addrinfo_t *dest_addr, intptr_t limit) +{ + intptr_t i, amt; + + if (!limit) + limit = AMOUNT_TO_WRITE_AND_BLOCK; + + /* should eventually block, unless UDP: */ + for (i = 0; i < limit; i++) { + if (dest_addr) { + amt = rktio_udp_sendto(rktio, fd2, dest_addr, "hello", 5); + } else + amt = rktio_write(rktio, fd2, "hello", 5); check_valid(amt != RKTIO_WRITE_ERROR); if (!amt) break; } - check_valid(i < 100000); - - fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ); - check_valid(fd); + check_valid(i > 0); + if (!rktio_fd_is_udp(rktio, fd2)) + check_valid(i < limit); +} + +static void check_drain_read(rktio_t *rktio, rktio_fd_t *fd2, intptr_t limit) +{ + intptr_t i, amt; + char buffer[256]; + + if (!limit) + limit = AMOUNT_TO_WRITE_AND_BLOCK; + /* should eventually block: */ - for (i = 0; i < 100000; i++) { + for (i = 0; i < limit; i++) { amt = rktio_read(rktio, fd2, buffer, sizeof(buffer)); check_valid(amt != RKTIO_READ_ERROR); check_valid(amt != RKTIO_READ_EOF); if (!amt) break; } - check_valid(i < 100000); - - check_valid(rktio_close(rktio, fd)); - check_valid(rktio_close(rktio, fd2)); + check_valid(i > 0); + check_valid(i < limit); } void check_many_lookup(rktio_t *rktio) @@ -532,6 +568,18 @@ int main() check_read_write_pair(rktio, fd, fd2); + /* Open pipe ends again: */ + fd2 = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_WRITE | RKTIO_OPEN_CAN_EXIST); + check_valid(fd2); + check_fill_write(rktio, fd2, NULL, 0); + + fd = rktio_open(rktio, "demo_fifo", RKTIO_OPEN_READ); + check_valid(fd); + check_drain_read(rktio, fd2, 0); + + check_valid(rktio_close(rktio, fd)); + check_valid(rktio_close(rktio, fd2)); + /* Networking */ { rktio_addrinfo_t *addr; @@ -549,7 +597,6 @@ int main() addr = lookup_loop(rktio, "localhost", 4536, -1, 0, 1); fd = connect_loop(rktio, addr, NULL); - rktio_free_addrinfo(rktio, addr); check_valid(rktio_poll_accept_ready(rktio, lnr) == RKTIO_POLL_READY); @@ -591,9 +638,69 @@ int main() } check_read_write_pair(rktio, fd, fd2); + + fd = connect_loop(rktio, addr, NULL); + rktio_free_addrinfo(rktio, addr); + + fd2 = rktio_accept(rktio, lnr); + + check_fill_write(rktio, fd2, NULL, 0); + check_drain_read(rktio, fd, 0); + + check_valid(rktio_close(rktio, fd)); + check_valid(rktio_close(rktio, fd2)); rktio_listen_stop(rktio, lnr); } + + /* UDP */ + { + rktio_addrinfo_t *intf_addr, *addr; + + intf_addr = lookup_loop(rktio, "localhost", 0, -1, 1, 0); + check_valid(intf_addr); + + fd = rktio_udp_open(rktio, intf_addr); + check_valid(fd); + + addr = lookup_loop(rktio, NULL, 4536, -1, 1, 0); + check_valid(addr); + check_valid(rktio_udp_bind(rktio, fd, addr)); + rktio_free_addrinfo(rktio, addr); + + fd2 = rktio_udp_open(rktio, intf_addr); + check_valid(fd2); + + addr = lookup_loop(rktio, "localhost", 4536, -1, 0, 0); + check_valid(addr); + check_valid(rktio_udp_connect(rktio, fd2, addr)); + + check_read_write_pair(rktio, fd, fd2); + + /* Again, this time to fill & drain: */ + + fd = rktio_udp_open(rktio, intf_addr); + + fd2 = rktio_udp_open(rktio, intf_addr); + check_valid(fd2); + addr = lookup_loop(rktio, NULL, 4536, -1, 1, 0); + check_valid(addr); + check_valid(rktio_udp_bind(rktio, fd, addr)); + rktio_free_addrinfo(rktio, addr); + + addr = lookup_loop(rktio, "localhost", 4536, -1, 0, 0); + check_valid(addr); + + printf("udp\n"); + check_fill_write(rktio, fd2, addr, AMOUNT_FOR_UDP); + check_drain_read(rktio, fd, AMOUNT_FOR_UDP+1); + + rktio_free_addrinfo(rktio, addr); + rktio_free_addrinfo(rktio, intf_addr); + + check_valid(rktio_close(rktio, fd)); + check_valid(rktio_close(rktio, fd2)); + } return 0; } diff --git a/racket/src/rktio/rktio.h b/racket/src/rktio/rktio.h index bd745acadd..e562d1366b 100644 --- a/racket/src/rktio/rktio.h +++ b/racket/src/rktio/rktio.h @@ -28,12 +28,14 @@ typedef struct rktio_fd_t rktio_fd_t; #define RKTIO_OPEN_MUST_EXIST (1<<5) #define RKTIO_OPEN_CAN_EXIST (1<<6) #define RKTIO_OPEN_SOCKET (1<<7) +#define RKTIO_OPEN_UDP (1<<8) rktio_fd_t *rktio_system_fd(rktio_t *rktio, intptr_t system_fd, int modes); intptr_t rktio_fd_system_fd(rktio_t *rktio, rktio_fd_t *rfd); int rktio_fd_is_regular_file(rktio_t *rktio, rktio_fd_t *rfd); int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_fd_is_udp(rktio_t *rktio, rktio_fd_t *rfd); rktio_fd_t *rktio_open(rktio_t *rktio, char *src, int modes); int rktio_close(rktio_t *rktio, rktio_fd_t *fd); @@ -46,6 +48,7 @@ void rktio_forget(rktio_t *rktio, rktio_fd_t *fd); #define RKTIO_WRITE_ERROR (-2) #define RKTIO_POLL_ERROR (-2) #define RKTIO_POLL_READY 1 +#define RKTIO_PROP_ERROR (-2) intptr_t rktio_read(rktio_t *rktio, rktio_fd_t *fd, char *buffer, intptr_t len); intptr_t rktio_write(rktio_t *rktio, rktio_fd_t *fd, char *buffer, intptr_t len); @@ -92,9 +95,44 @@ int rktio_poll_connect_ready(rktio_t *rktio, rktio_connect_t *conn); int rktio_socket_shutdown(rktio_t *rktio, rktio_fd_t *rfd, int mode); +rktio_fd_t *rktio_udp_open(rktio_t *rktio, rktio_addrinfo_t *addr); +int rktio_udp_disconnect(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_udp_bind(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr); +int rktio_udp_connect(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr); + +intptr_t rktio_udp_sendto(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr, + char *buffer, intptr_t len); + +typedef struct rktio_length_and_addrinfo_t { + intptr_t len; + rktio_addrinfo_t *addr; +} rktio_length_and_addrinfo_t; + +rktio_length_and_addrinfo_t *rktio_udp_recvfrom(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len); + +/* The following accessors return RKTIO_PROP_ERROR on failure */ +int rktio_udp_get_multicast_loopback(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_udp_set_multicast_loopback(rktio_t *rktio, rktio_fd_t *rfd, int on); +int rktio_udp_get_multicast_ttl(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_udp_set_multicast_ttl(rktio_t *rktio, rktio_fd_t *rfd, int ttl_val); + + char **rktio_socket_address(rktio_t *rktio, rktio_fd_t *rfd); char **rktio_socket_peer_address(rktio_t *rktio, rktio_fd_t *rfd); +char *rktio_udp_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd); +int rktio_udp_set_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr); + +enum { + RKTIO_ADD_MEMBERSHIP, + RKTIO_DROP_MEMBERSHIP +}; + +int rktio_udp_change_multicast_group(rktio_t *rktio, rktio_fd_t *rfd, + rktio_addrinfo_t *group_addr, + rktio_addrinfo_t *intf_addr, + int action); + /*************************************************/ /* File-descriptor sets for polling */ @@ -282,7 +320,9 @@ enum { RKTIO_ERROR_CONNECT_TRYING_NEXT, /* indicates that failure is not (yet) premanent */ RKTIO_ERROR_ACCEPT_NOT_READY, RKTIO_ERROR_HOST_AND_PORT_BOTH_UNSPECIFIED, - RKTIO_ERROR_TRY_AGAIN_WITH_IPV4, + RKTIO_ERROR_INFO_TRY_AGAIN, /* for UDP */ + RKTIO_ERROR_TRY_AGAIN, /* for UDP */ + RKTIO_ERROR_TRY_AGAIN_WITH_IPV4, /* for TCP listen */ }; /* GAI error sub-codes */ diff --git a/racket/src/rktio/rktio_network.c b/racket/src/rktio/rktio_network.c index dd7c0b3e00..d3bf969416 100644 --- a/racket/src/rktio/rktio_network.c +++ b/racket/src/rktio/rktio_network.c @@ -56,6 +56,12 @@ typedef struct sockaddr_in rktio_unspec_address; # define RKTIO_TCP_LISTEN_IPV6_ONLY_SOCKOPT # endif +#if defined(sun) +/* USE_NULL_TO_DISCONNECT_UDP calls connect() with NULL instead of + an AF_UNSPEC address to disconnect a UDP socket. */ +# define USE_NULL_TO_DISCONNECT_UDP +#endif + #endif #ifdef CANT_SET_SOCKET_BUFSIZE @@ -1019,19 +1025,24 @@ intptr_t rktio_socket_read(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr return rn; else if (rn == 0) return RKTIO_READ_EOF; - else if (WAS_EAGAIN(rn)) - return 0; else { - get_socket_error(); - return RKTIO_READ_ERROR; + int err = SOCK_ERRNO(); + if (WAS_EAGAIN(err)) + return 0; + else { + get_socket_error(); + return RKTIO_READ_ERROR; + } } } -intptr_t rktio_socket_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len) +static intptr_t do_socket_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len, + /* for UDP sendto: */ + rktio_addrinfo_t *addr) { rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); intptr_t sent; - int errid; + int errid = 0; #ifdef RKTIO_SYSTEM_WINDOWS # define SEND_BAD_MSG_SIZE(e) (e == WSAEMSGSIZE) @@ -1044,15 +1055,31 @@ intptr_t rktio_socket_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intpt #endif while (1) { - do { - sent = send(s, buffer, len, 0); - } while ((sent == -1) && (NOT_WINSOCK(errno) == EINTR)); + if (addr) { + /* Use first address that isn't a bad address: */ + for (; addr; addr = (rktio_addrinfo_t *)RKTIO_AS_ADDRINFO(addr)->ai_next) { + do { + sent = sendto(s, buffer, len, 0, + RKTIO_AS_ADDRINFO(addr)->ai_addr, RKTIO_AS_ADDRINFO(addr)->ai_addrlen); + } while ((sent == -1) && (NOT_WINSOCK(errno) == EINTR)); + if (sent >= 0) + break; + errid = SOCK_ERRNO(); + if (!WAS_EBADADDRESS(errid)) + break; + } + } else { + do { + sent = send(s, buffer, len, 0); + } while ((sent == -1) && (NOT_WINSOCK(errno) == EINTR)); + + if (sent == -1) + errid = SOCK_ERRNO(); + } if (sent >= 0) return sent; - errid = SOCK_ERRNO(); - if (WAS_EAGAIN(errid)) return 0; else if (SEND_BAD_MSG_SIZE(errid) && (len > 1)) { @@ -1065,6 +1092,11 @@ intptr_t rktio_socket_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intpt } } +intptr_t rktio_socket_write(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len) +{ + return do_socket_write(rktio, rfd, buffer, len, NULL); +} + /*========================================================================*/ /* TCP connect */ /*========================================================================*/ @@ -1629,120 +1661,27 @@ char **rktio_socket_peer_address(rktio_t *rktio, rktio_fd_t *rfd) return get_numeric_strings(rktio, name, name_len); } -#if 0 - /*========================================================================*/ /* UDP */ /*========================================================================*/ -/* Based on a design and implemenation by Eduardo Cavazos. */ - -#ifdef UDP_IS_SUPPORTED - -typedef struct Scheme_UDP_Evt { - Scheme_Object so; /* scheme_udp_evt_type */ - Scheme_UDP *udp; - short for_read, with_addr; - int offset, len; - char *str; - int dest_addr_count; - char **dest_addrs; - int *dest_addr_lens; -} Scheme_UDP_Evt; - -static int udp_close_it(Scheme_Object *_udp) +rktio_fd_t *rktio_udp_open(rktio_t *rktio, rktio_addrinfo_t *addr) { - Scheme_UDP *udp = (Scheme_UDP *)_udp; - - if (udp->s != INVALID_SOCKET) { - closesocket(udp->s); - (void)scheme_fd_to_semaphore(udp->s, MZFD_REMOVE, 1); - udp->s = INVALID_SOCKET; - - scheme_remove_managed(udp->mref, (Scheme_Object *)udp); - - return 0; - } - - return 1; -} - -#else - -typedef struct Scheme_UDP_Evt { } Scheme_UDP_Evt; -typedef Scheme_Object Scheme_UDP; - -#endif - -static Scheme_Object *make_udp(int argc, Scheme_Object *argv[]) -{ -#ifdef UDP_IS_SUPPORTED - Scheme_UDP *udp; rktio_socket_t s; - char *address = ""; - unsigned short origid, id; + + if (addr) + s = socket(RKTIO_AS_ADDRINFO(addr)->ai_family, + RKTIO_AS_ADDRINFO(addr)->ai_socktype, + RKTIO_AS_ADDRINFO(addr)->ai_protocol); - TCP_INIT("udp-open-socket"); - - if ((argc > 0) && !SCHEME_FALSEP(argv[0]) && !SCHEME_CHAR_STRINGP(argv[0])) - scheme_wrong_contract("udp-open-socket", "(or/c string? #f)", 0, argc, argv); - if ((argc > 1) && !SCHEME_FALSEP(argv[1]) && !CHECK_PORT_ID(argv[1])) - scheme_wrong_contract("udp-open-socket", "(or/c " PORT_ID_TYPE " #f)", 1, argc, argv); - - if ((argc > 0) && SCHEME_TRUEP(argv[0])) { - Scheme_Object *bs; - bs = scheme_char_string_to_byte_string(argv[0]); - address = SCHEME_BYTE_STR_VAL(bs); - } else - address = NULL; - if ((argc > 1) && SCHEME_TRUEP(argv[1])) - origid = (unsigned short)SCHEME_INT_VAL(argv[1]); else - origid = 0; - - scheme_security_check_network("udp-open-socket", address, origid, 0); - scheme_custodian_check_available(NULL, "udp-open-socket", "network"); - - if (address || origid) { - int err; - GC_CAN_IGNORE struct rktio_addrinfo_t *udp_bind_addr = NULL; - if (!origid) - origid = 1025; - id = origid; - udp_bind_addr = scheme_get_host_address(address, id, &err, -1, 1, 0); - if (!udp_bind_addr) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-open-socket: can't resolve address\n" - " address: %s\n" - " system error: %N", - address ? address : "", 1, err); - return NULL; - } - s = socket(udp_bind_addr->ai_family, - udp_bind_addr->ai_socktype, - udp_bind_addr->ai_protocol); - mz_freeaddrinfo(udp_bind_addr); - } else { s = socket(RKTIO_PF_INET, SOCK_DGRAM, 0); - } if (s == INVALID_SOCKET) { - int errid; - errid = SOCK_ERRNO(); - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-open-socket: creation failed\n" - " system error: %E", - errid); + get_socket_error(); return NULL; } - udp = MALLOC_ONE_TAGGED(Scheme_UDP); - udp->so.type = scheme_udp_type; - udp->s = s; - udp->bound = 0; - udp->connected = 0; - udp->previous_from_addr = NULL; - #ifdef RKTIO_SYSTEM_WINDOWS { unsigned long ioarg = 1; @@ -1760,71 +1699,7 @@ static Scheme_Object *make_udp(int argc, Scheme_Object *argv[]) # endif #endif - { - Scheme_Custodian_Reference *mref; - mref = scheme_add_managed(NULL, - (Scheme_Object *)udp, - (Scheme_Close_Custodian_Client *)udp_close_it, - NULL, - 1); - udp->mref = mref; - } - - return (Scheme_Object *)udp; -#else - scheme_raise_exn(MZEXN_FAIL_UNSUPPORTED, - "udp-open-socket: " NOT_SUPPORTED_STR); - return NULL; -#endif -} - -static Scheme_Object * -udp_close(int argc, Scheme_Object *argv[]) -{ - if (!SCHEME_UDPP(argv[0])) - scheme_wrong_contract("udp-close", "udp?", 0, argc, argv); - -#ifdef UDP_IS_SUPPORTED - if (udp_close_it(argv[0])) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-close: udp socket was already closed"); - return NULL; - } -#endif - - return scheme_void; -} - -static Scheme_Object * -udp_p(int argc, Scheme_Object *argv[]) -{ - return (SCHEME_UDPP(argv[0]) ? scheme_true : scheme_false); -} - -static Scheme_Object * -udp_bound_p(int argc, Scheme_Object *argv[]) -{ - if (!SCHEME_UDPP(argv[0])) - scheme_wrong_contract("udp-bound?", "udp?", 0, argc, argv); - -#ifdef UDP_IS_SUPPORTED - return (((Scheme_UDP *)argv[0])->bound ? scheme_true : scheme_false); -#else - return scheme_void; -#endif -} - -static Scheme_Object * -udp_connected_p(int argc, Scheme_Object *argv[]) -{ - if (!SCHEME_UDPP(argv[0])) - scheme_wrong_contract("udp-connected?", "udp?", 0, argc, argv); - -#ifdef UDP_IS_SUPPORTED - return (((Scheme_UDP *)argv[0])->connected ? scheme_true : scheme_false); -#else - return scheme_void; -#endif + return rktio_system_fd(rktio, s, RKTIO_OPEN_SOCKET | RKTIO_OPEN_UDP); } #ifdef UDP_DISCONNECT_EADRNOTAVAIL_OK @@ -1833,1198 +1708,261 @@ udp_connected_p(int argc, Scheme_Object *argv[]) # define OK_DISCONNECT_ERROR(e) ((e) == RKTIO_AFNOSUPPORT) #endif -static Scheme_Object *udp_bind_or_connect(const char *name, int argc, Scheme_Object *argv[], int do_bind) +int rktio_udp_disconnect(rktio_t *rktio, rktio_fd_t *rfd) { - if (!SCHEME_UDPP(argv[0])) - scheme_wrong_contract(name, "udp?", 0, argc, argv); + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + int err; -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp; - char *address = NULL; - unsigned short port = 0; - GC_CAN_IGNORE struct rktio_addrinfo_t *udp_bind_addr = NULL, *addr; - - udp = (Scheme_UDP *)argv[0]; - - if (!SCHEME_FALSEP(argv[1]) && !SCHEME_CHAR_STRINGP(argv[1])) - scheme_wrong_contract(name, "(or/c string? #f)", 1, argc, argv); - if (do_bind && !CHECK_LISTEN_PORT_ID(argv[2])) - scheme_wrong_contract(name, LISTEN_PORT_ID_TYPE, 2, argc, argv); - if (!do_bind && !SCHEME_FALSEP(argv[2]) && !CHECK_PORT_ID(argv[2])) - scheme_wrong_contract(name, "(or/c " PORT_ID_TYPE " #f)", 2, argc, argv); - - if (SCHEME_TRUEP(argv[1])) { - Scheme_Object *bs; - bs = scheme_char_string_to_byte_string(argv[1]); - address = SCHEME_BYTE_STR_VAL(bs); - } - if (SCHEME_TRUEP(argv[2])) - port = (unsigned short)SCHEME_INT_VAL(argv[2]); - - if (!do_bind && (SCHEME_TRUEP(argv[1]) != SCHEME_TRUEP(argv[2]))) { - scheme_contract_error(name, - "last second and third arguments must be both #f or both non-#f", - "second argument", 1, argv[1], - "third argument", 1, argv[2], - NULL); - } - - scheme_security_check_network(name, address, port, !do_bind); - - if (udp->s == INVALID_SOCKET) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: udp socket was already closed\n" - " socket: %V", - name, udp); - return NULL; - } - if (do_bind && udp->bound) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: udp socket is already bound\n" - " socket: %V", - name, udp); - return NULL; - } - - if (SCHEME_FALSEP(argv[1]) && SCHEME_FALSEP(argv[2])) { - /* DISCONNECT */ - int errid = 0; - if (udp->connected) { - int ok; #ifdef USE_NULL_TO_DISCONNECT_UDP - ok = !connect(udp->s, NULL, 0); + err = connect(s, NULL, 0); #else - GC_CAN_IGNORE rktio_unspec_address ua; - ua.sin_family = AF_UNSPEC; - ua.sin_port = 0; - memset(&(ua.sin_addr), 0, sizeof(ua.sin_addr)); - memset(&(ua.sin_zero), 0, sizeof(ua.sin_zero)); - ok = !connect(udp->s, (struct sockaddr *)&ua, sizeof(ua)); -#endif - if (!ok) errid = SOCK_ERRNO(); - if (ok || OK_DISCONNECT_ERROR(errid)) { - udp->connected = 0; - return scheme_void; - } - else { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't disconnect\n" - " address: %s\n" - " port number: %d\n" - " system error: %E", - name, - address ? address : "#f", port, - errid); - } - } - return scheme_void; - } - - { - /* RESOLVE ADDRESS */ - int err; - udp_bind_addr = scheme_get_host_address(address, port, &err, -1, do_bind, 0); - if (!udp_bind_addr) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't resolve address\n" - " address: %s\n" - " system error: %N", - name, - address, - 1, err); - return NULL; - } - } - - if (!do_bind) { - /* CONNECT CASE */ - int ok, errid = -1; - - /* connect using first address that works: */ - for (addr = udp_bind_addr; addr; addr = addr->ai_next) { - ok = !connect(udp->s, addr->ai_addr, addr->ai_addrlen); - if (ok) { - udp->connected = 1; - mz_freeaddrinfo(udp_bind_addr); - return scheme_void; - } else - errid = SOCK_ERRNO(); - } - - mz_freeaddrinfo(udp_bind_addr); - - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't connect\n" - " address: %s\n" - " port number: %d\n" - " system error: %E", - name, - address ? address : "#f", - port, - errid); - - return NULL; - } else { - /* BIND CASE */ - int ok, errid = -1; - - if ((argc > 3) && SCHEME_TRUEP(argv[3])) { - int one = 1; - if (setsockopt(udp->s, SOL_SOCKET, SO_REUSEADDR, (void *) &one, sizeof(one))) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't set SO_REUSEADDR\n" - " system error: %E", - name, - SOCK_ERRNO()); - return NULL; - } - } - - /* bind using first address that works: */ - for (addr = udp_bind_addr; addr; addr = addr->ai_next) { - ok = !bind(udp->s, addr->ai_addr, addr->ai_addrlen); - if (ok) { - udp->bound = 1; - mz_freeaddrinfo(udp_bind_addr); - return scheme_void; - } else - errid = SOCK_ERRNO(); - } - - mz_freeaddrinfo(udp_bind_addr); - - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't bind\n" - " address: %s\n" - " port number: %d\n" - " system error: %E", - name, - address ? address : "#f", - port, - errid); - return NULL; - } - } -#else - return scheme_void; -#endif -} - -static Scheme_Object *udp_bind(int argc, Scheme_Object *argv[]) -{ - return udp_bind_or_connect("udp-bind!", argc, argv, 1); -} - -static Scheme_Object *udp_connect(int argc, Scheme_Object *argv[]) -{ - return udp_bind_or_connect("udp-connect!", argc, argv, 0); -} - -#ifdef UDP_IS_SUPPORTED - -static int udp_check_send(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) -{ - Scheme_UDP *udp = (Scheme_UDP *)_udp; - - if (udp->s == INVALID_SOCKET) - return 1; - - if (!sinfo || !sinfo->is_poll) { - if (!check_fd_sema(udp->s, MZFD_CHECK_WRITE, sinfo, NULL)) - return 0; - } - -# ifdef HAVE_POLL_SYSCALL - { - GC_CAN_IGNORE struct pollfd pfd[1]; - int sr; - - pfd[0].fd = udp->s; - pfd[0].events = POLLOUT; - do { - sr = poll(pfd, 1, 0); - } while ((sr == -1) && (errno == EINTR)); - - if (sr) - return sr; - } -# else - { - DECL_SOCK_FDSET(writefds); - DECL_SOCK_FDSET(exnfds); - struct timeval time = {0, 0}; - int sr; - - INIT_DECL_SOCK_WR_FDSET(writefds); - INIT_DECL_SOCK_ER_FDSET(exnfds); - - RKTIO_SOCK_FD_ZERO(writefds); - RKTIO_SOCK_FD_SET(udp->s, writefds); - RKTIO_SOCK_FD_ZERO(exnfds); - RKTIO_SOCK_FD_SET(udp->s, exnfds); - - do { - sr = select(udp->s + 1, NULL, writefds, exnfds, &time); - } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); - - if (sr) - return sr; - } + rktio_unspec_address ua; + ua.sin_family = AF_UNSPEC; + ua.sin_port = 0; + memset(&(ua.sin_addr), 0, sizeof(ua.sin_addr)); + memset(&(ua.sin_zero), 0, sizeof(ua.sin_zero)); + err = connect(s, (struct sockaddr *)&ua, sizeof(ua)); #endif - check_fd_sema(udp->s, MZFD_CREATE_WRITE, sinfo, NULL); - + if (err) { + err = SOCK_ERRNO(); + if (OK_DISCONNECT_ERROR(err)) + err = 0; + } + + if (err) { + get_socket_error(); + return 0; + } + + return 1; +} + +int rktio_udp_bind(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr) +{ + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + int err; + + /* bind using first address that works: */ + for (; addr; addr = (rktio_addrinfo_t *)RKTIO_AS_ADDRINFO(addr)->ai_next) { + err = bind(s, RKTIO_AS_ADDRINFO(addr)->ai_addr, RKTIO_AS_ADDRINFO(addr)->ai_addrlen); + if (!err) { + return 1; + } + } + + get_socket_error(); return 0; } -static void udp_send_needs_wakeup(Scheme_Object *_udp, void *fds) +int rktio_udp_connect(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr) { - Scheme_UDP *udp = (Scheme_UDP *)_udp; - void *fds1, *fds2; - rktio_socket_t s = udp->s; - - fds1 = RKTIO_GET_FDSET(fds, 1); - fds2 = RKTIO_GET_FDSET(fds, 2); - - RKTIO_FD_SET(s, (fd_set *)fds1); - RKTIO_FD_SET(s, (fd_set *)fds2); -} + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + int err; -#endif - -#ifdef UDP_IS_SUPPORTED -static Scheme_Object *do_udp_send_it(const char *name, Scheme_UDP *udp, - char *bstr, intptr_t start, intptr_t end, - char *dest_addr, int dest_addr_len, int can_block, - int ignore_addr_failure) -{ - intptr_t x; - int errid = 0; - - while (1) { - if (udp->s == INVALID_SOCKET) { - /* socket was closed, maybe while we slept */ - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: udp socket is closed\n" - " socket: %V", - name, udp); - return NULL; + /* connect using first address that works: */ + for (; addr; addr = (rktio_addrinfo_t *)RKTIO_AS_ADDRINFO(addr)->ai_next) { + err = connect(s, RKTIO_AS_ADDRINFO(addr)->ai_addr, RKTIO_AS_ADDRINFO(addr)->ai_addrlen); + if (!err) { + return 1; } - if ((!dest_addr && !udp->connected) - || (dest_addr && udp->connected)) { - /* socket is unconnected, maybe disconnected while we slept */ - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: udp socket is%s connected\n" - " socket: %V", - name, - dest_addr ? "" : " not", - udp); - return NULL; - } - - udp->bound = 1; /* in case it's not bound already, send[to] binds it */ - - if (dest_addr) - x = sendto(udp->s, bstr XFORM_OK_PLUS start, end - start, - 0, (struct sockaddr *)dest_addr, dest_addr_len); - else - x = send(udp->s, bstr XFORM_OK_PLUS start, end - start, 0); - - if (x == -1) { - errid = SOCK_ERRNO(); - if (ignore_addr_failure && WAS_EBADADDRESS(errid)) { - return NULL; - } else if (WAS_EAGAIN(errid)) { - if (can_block) { - /* Block and eventually try again. */ - Scheme_Object *sema; - sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_WRITE, 1); - if (sema) - scheme_wait_sema(sema, 0); - else - scheme_block_until((Scheme_Ready_Fun)udp_check_send, - udp_send_needs_wakeup, - (Scheme_Object *)udp, - 0); - } else - return scheme_false; - } else if (NOT_WINSOCK(errid) != EINTR) - break; - } else if (x != (end - start)) { - /* this isn't supposed to happen: */ - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: didn't send enough (%d != %d)", - name, - x, end - start); - return NULL; - } else - break; - } - - if (x > -1) { - return (can_block ? scheme_void : scheme_true); - } else { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: send failed\n" - " system error: %E", - name, - errid); - return NULL; - } -} -#endif - -static Scheme_Object *udp_send_it(const char *name, int argc, Scheme_Object *argv[], - int with_addr, int can_block, Scheme_UDP_Evt *fill_evt) -{ -#ifdef UDP_IS_SUPPORTED - Scheme_UDP *udp; - char *address = ""; - intptr_t start, end; - int delta, err; - unsigned short origid, id; - GC_CAN_IGNORE struct rktio_addrinfo_t *udp_dest_addr; - - udp = (Scheme_UDP *)argv[0]; -#endif - - if (!SCHEME_UDPP(argv[0])) - scheme_wrong_contract(name, "udp?", 0, argc, argv); - -#ifdef UDP_IS_SUPPORTED - if (with_addr) { - if (!SCHEME_CHAR_STRINGP(argv[1])) - scheme_wrong_contract(name, "string?", 1, argc, argv); - if (!CHECK_PORT_ID(argv[2])) - scheme_wrong_contract(name, PORT_ID_TYPE, 2, argc, argv); - delta = 0; - } else - delta = -2; - - if (!SCHEME_BYTE_STRINGP(argv[3 + delta])) - scheme_wrong_contract(name, "bytes?", 3 + delta, argc, argv); - - scheme_get_substring_indices(name, argv[3 + delta], - argc, argv, - 4 + delta, 5 + delta, &start, &end); - - if (with_addr) { - Scheme_Object *bs; - bs = scheme_char_string_to_byte_string(argv[1]); - address = SCHEME_BYTE_STR_VAL(bs); - origid = (unsigned short)SCHEME_INT_VAL(argv[2]); - - scheme_security_check_network(name, address, origid, 1); - - id = origid; - } else { - address = NULL; - id = origid = 0; } - if (with_addr) - udp_dest_addr = scheme_get_host_address(address, id, &err, -1, 0, 0); - else - udp_dest_addr = NULL; - - if (!with_addr || udp_dest_addr) { - if (fill_evt) { - char *s; - fill_evt->str = SCHEME_BYTE_STR_VAL(argv[3+delta]); - fill_evt->offset = start; - fill_evt->len = end - start; - if (udp_dest_addr) { - GC_CAN_IGNORE struct rktio_addrinfo_t *addr; - int j, *lens; - char **addrs; - for (j = 0, addr = udp_dest_addr; addr; addr = addr->ai_next) { - j++; - } - fill_evt->dest_addr_count = j; - addrs = MALLOC_N(char*, j); - fill_evt->dest_addrs = addrs; - lens = MALLOC_N_ATOMIC(int, j); - fill_evt->dest_addr_lens = lens; - for (j = 0, addr = udp_dest_addr; addr; addr = addr->ai_next, j++) { - s = (char *)scheme_malloc_atomic(addr->ai_addrlen); - memcpy(s, addr->ai_addr, addr->ai_addrlen); - fill_evt->dest_addrs[j] = s; - fill_evt->dest_addr_lens[j] = addr->ai_addrlen; - } - mz_freeaddrinfo(udp_dest_addr); - } - return scheme_void; - } else { - Scheme_Object *r; - if (udp_dest_addr) { - GC_CAN_IGNORE struct rktio_addrinfo_t *addr; - r = NULL; - for (addr = udp_dest_addr; !r && addr; addr = addr->ai_next) { - r = do_udp_send_it(name, udp, - SCHEME_BYTE_STR_VAL(argv[3+delta]), start, end, - (char *)addr->ai_addr, - addr->ai_addrlen, - can_block, - !!addr->ai_next); - } - mz_freeaddrinfo(udp_dest_addr); - } else { - r = do_udp_send_it(name, udp, - SCHEME_BYTE_STR_VAL(argv[3+delta]), start, end, - NULL, 0, can_block, 1); - } - return r; - } - } else { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't resolve address\n" - " address: %s\n" - " system error: %N", - name, - address, 1, err); - return NULL; - } -#else - return scheme_void; -#endif -} - -static Scheme_Object *udp_send_to(int argc, Scheme_Object *argv[]) -{ - return udp_send_it("udp-send-to", argc, argv, 1, 1, NULL); -} - -static Scheme_Object *udp_send(int argc, Scheme_Object *argv[]) -{ - return udp_send_it("udp-send", argc, argv, 0, 1, NULL); -} - -static Scheme_Object *udp_send_to_star(int argc, Scheme_Object *argv[]) -{ - return udp_send_it("udp-send-to*", argc, argv, 1, 0, NULL); -} - -static Scheme_Object *udp_send_star(int argc, Scheme_Object *argv[]) -{ - return udp_send_it("udp-send*", argc, argv, 0, 0, NULL); -} - -static Scheme_Object *udp_send_to_enable_break(int argc, Scheme_Object *argv[]) -{ - return scheme_call_enable_break(udp_send_to, argc, argv); -} - -static Scheme_Object *udp_send_enable_break(int argc, Scheme_Object *argv[]) -{ - return scheme_call_enable_break(udp_send, argc, argv); -} - -#ifdef UDP_IS_SUPPORTED - -static int udp_check_recv(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) -{ - Scheme_UDP *udp = (Scheme_UDP *)_udp; - - if (udp->s == INVALID_SOCKET) - return 1; - - if (!sinfo || !sinfo->is_poll) { - if (!check_fd_sema(udp->s, MZFD_CHECK_READ, sinfo, NULL)) - return 0; - } - -# ifdef HAVE_POLL_SYSCALL - { - GC_CAN_IGNORE struct pollfd pfd[1]; - int sr; - - pfd[0].fd = udp->s; - pfd[0].events = POLLIN; - do { - sr = poll(pfd, 1, 0); - } while ((sr == -1) && (errno == EINTR)); - - if (sr) - return sr; - } -# else - { - DECL_SOCK_FDSET(readfds); - DECL_SOCK_FDSET(exnfds); - struct timeval time = {0, 0}; - int sr; - - INIT_DECL_SOCK_RD_FDSET(readfds); - INIT_DECL_SOCK_ER_FDSET(exnfds); - - RKTIO_SOCK_FD_ZERO(readfds); - RKTIO_SOCK_FD_SET(udp->s, readfds); - RKTIO_SOCK_FD_ZERO(exnfds); - RKTIO_SOCK_FD_SET(udp->s, exnfds); - - do { - sr = select(udp->s + 1, readfds, NULL, exnfds, &time); - } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); - - if (sr) - return sr; - } -# endif - - check_fd_sema(udp->s, MZFD_CREATE_READ, sinfo, NULL); - + get_socket_error(); return 0; } -static void udp_recv_needs_wakeup(Scheme_Object *_udp, void *fds) +intptr_t rktio_udp_sendto(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *addr, char *buffer, intptr_t len) { - Scheme_UDP *udp = (Scheme_UDP *)_udp; - void *fds1, *fds2; - - rktio_socket_t s = udp->s; - - fds1 = RKTIO_GET_FDSET(fds, 0); - fds2 = RKTIO_GET_FDSET(fds, 2); - - RKTIO_FD_SET(s, (fd_set *)fds1); - RKTIO_FD_SET(s, (fd_set *)fds2); + return do_socket_write(rktio, rfd, buffer, len, addr); } -#endif - -static int do_udp_recv(const char *name, Scheme_UDP *udp, char *bstr, intptr_t start, intptr_t end, - int can_block, Scheme_Object **v) +rktio_length_and_addrinfo_t *rktio_udp_recvfrom(rktio_t *rktio, rktio_fd_t *rfd, char *buffer, intptr_t len) { -#ifdef UDP_IS_SUPPORTED - intptr_t x; - int errid = 0; + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + rktio_length_and_addrinfo_t *r; + int rn, errid; char src_addr[RKTIO_SOCK_NAME_MAX_LEN]; unsigned int asize = sizeof(src_addr); - - if (!udp->bound) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: udp socket is not bound\n" - " socket: %V", - name, - udp); - return 0; - } - + while (1) { - if (udp->s == INVALID_SOCKET) { - /* socket was closed, maybe while we slept */ - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: udp socket is closed\n" - " socket: %V", - name, udp); - return 0; - } + if (!len) { + /* recvfrom() doesn't necessarily wait if you pass a buffer size of 0; + to be consistent with accepting a message but discarding bytes that + don't fit, accept at least one byte and turn a `1` result into `0` */ + char buf[1]; + rn = recvfrom(s, buf, 1, 0, (struct sockaddr *)src_addr, &asize); + if (rn == 1) + rn = 0; + } else + rn = recvfrom(s, buffer, len, 0, (struct sockaddr *)src_addr, &asize); - { - if (end == start) { - /* recvfrom() doesn't necessarily wait if you pass a buffer size of 0; - to be consistent with accepting a message but discarding bytes that - don't fit, accept at least one byte and turn a `1` result into `0` */ - char buf[1]; - x = recvfrom(udp->s, buf, 1, 0, (struct sockaddr *)src_addr, &asize); - if (x == 1) - x = 0; - } else { - x = recvfrom(udp->s, bstr XFORM_OK_PLUS start, end - start, 0, - (struct sockaddr *)src_addr, &asize); - } - } - - if (x == -1) { + if (rn < 0) { errid = SOCK_ERRNO(); if (WAS_ECONNREFUSED(errid)) { - /* Delayed ICMP error. Ignore it and try again. */ - errid = 0; + /* Delayed ICMP error. Client should ignore it and try again. */ + set_racket_error(RKTIO_ERROR_INFO_TRY_AGAIN); + return NULL; } else if (WAS_WSAEMSGSIZE(errid)) { - x = end - start; - errid = 0; - break; - } else if (WAS_EAGAIN(errid)) { - if (can_block) { - /* Block and eventually try again. */ - Scheme_Object *sema; - sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_READ, 1); - if (sema) - scheme_wait_sema(sema, 0); - else - scheme_block_until((Scheme_Ready_Fun)udp_check_recv, - udp_recv_needs_wakeup, - (Scheme_Object *)udp, - 0); - } else { - v[0] = scheme_false; - v[1] = scheme_false; - v[2] = scheme_false; - return 0; - } - } else if (NOT_WINSOCK(errid) != EINTR) - break; + /* => data truncated on Windows, which counts as success on Unix */ + rn = len; + break; + } else if (NOT_WINSOCK(errno) == EINTR) { + /* try again */ + } else if (WAS_EAGAIN(errno)) { + /* no data available */ + set_racket_error(RKTIO_ERROR_TRY_AGAIN); + return NULL; + } else { + get_socket_error(); + return NULL; + } } else break; } + + r = malloc(sizeof(rktio_length_and_addrinfo_t)); + r->len = rn; + + r->addr = malloc(asize); + memcpy(r->addr, src_addr, asize); + + return r; +} + +int rktio_udp_get_multicast_loopback(rktio_t *rktio, rktio_fd_t *rfd) +{ + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + u_char loop; + unsigned int loop_len = sizeof(loop); + int status; - if (x > -1) { - char host_buf[RKTIO_SOCK_HOST_NAME_MAX_LEN]; - char prev_buf[RKTIO_SOCK_HOST_NAME_MAX_LEN]; - char svc_buf[RKTIO_SOCK_SVC_NAME_MAX_LEN]; - int j, id; + status = getsockopt(s, IPPROTO_IP, IP_MULTICAST_LOOP, (void *)&loop, &loop_len); + + if (status) { + get_socket_error(); + return RKTIO_PROP_ERROR; + } else + return (loop ? 1 : 0); +} - v[0] = scheme_make_integer(x); - - scheme_getnameinfo((struct sockaddr *)src_addr, asize, - host_buf, sizeof(host_buf), - svc_buf, sizeof(svc_buf)); - - if (udp->previous_from_addr) { - mzchar *s; - s = SCHEME_CHAR_STR_VAL(udp->previous_from_addr); - for (j = 0; s[j]; j++) { - prev_buf[j] = (char)s[j]; - } - prev_buf[j] = 0; - } - - if (udp->previous_from_addr && !strcmp(prev_buf, host_buf)) { - v[1] = udp->previous_from_addr; - } else { - Scheme_Object *vv; - vv = scheme_make_immutable_sized_utf8_string(host_buf, -1); - v[1] = vv; - udp->previous_from_addr = v[1]; - } - - id = extract_svc_value(svc_buf); - - v[2] = scheme_make_integer(id); - - return 1; - } else { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: receive failed\n" - " system error: %E", - name, - errid); +int rktio_udp_set_multicast_loopback(rktio_t *rktio, rktio_fd_t *rfd, int on) +{ + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + u_char loop = (on ? 1 : 0); + unsigned int loop_len = sizeof(loop); + int status; + + status = setsockopt(s, IPPROTO_IP, IP_MULTICAST_LOOP, (void *)&loop, loop_len); + + if (status) { + get_socket_error(); return 0; - } -#else - return 0; -#endif -} - -static Scheme_Object *udp_recv(const char *name, int argc, Scheme_Object *argv[], - int can_block, Scheme_UDP_Evt *fill_evt) -{ - Scheme_UDP *udp; - intptr_t start, end; - Scheme_Object *v[3]; - - udp = (Scheme_UDP *)argv[0]; - - if (!SCHEME_UDPP(argv[0])) - scheme_wrong_contract(name, "udp?", 0, argc, argv); - if (!SCHEME_BYTE_STRINGP(argv[1]) || !SCHEME_MUTABLEP(argv[1])) - scheme_wrong_contract(name, "(or/c bytes? (not/c immutable?))", 1, argc, argv); - -#ifdef UDP_IS_SUPPORTED - scheme_get_substring_indices(name, argv[1], - argc, argv, - 2, 3, &start, &end); - if (fill_evt) { - fill_evt->str = SCHEME_BYTE_STR_VAL(argv[1]); - fill_evt->offset = start; - fill_evt->len = end - start; - return scheme_void; - } else { - do_udp_recv(name, udp, SCHEME_BYTE_STR_VAL(argv[1]), start, end, can_block, v); - - return scheme_values(3,v); - } -#else - return NULL; -#endif -} - -static Scheme_Object *udp_receive(int argc, Scheme_Object *argv[]) -{ - return udp_recv("udp-receive!", argc, argv, 1, NULL); -} - -static Scheme_Object *udp_receive_star(int argc, Scheme_Object *argv[]) -{ - return udp_recv("udp-receive!*", argc, argv, 0, NULL); -} - -static Scheme_Object *udp_receive_enable_break(int argc, Scheme_Object *argv[]) -{ - return scheme_call_enable_break(udp_receive, argc, argv); -} - -static Scheme_Object *make_udp_evt(const char *name, int argc, Scheme_Object **argv, int for_read) -{ -#ifdef UDP_IS_SUPPORTED - Scheme_UDP_Evt *uw; -#endif - - if (!SCHEME_UDPP(argv[0])) - scheme_wrong_contract(name, "udp?", 0, argc, argv); - -#ifdef UDP_IS_SUPPORTED - uw = MALLOC_ONE_TAGGED(Scheme_UDP_Evt); - uw->so.type = scheme_udp_evt_type; - uw->udp = (Scheme_UDP *)argv[0]; - uw->for_read = for_read; - - return (Scheme_Object *)uw; -#else - return scheme_void; -#endif -} - -static Scheme_Object *udp_read_ready_evt(int argc, Scheme_Object *argv[]) -{ - return make_udp_evt("udp-receive-ready-evt", argc, argv, 1); -} - -static Scheme_Object *udp_write_ready_evt(int argc, Scheme_Object *argv[]) -{ - return make_udp_evt("udp-send-ready-evt", argc, argv, 0); -} - -static Scheme_Object *udp_read_evt(int argc, Scheme_Object *argv[]) -{ - Scheme_Object *evt; - evt = make_udp_evt("udp-receive!-evt", argc, argv, 1); - udp_recv("udp-receive!-evt", argc, argv, 0, (Scheme_UDP_Evt *)evt); - return evt; -} - -static Scheme_Object *udp_write_evt(int argc, Scheme_Object *argv[]) -{ - Scheme_Object *evt; - evt = make_udp_evt("udp-send-evt", argc, argv, 0); - udp_send_it("udp-send-evt", argc, argv, 0, 0, (Scheme_UDP_Evt *)evt); - return evt; -} - -static Scheme_Object *udp_write_to_evt(int argc, Scheme_Object *argv[]) -{ - Scheme_Object *evt; - evt = make_udp_evt("udp-send-to-evt", argc, argv, 0); - udp_send_it("udp-send-to-evt", argc, argv, 1, 0, (Scheme_UDP_Evt *)evt); -#ifdef UDP_IS_SUPPORTED - ((Scheme_UDP_Evt *)evt)->with_addr = 1; -#endif - return evt; -} - -#ifdef UDP_IS_SUPPORTED -static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo) -{ - Scheme_UDP_Evt *uw = (Scheme_UDP_Evt *)_uw; - - if (uw->for_read) { - if (uw->str) { - Scheme_Object *v[3]; - if (do_udp_recv("udp-receive!-evt", uw->udp, - uw->str, uw->offset, uw->offset + uw->len, - 0, v)) { - scheme_set_sync_target(sinfo, scheme_build_list(3, v), NULL, NULL, 0, 0, NULL); - return 1; - } else - return 0; - } else { - return udp_check_recv((Scheme_Object *)uw->udp, NULL); - } - } else { - if (uw->str) { - Scheme_Object *r = NULL; - int j; - for (j = 0; !r && (j < (uw->dest_addrs ? uw->dest_addr_count : 1)); j++) { - r = do_udp_send_it("udp-send-evt", uw->udp, - uw->str, uw->offset, uw->offset + uw->len, - uw->dest_addrs ? uw->dest_addrs[j] : NULL, - uw->dest_addrs ? uw->dest_addr_lens[j] : 0, - 0, - j+1 < uw->dest_addr_count); - } - if (SCHEME_TRUEP(r)) { - scheme_set_sync_target(sinfo, scheme_void, NULL, NULL, 0, 0, NULL); - return 1; - } else - return 0; - } else - return udp_check_send((Scheme_Object *)uw->udp, NULL); - } -} - -static void udp_evt_needs_wakeup(Scheme_Object *_uw, void *fds) -{ - Scheme_UDP_Evt *uw = (Scheme_UDP_Evt *)_uw; - - if (uw->for_read) - udp_recv_needs_wakeup((Scheme_Object *)uw->udp, fds); - else - udp_send_needs_wakeup((Scheme_Object *)uw->udp, fds); -} -#endif - -static int udp_check_open(char const *name, int argc, Scheme_Object *argv[]) { - if (!SCHEME_UDPP(argv[0])) { - scheme_wrong_contract(name, "udp?", 0, argc, argv); - return 0; /* Why does no-one else expect control back after scheme_wrong_contract? */ - /* Or, conversely, why does everyone expect control back after scheme_raise_exn? */ - } - -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp = (Scheme_UDP *) argv[0]; - - if (udp->s == INVALID_SOCKET) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: udp socket was already closed\n" - " socket: %V", - name, udp); - return 0; - } - + } else return 1; - } -#else - return 0; -#endif } -static Scheme_Object * -udp_multicast_loopback_p(int argc, Scheme_Object *argv[]) +int rktio_udp_get_multicast_ttl(rktio_t *rktio, rktio_fd_t *rfd) { - if (!udp_check_open("udp-multicast-loopback?", argc, argv)) - return NULL; - -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp = (Scheme_UDP *) argv[0]; - u_char loop; - unsigned int loop_len = sizeof(loop); - int status; - status = getsockopt(udp->s, IPPROTO_IP, IP_MULTICAST_LOOP, (void *) &loop, &loop_len); - if (status) - status = SOCK_ERRNO(); - if (status) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-multicast-loopback?: getsockopt failed\n" - " system error: %N", - 0, status); - return NULL; - } else { - return (loop ? scheme_true : scheme_false); - } - } -#else - return scheme_void; -#endif + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + u_char ttl; + unsigned int ttl_len = sizeof(ttl); + int status; + + status = getsockopt(s, IPPROTO_IP, IP_MULTICAST_TTL, (void *)&ttl, &ttl_len); + + if (status) { + get_socket_error(); + return RKTIO_PROP_ERROR; + } else + return ttl; } -static Scheme_Object * -udp_multicast_set_loopback(int argc, Scheme_Object *argv[]) +int rktio_udp_set_multicast_ttl(rktio_t *rktio, rktio_fd_t *rfd, int ttl_val) { - if (!udp_check_open("udp-multicast-set-loopback!", argc, argv)) - return NULL; - -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp = (Scheme_UDP *) argv[0]; - u_char loop = SCHEME_TRUEP(argv[1]) ? 1 : 0; - unsigned int loop_len = sizeof(loop); - int status; - status = setsockopt(udp->s, IPPROTO_IP, IP_MULTICAST_LOOP, (void *) &loop, loop_len); - if (status) - status = SOCK_ERRNO(); - if (status) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-multicast-set-loopback!: setsockopt failed\n" - " system error: %N", - 0, status); - return NULL; - } else { - return scheme_void; - } - } -#else - return scheme_void; -#endif + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + u_char ttl = ttl_val; + unsigned int ttl_len = sizeof(ttl); + int status; + + status = setsockopt(s, IPPROTO_IP, IP_MULTICAST_TTL, (void *)&ttl, ttl_len); + + if (status) { + get_socket_error(); + return 0; + } else + return 1; } -static Scheme_Object * -udp_multicast_ttl(int argc, Scheme_Object *argv[]) +char *rktio_udp_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd) { - if (!udp_check_open("udp-multicast-ttl", argc, argv)) + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + struct in_addr intf; + unsigned int intf_len = sizeof(intf); + int status; + + status = getsockopt(s, IPPROTO_IP, IP_MULTICAST_IF, (void *)&intf, &intf_len); + if (status) { + get_socket_error(); return NULL; - -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp = (Scheme_UDP *) argv[0]; - u_char ttl; - unsigned int ttl_len = sizeof(ttl); - int status; - status = getsockopt(udp->s, IPPROTO_IP, IP_MULTICAST_TTL, (void *) &ttl, &ttl_len); - if (status) - status = SOCK_ERRNO(); - if (status) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-multicast-ttl: getsockopt failed\n" - " system error: %N", - 0, status); - return NULL; - } else { - return scheme_make_integer(ttl); - } + } else { + char host_buf[RKTIO_SOCK_HOST_NAME_MAX_LEN]; + unsigned char *b = (unsigned char *) &intf; /* yes, this is in network order */ + sprintf(host_buf, "%d.%d.%d.%d", b[0], b[1], b[2], b[3]); + return strdup(host_buf); } -#else - return scheme_void; -#endif } -static Scheme_Object * -udp_multicast_set_ttl(int argc, Scheme_Object *argv[]) +int rktio_udp_set_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd, rktio_addrinfo_t *intf_addr) { - if (!udp_check_open("udp-multicast-set-ttl!", argc, argv)) - return NULL; - if (!SCHEME_INTP(argv[1]) || (SCHEME_INT_VAL(argv[1]) < 0) || (SCHEME_INT_VAL(argv[1]) >= 256)) { - scheme_wrong_contract("udp-multicast-set-ttl!", "byte?", 1, argc, argv); - return NULL; - } - -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp = (Scheme_UDP *) argv[0]; - u_char ttl = (u_char) SCHEME_INT_VAL(argv[1]); - unsigned int ttl_len = sizeof(ttl); - int status; - status = setsockopt(udp->s, IPPROTO_IP, IP_MULTICAST_TTL, (void *) &ttl, ttl_len); - if (status) - status = SOCK_ERRNO(); - if (status) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-multicast-set-ttl!: setsockopt failed\n" - " system error: %N", - 0, status); - return NULL; - } else { - return scheme_void; - } - } -#else - return scheme_void; -#endif -} - -static Scheme_Object * -udp_multicast_interface(int argc, Scheme_Object *argv[]) -{ - if (!udp_check_open("udp-multicast-interface", argc, argv)) - return NULL; - -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp = (Scheme_UDP *) argv[0]; - GC_CAN_IGNORE struct in_addr intf; - unsigned int intf_len = sizeof(intf); - int status; - status = getsockopt(udp->s, IPPROTO_IP, IP_MULTICAST_IF, (void *) &intf, &intf_len); - if (status) - status = SOCK_ERRNO(); - if (status) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-multicast-interface: getsockopt failed\n" - " system error: %N", - 0, status); - return NULL; - } else { - char host_buf[RKTIO_SOCK_HOST_NAME_MAX_LEN]; - unsigned char *b = (unsigned char *) &intf; /* yes, this is in network order */ - sprintf(host_buf, "%d.%d.%d.%d", b[0], b[1], b[2], b[3]); - return scheme_make_utf8_string(host_buf); - } - } -#else - return scheme_void; -#endif -} - -static Scheme_Object * -udp_multicast_set_interface(int argc, Scheme_Object *argv[]) -{ - if (!udp_check_open("udp-multicast-set-interface!", argc, argv)) - return NULL; - if (!SCHEME_CHAR_STRINGP(argv[1]) && !SCHEME_FALSEP(argv[1])) { - scheme_wrong_contract("udp-multicast-set-interface!", "(or/c string? #f)", 1, argc, argv); - return NULL; - } - -#ifdef UDP_IS_SUPPORTED - { - Scheme_UDP *udp = (Scheme_UDP *) argv[0]; - GC_CAN_IGNORE struct in_addr intf; - unsigned int intf_len = sizeof(intf); - int status; - - if (SCHEME_FALSEP(argv[1])) { - intf.s_addr = INADDR_ANY; - } else { - Scheme_Object *bs; - char *address = ""; - GC_CAN_IGNORE struct rktio_addrinfo_t *if_addr = NULL; - int err; - bs = scheme_char_string_to_byte_string(argv[1]); - address = SCHEME_BYTE_STR_VAL(bs); - if_addr = scheme_get_host_address(address, -1, &err, RKTIO_PF_INET, 0, 0); - if (!if_addr) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-multicast-set-interface!: can't resolve interface address\n" - " address: %s\n" - " system error: %N", - address ? address : "", 1, err); - return NULL; - } - intf = ((struct sockaddr_in *)if_addr->ai_addr)->sin_addr; - mz_freeaddrinfo(if_addr); - } - - status = setsockopt(udp->s, IPPROTO_IP, IP_MULTICAST_IF, (void *) &intf, intf_len); - if (status) - status = SOCK_ERRNO(); - if (status) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "udp-multicast-set-interface!: setsockopt failed\n" - " system error: %N", - 0, status); - return NULL; - } else { - return scheme_void; - } - } -#else - return scheme_void; -#endif -} - -#ifdef UDP_IS_SUPPORTED - -static Scheme_Object * -do_udp_multicast_join_or_leave_group(char const *name, int optname, Scheme_UDP *udp, Scheme_Object *multiaddrname, Scheme_Object *ifaddrname) -{ - GC_CAN_IGNORE struct ip_mreq mreq; - unsigned int mreq_len = sizeof(mreq); + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + struct in_addr intf; + unsigned int intf_len = sizeof(intf); int status; - if (SCHEME_FALSEP(ifaddrname)) { + if (!intf_addr) { + intf.s_addr = INADDR_ANY; + } else { + intf = ((struct sockaddr_in *)RKTIO_AS_ADDRINFO(intf_addr)->ai_addr)->sin_addr; + } + + status = setsockopt(s, IPPROTO_IP, IP_MULTICAST_IF, (void *)&intf, intf_len); + if (status) { + get_socket_error(); + return 0; + } else + return 1; +} + +int rktio_udp_change_multicast_group(rktio_t *rktio, rktio_fd_t *rfd, + rktio_addrinfo_t *group_addr, + rktio_addrinfo_t *intf_addr, + int action) +{ + rktio_socket_t s = rktio_fd_system_fd(rktio, rfd); + struct ip_mreq mreq; + unsigned int mreq_len = sizeof(mreq); + int status; + int optname; + + if (!intf_addr) { mreq.imr_interface.s_addr = INADDR_ANY; } else { - Scheme_Object *bs; - char *address = ""; - GC_CAN_IGNORE struct rktio_addrinfo_t *if_addr = NULL; - int err; - bs = scheme_char_string_to_byte_string(ifaddrname); - address = SCHEME_BYTE_STR_VAL(bs); - if_addr = scheme_get_host_address(address, -1, &err, RKTIO_PF_INET, 0, 0); - if (!if_addr) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't resolve interface address\n" - " address: %s\n" - " system error: %N", - name, address ? address : "", 1, err); - return NULL; - } - mreq.imr_interface = ((struct sockaddr_in *)if_addr->ai_addr)->sin_addr; - mz_freeaddrinfo(if_addr); + mreq.imr_interface = ((struct sockaddr_in *)RKTIO_AS_ADDRINFO(intf_addr)->ai_addr)->sin_addr; } - { - Scheme_Object *bs; - char *address = ""; - GC_CAN_IGNORE struct rktio_addrinfo_t *group_addr = NULL; - int err; - bs = scheme_char_string_to_byte_string(multiaddrname); - address = SCHEME_BYTE_STR_VAL(bs); - group_addr = scheme_get_host_address(address, -1, &err, RKTIO_PF_INET, 0, 0); - if (!group_addr) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: can't resolve group address\n" - " address: %s\n" - " system error: %N", - name, address ? address : "", 1, err); - return NULL; - } - mreq.imr_multiaddr = ((struct sockaddr_in *)group_addr->ai_addr)->sin_addr; - mz_freeaddrinfo(group_addr); - } + mreq.imr_multiaddr = ((struct sockaddr_in *)RKTIO_AS_ADDRINFO(group_addr)->ai_addr)->sin_addr; - status = setsockopt(udp->s, IPPROTO_IP, optname, (void *) &mreq, mreq_len); - if (status) - status = SOCK_ERRNO(); + if (action == RKTIO_ADD_MEMBERSHIP) + optname = IP_ADD_MEMBERSHIP; + else if (action == RKTIO_DROP_MEMBERSHIP) + optname = IP_DROP_MEMBERSHIP; + + status = setsockopt(s, IPPROTO_IP, optname, (void *) &mreq, mreq_len); + if (status) { - scheme_raise_exn(MZEXN_FAIL_NETWORK, - "%s: setsockopt failed\n" - " system error: %N", - name, 0, status); - return NULL; - } else { - return scheme_void; - } + get_socket_error(); + return 0; + } else + return 1; } - -#endif - -static Scheme_Object * -udp_multicast_join_or_leave_group(char const *name, int optname, int argc, Scheme_Object *argv[]) -{ - if (!udp_check_open(name, argc, argv)) - return NULL; - if (!SCHEME_CHAR_STRINGP(argv[1])) { - scheme_wrong_contract(name, "string?", 1, argc, argv); - return NULL; - } - if (!SCHEME_CHAR_STRINGP(argv[2]) && !SCHEME_FALSEP(argv[2])) { - scheme_wrong_contract(name, "(or/c string? #f)", 2, argc, argv); - return NULL; - } - -#ifdef UDP_IS_SUPPORTED - return do_udp_multicast_join_or_leave_group(name, optname, - (Scheme_UDP *) argv[0], argv[1], argv[2]); -#else - return scheme_void; -#endif -} - -#ifdef UDP_IS_SUPPORTED -# define WHEN_UDP_IS_SUPPORTED(x) x -#else -# define WHEN_UDP_IS_SUPPORTED(x) 0 -#endif - -static Scheme_Object * -udp_multicast_join_group(int argc, Scheme_Object *argv[]) -{ - return udp_multicast_join_or_leave_group("udp-multicast-join-group!", - WHEN_UDP_IS_SUPPORTED(IP_ADD_MEMBERSHIP), - argc, - argv); -} - -static Scheme_Object * -udp_multicast_leave_group(int argc, Scheme_Object *argv[]) -{ - return udp_multicast_join_or_leave_group("udp-multicast-leave-group!", - WHEN_UDP_IS_SUPPORTED(IP_DROP_MEMBERSHIP), - argc, - argv); -} - -#endif diff --git a/racket/src/rktio/rktio_read_write.c b/racket/src/rktio/rktio_read_write.c index c27945fdc8..1a2f814e84 100644 --- a/racket/src/rktio/rktio_read_write.c +++ b/racket/src/rktio/rktio_read_write.c @@ -83,7 +83,7 @@ static void init_read_fd(rktio_fd_t *rfd) th->checking = 0; sm = CreateSemaphore(NULL, 0, 1, NULL); -v th->checking_sema = sm; + th->checking_sema = sm; sm = CreateSemaphore(NULL, 0, 1, NULL); th->ready_sema = sm; sm = CreateSemaphore(NULL, 1, 1, NULL); @@ -153,6 +153,11 @@ int rktio_fd_is_socket(rktio_t *rktio, rktio_fd_t *rfd) return (rfd->modes & RKTIO_OPEN_SOCKET); } +int rktio_fd_is_udp(rktio_t *rktio, rktio_fd_t *rfd) +{ + return (rfd->modes & RKTIO_OPEN_UDP); +} + rktio_fd_t *rktio_dup(rktio_t *rktio, rktio_fd_t *rfd) { #ifdef RKTIO_SYSTEM_UNIX intptr_t nfd;