rktio: repairs

All tests now pass for MacOS.
This commit is contained in:
Matthew Flatt 2017-06-16 13:16:01 -06:00
parent 463862d793
commit c4b3b19c1e
13 changed files with 63 additions and 82 deletions

View File

@ -785,7 +785,15 @@ If @var{read_too} is non-zero, the function produces multiple values
Creates Racket input and output ports for a TCP socket @var{s}. The
@var{name} argument supplies the name for the ports. If @var{close}
is non-zero, then the ports assume responsibility for closing the
socket. The resulting ports are written to @var{inp} and @var{outp}.}
socket. The resulting ports are written to @var{inp} and @var{outp}.
Whether @var{close} is zero or not, closing the resulting ports
unregisters the file descriptor with @cpp{scheme_fd_to_semaphore}.
So, passing zero for @var{close} and also using the file descriptor
with other ports or with @cpp{scheme_fd_to_semaphore} will not work right.
@history["6.9.0.6" @elem{Changed ports to always unregister with @cpp{scheme_fd_to_semaphore},
since it's not safe to skip that step.}]}
@function[(Scheme_Object* scheme_fd_to_semaphore

View File

@ -4975,8 +4975,12 @@
(set! s? f) ; break the JIT's optimistic assumption
(define (go)
(define init-size
(let ([vec (make-vector 6)])
(vector-set-performance-stats! vec (current-thread))
(vector-ref vec 3)))
(define size (f 500000)) ; make sure that this still leads to a tail loop
(size . < . 80000)))
((- size init-size) . < . 20000)))
(test #t (dynamic-require ''check-tail-call-by-jit-for-struct-predicate 'go))

View File

@ -625,7 +625,7 @@
(let ([t (thread
(lambda ()
(sync s-t)))]
[portnum (listen-port l)] ; so parallel tests work ok
[portnum (listen-port l)] ; so parallel tests work ok
[orig-thread (current-thread)])
(let-values ([(r w) (make-pipe)])
@ -728,6 +728,7 @@
(loop))))
(close-output-port sw)
(test cr sync cr)
(test cr sync s t l sr cr)
(test cr sync s t l sr cr)

View File

@ -317,7 +317,7 @@
((b) (make-bytes 8 0)))
(test (void) udp-multicast-set-interface! s2 "localhost")
(test (void) udp-send-to s2 "233.252.0.0" lp #"hi")
(sleep 0.05)
(sleep 0.05) ; (sync (udp-receive-ready-evt s))
(let-values (((packet-length ra1 rp1) (udp-receive!* s b)))
(test 2 values packet-length)
(test #"hi\0\0\0\0\0\0" values b))
@ -334,8 +334,7 @@
(test (void) udp-close s)
;; It's closed
(err/rt-test (udp-multicast-loopback? s) exn:fail:network?)
(err/rt-test (udp-multicast-set-loopback! s #t) exn:fail:network?)
)
(err/rt-test (udp-multicast-set-loopback! s #t) exn:fail:network?))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View File

@ -551,7 +551,6 @@ static Scheme_Env *place_instance_init(void *stack_base, int initial_main_os_thr
scheme_init_sema_places();
scheme_init_gmp_places();
scheme_init_fd_semaphores();
scheme_alloc_global_fdset();
#ifndef DONT_USE_FOREIGN
scheme_init_foreign_places();
#endif
@ -676,7 +675,6 @@ void scheme_place_instance_destroy(int force)
GC_destruct_child_gc();
#endif
scheme_free_all_code();
scheme_free_global_fdset();
rktio_destroy(scheme_rktio);
}

View File

@ -1,6 +1,5 @@
/* >>>> Generated by mkmark.rkt from mzmarksrc.c <<<< */
#ifdef USE_TCP
static int mark_listener_SIZE(void *p, struct NewGC *gc) {
#ifndef GC_NO_SIZE_NEEDED_FROM_PROCS
listener_t *l = (listener_t *)p;
@ -91,7 +90,6 @@ static int mark_tcp_FIXUP(void *p, struct NewGC *gc) {
#define mark_tcp_IS_CONST_SIZE 1
# ifdef UDP_IS_SUPPORTED
static int mark_udp_SIZE(void *p, struct NewGC *gc) {
#ifndef GC_NO_SIZE_NEEDED_FROM_PROCS
gcBYTES_TO_WORDS(sizeof(Scheme_UDP));
@ -179,6 +177,4 @@ static int mark_udp_evt_FIXUP(void *p, struct NewGC *gc) {
#define mark_udp_evt_IS_ATOMIC 0
#define mark_udp_evt_IS_CONST_SIZE 1
# endif
#endif

View File

@ -1818,7 +1818,6 @@ END print;
START network;
#ifdef USE_TCP
mark_listener {
listener_t *l = (listener_t *)p;
@ -1841,7 +1840,6 @@ mark_tcp {
gcBYTES_TO_WORDS(sizeof(Scheme_Tcp));
}
# ifdef UDP_IS_SUPPORTED
mark_udp {
mark:
Scheme_UDP *udp = (Scheme_UDP *)p;
@ -1863,8 +1861,6 @@ mark_udp_evt {
size:
gcBYTES_TO_WORDS(sizeof(Scheme_UDP_Evt));
}
# endif
#endif
END network;

View File

@ -188,6 +188,14 @@ static int check_fd_sema(rktio_fd_t *s, int mode, Scheme_Schedule_Info *sinfo, S
sema = scheme_rktio_fd_to_semaphore(s, mode);
if (sema) {
/* It would make sense to force a poll via
scheme_check_fd_semaphores() here, although we'd only want to
to that once per scheduler cycle. That would more reliably poll
at the OS level, since we otherwise wait on the scheduler to
check semaphores. It's not clear that the OS supports more
precise reasoning about readiness, though, and Racket has
traditonally not done that, so we're still skipping
it. */
if (!scheme_wait_sema(sema, 1)) {
if (sinfo && !sinfo->no_redirect)
scheme_set_sync_target(sinfo, sema, orig, NULL, 0, 0, NULL);
@ -392,7 +400,7 @@ static intptr_t tcp_get_string(Scheme_Input_Port *port,
rn = rktio_read(scheme_rktio, data->tcp, data->b.buffer, read_amt);
data->b.bufmax = rn; /* could be count, error, or EOF */
}
if (data->b.bufmax) {
/* got data, error, or EOF */
break;
@ -424,7 +432,7 @@ static intptr_t tcp_get_string(Scheme_Input_Port *port,
"tcp-read: error reading\n"
" system error: %R");
return 0;
} else if (data->b.bufmax == RKTIO_READ_ERROR) {
} else if (data->b.bufmax == RKTIO_READ_EOF) {
data->b.bufmax = 0;
data->b.hiteof = 1;
return EOF;
@ -809,7 +817,7 @@ static void wait_until_lookup(Connect_Progress_Data *pd)
}
}
static rktio_addrinfo_t *do_resolve_address(const char *who, char *address, int id, int family, int show_id_on_error)
static rktio_addrinfo_t *do_resolve_address(const char *who, char *address, int id, int family, int passive, int show_id_on_error)
{
Connect_Progress_Data *pd;
rktio_addrinfo_lookup_t *lookup;
@ -817,7 +825,7 @@ static rktio_addrinfo_t *do_resolve_address(const char *who, char *address, int
pd = make_connect_progress_data();
lookup = rktio_start_addrinfo_lookup(scheme_rktio, address, id, family, 0, 0);
lookup = rktio_start_addrinfo_lookup(scheme_rktio, address, id, family, passive, 0);
if (!lookup) {
addr = NULL;
} else {
@ -1758,7 +1766,7 @@ static Scheme_Object *make_udp(int argc, Scheme_Object *argv[])
if (!id)
id = 1025;
addr = do_resolve_address("upd-open-socket", address, id, RKTIO_FAMILY_ANY, show_id_on_error);
addr = do_resolve_address("upd-open-socket", address, id, RKTIO_FAMILY_ANY, 0, show_id_on_error);
} else
addr = NULL;
@ -1894,10 +1902,11 @@ static Scheme_Object *udp_bind_or_connect(const char *name, int argc, Scheme_Obj
" system error: %R",
name);
}
udp->connected = 0;
return scheme_void;
}
addr = do_resolve_address(name, address, port, RKTIO_FAMILY_ANY, 1);
addr = do_resolve_address(name, address, port, RKTIO_FAMILY_ANY, do_bind, 1);
if (!do_bind) {
/* CONNECT CASE */
@ -1918,6 +1927,8 @@ static Scheme_Object *udp_bind_or_connect(const char *name, int argc, Scheme_Obj
return NULL;
}
udp->connected = 1;
} else {
/* BIND CASE */
int ok;
@ -1938,6 +1949,8 @@ static Scheme_Object *udp_bind_or_connect(const char *name, int argc, Scheme_Obj
port);
return NULL;
}
udp->bound = 1;
}
}
@ -2101,7 +2114,7 @@ static Scheme_Object *udp_send_it(const char *name, int argc, Scheme_Object *arg
scheme_security_check_network(name, address, id, 1);
dest_addr = do_resolve_address(name, address, id, RKTIO_FAMILY_ANY, 1);
dest_addr = do_resolve_address(name, address, id, RKTIO_FAMILY_ANY, 0, 1);
} else {
dest_addr = NULL;
}
@ -2322,23 +2335,17 @@ static Scheme_Object *udp_receive_enable_break(int argc, Scheme_Object *argv[])
static Scheme_Object *make_udp_evt(const char *name, int argc, Scheme_Object **argv, int for_read)
{
#ifdef UDP_IS_SUPPORTED
Scheme_UDP_Evt *uw;
#endif
if (!SCHEME_UDPP(argv[0]))
scheme_wrong_contract(name, "udp?", 0, argc, argv);
#ifdef UDP_IS_SUPPORTED
uw = MALLOC_ONE_TAGGED(Scheme_UDP_Evt);
uw->so.type = scheme_udp_evt_type;
uw->udp = (Scheme_UDP *)argv[0];
uw->for_read = for_read;
return (Scheme_Object *)uw;
#else
return scheme_void;
#endif
}
static Scheme_Object *udp_read_ready_evt(int argc, Scheme_Object *argv[])
@ -2552,7 +2559,7 @@ udp_multicast_set_interface(int argc, Scheme_Object *argv[])
if (SCHEME_CHAR_STRINGP(argv[1])) {
bs = scheme_char_string_to_byte_string(argv[1]);
addr = do_resolve_address("udp-multicast-set-interface!", SCHEME_BYTE_STR_VAL(bs), -1, udp_default_family(), 0);
addr = do_resolve_address("udp-multicast-set-interface!", SCHEME_BYTE_STR_VAL(bs), -1, udp_default_family(), 0, 0);
} else
addr = NULL;
@ -2581,7 +2588,6 @@ do_udp_multicast_join_or_leave_group(char const *name, int optname, Scheme_UDP *
pd = make_connect_progress_data();
bs = scheme_char_string_to_byte_string(multiaddrname);
address = SCHEME_BYTE_STR_VAL(bs);
@ -2639,7 +2645,6 @@ do_udp_multicast_join_or_leave_group(char const *name, int optname, Scheme_UDP *
rktio_addrinfo_free(scheme_rktio, multi_addr);
if (intf_addr) rktio_addrinfo_free(scheme_rktio, intf_addr);
if (!r)
scheme_raise_exn(MZEXN_FAIL_NETWORK,
@ -2704,14 +2709,10 @@ START_XFORM_SKIP;
static void register_traversers(void)
{
#ifdef USE_TCP
GC_REG_TRAV(scheme_listener_type, mark_listener);
GC_REG_TRAV(scheme_rt_tcp, mark_tcp);
# ifdef UDP_IS_SUPPORTED
GC_REG_TRAV(scheme_udp_type, mark_udp);
GC_REG_TRAV(scheme_udp_evt_type, mark_udp_evt);
# endif
#endif
}
END_XFORM_SKIP;

View File

@ -484,14 +484,6 @@ void scheme_set_stdio_makers(Scheme_Stdio_Maker_Proc in,
/* fd arrays */
/*========================================================================*/
void scheme_alloc_global_fdset() {
scheme_semaphore_fd_set = rktio_ltps_open(scheme_rktio);
}
void scheme_free_global_fdset(void) {
rktio_ltps_close(scheme_rktio, scheme_semaphore_fd_set);
}
void scheme_add_fd_handle(void *h, void *fds, int repost)
{
#if defined(WIN32_FD_HANDLES)
@ -3596,7 +3588,7 @@ scheme_do_open_input_file(char *name, int offset, int argc, Scheme_Object *argv[
| (text_mode ? RKTIO_OPEN_TEXT : 0)));
if (!fd) {
filename_exn(name, "cannot open input file", filename, RKTIO_ERROR_DOES_NOT_EXIST);
filename_exn(name, "cannot open input file", filename, (for_module ? RKTIO_ERROR_DOES_NOT_EXIST : 0));
return NULL;
}
@ -4355,8 +4347,10 @@ void scheme_filesystem_change_evt_cancel(Scheme_Object *evt, void *ignored_data)
{
Scheme_Filesystem_Change_Evt *fc = (Scheme_Filesystem_Change_Evt *)evt;
rktio_fs_change_forget(scheme_rktio, fc->rfc);
fc->rfc = NULL;
if (fc->rfc) {
rktio_fs_change_forget(scheme_rktio, fc->rfc);
fc->rfc = NULL;
}
}
static int filesystem_change_evt_ready(Scheme_Object *evt, Scheme_Schedule_Info *sinfo)
@ -5881,7 +5875,7 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
rktio_fd_t *stdout_fd = NULL;
rktio_fd_t *stdin_fd = NULL;
rktio_fd_t *stderr_fd = NULL;
int need_forget_out = 0, need_forget_in = 0, need_forget_err = 0, new_process_group = 0;
int need_forget_out = 0, need_forget_in = 0, need_forget_err = 0;
rktio_envvars_t *envvars;
rktio_process_result_t *result;
Scheme_Config *config;
@ -6023,9 +6017,10 @@ static Scheme_Object *subprocess(int c, Scheme_Object *args[])
config = scheme_current_config();
cust_mode = scheme_get_param(config, MZCONFIG_SUBPROC_GROUP_ENABLED);
new_process_group = SCHEME_TRUEP(cust_mode);
cust_mode = scheme_get_param(config, MZCONFIG_SUBPROC_CUSTODIAN_MODE);
if (SCHEME_TRUEP(cust_mode))
flags |= RKTIO_PROCESS_NEW_GROUP;
cust_mode = scheme_get_param(config, MZCONFIG_SUBPROC_CUSTODIAN_MODE);
if (SCHEME_SYMBOLP(cust_mode)
&& !strcmp(SCHEME_SYM_VAL(cust_mode), "kill")
&& (rktio_process_allowed_flags(scheme_rktio) & RKTIO_PROCESS_WINDOWS_CHAIN_TERMINATION))

View File

@ -435,7 +435,6 @@ void scheme_register_network_evts();
void scheme_free_dynamic_extensions(void);
void scheme_free_all_code(void);
void scheme_free_global_fdset(void);
XFORM_NONGCING int scheme_is_multithreaded(int now);
@ -4405,7 +4404,6 @@ int scheme_is_user_port(Scheme_Object *port);
int scheme_byte_ready_or_user_port_ready(Scheme_Object *p, Scheme_Schedule_Info *sinfo);
int scheme_pipe_char_count(Scheme_Object *p);
void scheme_alloc_global_fdset();
Scheme_Object *scheme_port_name(Scheme_Object *p);
intptr_t scheme_port_closed_p (Scheme_Object *port);

View File

@ -73,6 +73,7 @@ rktio_ltps_handle_t *make_ltps_handle()
rktio_ltps_handle_t *s;
s = malloc(sizeof(rktio_ltps_handle_t));
s->data = NULL;
s->next = NULL;
return s;
}

View File

@ -927,7 +927,7 @@ int rktio_socket_shutdown(rktio_t *rktio, rktio_fd_t *rfd, int mode)
{
rktio_socket_t s = rktio_fd_system_fd(rktio, rfd);
if (!shutdown(s, ((mode == RKTIO_SHUTDOWN_READ) ? SHUT_RD : SHUT_RDWR))) {
if (shutdown(s, ((mode == RKTIO_SHUTDOWN_READ) ? SHUT_RD : SHUT_WR))) {
get_socket_error();
return 0;
}
@ -1129,10 +1129,6 @@ rktio_connect_t *rktio_start_connect(rktio_t *rktio, rktio_addrinfo_t *dest, rkt
{
rktio_connect_t *conn;
#ifdef USE_TCP
TCP_INIT("tcp-connect");
#endif
conn = malloc(sizeof(rktio_connect_t));
conn->dest = dest;
conn->src = src;
@ -1162,7 +1158,9 @@ static rktio_connect_t *try_connect(rktio_t *rktio, rktio_connect_t *conn)
fcntl(s, F_SETFL, RKTIO_NONBLOCKING);
RKTIO_WHEN_SET_SOCKBUF_SIZE(setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(int)));
#endif
status = connect(s, RKTIO_AS_ADDRINFO(addr)->ai_addr, RKTIO_AS_ADDRINFO(addr)->ai_addrlen);
#ifdef RKTIO_SYSTEM_UNIX
if (status)
status = errno;
@ -1192,15 +1190,18 @@ static rktio_connect_t *try_connect(rktio_t *rktio, rktio_connect_t *conn)
int rktio_poll_connect_ready(rktio_t *rktio, rktio_connect_t *conn)
{
if (!conn->inprogress) {
if (conn->inprogress)
return rktio_socket_poll_write_ready(rktio, conn->trying_fd);
} else
else
return RKTIO_POLL_READY;
}
void rktio_poll_add_connect(rktio_t *rktio, rktio_connect_t *conn, rktio_poll_set_t *fds)
{
rktio_poll_add(rktio, conn->trying_fd, fds, RKTIO_POLL_WRITE);
if (conn->inprogress)
rktio_poll_add(rktio, conn->trying_fd, fds, RKTIO_POLL_WRITE);
else
rktio_poll_set_add_nosleep(rktio, fds);
}
static void conn_free(rktio_connect_t *conn)
@ -1709,24 +1710,7 @@ rktio_fd_t *rktio_udp_open(rktio_t *rktio, rktio_addrinfo_t *addr, int family)
return NULL;
}
#ifdef RKTIO_SYSTEM_WINDOWS
{
unsigned long ioarg = 1;
BOOL bc = 1;
ioctlsocket(s, FIONBIO, &ioarg);
setsockopt(s, SOL_SOCKET, SO_BROADCAST, (char *)(&bc), sizeof(BOOL));
}
#else
fcntl(s, F_SETFL, RKTIO_NONBLOCKING);
# ifdef SO_BROADCAST
{
int bc = 1;
setsockopt(s, SOL_SOCKET, SO_BROADCAST, &bc, sizeof(bc));
}
# endif
#endif
return rktio_system_fd(rktio, s, RKTIO_OPEN_SOCKET | RKTIO_OPEN_UDP);
return rktio_system_fd(rktio, s, RKTIO_OPEN_SOCKET | RKTIO_OPEN_UDP | RKTIO_OPEN_INIT);
}
#ifdef UDP_DISCONNECT_EADRNOTAVAIL_OK
@ -1935,6 +1919,7 @@ char *rktio_udp_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd)
int status;
status = getsockopt(s, IPPROTO_IP, IP_MULTICAST_IF, (void *)&intf, &intf_len);
if (status) {
get_socket_error();
return NULL;
@ -1960,6 +1945,7 @@ int rktio_udp_set_multicast_interface(rktio_t *rktio, rktio_fd_t *rfd, rktio_add
}
status = setsockopt(s, IPPROTO_IP, IP_MULTICAST_IF, (void *)&intf, intf_len);
if (status) {
get_socket_error();
return 0;
@ -1988,10 +1974,8 @@ int rktio_udp_change_multicast_group(rktio_t *rktio, rktio_fd_t *rfd,
if (action == RKTIO_ADD_MEMBERSHIP)
optname = IP_ADD_MEMBERSHIP;
else if (action == RKTIO_DROP_MEMBERSHIP)
optname = IP_DROP_MEMBERSHIP;
else
optname = 0;
optname = IP_DROP_MEMBERSHIP;
status = setsockopt(s, IPPROTO_IP, optname, (void *) &mreq, mreq_len);

View File

@ -87,7 +87,7 @@ rktio_fd_t **rktio_make_pipe(rktio_t *rktio, int flags)
intptr_t a[2];
rktio_fd_t **rfds;
if (!rktio_make_os_pipe(rktio, a, flags))
if (rktio_make_os_pipe(rktio, a, flags))
return NULL;
rfds = malloc(sizeof(rktio_fd_t*) * 2);