add `place-message-allowed?' and fix related problems

Closes PR 11983
This commit is contained in:
Matthew Flatt 2011-06-17 10:33:27 -06:00
parent c7d86ed3a6
commit b1e47eba45
6 changed files with 213 additions and 72 deletions

View File

@ -21,6 +21,7 @@
place-channel-get
place-channel?
place?
place-message-allowed?
place-channel-put/get
processor-count
place
@ -118,6 +119,9 @@
(or (TH-place? pl)
(TH-place-channel? pl)))
(define (th-place-message-allowed? pl)
#t)
(define-syntax-rule (define-pl x p t) (define x (if (pl-place-enabled?) p t)))
(define-pl dynamic-place pl-dynamic-place th-dynamic-place)
@ -130,6 +134,7 @@
(define-pl place-channel-get pl-place-channel-get th-place-channel-get)
(define-pl place-channel? pl-place-channel? th-place-channel?)
(define-pl place? pl-place? TH-place?)
(define-pl place-message-allowed? pl-place-message-allowed? th-place-message-allowed?)
(define-syntax-rule (define-syntax-case (N a ...) b ...)
(define-syntax (N stx)

View File

@ -18,11 +18,12 @@
@; ----------------------------------------------------------------------
@margin-note{Currently, parallel support for @racket[place] is enabled
@margin-note{Currently, parallel support for places is enabled
only for Racket 3m (which is the main variant of Racket), and only
by default for Windows, Linux x86/x86_64, and Mac OS X x86/x86_64. To
enable support for other platforms, use @DFlag{enable-places} with
@exec{configure} when building Racket.}
@exec{configure} when building Racket. The @racket[place-enabled?]
function reports whether places run in parallel.}
@note-lib[racket/place]
@ -35,9 +36,8 @@ instance of the Racket virtual machine. Places communicate through
@deftech{place channels}, which are endpoints for a two-way buffered
communication.
To a first approximation, place channels allow only immutable values
as messages over the channel: numbers, characters, booleans, immutable
pairs, immutable vectors, and immutable structures. In addition, place
To a first approximation, place channels support only immutable,
transparent values as messages. In addition, place
channels themselves can be sent across channels to establish new
(possibly more direct) lines of communication in addition to any
existing lines. Finally, mutable values produced by
@ -46,7 +46,7 @@ existing lines. Finally, mutable values produced by
@racket[shared-bytes], and @racket[make-shared-bytes] can be sent
across place channels; mutation of such values is visible to all
places that share the value, because they are allowed in a
@deftech{shared memory space}.
@deftech{shared memory space}. See @racket[place-message-allowed?].
A @tech{place channel} can be used as a @tech{synchronizable event}
(see @secref["sync"]) to receive a value through the channel. A place
@ -87,6 +87,14 @@ racket
]
@defproc[(place-enabled?) boolean?]{
Returns @racket[#t] if Racket is configured so that
@racket[dynamic-place] and @racket[place] create places that can run
in parallel, @racket[#f] if @racket[dynamic-place] and @racket[place]
are simulated using @racket[thread].}
@defproc[(place? [v any/c]) boolean?]{
Returns @racket[#t] if @racket[v] is a @deftech{place descriptor}
value, @racket[#f] otherwise. Every @tech{place descriptor}
@ -98,6 +106,7 @@ racket
@racket[#f] otherwise.
}
@defproc[(dynamic-place [module-path module-path?] [start-proc symbol?]) place?]{
Creates a @tech{place} to run the procedure that is identified by
@ -113,6 +122,16 @@ racket
other end of communication for the @tech{place descriptor} returned
by @racket[place].}
@defform[(place id body ...+)]{
Creates a place that evaluates @racket[body]
expressions with @racket[id] bound to a place channel. The
@racket[body]s close only over @racket[id] plus the top-level
bindings of the enclosing module, because the
@racket[body]s are lifted to a function that is exported by
the module. The result of @racket[place] is a place descriptor,
like the result of @racket[dynamic-place].
}
@defproc[(place-wait [p place?]) exact-integer?]{
Returns the completion value of the place indicated by @racket[p],
@ -127,12 +146,6 @@ racket
Terminates the place indicated by @racket[p],
}
@defform[(place place-channel? body ...)]{
In-line definition of a place worker body, which is lifted up to module scope.
@racket[place] closes over only module scope variables.
Returns the place descriptor for the newly constructed place.
}
@defproc[(place-channel) (values place-channel? place-channel?)]{
Returns two @tech{place channels}. Data sent through the first
@ -145,15 +158,57 @@ racket
channel}).
}
@defproc[(place-channel-put [pch place-channel?] [v any/c]) void]{
Sends an immutable message @racket[v] on channel @racket[pch].
@defproc[(place-channel-put [pch place-channel?] [v place-message-allowed?]) void]{
Sends a message @racket[v] on channel @racket[pch].
See @racket[place-message-allowed?] form information on automatic
coercions in @racket[v], such as converting a mutable string to an
immutable string.
}
@defproc[(place-channel-get [pch place-channel?]) any/c]{
Returns an immutable message received on channel @racket[pch].
@defproc[(place-channel-get [pch place-channel?]) place-message-allowed?]{
Returns a message received on channel @racket[pch].
}
@defproc[(place-channel-put/get [pch place-channel?] [v any/c]) void]{
Sends an immutable message @racket[v] on channel @racket[pch] and then
waits for a reply message on the same channel.
}
@defproc[(place-message-allowed? [v any/c]) boolean?]{
Returns @racket[#t] if @racket[v] is allowed as a message on a place channel,
@racket[#f] otherwise.
If @racket[(place-enabled?)] returns @racket[#f], then the result is
always @racket[#t] and no conversions are performed on @racket[v] as a
message. Otherwise, the following kinds of data are allowed as
messages:
@itemlist[
@item{@tech{numbers}, @tech{characters}, @tech{booleans}, and
@|void-const|;}
@item{@tech{symbols} that are @tech{interned};}
@item{@tech{strings} and @tech{byte strings}, where mutable strings
and byte strings are automatically replaced by immutable
variants;}
@item{@tech{pairs}, @tech{lists}, @tech{vectors}, and immutable
@tech{prefab} structures containing message-allowed values,
where a mutable vector is automatically replaced by an
immutable vector;}
@item{@tech{place channels}, where a @tech{place descriptor} is
automatically replaced by a plain place channel; and}
@item{values produced by @racket[shared-flvector],
@racket[make-shared-flvector], @racket[shared-fxvector],
@racket[make-shared-fxvector], @racket[shared-bytes], and
@racket[make-shared-bytes].}
]}

View File

@ -35,4 +35,19 @@
(sync never-evt))])
(place-kill p))
(for ([v (list #t #f null 'a #\a 1 1/2 1.0 (expt 2 100)
"apple" (make-string 10) #"apple" (make-bytes 10)
(void))])
(test #t place-message-allowed? v)
(test #t place-message-allowed? (list v))
(test #t place-message-allowed? (vector v)))
(for ([v (list (gensym) (string->uninterned-symbol "apple")
(lambda () 10)
add1)])
(test (not (place-enabled?)) place-message-allowed? v)
(test (not (place-enabled?)) place-message-allowed? (list v))
(test (not (place-enabled?)) place-message-allowed? (cons 1 v))
(test (not (place-enabled?)) place-message-allowed? (cons v 1))
(test (not (place-enabled?)) place-message-allowed? (vector v)))
(report-errs)

View File

@ -646,6 +646,35 @@ static bigdig* allocate_bigdig_array(intptr_t length)
return res;
}
Scheme_Object *scheme_bignum_copy(const Scheme_Object *n)
{
Scheme_Object *o;
intptr_t len;
bigdig* digs;
len = SCHEME_BIGLEN(n);
if (SCHEME_BIGDIG(n) == ((Small_Bignum *) mzALIAS n)->v) {
o = (Scheme_Object *)scheme_malloc_tagged(sizeof(Small_Bignum));
#if MZ_PRECISE_GC
SCHEME_SET_BIGINLINE(o);
#endif
((Small_Bignum *)o)->v[0] = SCHEME_BIGDIG(n)[0];
SCHEME_BIGDIG(o) = ((Small_Bignum *) mzALIAS o)->v;
} else {
o = (Scheme_Object *)MALLOC_ONE_TAGGED(Scheme_Bignum);
digs = allocate_bigdig_array(len);
memcpy(digs, SCHEME_BIGDIG(n), len * sizeof(bigdig));
SCHEME_BIGDIG(o) = digs;
}
o->type = scheme_bignum_type;
SCHEME_SET_BIGPOS(o, SCHEME_BIGPOS(n));
SCHEME_BIGLEN(o) = len;
return o;
}
/* We don't want to count leading digits of 0 in the bignum's length */
XFORM_NONGCING static intptr_t bigdig_length(bigdig* array, intptr_t alloced)
{

View File

@ -33,6 +33,7 @@ static Scheme_Object *scheme_place_receive(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]);
static Scheme_Object *def_place_exit_handler_proc(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]);
static Scheme_Object* scheme_place_allowed_p(int argc, Scheme_Object *args[]);
static int cust_kill_place(Scheme_Object *pl, void *notused);
static Scheme_Place_Async_Channel *scheme_place_async_channel_create();
@ -44,7 +45,8 @@ static Scheme_Object *scheme_place_async_receive(Scheme_Place_Async_Channel *ch)
static Scheme_Object *scheme_places_deep_copy_to_master(Scheme_Object *so);
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht, int copy, int gcable);
static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht,
int copy, int gcable, int can_raise_exn);
#endif
# ifdef MZ_PRECISE_GC
@ -96,9 +98,10 @@ void scheme_init_place(Scheme_Env *env)
PLACE_PRIM_W_ARITY("place-break", scheme_place_break, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv);
PLACE_PRIM_W_ARITY("place-channel-put", scheme_place_send, 1, 2, plenv);
PLACE_PRIM_W_ARITY("place-channel-put", scheme_place_send, 2, 2, plenv);
PLACE_PRIM_W_ARITY("place-channel-get", scheme_place_receive, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel?", scheme_place_channel_p, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-message-allowed?", scheme_place_allowed_p, 1, 1, plenv);
#ifdef MZ_USE_PLACES
REGISTER_SO(scheme_def_place_exit_proc);
@ -827,7 +830,7 @@ static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[])
static Scheme_Object *do_places_deep_copy(Scheme_Object *so, int gcable) {
#if defined(MZ_USE_PLACES) && defined(MZ_PRECISE_GC)
Scheme_Hash_Table *ht = NULL;
return places_deep_copy_worker(so, &ht, 1, gcable);
return places_deep_copy_worker(so, &ht, 1, gcable, gcable);
#else
return so;
#endif
@ -839,7 +842,7 @@ Scheme_Object *scheme_places_deep_copy(Scheme_Object *so) {
static void bad_place_message(Scheme_Object *so) {
scheme_arg_mismatch("place-channel-put",
"cannot transmit a message containing value: ",
"value not allowed in a message: ",
so);
}
@ -866,7 +869,7 @@ static Scheme_Object *trivial_copy(Scheme_Object *so)
return NULL;
}
static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *ht, int copy) {
static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *ht, int copy, int can_raise_exn) {
Scheme_Object *new_so;
new_so = trivial_copy(so);
@ -879,14 +882,18 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
if (copy)
new_so = scheme_make_char(SCHEME_CHAR_VAL(so));
break;
case scheme_bignum_type:
if (copy)
new_so = scheme_bignum_copy(so);
break;
case scheme_rational_type:
{
Scheme_Object *n;
Scheme_Object *d;
n = scheme_rational_numerator(so);
d = scheme_rational_denominator(so);
n = shallow_types_copy(n, NULL, copy);
d = shallow_types_copy(d, NULL, copy);
n = shallow_types_copy(n, NULL, copy, can_raise_exn);
d = shallow_types_copy(d, NULL, copy, can_raise_exn);
if (copy)
new_so = scheme_make_rational(n, d);
}
@ -905,20 +912,24 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
Scheme_Object *i;
r = scheme_complex_real_part(so);
i = scheme_complex_imaginary_part(so);
r = shallow_types_copy(r, NULL, copy);
i = shallow_types_copy(i, NULL, copy);
r = shallow_types_copy(r, NULL, copy, can_raise_exn);
i = shallow_types_copy(i, NULL, copy, can_raise_exn);
if (copy)
new_so = scheme_make_complex(r, i);
}
break;
case scheme_char_string_type:
if (copy)
if (copy) {
new_so = scheme_make_sized_offset_char_string(SCHEME_CHAR_STR_VAL(so), 0, SCHEME_CHAR_STRLEN_VAL(so), 1);
SCHEME_SET_IMMUTABLE(new_so);
}
break;
case scheme_byte_string_type:
/* not allocated as shared, since that's covered above */
if (copy)
if (copy) {
new_so = scheme_make_sized_offset_byte_string(SCHEME_BYTE_STR_VAL(so), 0, SCHEME_BYTE_STRLEN_VAL(so), 1);
SCHEME_SET_IMMUTABLE(new_so);
}
break;
case scheme_unix_path_type:
case scheme_windows_path_type:
@ -928,7 +939,10 @@ static Scheme_Object *shallow_types_copy(Scheme_Object *so, Scheme_Hash_Table *h
break;
case scheme_symbol_type:
if (SCHEME_SYM_UNINTERNEDP(so)) {
bad_place_message(so);
if (can_raise_exn)
bad_place_message(so);
else
return NULL;
} else {
if (copy) {
new_so = scheme_make_sized_offset_byte_string((char *)so, SCHEME_SYMSTR_OFFSET(so), SCHEME_SYM_LEN(so), 1);
@ -1113,7 +1127,8 @@ static MZ_INLINE Scheme_Object *inf_get(Scheme_Object **instack, int pos, uintpt
/* This code often executes with the master GC switched on */
/* It cannot use the usual stack overflow mechanism */
/* Therefore it must use its own stack implementation for recursion */
static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht, int copy, int gcable) {
static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Table **ht,
int copy, int gcable, int can_raise_exn) {
Scheme_Object *inf_stack = NULL;
Scheme_Object *reg0 = NULL;
uintptr_t inf_stack_depth = 0;
@ -1139,6 +1154,7 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab
#define DEEP_RETURN 8
#define DEEP_DONE 9
#define RETURN do { goto DEEP_RETURN_L; } while(0);
#define ABORT do { goto DEEP_DONE_L; } while(0);
#define IFS_PUSH(x) inf_push(&inf_stack, x, &inf_stack_depth, gcable)
#define IFS_POP inf_pop(&inf_stack, &inf_stack_depth, gcable)
#define IFS_POPN(n) do { int N = (n); while (N > 0) { IFS_POP; N--;} } while(0);
@ -1152,7 +1168,7 @@ static Scheme_Object *places_deep_copy_worker(Scheme_Object *so, Scheme_Hash_Tab
int ctr = 0;
/* First, check for simple values that don't need to be hashed: */
new_so = shallow_types_copy(so, *ht, copy);
new_so = shallow_types_copy(so, *ht, copy, can_raise_exn);
if (new_so) return new_so;
if (*ht) {
@ -1188,7 +1204,7 @@ DEEP_DO:
}
}
new_so = shallow_types_copy(so, *ht, copy);
new_so = shallow_types_copy(so, *ht, copy, can_raise_exn);
if (new_so) RETURN;
new_so = so;
@ -1236,11 +1252,13 @@ DEEP_DO_FIN_PAIR_L:
/* handle cycles: */
scheme_hash_set(*ht, so, vec);
i = 0;
IFS_PUSH(vec);
IFS_PUSH(so);
IFS_PUSH(scheme_make_integer(size));
IFS_PUSH(scheme_make_integer(i));
if (i < size) {
IFS_PUSH(vec);
IFS_PUSH(so);
IFS_PUSH(scheme_make_integer(size));
IFS_PUSH(scheme_make_integer(i));
SET_R0(SCHEME_VEC_ELS(so)[i]);
GOTO_NEXT_CONT(DEEP_DO, DEEP_VEC1);
}
@ -1286,11 +1304,22 @@ DEEP_VEC2:
size = stype->num_slots;
local_slots = stype->num_slots - (ptype ? ptype->num_slots : 0);
if (!stype->prefab_key)
bad_place_message(so);
if (!stype->prefab_key) {
if (can_raise_exn)
bad_place_message(so);
else {
new_so = NULL;
ABORT;
}
}
for (i = 0; i < local_slots; i++) {
if (!stype->immutables || stype->immutables[i] != 1) {
bad_place_message(so);
if (can_raise_exn)
bad_place_message(so);
else {
new_so = NULL;
ABORT;
}
}
}
@ -1408,7 +1437,12 @@ DEEP_SST2_L:
}
break;
default:
bad_place_message(so);
if (can_raise_exn)
bad_place_message(so);
else {
new_so = NULL;
ABORT;
}
break;
}
@ -1666,19 +1700,19 @@ Scheme_Object *scheme_places_deep_copy_to_master(Scheme_Object *so) {
void *original_gc;
/* forces hash codes: */
(void)places_deep_copy_worker(so, &ht, 0, 1);
(void)places_deep_copy_worker(so, &ht, 0, 1, 1);
ht = NULL;
original_gc = GC_switch_to_master_gc();
scheme_start_atomic();
o = places_deep_copy_worker(so, &ht, 1, 1);
o = places_deep_copy_worker(so, &ht, 1, 1, 0);
scheme_end_atomic_no_swap();
GC_switch_back_from_master(original_gc);
return o;
#else
return places_deep_copy_worker(so, &ht, 1, 1);
return places_deep_copy_worker(so, &ht, 1, 1, 1);
#endif
}
@ -1882,46 +1916,46 @@ Scheme_Object *scheme_places_deserialize(Scheme_Object *so, void *msg_memory) {
#endif
}
Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
if (argc == 2) {
Scheme_Place_Bi_Channel *ch;
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel;
}
else if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type)) {
ch = (Scheme_Place_Bi_Channel *) args[0];
}
else {
ch = NULL;
scheme_wrong_type("place-channel-put", "place-channel", 0, argc, args);
}
scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, args[1]);
Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[])
{
Scheme_Place_Bi_Channel *ch;
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel;
}
else if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type)) {
ch = (Scheme_Place_Bi_Channel *) args[0];
}
else {
scheme_wrong_count_m("place-channel-put", 2, 2, argc, args, 0);
ch = NULL;
scheme_wrong_type("place-channel-put", "place-channel", 0, argc, args);
}
scheme_place_async_send((Scheme_Place_Async_Channel *) ch->sendch, args[1]);
return scheme_void;
}
Scheme_Object *scheme_place_receive(int argc, Scheme_Object *args[]) {
if (argc == 1) {
Scheme_Place_Bi_Channel *ch;
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel;
}
else if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type)) {
ch = (Scheme_Place_Bi_Channel *) args[0];
}
else {
ch = NULL;
scheme_wrong_type("place-channel-get", "place-channel", 0, argc, args);
}
return scheme_place_async_receive((Scheme_Place_Async_Channel *) ch->recvch);
Scheme_Place_Bi_Channel *ch;
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
ch = (Scheme_Place_Bi_Channel *) ((Scheme_Place *) args[0])->channel;
}
else if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type)) {
ch = (Scheme_Place_Bi_Channel *) args[0];
}
else {
scheme_wrong_count_m("place-channel-get", 1, 1, argc, args, 0);
ch = NULL;
scheme_wrong_type("place-channel-get", "place-channel", 0, argc, args);
}
ESCAPED_BEFORE_HERE;
return scheme_place_async_receive((Scheme_Place_Async_Channel *) ch->recvch);
}
static Scheme_Object* scheme_place_allowed_p(int argc, Scheme_Object *args[])
{
Scheme_Hash_Table *ht = NULL;
if (places_deep_copy_worker(args[0], &ht, 0, 1, 0))
return scheme_true;
else
return scheme_false;
}
# ifdef MZ_PRECISE_GC
@ -2068,6 +2102,7 @@ static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Objec
int cnt;
o = scheme_places_serialize(uo, &msg_memory);
if (!o) bad_place_message(uo);
mzrt_mutex_lock(ch->lock);
{

View File

@ -1700,6 +1700,8 @@ typedef struct {
XFORM_NONGCING Scheme_Object *scheme_make_small_bignum(intptr_t v, Small_Bignum *s);
char *scheme_number_to_string(int radix, Scheme_Object *obj);
Scheme_Object *scheme_bignum_copy(const Scheme_Object *n);
XFORM_NONGCING int scheme_bignum_get_int_val(const Scheme_Object *o, intptr_t *v);
XFORM_NONGCING int scheme_bignum_get_unsigned_int_val(const Scheme_Object *o, uintptr_t *v);
XFORM_NONGCING int scheme_bignum_get_long_long_val(const Scheme_Object *o, mzlonglong *v);