diff --git a/collects/tests/racket/place-channel-fd.rkt b/collects/tests/racket/place-channel-fd.rkt index 1218437138..d8a6fa3db4 100644 --- a/collects/tests/racket/place-channel-fd.rkt +++ b/collects/tests/racket/place-channel-fd.rkt @@ -4,6 +4,8 @@ racket/port racket/runtime-path racket/list + racket/tcp + racket/match rackunit (for-syntax racket/base)) @@ -84,4 +86,29 @@ (define i3 (open-input-file "test2")) (check-equal? #t #t "cleanup of unreceived port message") (place-channel-put p i3) + + + (define port-ch (make-channel)) + + (thread + (lambda () + (define p (place ch + (match (place-channel-get ch) + [(list in out) + (define x (read in)) + (printf "IN PLACE ~a\n" x) + (write (string-append "From Place " x) out) + (flush-output out)]))) + (define s (tcp-listen 0)) + (define-values (h1 p1 h2 p2) (tcp-addresses s #t)) + (printf "~a ~a ~a ~a\n" h1 p1 h2 p2) + (channel-put port-ch p1) + (define-values (in out) (tcp-accept s)) + (place-channel-put p (list in out)) + (place-wait p))) + + (define-values (in out) (tcp-connect "localhost" (channel-get port-ch))) + (write "Hello There" out) + (flush-output out) + (displayln (read in)) ) diff --git a/collects/tests/racket/place-channel.rkt b/collects/tests/racket/place-channel.rkt index 14acc72cbc..45844d4751 100644 --- a/collects/tests/racket/place-channel.rkt +++ b/collects/tests/racket/place-channel.rkt @@ -354,29 +354,6 @@ (test-long (lambda (x) (intern-num-sym (modulo x 1000))) "Listof symbols") (test-long (lambda (x) #s(clown "Binky" "pie")) "Listof prefabs") (test-long (lambda (x) (read (open-input-string "#0=(#0# . #0#)"))) "Listof cycles") - - (define port-ch (make-channel)) - - (thread - (lambda () - (define p (place ch - (match (place-channel-get ch) - [(list in out) - (define x (read in)) - (printf "IN PLACE ~a\n" x) - (write (string-append "From Place " x) out) - (flush-output out)]))) - (define s (tcp-listen 0)) - (define-values (h1 p1 h2 p2) (tcp-addresses s #t)) - (printf "~a ~a ~a ~a\n" h1 p1 h2 p2) - (channel-put port-ch p1) - (define-values (in out) (tcp-accept s)) - (place-channel-put p (list in out)) - (place-wait p))) - - (define-values (in out) (tcp-connect "localhost" (channel-get port-ch))) - (write "Hello There" out) - (flush-output out) - (displayln (read in))) +) ;(report-errs) diff --git a/src/racket/src/error.c b/src/racket/src/error.c index f13a9492da..087fb3ebb7 100644 --- a/src/racket/src/error.c +++ b/src/racket/src/error.c @@ -28,6 +28,11 @@ #ifdef DOS_FILE_SYSTEM # include #endif +#ifdef NO_ERRNO_GLOBAL +# define errno -1 +#else +# include +#endif #ifdef USE_C_SYSLOG # include # include @@ -157,6 +162,14 @@ static void default_output(char *s, intptr_t len) fflush(stderr); } +intptr_t scheme_errno() { +#ifdef WINDOWS_FILE_HANDLES + return GetLastError(); +#else + return errno; +#endif +} + Scheme_Config *scheme_init_error_escape_proc(Scheme_Config *config) { if (!def_error_esc_proc) { diff --git a/src/racket/src/mzmark_place.inc b/src/racket/src/mzmark_place.inc index 6d289c240e..fe45335c73 100644 --- a/src/racket/src/mzmark_place.inc +++ b/src/racket/src/mzmark_place.inc @@ -127,3 +127,28 @@ static int serialized_file_fd_val_FIXUP(void *p, struct NewGC *gc) { #define serialized_file_fd_val_IS_CONST_SIZE 1 +static int serialized_socket_fd_val_SIZE(void *p, struct NewGC *gc) { + return + gcBYTES_TO_WORDS(sizeof(Scheme_Serialized_Socket_FD)); +} + +static int serialized_socket_fd_val_MARK(void *p, struct NewGC *gc) { + Scheme_Serialized_Socket_FD *sfd = (Scheme_Serialized_Socket_FD *) p; + gcMARK2(sfd->name, gc); + + return + gcBYTES_TO_WORDS(sizeof(Scheme_Serialized_Socket_FD)); +} + +static int serialized_socket_fd_val_FIXUP(void *p, struct NewGC *gc) { + Scheme_Serialized_Socket_FD *sfd = (Scheme_Serialized_Socket_FD *) p; + gcFIXUP2(sfd->name, gc); + + return + gcBYTES_TO_WORDS(sizeof(Scheme_Serialized_Socket_FD)); +} + +#define serialized_socket_fd_val_IS_ATOMIC 0 +#define serialized_socket_fd_val_IS_CONST_SIZE 1 + + diff --git a/src/racket/src/mzmarksrc.c b/src/racket/src/mzmarksrc.c index 521dc4d6b2..250c6818ed 100644 --- a/src/racket/src/mzmarksrc.c +++ b/src/racket/src/mzmarksrc.c @@ -1478,6 +1478,15 @@ serialized_file_fd_val { gcBYTES_TO_WORDS(sizeof(Scheme_Serialized_File_FD)); } +serialized_socket_fd_val { + mark: + Scheme_Serialized_Socket_FD *sfd = (Scheme_Serialized_Socket_FD *) p; + gcMARK2(sfd->name, gc); + + size: + gcBYTES_TO_WORDS(sizeof(Scheme_Serialized_Socket_FD)); +} + END place; /**********************************************************************/ diff --git a/src/racket/src/network.c b/src/racket/src/network.c index a5233b3e7d..37e7b09c38 100644 --- a/src/racket/src/network.c +++ b/src/racket/src/network.c @@ -92,6 +92,10 @@ struct SOCKADDR_IN { extern int scheme_stupid_windows_machine; #endif +intptr_t scheme_socket_errno() { + return SOCK_ERRNO(); +} + #include "schfd.h" #define TCP_BUFFER_SIZE 4096 @@ -1435,7 +1439,7 @@ tcp_out_buffer_mode(Scheme_Port *p, int mode) } static Scheme_Object * -make_tcp_input_port(void *data, const char *name, Scheme_Object *cust) +make_tcp_input_port_symbol_name(void *data, Scheme_Object *name, Scheme_Object *cust) { Scheme_Input_Port *ip; @@ -1444,7 +1448,7 @@ make_tcp_input_port(void *data, const char *name, Scheme_Object *cust) ip = scheme_make_input_port(scheme_tcp_input_port_type, data, - scheme_intern_symbol(name), + name, tcp_get_string, NULL, scheme_progress_evt_via_get, @@ -1460,7 +1464,13 @@ make_tcp_input_port(void *data, const char *name, Scheme_Object *cust) } static Scheme_Object * -make_tcp_output_port(void *data, const char *name, Scheme_Object *cust) +make_tcp_input_port(void *data, const char *name, Scheme_Object *cust) +{ + return make_tcp_input_port_symbol_name(data, scheme_intern_symbol(name), cust); +} + +static Scheme_Object * +make_tcp_output_port_symbol_name(void *data, Scheme_Object *name, Scheme_Object *cust) { Scheme_Output_Port *op; @@ -1469,7 +1479,7 @@ make_tcp_output_port(void *data, const char *name, Scheme_Object *cust) op = scheme_make_output_port(scheme_tcp_output_port_type, data, - scheme_intern_symbol(name), + name, scheme_write_evt_via_write, tcp_write_string, (Scheme_Out_Ready_Fun)tcp_check_write, @@ -1484,6 +1494,12 @@ make_tcp_output_port(void *data, const char *name, Scheme_Object *cust) return (Scheme_Object *)op; } +static Scheme_Object * +make_tcp_output_port(void *data, const char *name, Scheme_Object *cust) +{ + return make_tcp_output_port_symbol_name(data, scheme_intern_symbol(name), cust); +} + #endif /* USE_TCP */ /*========================================================================*/ @@ -2513,12 +2529,47 @@ void scheme_socket_to_ports(intptr_t s, const char *name, int takeover, } } +void scheme_socket_to_input_port(intptr_t s, Scheme_Object *name, int takeover, + Scheme_Object **_inp) +{ + Scheme_Tcp *tcp; + Scheme_Object *v; + + tcp = make_tcp_port_data(s, takeover ? 1 : 2); + + v = make_tcp_input_port_symbol_name(tcp, name, NULL); + *_inp = v; + + if (takeover) { + REGISTER_SOCKET(s); + } +} + +void scheme_socket_to_output_port(intptr_t s, Scheme_Object *name, int takeover, + Scheme_Object **_outp) +{ + Scheme_Tcp *tcp; + Scheme_Object *v; + + tcp = make_tcp_port_data(s, takeover ? 1 : 2); + + v = make_tcp_output_port_symbol_name(tcp, name, NULL); + *_outp = v; + + if (takeover) { + REGISTER_SOCKET(s); + } +} + intptr_t scheme_dup_socket(intptr_t fd) { #ifdef USE_SOCKETS_TCP # ifdef USE_WINSOCK_TCP intptr_t nsocket; + intptr_t rc; WSAPROTOCOL_INFO protocolInfo; - WSADuplicateSocket(fd, GetCurrentProcessId(), &protocolInfo); + rc = WSADuplicateSocket(fd, GetCurrentProcessId(), &protocolInfo); + if (rc) + return rc; nsocket = WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &protocolInfo, 0, WSA_FLAG_OVERLAPPED); REGISTER_SOCKET(nsocket); return nsocket; diff --git a/src/racket/src/place.c b/src/racket/src/place.c index ab403b0439..8919bed514 100644 --- a/src/racket/src/place.c +++ b/src/racket/src/place.c @@ -898,18 +898,6 @@ static void push_duped_fd(Scheme_Object **fd_accumulators, intptr_t slot, intptr } } -static Scheme_Object *make_serialized_tcp_fd(intptr_t fd, intptr_t type, Scheme_Object **fd_accumulators) { - Scheme_Simple_Object *so; - int dupfd; - dupfd = scheme_dup_socket(fd); - push_duped_fd(fd_accumulators, 1, dupfd); - so = scheme_malloc_small_atomic_tagged(sizeof(Scheme_Simple_Object)); - so->iso.so.type = scheme_serialized_tcp_fd_type; - so->u.two_int_val.int1 = type; - so->u.two_int_val.int2 = dupfd; - return (Scheme_Object *)so; -} - static Scheme_Object *trivial_copy(Scheme_Object *so) { switch (SCHEME_TYPE(so)) { @@ -938,7 +926,8 @@ static Scheme_Object *trivial_copy(Scheme_Object *so) return NULL; } -static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *ht, Scheme_Object **fd_accumulators,int copy, int can_raise_exn) { +static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *ht, + Scheme_Object **fd_accumulators, intptr_t *delayed_errno, int copy, int can_raise_exn) { Scheme_Object *new_so; new_so = trivial_copy(so); @@ -961,8 +950,8 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h Scheme_Object *d; n = scheme_rational_numerator(so); d = scheme_rational_denominator(so); - n = shallow_types_copy(n, NULL, fd_accumulators, copy, can_raise_exn); - d = shallow_types_copy(d, NULL, fd_accumulators, copy, can_raise_exn); + n = shallow_types_copy(n, NULL, fd_accumulators, delayed_errno, copy, can_raise_exn); + d = shallow_types_copy(d, NULL, fd_accumulators, delayed_errno, copy, can_raise_exn); if (copy) new_so = scheme_make_rational(n, d); } @@ -981,8 +970,8 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h Scheme_Object *i; r = scheme_complex_real_part(so); i = scheme_complex_imaginary_part(so); - r = shallow_types_copy(r, NULL, fd_accumulators, copy, can_raise_exn); - i = shallow_types_copy(i, NULL, fd_accumulators, copy, can_raise_exn); + r = shallow_types_copy(r, NULL, fd_accumulators, delayed_errno, copy, can_raise_exn); + i = shallow_types_copy(i, NULL, fd_accumulators, delayed_errno, copy, can_raise_exn); if (copy) new_so = scheme_make_complex(r, i); } @@ -1065,7 +1054,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h o->type = scheme_cpointer_type; SCHEME_CPTR_FLAGS(o) |= 0x1; SCHEME_CPTR_VAL(o) = SCHEME_CPTR_VAL(so); - o2 = shallow_types_copy(SCHEME_CPTR_TYPE(so), NULL, fd_accumulators, copy, can_raise_exn); + o2 = shallow_types_copy(SCHEME_CPTR_TYPE(so), NULL, fd_accumulators, delayed_errno, copy, can_raise_exn); SCHEME_CPTR_TYPE(o) = o2; new_so = o; @@ -1082,7 +1071,30 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h intptr_t fd; if(scheme_get_port_socket(so, &fd)) { if (copy) { - new_so = make_serialized_tcp_fd(fd, so->type, fd_accumulators); + Scheme_Object *tmp; + Scheme_Object *portname; + Scheme_Serialized_Socket_FD *ssfd; + int dupfd; + dupfd = scheme_dup_socket(fd); + if (dupfd == -1) { + if (can_raise_exn) + scheme_raise_exn(MZEXN_FAIL_NETWORK, "dup: error duplicating socket(%e)", scheme_socket_errno()); + if (delayed_errno) { + intptr_t tmp; + tmp = scheme_socket_errno(); + *delayed_errno = tmp; + } + return NULL; + } + push_duped_fd(fd_accumulators, 1, dupfd); + ssfd = scheme_malloc_tagged(sizeof(Scheme_Serialized_Socket_FD)); + ssfd->so.type = scheme_serialized_tcp_fd_type; + ssfd->type = so->type; + ssfd->fd = dupfd; + portname = scheme_port_name(so); + tmp = shallow_types_copy(portname, ht, fd_accumulators, delayed_errno, copy, can_raise_exn); + ssfd->name = tmp; + return (Scheme_Object *)ssfd; } } else if (SCHEME_TRUEP(scheme_file_stream_port_p(1, &so))) { @@ -1093,11 +1105,19 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h sffd = scheme_malloc_tagged(sizeof(Scheme_Serialized_File_FD)); sffd->so.type = scheme_serialized_file_fd_type; scheme_get_serialized_fd_flags(so, sffd); - if (sffd->name) { - tmp = shallow_types_copy(sffd->name, ht, fd_accumulators, copy, can_raise_exn); - sffd->name = tmp; - } + tmp = shallow_types_copy(sffd->name, ht, fd_accumulators, delayed_errno, copy, can_raise_exn); + sffd->name = tmp; dupfd = scheme_dup_file(fd); + if (dupfd == -1) { + if (can_raise_exn) + scheme_raise_exn(MZEXN_FAIL_FILESYSTEM, "dup: error duplicating file descriptor(%e)", scheme_errno()); + if (delayed_errno) { + intptr_t tmp; + tmp = scheme_errno(); + *delayed_errno = tmp; + } + return NULL; + } push_duped_fd(fd_accumulators, 0, dupfd); sffd->fd = dupfd; sffd->type = so->type; @@ -1118,15 +1138,20 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h { Scheme_Object *in; Scheme_Object *out; - int type = ((Scheme_Simple_Object *) so)->u.two_int_val.int1; - int fd = ((Scheme_Simple_Object *) so)->u.two_int_val.int2; - scheme_socket_to_ports(fd, "", 1, &in, &out); + Scheme_Object *name; + int type = ((Scheme_Serialized_Socket_FD *) so)->type; + int fd = ((Scheme_Serialized_Socket_FD *) so)->fd; + name = ((Scheme_Serialized_Socket_FD *) so)->name; + + //scheme_socket_to_ports(fd, "tcp-accepted", 1, &in, &out); if (type == scheme_input_port_type) { - scheme_tcp_abandon_port(out); + scheme_socket_to_input_port(fd, name, 1, &in); + //scheme_tcp_abandon_port(out); new_so = in; } else { - scheme_tcp_abandon_port(in); + scheme_socket_to_output_port(fd, name, 1, &out); + //scheme_tcp_abandon_port(in); new_so = out; } } @@ -1307,6 +1332,7 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab uintptr_t inf_stack_depth = 0; Scheme_Object *fd_accumulators = NULL; + intptr_t delayed_errno = 0; /* lifted variables for xform*/ Scheme_Object *pair; @@ -1343,7 +1369,7 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab int ctr = 0; /* First, check for simple values that don't need to be hashed: */ - new_so = shallow_types_copy(so, *ht, &fd_accumulators, copy, can_raise_exn); + new_so = shallow_types_copy(so, *ht, &fd_accumulators, &delayed_errno, copy, can_raise_exn); if (new_so) return new_so; if (*ht) { @@ -1379,7 +1405,7 @@ DEEP_DO: } } - new_so = shallow_types_copy(so, *ht, &fd_accumulators, copy, can_raise_exn); + new_so = shallow_types_copy(so, *ht, &fd_accumulators, &delayed_errno, copy, can_raise_exn); if (new_so) RETURN; new_so = so; @@ -1606,6 +1632,8 @@ DEEP_SST2_L: } break; default: + if (delayed_errno) + scheme_warning("Error serializing place message: %e", delayed_errno); bad_place_message2(so, fd_accumulators, can_raise_exn); new_so = NULL; ABORT; @@ -1946,6 +1974,7 @@ static void places_deserialize_clean_worker(Scheme_Object **pso, Scheme_Hash_Tab case scheme_integer_type: case scheme_place_bi_channel_type: /* allocated in the master and can be passed along as is */ case scheme_char_type: + case scheme_bignum_type: case scheme_rational_type: case scheme_float_type: case scheme_double_type: @@ -1973,7 +2002,7 @@ static void places_deserialize_clean_worker(Scheme_Object **pso, Scheme_Hash_Tab scheme_close_socket_fd(fd); } else { - tmp = shallow_types_copy(so, NULL, NULL, 1, 1); + tmp = shallow_types_copy(so, NULL, NULL, NULL, 1, 1); *pso = tmp; } break; @@ -1984,7 +2013,7 @@ static void places_deserialize_clean_worker(Scheme_Object **pso, Scheme_Hash_Tab scheme_close_file_fd(sffd->fd); } else { - tmp = shallow_types_copy(so, NULL, NULL, 1, 1); + tmp = shallow_types_copy(so, NULL, NULL, NULL, 1, 1); *pso = tmp; } break; @@ -2552,6 +2581,7 @@ static void register_traversers(void) GC_REG_TRAV(scheme_place_async_channel_type, place_async_channel_val); GC_REG_TRAV(scheme_place_bi_channel_type, place_bi_channel_val); GC_REG_TRAV(scheme_serialized_file_fd_type, serialized_file_fd_val); + GC_REG_TRAV(scheme_serialized_tcp_fd_type, serialized_socket_fd_val); } END_XFORM_SKIP; diff --git a/src/racket/src/port.c b/src/racket/src/port.c index ad4d4c7996..46924d65c1 100644 --- a/src/racket/src/port.c +++ b/src/racket/src/port.c @@ -286,6 +286,13 @@ typedef struct Scheme_FD { # endif } Scheme_FD; +Scheme_Object *scheme_port_name(Scheme_Object *p) { + if (p->type == scheme_input_port_type) + return ((Scheme_Input_Port *)p)->name; + else + return ((Scheme_Output_Port *)p)->name; +} + int scheme_get_serialized_fd_flags(Scheme_Object* p, Scheme_Serialized_File_FD *so) { Scheme_FD *fds; if (p->type == scheme_input_port_type) { @@ -294,7 +301,7 @@ int scheme_get_serialized_fd_flags(Scheme_Object* p, Scheme_Serialized_File_FD * } else { fds = (Scheme_FD *) ((Scheme_Output_Port *)p)->port_data; - so->name = ((Scheme_Input_Port *)p)->name; + so->name = ((Scheme_Output_Port *)p)->name; } so->regfile = fds->regfile; so->textmode = fds->textmode; diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 593a36af11..0f6cc2429c 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -3482,6 +3482,7 @@ int scheme_byte_ready_or_user_port_ready(Scheme_Object *p, Scheme_Schedule_Info int scheme_pipe_char_count(Scheme_Object *p); void scheme_alloc_global_fdset(); +Scheme_Object *scheme_port_name(Scheme_Object *p); #define CURRENT_INPUT_PORT(config) scheme_get_param(config, MZCONFIG_INPUT_PORT) #define CURRENT_OUTPUT_PORT(config) scheme_get_param(config, MZCONFIG_OUTPUT_PORT) @@ -3692,13 +3693,23 @@ typedef struct Scheme_Serialized_File_FD{ char flush_mode; } Scheme_Serialized_File_FD; +typedef struct Scheme_Serialized_Socket_FD{ + Scheme_Object so; + Scheme_Object *name; + intptr_t fd; + intptr_t type; +} Scheme_Serialized_Socket_FD; + int scheme_get_serialized_fd_flags(Scheme_Object* p, Scheme_Serialized_File_FD *so); intptr_t scheme_dup_socket(intptr_t fd); intptr_t scheme_dup_file(intptr_t fd); void scheme_close_socket_fd(intptr_t fd); void scheme_close_file_fd(intptr_t fd); void scheme_tcp_abandon_port(Scheme_Object *port); - +intptr_t scheme_socket_errno(); +intptr_t scheme_errno(); +void scheme_socket_to_input_port(intptr_t s, Scheme_Object *name, int takeover, Scheme_Object **_inp); +void scheme_socket_to_output_port(intptr_t s, Scheme_Object *name, int takeover, Scheme_Object **_outp); #define SCHEME_PLACE_OBJECTP(o) (SCHEME_TYPE(o) == scheme_place_object_type)