diff --git a/src/racket/include/scheme.h b/src/racket/include/scheme.h index 90966a5a4b..9008485e02 100644 --- a/src/racket/include/scheme.h +++ b/src/racket/include/scheme.h @@ -2004,6 +2004,13 @@ extern Scheme_Extension_Table *scheme_extension_table; # define MZ_FD_ISSET(n, p) FD_ISSET(n, p) #endif +/* For scheme_fd_to_semaphore(): */ +#define MZFD_CREATE_READ 1 +#define MZFD_CREATE_WRITE 2 +#define MZFD_CHECK_READ 3 +#define MZFD_CHECK_WRITE 4 +#define MZFD_REMOVE 5 + /*========================================================================*/ #ifdef __cplusplus diff --git a/src/racket/include/schthread.h b/src/racket/include/schthread.h index 8c82bb1bae..4aa7c59b68 100644 --- a/src/racket/include/schthread.h +++ b/src/racket/include/schthread.h @@ -174,6 +174,8 @@ typedef struct Thread_Local_Variables { struct Scheme_Object *scheme_orig_stderr_port_; struct Scheme_Object *scheme_orig_stdin_port_; struct mz_fd_set *scheme_fd_set_; + struct mz_fd_set *scheme_semaphore_fd_set_; + struct Scheme_Hash_Table *scheme_semaphore_fd_mapping_; #ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS struct Scheme_Hash_Table *locked_fd_process_map_; #endif @@ -518,6 +520,8 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL; #define scheme_orig_stderr_port XOA (scheme_get_thread_local_variables()->scheme_orig_stderr_port_) #define scheme_orig_stdin_port XOA (scheme_get_thread_local_variables()->scheme_orig_stdin_port_) #define scheme_fd_set XOA (scheme_get_thread_local_variables()->scheme_fd_set_) +#define scheme_semaphore_fd_set XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_set_) +#define scheme_semaphore_fd_mapping XOA (scheme_get_thread_local_variables()->scheme_semaphore_fd_mapping_) #define locked_fd_process_map XOA (scheme_get_thread_local_variables()->locked_fd_process_map_) #define new_port_cust XOA (scheme_get_thread_local_variables()->new_port_cust_) #define scheme_break_semaphore XOA (scheme_get_thread_local_variables()->scheme_break_semaphore_) diff --git a/src/racket/src/mzmark_network.inc b/src/racket/src/mzmark_network.inc index 6ccad5e81e..16ec95cfae 100644 --- a/src/racket/src/mzmark_network.inc +++ b/src/racket/src/mzmark_network.inc @@ -12,6 +12,9 @@ static int mark_listener_MARK(void *p, struct NewGC *gc) { gcMARK2(l->mref, gc); +# ifdef HAVE_POLL_SYSCALL + gcMARK2(l->pfd, gc); +# endif return gcBYTES_TO_WORDS(sizeof(listener_t) + ((l->count - mzFLEX_DELTA) * sizeof(tcp_t))); @@ -22,6 +25,9 @@ static int mark_listener_FIXUP(void *p, struct NewGC *gc) { gcFIXUP2(l->mref, gc); +# ifdef HAVE_POLL_SYSCALL + gcFIXUP2(l->pfd, gc); +# endif return gcBYTES_TO_WORDS(sizeof(listener_t) + ((l->count - mzFLEX_DELTA) * sizeof(tcp_t))); diff --git a/src/racket/src/network.c b/src/racket/src/network.c index e071511f0e..09c71f5a93 100644 --- a/src/racket/src/network.c +++ b/src/racket/src/network.c @@ -294,6 +294,22 @@ void scheme_init_network(Scheme_Env *env) #endif } +static int check_fd_sema(tcp_t s, int mode, Scheme_Schedule_Info *sinfo) +{ + Scheme_Object *sema; + + sema = scheme_fd_to_semaphore(s, mode); + + if (sema) { + if (!scheme_wait_sema(sema, 1)) { + if (sinfo) + scheme_set_sync_target(sinfo, sema, NULL, NULL, 0, 0, NULL); + return 0; + } + } + + return 1; +} /*========================================================================*/ /* TCP glue */ @@ -888,13 +904,41 @@ static void TCP_INIT(char *name) /* Forward declaration */ static int stop_listener(Scheme_Object *o); -static int tcp_check_accept(Scheme_Object *_listener) +static Scheme_Object *listener_to_evt(listener_t *listener) +{ + Scheme_Object **a, *sema; + int i; + + a = MALLOC_N(Scheme_Object*, listener->count); + for (i = listener->count; i--; ) { + sema = scheme_fd_to_semaphore(listener->s[i], MZFD_CREATE_READ); + if (!sema) + return NULL; + a[i] = sema; + } + + return scheme_make_evt_set(listener->count, a); +} + +static int tcp_check_accept(Scheme_Object *_listener, Scheme_Schedule_Info *sinfo) { #ifdef USE_SOCKETS_TCP listener_t *listener = (listener_t *)_listener; -# ifdef HAVE_POLL_SYSCALL int sr, i; +# ifndef HAVE_POLL_SYSCALL + tcp_t s, mx; + DECL_OS_FDSET(readfds); + DECL_OS_FDSET(exnfds); + struct timeval time = {0, 0}; +# endif + for (i = listener->count; i--; ) { + if (check_fd_sema(listener->s[i], MZFD_CHECK_READ, NULL)) + break; + } + if (i < 0) return 0; + +# ifdef HAVE_POLL_SYSCALL if (LISTENER_WAS_CLOSED(listener)) return 1; @@ -909,13 +953,9 @@ static int tcp_check_accept(Scheme_Object *_listener) } } - return sr; + if (sr) + return sr; # else - tcp_t s, mx; - DECL_OS_FDSET(readfds); - DECL_OS_FDSET(exnfds); - struct timeval time = {0, 0}; - int sr, i; INIT_DECL_OS_RD_FDSET(readfds); INIT_DECL_OS_ER_FDSET(exnfds); @@ -948,9 +988,24 @@ static int tcp_check_accept(Scheme_Object *_listener) } } - return sr; + if (sr) + return sr; # endif + + if (sinfo) { + Scheme_Object *evt; + evt = listener_to_evt(listener); + if (evt) + scheme_set_sync_target(sinfo, evt, NULL, NULL, 0, 0, NULL); + } else { + for (i = listener->count; i--; ) { + check_fd_sema(listener->s[i], MZFD_CREATE_READ, NULL); + } + } + #endif + + return 0; } static void tcp_accept_needs_wakeup(Scheme_Object *_listener, void *fds) @@ -973,7 +1028,7 @@ static void tcp_accept_needs_wakeup(Scheme_Object *_listener, void *fds) #endif } -static int tcp_check_connect(Scheme_Object *connector_p) +static int tcp_check_connect(Scheme_Object *connector_p, Scheme_Schedule_Info *sinfo) { #ifdef USE_SOCKETS_TCP tcp_t s; @@ -981,6 +1036,9 @@ static int tcp_check_connect(Scheme_Object *connector_p) s = *(tcp_t *)connector_p; + if (!check_fd_sema(s, MZFD_CHECK_WRITE, sinfo)) + return 0; + # ifdef HAVE_POLL_SYSCALL { GC_CAN_IGNORE struct pollfd pfd[1]; @@ -990,9 +1048,9 @@ static int tcp_check_connect(Scheme_Object *connector_p) sr = poll(pfd, 1, 0); } while ((sr == -1) && (errno == EINTR)); - if (!sr) - return 0; - else if (pfd[0].revents & POLLOUT) + if (!sr) { + /* fall through */ + } else if (pfd[0].revents & POLLOUT) return 1; else return -1; @@ -1016,17 +1074,19 @@ static int tcp_check_connect(Scheme_Object *connector_p) sr = select(s + 1, NULL, writefds, exnfds, &time); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); - if (!sr) - return 0; - else if (FD_ISSET(s, exnfds)) + if (!sr) { + /* fall through */ + } else if (FD_ISSET(s, exnfds)) return -1; else return 1; } # endif -#else - return 0; + + check_fd_sema(s, MZFD_CREATE_WRITE, sinfo); + #endif + return 0; } static void tcp_connect_needs_wakeup(Scheme_Object *connector_p, void *fds) @@ -1043,13 +1103,16 @@ static void tcp_connect_needs_wakeup(Scheme_Object *connector_p, void *fds) #endif } -static int tcp_check_write(Scheme_Object *port) +static int tcp_check_write(Scheme_Object *port, Scheme_Schedule_Info *sinfo) { Scheme_Tcp *data = (Scheme_Tcp *)((Scheme_Output_Port *)port)->port_data; - + if (((Scheme_Output_Port *)port)->closed) return 1; + if (!check_fd_sema(data->tcp, MZFD_CHECK_WRITE, sinfo)) + return 0; + #ifdef USE_SOCKETS_TCP # ifdef HAVE_POLL_SYSCALL { @@ -1062,9 +1125,9 @@ static int tcp_check_write(Scheme_Object *port) sr = poll(pfd, 1, 0); } while ((sr == -1) && (errno == EINTR)); - if (!sr) - return 0; - else if (pfd[0].revents & POLLOUT) + if (!sr) { + /* fall through */ + } else if (pfd[0].revents & POLLOUT) return 1; else return -1; @@ -1091,7 +1154,8 @@ static int tcp_check_write(Scheme_Object *port) sr = select(s + 1, NULL, writefds, exnfds, &time); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); - return sr; + if (sr) + return sr; } # endif #else @@ -1115,6 +1179,10 @@ static int tcp_check_write(Scheme_Object *port) return !!bytes; } #endif + + check_fd_sema(data->tcp, MZFD_CREATE_WRITE, sinfo); + + return 0; } static void tcp_write_needs_wakeup(Scheme_Object *port, void *fds) @@ -1168,7 +1236,7 @@ static Scheme_Tcp *make_tcp_port_data(MAKE_TCP_ARG int refcount) return data; } -static int tcp_byte_ready (Scheme_Input_Port *port) +static int tcp_byte_ready (Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo) { Scheme_Tcp *data; #ifdef USE_SOCKETS_TCP @@ -1193,6 +1261,9 @@ static int tcp_byte_ready (Scheme_Input_Port *port) if (data->b.bufpos < data->b.bufmax) return 1; + if (!check_fd_sema(data->tcp, MZFD_CHECK_READ, sinfo)) + return 0; + #ifdef USE_SOCKETS_TCP # ifdef HAVE_POLL_SYSCALL { @@ -1204,7 +1275,8 @@ static int tcp_byte_ready (Scheme_Input_Port *port) sr = poll(pfd, 1, 0); } while ((sr == -1) && (errno == EINTR)); - return sr; + if (sr) + return sr; } # else { @@ -1217,11 +1289,14 @@ static int tcp_byte_ready (Scheme_Input_Port *port) sr = select(data->tcp + 1, readfds, NULL, exfds, &time); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); - return sr; + if (sr) + return sr; } # endif #endif + check_fd_sema(data->tcp, MZFD_CREATE_READ, sinfo); + return 0; } @@ -1257,16 +1332,23 @@ static intptr_t tcp_get_string(Scheme_Input_Port *port, return n; } - while (!tcp_byte_ready(port)) { + while (!tcp_byte_ready(port, NULL)) { if (nonblock > 0) return 0; #ifdef USE_SOCKETS_TCP - scheme_block_until_unless((Scheme_Ready_Fun)tcp_byte_ready, - scheme_need_wakeup, - (Scheme_Object *)port, - 0.0, unless, - nonblock); + { + Scheme_Object *sema; + sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_READ); + if (sema) + scheme_wait_sema(sema, nonblock ? -1 : 0); + else + scheme_block_until_unless((Scheme_Ready_Fun)tcp_byte_ready, + scheme_need_wakeup, + (Scheme_Object *)port, + 0.0, unless, + nonblock); + } #else do { scheme_thread_block_enable_break((float)0.0, nonblock); @@ -1378,6 +1460,7 @@ static void tcp_close_input(Scheme_Input_Port *port) closesocket(data->tcp); #endif + (void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE); } static int @@ -1457,8 +1540,17 @@ static intptr_t tcp_do_write_string(Scheme_Output_Port *port, return 0; /* Block for writing: */ - scheme_block_until_enable_break(tcp_check_write, tcp_write_needs_wakeup, (Scheme_Object *)port, - (float)0.0, enable_break); + { + Scheme_Object *sema; + sema = scheme_fd_to_semaphore(data->tcp, MZFD_CREATE_WRITE); + if (sema) + scheme_wait_sema(sema, enable_break ? -1 : 0); + else + scheme_block_until_enable_break((Scheme_Ready_Fun)tcp_check_write, + tcp_write_needs_wakeup, + (Scheme_Object *)port, + (float)0.0, enable_break); + } /* Closed while blocking? */ if (((Scheme_Output_Port *)port)->closed) { @@ -1576,6 +1668,7 @@ static void tcp_close_output(Scheme_Output_Port *port) closesocket(data->tcp); #endif + (void)scheme_fd_to_semaphore(data->tcp, MZFD_REMOVE); } static int @@ -1611,7 +1704,7 @@ make_tcp_input_port_symbol_name(void *data, Scheme_Object *name, Scheme_Object * NULL, scheme_progress_evt_via_get, scheme_peeked_read_via_get, - tcp_byte_ready, + (Scheme_In_Ready_Fun)tcp_byte_ready, tcp_close_input, tcp_need_wakeup, 1); @@ -1673,6 +1766,7 @@ typedef struct Close_Socket_Data { static void closesocket_w_decrement(Close_Socket_Data *csd) { closesocket(csd->s); + (void)scheme_fd_to_semaphore(csd->s, MZFD_REMOVE); if (csd->src_addr) mz_freeaddrinfo(csd->src_addr); mz_freeaddrinfo(csd->dest_addr); @@ -1796,6 +1890,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[]) if (inprogress) { tcp_t *sptr; Close_Socket_Data *csd; + Scheme_Object *sema; sptr = (tcp_t *)scheme_malloc_atomic(sizeof(tcp_t)); *sptr = s; @@ -1805,8 +1900,16 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[]) csd->src_addr = tcp_connect_src; csd->dest_addr = tcp_connect_dest; + sema = scheme_fd_to_semaphore(s, MZFD_CREATE_WRITE); + BEGIN_ESCAPEABLE(closesocket_w_decrement, csd); - scheme_block_until(tcp_check_connect, tcp_connect_needs_wakeup, (void *)sptr, (float)0.0); + if (sema) + scheme_wait_sema(sema, 0); + else + scheme_block_until((Scheme_Ready_Fun)tcp_check_connect, + tcp_connect_needs_wakeup, + (void *)sptr, + (float)0.0); END_ESCAPEABLE(); /* Check whether connect succeeded, or get error: */ @@ -1823,7 +1926,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[]) /* getsockopt() seems not to work in Windows 95, so use the result from select(), which seems to reliably detect an error condition */ if (!status) { - if (tcp_check_connect((Scheme_Object *)sptr) == -1) { + if (tcp_check_connect((Scheme_Object *)sptr, NULL) == -1) { status = 1; errno = WSAECONNREFUSED; /* guess! */ } @@ -1851,6 +1954,7 @@ static Scheme_Object *tcp_connect(int argc, Scheme_Object *argv[]) } else { errid = errno; closesocket(s); + (void)scheme_fd_to_semaphore(s, MZFD_REMOVE); errpart = 6; } } else { @@ -2055,6 +2159,7 @@ tcp_listen(int argc, Scheme_Object *argv[]) } else { errid = errno; closesocket(s); + (void)scheme_fd_to_semaphore(s, MZFD_REMOVE); errno = errid; s = INVALID_SOCKET; } @@ -2197,6 +2302,7 @@ static int stop_listener(Scheme_Object *o) s = listener->s[i]; UNREGISTER_SOCKET(s); closesocket(s); + (void)scheme_fd_to_semaphore(s, MZFD_REMOVE); listener->s[i] = INVALID_SOCKET; } scheme_remove_managed(((listener_t *)o)->mref, o); @@ -2251,7 +2357,7 @@ tcp_accept_ready(int argc, Scheme_Object *argv[]) return NULL; } - ready = tcp_check_accept(argv[0]); + ready = tcp_check_accept(argv[0], NULL); return (ready ? scheme_true : scheme_false); #else @@ -2283,10 +2389,18 @@ do_tcp_accept(int argc, Scheme_Object *argv[], Scheme_Object *cust, char **_fail was_closed = LISTENER_WAS_CLOSED(listener); if (!was_closed) { - ready_pos = tcp_check_accept(listener); + ready_pos = tcp_check_accept(listener, NULL); if (!ready_pos) { - scheme_block_until(tcp_check_accept, tcp_accept_needs_wakeup, listener, 0.0); - ready_pos = tcp_check_accept(listener); + Scheme_Object *evt; + evt = listener_to_evt((listener_t *)listener); + if (evt) + scheme_sync(1, &evt); + else + scheme_block_until((Scheme_Ready_Fun)tcp_check_accept, + tcp_accept_needs_wakeup, + listener, + 0.0); + ready_pos = tcp_check_accept(listener, NULL); } was_closed = LISTENER_WAS_CLOSED(listener); } else @@ -2369,7 +2483,7 @@ tcp_accept_break(int argc, Scheme_Object *argv[]) void register_network_evts() { #ifdef USE_TCP - scheme_add_evt(scheme_listener_type, tcp_check_accept, tcp_accept_needs_wakeup, NULL, 0); + scheme_add_evt(scheme_listener_type, (Scheme_Ready_Fun)tcp_check_accept, tcp_accept_needs_wakeup, NULL, 0); scheme_add_evt(scheme_tcp_accept_evt_type, (Scheme_Ready_Fun)tcp_check_accept_evt, tcp_accept_evt_needs_wakeup, NULL, 0); # ifdef UDP_IS_SUPPORTED scheme_add_evt(scheme_udp_evt_type, (Scheme_Ready_Fun)udp_evt_check_ready, udp_evt_needs_wakeup, NULL, 0); @@ -2621,7 +2735,7 @@ static Scheme_Object *accept_failed(void *msg, int argc, Scheme_Object **argv) static int tcp_check_accept_evt(Scheme_Object *ae, Scheme_Schedule_Info *sinfo) { - if (tcp_check_accept(SCHEME_PTR1_VAL(ae))) { + if (tcp_check_accept(SCHEME_PTR1_VAL(ae), NULL)) { Scheme_Object *a[2]; char *fail_reason = NULL; a[0] = SCHEME_PTR1_VAL(ae); @@ -2759,6 +2873,7 @@ void scheme_close_socket_fd(intptr_t fd) #ifdef USE_SOCKETS_TCP UNREGISTER_SOCKET(fd); closesocket(fd); + (void)scheme_fd_to_semaphore(fd, MZFD_REMOVE); #endif } @@ -2786,6 +2901,7 @@ static int udp_close_it(Scheme_Object *_udp) if (udp->s != INVALID_SOCKET) { closesocket(udp->s); + (void)scheme_fd_to_semaphore(udp->s, MZFD_REMOVE); udp->s = INVALID_SOCKET; scheme_remove_managed(udp->mref, (Scheme_Object *)udp); @@ -3092,13 +3208,16 @@ static Scheme_Object *udp_connect(int argc, Scheme_Object *argv[]) #ifdef UDP_IS_SUPPORTED -static int udp_check_send(Scheme_Object *_udp) +static int udp_check_send(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) { Scheme_UDP *udp = (Scheme_UDP *)_udp; if (udp->s == INVALID_SOCKET) return 1; + if (!check_fd_sema(udp->s, MZFD_CHECK_WRITE, sinfo)) + return 0; + # ifdef HAVE_POLL_SYSCALL { GC_CAN_IGNORE struct pollfd pfd[1]; @@ -3110,7 +3229,8 @@ static int udp_check_send(Scheme_Object *_udp) sr = poll(pfd, 1, 0); } while ((sr == -1) && (errno == EINTR)); - return sr; + if (sr) + return sr; } # else { @@ -3130,10 +3250,15 @@ static int udp_check_send(Scheme_Object *_udp) do { sr = select(udp->s + 1, NULL, writefds, exnfds, &time); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); - - return sr; + + if (sr) + return sr; } #endif + + check_fd_sema(udp->s, MZFD_CREATE_WRITE, sinfo); + + return 0; } static void udp_send_needs_wakeup(Scheme_Object *_udp, void *fds) @@ -3190,7 +3315,15 @@ static Scheme_Object *do_udp_send_it(const char *name, Scheme_UDP *udp, if (WAS_EAGAIN(errid)) { if (can_block) { /* Block and eventually try again. */ - scheme_block_until(udp_check_send, udp_send_needs_wakeup, (Scheme_Object *)udp, 0); + Scheme_Object *sema; + sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_WRITE); + if (sema) + scheme_wait_sema(sema, 0); + else + scheme_block_until((Scheme_Ready_Fun)udp_check_send, + udp_send_needs_wakeup, + (Scheme_Object *)udp, + 0); } else return scheme_false; } else if (NOT_WINSOCK(errid) != EINTR) @@ -3339,13 +3472,16 @@ static Scheme_Object *udp_send_enable_break(int argc, Scheme_Object *argv[]) #ifdef UDP_IS_SUPPORTED -static int udp_check_recv(Scheme_Object *_udp) +static int udp_check_recv(Scheme_Object *_udp, Scheme_Schedule_Info *sinfo) { Scheme_UDP *udp = (Scheme_UDP *)_udp; if (udp->s == INVALID_SOCKET) return 1; + if (!check_fd_sema(udp->s, MZFD_CHECK_READ, sinfo)) + return 0; + # ifdef HAVE_POLL_SYSCALL { GC_CAN_IGNORE struct pollfd pfd[1]; @@ -3357,7 +3493,8 @@ static int udp_check_recv(Scheme_Object *_udp) sr = poll(pfd, 1, 0); } while ((sr == -1) && (errno == EINTR)); - return sr; + if (sr) + return sr; } # else { @@ -3378,9 +3515,14 @@ static int udp_check_recv(Scheme_Object *_udp) sr = select(udp->s + 1, readfds, NULL, exnfds, &time); } while ((sr == -1) && (NOT_WINSOCK(errno) == EINTR)); - return sr; + if (sr) + return sr; } # endif + + check_fd_sema(udp->s, MZFD_CREATE_READ, sinfo); + + return 0; } static void udp_recv_needs_wakeup(Scheme_Object *_udp, void *fds) @@ -3438,7 +3580,15 @@ static int do_udp_recv(const char *name, Scheme_UDP *udp, char *bstr, intptr_t s } if (WAS_EAGAIN(errid)) { if (can_block) { /* Block and eventually try again. */ - scheme_block_until(udp_check_recv, udp_recv_needs_wakeup, (Scheme_Object *)udp, 0); + Scheme_Object *sema; + sema = scheme_fd_to_semaphore(udp->s, MZFD_CREATE_READ); + if (sema) + scheme_wait_sema(sema, 0); + else + scheme_block_until((Scheme_Ready_Fun)udp_check_recv, + udp_recv_needs_wakeup, + (Scheme_Object *)udp, + 0); } else { v[0] = scheme_false; v[1] = scheme_false; @@ -3614,7 +3764,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo) } else return 0; } else { - return udp_check_recv((Scheme_Object *)uw->udp); + return udp_check_recv((Scheme_Object *)uw->udp, NULL); } } else { if (uw->str) { @@ -3629,7 +3779,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo) } else return 0; } else - return udp_check_send((Scheme_Object *)uw->udp); + return udp_check_send((Scheme_Object *)uw->udp, NULL); } } diff --git a/src/racket/src/port.c b/src/racket/src/port.c index f5f5063cac..623c7fb555 100644 --- a/src/racket/src/port.c +++ b/src/racket/src/port.c @@ -344,6 +344,8 @@ THREAD_LOCAL_DECL(Scheme_Object *scheme_orig_stderr_port); THREAD_LOCAL_DECL(Scheme_Object *scheme_orig_stdin_port); THREAD_LOCAL_DECL(struct mz_fd_set *scheme_fd_set); +THREAD_LOCAL_DECL(struct mz_fd_set *scheme_semaphore_fd_set); +THREAD_LOCAL_DECL(Scheme_Hash_Table *scheme_semaphore_fd_mapping); #ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS THREAD_LOCAL_DECL(Scheme_Hash_Table *locked_fd_process_map); @@ -781,6 +783,19 @@ void scheme_alloc_global_fdset() { REGISTER_SO(scheme_fd_set); scheme_fd_set = (struct mz_fd_set *)scheme_alloc_fdset_array(3, 0); #endif + + REGISTER_SO(scheme_semaphore_fd_set); +#ifdef USE_FAR_MZ_FDCALLS + scheme_semaphore_fd_set = (struct mz_fd_set *)scheme_alloc_fdset_array(3, 0); +#else + scheme_semaphore_fd_set = (struct mz_fd_set *)scheme_malloc_atomic(sizeof(struct mz_fd_set)); +#endif + scheme_fdzero(MZ_GET_FDSET(scheme_semaphore_fd_set, 0)); + scheme_fdzero(MZ_GET_FDSET(scheme_semaphore_fd_set, 1)); + scheme_fdzero(MZ_GET_FDSET(scheme_semaphore_fd_set, 2)); + + REGISTER_SO(scheme_semaphore_fd_mapping); + scheme_semaphore_fd_mapping = scheme_make_hash_table_eqv(); } #ifdef HAVE_POLL_SYSCALL @@ -815,7 +830,7 @@ void *scheme_alloc_fdset_array(int count, int permanent) data->pfd = pfd; if (permanent) - scheme_dont_gc_ptr(data); + scheme_dont_gc_ptr(r); return r; } @@ -844,18 +859,35 @@ void scheme_fdzero(void *fd) ((struct mz_fd_set *)fd)->data->count = scheme_make_integer(0); } +static int find_fd_pos(struct mz_fd_set_data *data, int n) +{ + intptr_t count = SCHEME_INT_VAL(data->count); + intptr_t i; + Scheme_Object *v; + + /* This linear search probably isn't good enough for hundreds or + thousands of descriptors, but epoll()/kqueue() mode should handle + that case, anyway. */ + for (i = 0; i < count; i++) { + if (data->pfd[i].fd == n) { + return i; + } + } + + return -1; +} + void scheme_fdclr(void *fd, int n) { struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data; intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags); - intptr_t count = SCHEME_INT_VAL(data->count); - intptr_t i; + intptr_t pos; - for (i = 0; i < count; i++) { - if (data->pfd[i].fd == n) { - data->pfd[i].events -= (data->pfd[i].events & flag); - return; - } + if (!flag) return; + + pos = find_fd_pos(data, n); + if (pos >= 0) { + data->pfd[pos].events -= (data->pfd[pos].events & flag); } } @@ -863,17 +895,18 @@ void scheme_fdset(void *fd, int n) { struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data; intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags); - intptr_t count = SCHEME_INT_VAL(data->count); - intptr_t size, i; + intptr_t count, size, pos; struct pollfd *pfd; - for (i = 0; i < count; i++) { - if (data->pfd[i].fd == n) { - data->pfd[i].events |= flag; - return; - } + if (!flag) return; + + pos = find_fd_pos(data, n); + if (pos >= 0) { + data->pfd[pos].events |= flag; + return; } + count = SCHEME_INT_VAL(data->count); size = SCHEME_INT_VAL(data->size); if (count >= size) { size = size * 2; @@ -893,23 +926,113 @@ int scheme_fdisset(void *fd, int n) { struct mz_fd_set_data *data = ((struct mz_fd_set *)fd)->data; intptr_t flag = SCHEME_INT_VAL(((struct mz_fd_set *)fd)->flags); - intptr_t count = SCHEME_INT_VAL(data->count); - intptr_t i; + intptr_t pos; if (!flag) flag = (POLLERR | POLLHUP); - for (i = 0; i < count; i++) { - if (data->pfd[i].fd == n) { - if (data->pfd[i].revents & flag) - return 1; - else - return 0; - } + pos = find_fd_pos(data, n); + if (pos >= 0) { + if (data->pfd[pos].revents & flag) + return 1; + else + return 0; } return 0; } +static int cmp_fd(const void *_a, const void *_b) +{ + struct pollfd *a = (struct pollfd *)_a; + struct pollfd *b = (struct pollfd *)_b; + return a->fd - b->fd; +} + +void *scheme_merge_fd_sets(void *fds, void *src_fds) +{ + struct mz_fd_set_data *data = ((struct mz_fd_set *)fds)->data; + struct mz_fd_set_data *src_data = ((struct mz_fd_set *)src_fds)->data; + int i, si, c, sc, j, nc; + struct pollfd *pfds; + + scheme_clean_fd_set(fds); + scheme_clean_fd_set(src_fds); + + c = SCHEME_INT_VAL(data->count); + sc = SCHEME_INT_VAL(src_data->count); + + if (!c) + return src_fds; + if (!sc) + return fds; + + qsort(data->pfd, c, sizeof(struct pollfd), cmp_fd); + qsort(src_data->pfd, sc, sizeof(struct pollfd), cmp_fd); + + nc = c + sc; + pfds = (struct pollfd *)scheme_malloc_atomic(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE)); + j = 0; + for (i = 0, si = 0; (i < c) && (si < sc); ) { + if (data->pfd[i].fd == src_data->pfd[si].fd) { + pfds[j].fd = data->pfd[i].fd; + pfds[j].events = (data->pfd[i].events | src_data->pfd[si].events); + i++; + si++; + } else if (data->pfd[i].fd < src_data->pfd[si].fd) { + pfds[j].fd = data->pfd[i].fd; + pfds[j].events = data->pfd[i].events; + i++; + } else { + pfds[j].fd = src_data->pfd[si].fd; + pfds[j].events = src_data->pfd[si].events; + si++; + } + j++; + } + for ( ; i < c; i++, j++) { + pfds[j].fd = data->pfd[i].fd; + pfds[j].events = data->pfd[i].events; + } + for ( ; si < sc; si++, j++) { + pfds[j].fd = src_data->pfd[si].fd; + pfds[j].events = src_data->pfd[si].events; + } + + if (nc > SCHEME_INT_VAL(data->size)) { + data->pfd = pfds; + data->size = scheme_make_integer(nc); + } else + memcpy(data->pfd, pfds, j * sizeof(struct pollfd)); + data->count = scheme_make_integer(j); + + return fds; +} + +void scheme_clean_fd_set(void *fds) +{ + struct mz_fd_set_data *data = ((struct mz_fd_set *)fds)->data; + intptr_t count = SCHEME_INT_VAL(data->count); + intptr_t i, j = 0; + + for (i = 0; i < count; i++) { + if (data->pfd[i].events) { + if (j < i) { + data->pfd[j].fd = data->pfd[i].fd; + data->pfd[j].events = data->pfd[i].events; + } + j++; + } + } + + count = j; + data->count = scheme_make_integer(count); +} + +int scheme_get_fd_limit(void *fds) +{ + return 0; +} + #else # if defined(USE_DYNAMIC_FDSET_SIZE) @@ -1115,6 +1238,79 @@ int scheme_fdisset(void *fd, int n) #endif } +void *scheme_merge_fd_sets(void *fds, void *src_fds) +{ +#if defined(WIN32_FD_HANDLES) + win_extended_fd_set *efd = (win_extended_fd_set *)src_fds; + int i; + for (i = SCHEME_INT_VAL(efd->added); i--; ) { + if (efd->sockets[i] != INVALID_SOCKET) + scheme_fdset(fds, efd->sockets[i]); + } + return fds; +#else + int i, j; + GC_CAN_IGNORE unsigned char *p, *sp; + for (j = 0; j < 3; j++) { + p = scheme_get_fdset(fds, j); + sp = scheme_get_fdset(src_fds, j); + if (FDSET_LIMIT(sp) > FDSET_LIMIT(p)) { + i = FDSET_LIMIT(sp); + FDSET_LIMIT(p) = i; + } +# if defined(USE_DYNAMIC_FDSET_SIZE) + i = dynamic_fd_size; +# else + i = sizeof(fd_set); +# endif + for (; i--; p++, sp++) { + *p |= *sp; + } + } + return fds; +#endif +} + +void scheme_clean_fd_set(void *fds) +{ +} + +int scheme_get_fd_limit(void *fds) +{ + int limit, actual_limit; + fd_set *rd, *wr, *ex; + +# ifdef USE_WINSOCK_TCP + limit = 0; +# else +# ifdef USE_ULIMIT + limit = ulimit(4, 0); +# else +# ifdef FIXED_FD_LIMIT + limit = FIXED_FD_LIMIT; +# else + limit = getdtablesize(); +# endif +# endif +# endif + + rd = (fd_set *)fds; + wr = (fd_set *)MZ_GET_FDSET(fds, 1); + ex = (fd_set *)MZ_GET_FDSET(fds, 2); +# ifdef STORED_ACTUAL_FDSET_LIMIT + actual_limit = FDSET_LIMIT(rd); + if (FDSET_LIMIT(wr) > actual_limit) + actual_limit = FDSET_LIMIT(wr); + if (FDSET_LIMIT(ex) > actual_limit) + actual_limit = FDSET_LIMIT(ex); + actual_limit++; +# else + actual_limit = limit; +# endif + + return actual_limit; +} + #endif void scheme_add_fd_handle(void *h, void *fds, int repost) @@ -1691,9 +1887,9 @@ static int output_ready(Scheme_Object *port, Scheme_Schedule_Info *sinfo) } if (op->ready_fun) { - Scheme_Out_Ready_Fun rf; - rf = op->ready_fun; - return rf(op); + Scheme_Out_Ready_Fun_FPC rf; + rf = (Scheme_Out_Ready_Fun_FPC)op->ready_fun; + return rf(op, sinfo); } return 1; @@ -1714,6 +1910,8 @@ static void output_need_wakeup (Scheme_Object *port, void *fds) } } +static int byte_input_ready (Scheme_Object *port, Scheme_Schedule_Info *sinfo); + int scheme_byte_ready_or_user_port_ready(Scheme_Object *p, Scheme_Schedule_Info *sinfo) { Scheme_Input_Port *ip; @@ -1732,14 +1930,14 @@ int scheme_byte_ready_or_user_port_ready(Scheme_Object *p, Scheme_Schedule_Info scheduler isn't requesting the status, we need sinfo. */ return scheme_user_port_byte_probably_ready(ip, sinfo); } else - return scheme_byte_ready(p); + return byte_input_ready(p, sinfo); } static void register_port_wait() { scheme_add_evt(scheme_input_port_type, - (Scheme_Ready_Fun)scheme_byte_ready_or_user_port_ready, scheme_need_wakeup, - evt_input_port_p, 1); + (Scheme_Ready_Fun)scheme_byte_ready_or_user_port_ready, scheme_need_wakeup, + evt_input_port_p, 1); scheme_add_evt(scheme_output_port_type, (Scheme_Ready_Fun)output_ready, output_need_wakeup, evt_output_port_p, 1); @@ -3381,8 +3579,7 @@ scheme_ungetc (int ch, Scheme_Object *port) } } -int -scheme_byte_ready (Scheme_Object *port) +int byte_input_ready (Scheme_Object *port, Scheme_Schedule_Info *sinfo) { Scheme_Input_Port *ip; int retval; @@ -3397,13 +3594,20 @@ scheme_byte_ready (Scheme_Object *port) || pipe_char_count(ip->peeked_read))) retval = 1; else { - Scheme_In_Ready_Fun f = ip->byte_ready_fun; - retval = f(ip); + Scheme_In_Ready_Fun_FPC f; + f = (Scheme_In_Ready_Fun_FPC)ip->byte_ready_fun; + retval = f(ip, NULL); } return retval; } +int +scheme_byte_ready (Scheme_Object *port) +{ + return byte_input_ready(port, NULL); +} + int scheme_char_ready (Scheme_Object *port) { @@ -5760,17 +5964,26 @@ static intptr_t fd_get_string_slow(Scheme_Input_Port *port, int none_avail = 0; int target_size, target_offset, ext_target; char *target; + Scheme_Object *sema; /* If no chars appear to be ready, go to sleep. */ while (!fd_byte_ready(port)) { if (nonblock > 0) return 0; - - scheme_block_until_unless((Scheme_Ready_Fun)fd_byte_ready, - (Scheme_Needs_Wakeup_Fun)fd_need_wakeup, - (Scheme_Object *)port, - 0.0, unless, - nonblock); + +#ifdef WINDOWS_FILE_HANDLES + sema = NULL; +#else + sema = scheme_fd_to_semaphore(fip->fd, MZFD_CREATE_READ); +#endif + if (sema) + scheme_wait_sema(sema, nonblock ? -1 : 0); + else + scheme_block_until_unless((Scheme_Ready_Fun)fd_byte_ready, + (Scheme_Needs_Wakeup_Fun)fd_need_wakeup, + (Scheme_Object *)port, + 0.0, unless, + nonblock); scheme_wait_input_allowed(port, nonblock); @@ -6061,6 +6274,7 @@ fd_close_input(Scheme_Input_Port *port) # ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS release_lockf(fip->fd); # endif + (void)scheme_fd_to_semaphore(fip->fd, MZFD_REMOVE); } } #endif @@ -7181,16 +7395,27 @@ static intptr_t flush_fd(Scheme_Output_Port *op, return wrote; } else if (full_write_buffer) { /* Need to block; remember that we're holding a lock. */ + Scheme_Object *sema; + if (immediate_only == 2) { fop->flushing = 0; return wrote; } +#ifdef WINDOWS_FILE_HANDLES + sema = NULL; +#else + sema = scheme_fd_to_semaphore(fop->fd, MZFD_CREATE_WRITE); +#endif + BEGIN_ESCAPEABLE(release_flushing_lock, fop); - scheme_block_until_enable_break(fd_write_ready, - fd_write_need_wakeup, - (Scheme_Object *)op, 0.0, - enable_break); + if (sema) + scheme_wait_sema(sema, enable_break ? -1 : 0); + else + scheme_block_until_enable_break(fd_write_ready, + fd_write_need_wakeup, + (Scheme_Object *)op, 0.0, + enable_break); END_ESCAPEABLE(); } else { fop->flushing = 0; @@ -7352,6 +7577,7 @@ fd_close_output(Scheme_Output_Port *port) # ifdef USE_FCNTL_AND_FORK_FOR_FILE_LOCKS release_lockf(fop->fd); # endif + (void)scheme_fd_to_semaphore(fop->fd, MZFD_REMOVE); } } #endif @@ -9387,7 +9613,7 @@ static void default_sleep(float v, void *fds) #if defined(FILES_HAVE_FDS) || defined(USE_WINSOCK_TCP) # ifndef HAVE_POLL_SYSCALL - int limit, actual_limit; + int actual_limit; fd_set *rd, *wr, *ex; struct timeval time; # endif @@ -9416,33 +9642,11 @@ static void default_sleep(float v, void *fds) time.tv_usec = usecs; } -# ifdef USE_WINSOCK_TCP - limit = 0; -# else -# ifdef USE_ULIMIT - limit = ulimit(4, 0); -# else -# ifdef FIXED_FD_LIMIT - limit = FIXED_FD_LIMIT; -# else - limit = getdtablesize(); -# endif -# endif -# endif - rd = (fd_set *)fds; wr = (fd_set *)MZ_GET_FDSET(fds, 1); ex = (fd_set *)MZ_GET_FDSET(fds, 2); -# ifdef STORED_ACTUAL_FDSET_LIMIT - actual_limit = FDSET_LIMIT(rd); - if (FDSET_LIMIT(wr) > actual_limit) - actual_limit = FDSET_LIMIT(wr); - if (FDSET_LIMIT(ex) > actual_limit) - actual_limit = FDSET_LIMIT(ex); - actual_limit++; -# else - actual_limit = limit; -# endif + + actual_limit = scheme_get_fd_limit(fds); # endif /******* Start Windows stuff *******/ diff --git a/src/racket/src/schemef.h b/src/racket/src/schemef.h index a902356161..a7c88b5af4 100644 --- a/src/racket/src/schemef.h +++ b/src/racket/src/schemef.h @@ -949,6 +949,8 @@ MZ_EXTERN int scheme_get_port_socket(Scheme_Object *p, intptr_t *_s); MZ_EXTERN void scheme_socket_to_ports(intptr_t s, const char *name, int takeover, Scheme_Object **_inp, Scheme_Object **_outp); +MZ_EXTERN Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode); + MZ_EXTERN void scheme_set_type_printer(Scheme_Type stype, Scheme_Type_Printer printer); MZ_EXTERN void scheme_print_bytes(Scheme_Print_Params *pp, const char *str, int offset, int len); MZ_EXTERN void scheme_print_utf8(Scheme_Print_Params *pp, const char *str, int offset, int len); diff --git a/src/racket/src/schemex.h b/src/racket/src/schemex.h index 74a2d36928..cf081dfd22 100644 --- a/src/racket/src/schemex.h +++ b/src/racket/src/schemex.h @@ -780,6 +780,7 @@ intptr_t (*scheme_get_port_fd)(Scheme_Object *p); int (*scheme_get_port_socket)(Scheme_Object *p, intptr_t *_s); void (*scheme_socket_to_ports)(intptr_t s, const char *name, int takeover, Scheme_Object **_inp, Scheme_Object **_outp); +Scheme_Object *(*scheme_fd_to_semaphore)(intptr_t fd, int mode); void (*scheme_set_type_printer)(Scheme_Type stype, Scheme_Type_Printer printer); void (*scheme_print_bytes)(Scheme_Print_Params *pp, const char *str, int offset, int len); void (*scheme_print_utf8)(Scheme_Print_Params *pp, const char *str, int offset, int len); diff --git a/src/racket/src/schfd.h b/src/racket/src/schfd.h index e0737a62bb..080c8534ad 100644 --- a/src/racket/src/schfd.h +++ b/src/racket/src/schfd.h @@ -31,3 +31,7 @@ struct mz_fd_set { fd_set data; }; # define INIT_DECL_WR_FDSET(r) /* empty */ # define INIT_DECL_ER_FDSET(r) /* empty */ #endif + +void *scheme_merge_fd_sets(void *fds, void *src_fds); +void scheme_clean_fd_set(void *fds); +int scheme_get_fd_limit(void *fds); diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index d98655a1e8..345b33ae9d 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -611,6 +611,8 @@ struct Syncing; void scheme_accept_sync(struct Syncing *syncing, int i); typedef int (*Scheme_Ready_Fun_FPC)(Scheme_Object *o, Scheme_Schedule_Info *sinfo); +typedef int (*Scheme_Out_Ready_Fun_FPC)(Scheme_Output_Port *port, Scheme_Schedule_Info *sinfo); +typedef int (*Scheme_In_Ready_Fun_FPC)(Scheme_Input_Port *port, Scheme_Schedule_Info *sinfo); void scheme_check_break_now(void); diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index 0194ea5a95..70505747b2 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -56,6 +56,10 @@ # ifdef USE_BEOS_SOCKET_INCLUDE # include # endif +# ifdef HAVE_POLL_SYSCALL +# include +# endif +# include #endif #ifdef USE_WINSOCK_TCP # ifdef USE_TCP @@ -3385,12 +3389,194 @@ static Scheme_Object *call_as_nested_thread(int argc, Scheme_Object *argv[]) /* thread scheduling and termination */ /*========================================================================*/ +Scheme_Object *scheme_fd_to_semaphore(intptr_t fd, int mode) +{ +#ifdef USE_WINSOCK_TCP + return NULL; +#else + Scheme_Object *key, *v, *s; + void *r, *w, *e; + + if (!scheme_semaphore_fd_mapping) + return NULL; + + key = scheme_make_integer_value(fd); + v = scheme_hash_get(scheme_semaphore_fd_mapping, key); + if (!v && ((mode == MZFD_CHECK_READ) + || (mode == MZFD_CHECK_WRITE) + || (mode == MZFD_REMOVE))) + return NULL; + + if (!v) { + v = scheme_make_vector(2, scheme_false); + scheme_hash_set(scheme_semaphore_fd_mapping, key, v); + } + + r = MZ_GET_FDSET(scheme_semaphore_fd_set, 0); + w = MZ_GET_FDSET(scheme_semaphore_fd_set, 1); + e = MZ_GET_FDSET(scheme_semaphore_fd_set, 2); + + if (mode == MZFD_REMOVE) { + scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL); + MZ_FD_CLR(fd, r); + MZ_FD_CLR(fd, w); + MZ_FD_CLR(fd, e); + s = NULL; + } else if ((mode == MZFD_CHECK_READ) + || (mode == MZFD_CREATE_READ)) { + s = SCHEME_VEC_ELS(v)[0]; + if (SCHEME_FALSEP(s)) { + if (mode == MZFD_CREATE_READ) { + s = scheme_make_sema(0); + SCHEME_VEC_ELS(v)[0] = s; + MZ_FD_SET(fd, r); + MZ_FD_SET(fd, e); + } + } + } else { + s = SCHEME_VEC_ELS(v)[1]; + if (SCHEME_FALSEP(s)) { + if (mode == MZFD_CREATE_WRITE) { + s = scheme_make_sema(0); + SCHEME_VEC_ELS(v)[1] = s; + MZ_FD_SET(fd, w); + MZ_FD_SET(fd, e); + } + } + } + + return s; +#endif +} + +static int check_fd_semaphores() +{ +#ifdef USE_WINSOCK_TCP + return 0; +#elif defined(HAVE_POLL_SYSCALL) + struct pollfd *pfd; + intptr_t i, c; + Scheme_Object *v, *s, *key; + int sr, hit = 0; + + if (!scheme_semaphore_fd_mapping || !scheme_semaphore_fd_mapping->count) + return 0; + + scheme_clean_fd_set(scheme_semaphore_fd_set); + c = SCHEME_INT_VAL(scheme_semaphore_fd_set->data->count); + pfd = scheme_semaphore_fd_set->data->pfd; + + do { + sr = poll(pfd, c, 0); + } while ((sr == -1) && (errno == EINTR)); + + if (sr > 0) { + for (i = 0; i < c; i++) { + if (pfd[i].revents) { + key = scheme_make_integer_value(pfd[i].fd); + v = scheme_hash_get(scheme_semaphore_fd_mapping, key); + if (v) { + if (pfd[i].revents & (POLLIN | POLLHUP | POLLERR)) { + s = SCHEME_VEC_ELS(v)[0]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[0] = scheme_false; + } + pfd[i].revents -= (pfd[i].revents & POLLIN); + } + if (pfd[i].revents & (POLLOUT | POLLHUP | POLLERR)) { + s = SCHEME_VEC_ELS(v)[1]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[1] = scheme_false; + } + pfd[i].revents -= (pfd[i].revents & POLLOUT); + } + if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]) + && SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1])) + scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL); + } + } + } + } + + return hit; +#else + void *fds; + struct timeval time = {0, 0}; + int i, actual_limit, r, w, e, sr, hit = 0; + Scheme_Object *key, *v, *s; + DECL_FDSET(set, 3); + fd_set *set1, *set2; + + if (!scheme_semaphore_fd_mapping || !scheme_semaphore_fd_mapping->count) + return 0; + + INIT_DECL_FDSET(set, set1, set2); + set1 = (fd_set *) MZ_GET_FDSET(set, 1); + set2 = (fd_set *) MZ_GET_FDSET(set, 2); + + fds = (void *)set; + MZ_FD_ZERO(set); + MZ_FD_ZERO(set1); + MZ_FD_ZERO(set2); + + scheme_merge_fd_sets(fds, scheme_semaphore_fd_set); + + actual_limit = scheme_get_fd_limit(fds); + + do { + sr = select(actual_limit, set, set1, set2, &time); + } while ((sr == -1) && (errno == EINTR)); + + if (sr > 0) { + for (i = 0; i < actual_limit; i++) { + r = MZ_FD_ISSET(i, set); + w = MZ_FD_ISSET(i, set1); + e = MZ_FD_ISSET(i, set2); + if (r || w || e) { + key = scheme_make_integer_value(i); + v = scheme_hash_get(scheme_semaphore_fd_mapping, key); + if (v) { + if (r || e) { + s = SCHEME_VEC_ELS(v)[0]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[0] = scheme_false; + } + MZ_FD_CLR(i, MZ_GET_FDSET(scheme_semaphore_fd_set, 0)); + } + if (w || e) { + s = SCHEME_VEC_ELS(v)[1]; + if (!SCHEME_FALSEP(s)) { + scheme_post_sema_all(s); + hit = 1; + SCHEME_VEC_ELS(v)[1] = scheme_false; + } + MZ_FD_CLR(i, MZ_GET_FDSET(scheme_semaphore_fd_set, 1)); + } + if (SCHEME_FALSEP(SCHEME_VEC_ELS(v)[0]) + && SCHEME_FALSEP(SCHEME_VEC_ELS(v)[1])) { + MZ_FD_CLR(i, MZ_GET_FDSET(scheme_semaphore_fd_set, 2)); + scheme_hash_set(scheme_semaphore_fd_mapping, key, NULL); + } + } + } + } + } + + return hit; +#endif +} + static int check_sleep(int need_activity, int sleep_now) /* Signals should be suspended */ { Scheme_Thread *p, *p2; int end_with_act; - #if defined(USING_FDS) DECL_FDSET(set, 3); fd_set *set1, *set2; @@ -3505,7 +3691,10 @@ static int check_sleep(int need_activity, int sleep_now) if (post_system_idle()) { return 0; } - + + scheme_clean_fd_set(fds); + fds = scheme_merge_fd_sets(fds, scheme_semaphore_fd_set); + if (sleep_now) { float mst = (float)max_sleep_time; @@ -4066,6 +4255,7 @@ void scheme_thread_block(float sleep_time) double sleep_end; Scheme_Thread *next; Scheme_Thread *p = scheme_current_thread; + int skip_sleep; if (p->return_marks_to) /* just in case we get here */ return; @@ -4136,6 +4326,24 @@ void scheme_thread_block(float sleep_time) scheme_check_foreign_work(); #endif + skip_sleep = 0; + if (check_fd_semaphores()) { + /* double check whether a semaphore for this thread woke up: */ + if (p->block_descriptor == GENERIC_BLOCKED) { + if (p->block_check) { + Scheme_Ready_Fun_FPC f = (Scheme_Ready_Fun_FPC)p->block_check; + Scheme_Schedule_Info sinfo; + init_schedule_info(&sinfo, p, sleep_end); + if (f(p->blocker, &sinfo)) { + sleep_end = 0; + skip_sleep = 1; + } else { + sleep_end = sinfo.sleep_end; + } + } + } + } + if (!do_atomic && (sleep_end >= 0.0)) { find_next_thread(&next); } else @@ -4204,7 +4412,7 @@ void scheme_thread_block(float sleep_time) } } else { /* If all processes are blocked, check for total process sleeping: */ - if (p->block_descriptor != NOT_BLOCKED) { + if ((p->block_descriptor != NOT_BLOCKED) && !skip_sleep) { check_sleep(1, 1); } }