places: fix handling of OS-level errors during serialization

When a file descriptor cannot be `dup`ed for a place message, the
error message has to be delayed until the partially copied message is
cleaned up. Various problems with the message-serialization code
caused that delay to work incorrectly or incompletely.

Closes #2305
This commit is contained in:
Matthew Flatt 2018-10-09 13:54:55 -06:00
parent fcf080cf93
commit 238e2c7861
2 changed files with 119 additions and 51 deletions

View File

@ -0,0 +1,46 @@
#lang racket
(provide main)
(define (main mode)
(define out
(place in
(for/list ((n (in-naturals)))
(place-channel-get in))))
(define i
(case mode
[("tcp")
(define PORT 12346)
(define listener (tcp-listen PORT 100 #t))
(define-values (i o) (tcp-connect "127.0.0.1" PORT))
i]
[else
(current-input-port)]))
(for ((n (in-naturals)))
(printf "sending port ~a\n" n)
(place-channel-put out i)))
(module+ test
(require racket/system
compiler/find-exe)
(define (check mode rx)
(define s (open-output-bytes))
(define r
(parameterize ([current-error-port s]
[current-output-port (open-output-bytes)])
(system* (find-exe)
"-tm"
(variable-reference->module-source
(#%variable-reference))
mode)))
(when r
(error "expected process to exit with failure"))
(unless (regexp-match? rx (get-output-bytes s))
(error "output did not match expected pattern: "
(get-output-bytes s))))
(check "tcp" "socket dup failed")
(check "stdio" "port dup failed"))

View File

@ -80,7 +80,8 @@ static void bi_channel_set_finalizer(Scheme_Place_Bi_Channel *ch);
static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht,
int mode, int gcable, int can_raise_exn,
Scheme_Object **master_chain,
Scheme_Object **invalid_object);
Scheme_Object **invalid_object,
char **delayed_err, intptr_t *delayed_errno, intptr_t *delayed_errkind);
# define mzPDC_CHECK 0
# define mzPDC_COPY 1
# define mzPDC_UNCOPY 2
@ -785,11 +786,13 @@ static Scheme_Object *place_p(int argc, Scheme_Object *args[])
static Scheme_Object *do_places_deep_copy(Scheme_Object *so, int mode, int gcable,
Scheme_Object **master_chain,
Scheme_Object **invalid_object)
Scheme_Object **invalid_object,
char **delayed_err, intptr_t *delayed_errno, intptr_t *delayed_errkind)
{
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
Scheme_Hash_Table *ht = NULL;
return places_deep_copy_worker(so, &ht, mode, gcable, gcable, master_chain, invalid_object);
return places_deep_copy_worker(so, &ht, mode, gcable, gcable, master_chain, invalid_object,
delayed_err, delayed_errno, delayed_errkind);
#else
return so;
#endif
@ -797,19 +800,24 @@ static Scheme_Object *do_places_deep_copy(Scheme_Object *so, int mode, int gcabl
static Scheme_Object *places_prepare_direct(Scheme_Object *so) {
so = strip_chaperones(so);
(void)do_places_deep_copy(so, mzPDC_CHECK, 1, NULL, NULL);
(void)do_places_deep_copy(so, mzPDC_CHECK, 1, NULL, NULL, NULL, NULL, NULL);
return so;
}
static Scheme_Object *places_deep_direct_uncopy(Scheme_Object *so) {
return do_places_deep_copy(so, mzPDC_DIRECT_UNCOPY, 1, NULL, NULL);
return do_places_deep_copy(so, mzPDC_DIRECT_UNCOPY, 1, NULL, NULL, NULL, NULL, NULL);
}
static void bad_place_message(Scheme_Object *so) {
scheme_contract_error("place-channel-put",
"value not allowed in a message",
"value", 1, so,
NULL);
static void bad_place_message(Scheme_Object *so,
char *delayed_err, intptr_t delayed_errno, intptr_t delayed_errkind) {
if (delayed_err) {
rktio_set_last_error(scheme_rktio, delayed_errkind, delayed_errno);
scheme_rktio_error("place-channel-put", delayed_err);
} else
scheme_contract_error("place-channel-put",
"value not allowed in a message",
"value", 1, so,
NULL);
}
static void *box_fd(rktio_fd_transfer_t *fd)
@ -825,7 +833,8 @@ static rktio_fd_transfer_t *unbox_fd(void *p)
return *(rktio_fd_transfer_t **)p;
}
static void bad_place_message2(Scheme_Object *so, Scheme_Object *o, int can_raise_exn) {
static void bad_place_message2(Scheme_Object *so, Scheme_Object *o, int can_raise_exn,
char *delayed_err, intptr_t delayed_errno, intptr_t delayed_errkind) {
Scheme_Object *l;
Scheme_Vector *v = (Scheme_Vector *) o;
if (v) {
@ -847,7 +856,7 @@ static void bad_place_message2(Scheme_Object *so, Scheme_Object *o, int can_rais
}
}
if (can_raise_exn)
bad_place_message(so);
bad_place_message(so, delayed_err, delayed_errno, delayed_errkind);
}
static void push_duped_fd(Scheme_Object **fd_accumulators, intptr_t slot, rktio_fd_transfer_t *dupfdt) {
@ -900,7 +909,7 @@ static Scheme_Object *trivial_copy(Scheme_Object *so, Scheme_Object **master_cha
static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *ht,
Scheme_Object **fd_accumulators,
intptr_t *delayed_errkind, intptr_t *delayed_errno,
char **delayed_err, intptr_t *delayed_errno, intptr_t *delayed_errkind,
int mode, int can_raise_exn,
Scheme_Object **master_chain,
Scheme_Object **invalid_object) {
@ -955,9 +964,9 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
Scheme_Object *d;
n = scheme_rational_numerator(so);
d = scheme_rational_denominator(so);
n = shallow_types_copy(n, NULL, fd_accumulators, delayed_errno, delayed_errkind,
n = shallow_types_copy(n, NULL, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
d = shallow_types_copy(d, NULL, fd_accumulators, delayed_errno, delayed_errkind,
d = shallow_types_copy(d, NULL, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
new_so = scheme_make_rational(n, d);
}
@ -982,9 +991,9 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
Scheme_Object *i;
r = scheme_complex_real_part(so);
i = scheme_complex_imaginary_part(so);
r = shallow_types_copy(r, NULL, fd_accumulators, delayed_errno, delayed_errkind,
r = shallow_types_copy(r, NULL, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
i = shallow_types_copy(i, NULL, fd_accumulators, delayed_errno, delayed_errkind,
i = shallow_types_copy(i, NULL, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
new_so = scheme_make_complex(r, i);
}
@ -1142,20 +1151,20 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
SCHEME_CPTR_VAL(o) = SCHEME_CPTR_VAL(so);
o2 = SCHEME_CPTR_TYPE(so);
if (o2)
o2 = shallow_types_copy(o2, NULL, fd_accumulators, delayed_errno, delayed_errkind,
o2 = shallow_types_copy(o2, NULL, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
SCHEME_CPTR_TYPE(o) = o2;
new_so = o;
} else {
if (SCHEME_CPTR_TYPE(so)) {
(void)shallow_types_copy(SCHEME_CPTR_TYPE(so), NULL, fd_accumulators, delayed_errno, delayed_errkind,
(void)shallow_types_copy(SCHEME_CPTR_TYPE(so), NULL, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
}
}
}
else {
bad_place_message2(so, *fd_accumulators, can_raise_exn);
bad_place_message2(so, *fd_accumulators, can_raise_exn, 0, 0, 0);
if (invalid_object) *invalid_object = so;
return NULL;
}
@ -1175,13 +1184,15 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
if (!dupfd) {
if (can_raise_exn)
scheme_rktio_error("dynamic-place", "socket dup");
if (delayed_errno) {
if (delayed_err) {
intptr_t tmp;
*delayed_err = "socket dup";
tmp = rktio_get_last_error(scheme_rktio);
*delayed_errno = tmp;
tmp = rktio_get_last_error_kind(scheme_rktio);
*delayed_errkind = tmp;
}
if (invalid_object) *invalid_object = so;
return NULL;
}
dupfdt = rktio_fd_detach(scheme_rktio, dupfd);
@ -1191,7 +1202,7 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
ssfd->type = so->type;
ssfd->fdt = dupfdt;
portname = scheme_port_name(so);
tmp = shallow_types_copy(portname, ht, fd_accumulators, delayed_errno, delayed_errkind,
tmp = shallow_types_copy(portname, ht, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
ssfd->name = tmp;
return (Scheme_Object *)ssfd;
@ -1207,20 +1218,22 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
sffd = scheme_malloc_tagged(sizeof(Scheme_Serialized_File_FD));
sffd->so.type = scheme_serialized_file_fd_type;
scheme_get_serialized_fd_flags(so, sffd);
tmp = shallow_types_copy(scheme_port_name(so), ht, fd_accumulators, delayed_errno, delayed_errkind,
tmp = shallow_types_copy(scheme_port_name(so), ht, fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
sffd->name = tmp;
dupfd = rktio_dup(scheme_rktio, fd);
if (!dupfd) {
if (can_raise_exn)
scheme_rktio_error("dynamic-place", "port dup");
if (delayed_errno) {
if (delayed_err) {
intptr_t tmp;
*delayed_err = "port dup";
tmp = rktio_get_last_error(scheme_rktio);
*delayed_errno = tmp;
tmp = rktio_get_last_error_kind(scheme_rktio);
*delayed_errkind = tmp;
}
if (invalid_object) *invalid_object = so;
return NULL;
}
dupfdt = rktio_fd_detach(scheme_rktio, dupfd);
@ -1231,13 +1244,13 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
}
}
else {
bad_place_message2(so, *fd_accumulators, can_raise_exn);
bad_place_message2(so, *fd_accumulators, can_raise_exn, 0, 0, 0);
if (invalid_object) *invalid_object = so;
return NULL;
}
}
else {
bad_place_message2(so, *fd_accumulators, can_raise_exn);
bad_place_message2(so, *fd_accumulators, can_raise_exn, 0, 0, 0);
if (invalid_object) *invalid_object = so;
return NULL;
}
@ -1470,14 +1483,13 @@ static MZ_INLINE Scheme_Object *inf_get(Scheme_Object **instack, int pos, uintpt
static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht,
int mode, int gcable, int can_raise_exn,
Scheme_Object **master_chain,
Scheme_Object **invalid_object) {
Scheme_Object **invalid_object,
char **delayed_err, intptr_t *delayed_errno, intptr_t *delayed_errkind) {
Scheme_Object *inf_stack = NULL;
Scheme_Object *reg0 = NULL;
uintptr_t inf_stack_depth = 0, inf_max_depth = 0;
Scheme_Object *fd_accumulators = NULL;
intptr_t delayed_errno = 0;
intptr_t delayed_errkind = 0;
int set_mode = ((mode == mzPDC_COPY)
|| (mode == mzPDC_UNCOPY) || (mode == mzPDC_DIRECT_UNCOPY)
@ -1521,9 +1533,13 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab
int ctr = 0;
/* First, check for simple values that don't need to be hashed: */
new_so = shallow_types_copy(so, *ht, &fd_accumulators, &delayed_errno, &delayed_errkind,
new_so = shallow_types_copy(so, *ht, &fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
if (new_so) return new_so;
if (delayed_err && *delayed_err) {
bad_place_message2(so, fd_accumulators, can_raise_exn, *delayed_err, *delayed_errkind, *delayed_errno);
return NULL;
}
if (*ht) {
Scheme_Object *r;
@ -1559,10 +1575,13 @@ DEEP_DO:
}
}
new_so = shallow_types_copy(so, *ht, &fd_accumulators, &delayed_errno, &delayed_errkind,
new_so = shallow_types_copy(so, *ht, &fd_accumulators, delayed_err, delayed_errno, delayed_errkind,
mode, can_raise_exn, master_chain, invalid_object);
if (new_so) RETURN;
new_so = so;
if (delayed_err && *delayed_err) {
bad_place_message2(so, fd_accumulators, can_raise_exn, *delayed_err, *delayed_errkind, *delayed_errno);
ABORT;
}
if (gcable && (mode == mzPDC_UNCOPY))
SCHEME_USE_FUEL(1);
@ -1667,13 +1686,13 @@ DEEP_VEC2:
size = stype->num_slots;
if (!stype->prefab_key) {
bad_place_message2(so, fd_accumulators, can_raise_exn);
bad_place_message2(so, fd_accumulators, can_raise_exn, 0, 0, 0);
if (invalid_object) *invalid_object = so;
new_so = NULL;
ABORT;
}
if (!(MZ_OPT_HASH_KEY(&stype->iso) & STRUCT_TYPE_ALL_IMMUTABLE)) {
bad_place_message2(so, fd_accumulators, can_raise_exn);
bad_place_message2(so, fd_accumulators, can_raise_exn, 0, 0, 0);
if (invalid_object) *invalid_object = so;
new_so = NULL;
ABORT;
@ -1895,11 +1914,7 @@ DEEP_HT3:
break;
default:
if (delayed_errno) {
rktio_set_last_error(scheme_rktio, delayed_errkind, delayed_errno);
scheme_warning("Error serializing place message: %R");
}
bad_place_message2(so, fd_accumulators, can_raise_exn);
bad_place_message2(so, fd_accumulators, can_raise_exn, 0, 0, 0);
if (invalid_object) *invalid_object = so;
new_so = NULL;
ABORT;
@ -2486,7 +2501,8 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
}
static Scheme_Object *places_serialize(Scheme_Object *so, void **msg_memory, Scheme_Object **master_chain,
Scheme_Object **invalid_object) {
Scheme_Object **invalid_object,
char **delayed_err, intptr_t *delayed_errno, intptr_t *delayed_errkind) {
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
Scheme_Object *new_so;
Scheme_Object *tmp;
@ -2496,11 +2512,12 @@ static Scheme_Object *places_serialize(Scheme_Object *so, void **msg_memory, Sch
while (1) {
GC_create_message_allocator();
new_so = do_places_deep_copy(so, mzPDC_COPY, 0, master_chain, invalid_object);
new_so = do_places_deep_copy(so, mzPDC_COPY, 0, master_chain, invalid_object,
delayed_err, delayed_errno, delayed_errkind);
tmp = GC_finish_message_allocator();
(*msg_memory) = tmp;
if (!new_so && SCHEME_CHAPERONEP(*invalid_object)) {
if (!new_so && !*delayed_err && SCHEME_CHAPERONEP(*invalid_object)) {
/* try again after removing chaperones */
so = strip_chaperones(so);
if (!so)
@ -2531,7 +2548,7 @@ static Scheme_Object *places_deserialize(Scheme_Object *so, void *msg_memory, Sc
/* small messages are deemed to be < 1k, this could be tuned in either direction */
if (GC_message_small_objects_size(msg_memory, 1024)) {
new_so = do_places_deep_copy(so, mzPDC_UNCOPY, 1, NULL, NULL);
new_so = do_places_deep_copy(so, mzPDC_UNCOPY, 1, NULL, NULL, NULL, NULL, NULL);
from_p->place_channel_msg_in_flight = NULL;
from_p->place_channel_msg_chain_in_flight = NULL;
GC_dispose_short_message_allocator(msg_memory);
@ -2545,7 +2562,7 @@ static Scheme_Object *places_deserialize(Scheme_Object *so, void *msg_memory, Sc
GC_adopt_message_allocator(msg_memory);
msg_memory = NULL;
#if !defined(SHARED_TABLES)
new_so = do_places_deep_copy(so, mzPDC_DESER, 1, NULL, NULL);
new_so = do_places_deep_copy(so, mzPDC_DESER, 1, NULL, NULL, NULL, NULL, NULL);
#endif
}
return new_so;
@ -2593,12 +2610,12 @@ static Scheme_Object* place_allowed_p(int argc, Scheme_Object *args[])
v = args[0];
if (places_deep_copy_worker(v, &ht, mzPDC_CHECK, 1, 0, NULL, &invalid_object))
if (places_deep_copy_worker(v, &ht, mzPDC_CHECK, 1, 0, NULL, &invalid_object, NULL, NULL, NULL))
return scheme_true;
else {
if (invalid_object && SCHEME_CHAPERONEP(invalid_object)) {
v = strip_chaperones(v);
if (v && places_deep_copy_worker(v, &ht, mzPDC_CHECK, 1, 0, NULL, NULL))
if (v && places_deep_copy_worker(v, &ht, mzPDC_CHECK, 1, 0, NULL, NULL, NULL, NULL, NULL))
return scheme_true;
}
return scheme_false;
@ -2673,7 +2690,7 @@ static void async_channel_finalize(void *p, void* data) {
for (i = 0; i < ch->size ; i++) {
ht = NULL;
if (ch->msgs[i]) {
(void)places_deep_copy_worker(ch->msgs[i], &ht, mzPDC_CLEAN, 0, 0, NULL, NULL);
(void)places_deep_copy_worker(ch->msgs[i], &ht, mzPDC_CLEAN, 0, 0, NULL, NULL, NULL, NULL, NULL);
ch->msgs[i] = NULL;
}
#ifdef MZ_PRECISE_GC
@ -2921,19 +2938,24 @@ static void place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *uo)
void *msg_memory = NULL;
Scheme_Object *o, *master_chain = NULL, *invalid_object = NULL;
intptr_t sz, cnt;
char *delayed_err = NULL;
intptr_t delayed_errno = 0;
intptr_t delayed_errkind = 0;
o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object);
o = places_serialize(uo, &msg_memory, &master_chain, &invalid_object,
&delayed_err, &delayed_errno, &delayed_errkind);
/* uo needs to stay live until `master_chain` is registered in `ch` */
if (!o) {
if (invalid_object) {
if (invalid_object && !delayed_err) {
scheme_contract_error("place-channel-put",
"value not allowed in a message",
"value", 1, invalid_object,
"message", 1, uo,
NULL);
}
else bad_place_message(uo);
} else
bad_place_message(uo, delayed_err, delayed_errno, delayed_errkind);
}
{