Places: refactor place-channel use

This commit is contained in:
Kevin Tew 2010-08-05 11:03:17 -06:00
parent 500685c0f2
commit 35a65f90c2
5 changed files with 78 additions and 97 deletions

View File

@ -13,5 +13,4 @@
place-channel-recv place-channel-recv
place-channel? place-channel?
place? place?
place-channel-send/recv place-channel-send/recv)
place-channel->receiver-channel)

View File

@ -7,10 +7,10 @@
@(require scribble/manual @(require scribble/manual
scribble/urls scribble/urls
scribble/struct scribble/struct
(for-label scheme (for-label racket
scheme/base racket/base
scheme/contract racket/contract
scheme/place)) racket/place))
@; ---------------------------------------------------------------------- @; ----------------------------------------------------------------------
@ -18,41 +18,51 @@ The PLT futures API enables the development of parallel programs which
take advantage of machines with multiple processors, cores, or take advantage of machines with multiple processors, cores, or
hardware threads. hardware threads.
@defmodule[scheme/place]{} @defmodule[racket/place]{}
@defproc[(place [module-path module-path?] [start-proc proc?] [place-channel place-channel?]) place?]{ @defproc[(place [module-path module-path?] [start-proc proc?] [place-channel place-channel?]) place?]{
Starts running @scheme[start-proc] in parallel. scheme[start-proc] must Starts running @racket[start-proc] in parallel. @racket[start-proc] must
be a function defined in @scheme[module-path]. Each place is created with a scheme[place-channel] be a function defined in @racket[module-path]. Each place is created with a racket[place-channel]
that permits communication with the place originator. This initial channel can be overridden with that permits communication with the place originator. This initial channel can be overridden with
an optional scheme[place-channel] argument. The @scheme[place] an optional @racket[place-channel] argument. The @racket[place]
procedure returns immediately with a place descriptor value representing the newly constructed place. procedure returns immediately with a place descriptor value representing the newly constructed place.
} }
@defproc[(place-wait [p place?]) exact-integer?]{ @defproc[(place-wait [p place?]) exact-integer?]{
Returns the return value of a completed place @scheme[p], blocking until Returns the return value of a completed place @racket[p], blocking until
the place completes (if it has not already completed). the place completes (if it has not already completed).
} }
@defproc[(place? [x any/c]) boolean?]{ @defproc[(place? [x any/c]) boolean?]{
Returns @scheme[#t] if @scheme[x] is a place object. Returns @racket[#t] if @racket[x] is a place object.
}
@defproc[(place-channel) (values place-channel? place-channel?)]{
Returns two @racket[place-channel] objects.
One @racket[place-channel] should be used by the current @racket[place] to send
messages to a destination @racket[place].
The other @racket[place-channel] should be sent to a destination @racket[place] over
an existing @racket[place-channel].
} }
@defproc[(place-channel-send [ch place-channel?] [x any/c]) void]{ @defproc[(place-channel-send [ch place-channel?] [x any/c]) void]{
Sends an immutable message @scheme[x] on channel @scheme[ch]. Sends an immutable message @racket[x] on channel @racket[ch].
} }
@defproc[(place-channel-recv [p place-channel?]) any/c]{ @defproc[(place-channel-recv [p place-channel?]) any/c]{
Returns an immutable message received on channel @scheme[ch]. Returns an immutable message received on channel @racket[ch].
} }
@defproc[(place-channel? [x any/c]) boolean?]{ @defproc[(place-channel? [x any/c]) boolean?]{
Returns @scheme[#t] if @scheme[x] is a place-channel object. Returns @racket[#t] if @racket[x] is a place-channel object.
} }
@defproc[(place-channel-send/recv [ch place-channel?] [x any/c]) void]{ @defproc[(place-channel-send/recv [ch place-channel?] [x any/c]) void]{
Sends an immutable message @scheme[x] on channel @scheme[ch] and then Sends an immutable message @racket[x] on channel @racket[ch] and then
waits for a repy message. waits for a repy message.
Returns an immutable message received on channel @scheme[ch]. Returns an immutable message received on channel @racket[ch].
} }
@section[#:tag "example"]{How Do I Keep Those Cores Busy?} @section[#:tag "example"]{How Do I Keep Those Cores Busy?}
@ -60,7 +70,7 @@ hardware threads.
This code launches two places passing 1 and 2 as the initial channels This code launches two places passing 1 and 2 as the initial channels
and then waits for the places to complete and return. and then waits for the places to complete and return.
@schemeblock[ @racketblock[
(let ((pls (map (lambda (x) (place "place-worker.ss" 'place-main x)) (let ((pls (map (lambda (x) (place "place-worker.ss" 'place-main x))
(list 1 2)))) (list 1 2))))
(map place-wait pls)) (map place-wait pls))
@ -68,8 +78,8 @@ and then waits for the places to complete and return.
This is the code for the place-worker.ss module that each place will execute. This is the code for the place-worker.ss module that each place will execute.
@schemeblock[ @racketblock[
(module place-worker scheme (module place-worker racket
(provide place-main) (provide place-main)
(define (place-main x) (define (place-main x)
@ -77,13 +87,10 @@ This is the code for the place-worker.ss module that each place will execute.
] ]
@section[#:tag "place-channels"]{Place Channels} @section[#:tag "place-channels"]{Place Channels}
@;@defproc[(make-place-channel) channel?]{ Place channels can be used with @racket[place-channel-recv], or as a
@;Creates and returns a new channel.
Place channels can be used with @scheme[place-channel-recv], or as a
@tech[#:doc '(lib "scribblings/reference/reference.scrbl")]{synchronizable event} @tech[#:doc '(lib "scribblings/reference/reference.scrbl")]{synchronizable event}
(see @secref[#:doc '(lib "scribblings/reference/reference.scrbl") "sync"]) to receive a value (see @secref[#:doc '(lib "scribblings/reference/reference.scrbl") "sync"]) to receive a value
through the channel. The channel can be used with @scheme[place-channel-send] through the channel. The channel can be used with @racket[place-channel-send]
to send a value through the channel. to send a value through the channel.
@section[#:tag "messagepassingparallelism"]{Message Passing Parallelism} @section[#:tag "messagepassingparallelism"]{Message Passing Parallelism}
@ -91,18 +98,18 @@ to send a value through the channel.
Places can only communicate by passing immutable messages on place-channels. Places can only communicate by passing immutable messages on place-channels.
Only immutable pairs, vectors, and structs can be communicated across places channels. Only immutable pairs, vectors, and structs can be communicated across places channels.
@section[#:tag "logging"]{Architecture and Garbage Collection} @section[#:tag "places-architecture"]{Architecture and Garbage Collection}
Immutable messages communicated on place-channels are first copied to a shared Immutable messages communicated on place-channels are first copied to a shared
garbage collector called the master. Places are allowed to garbage collect garbage collector called the master. Places are allowed to garbage collect
independently of one another. The master collector, however, has to pause all independently of one another. The master collector, however, has to pause all
mutators before it can collect garbage. mutators before it can collect garbage.
@section[#:tag "compiling"]{Enabling Places in Racket Builds} @section[#:tag "enabling-places"]{Enabling Places in Racket Builds}
PLT's parallel-places support is only enabled if you pass PLT's parallel-places support is only enabled if you pass
@DFlag{enable-places} to @exec{configure} when you build PLT (and @DFlag{enable-places} to @exec{configure} when you build PLT (and
that build currently only works with @exec{mzscheme}, not with that build currently only works with @exec{racket}, not with
@exec{mred}). When parallel-future support is not enabled, @exec{gracket}). When parallel-places support is not enabled,
@scheme[place] usage is a syntax error. @racket[place] usage is a syntax error.
@; @FIXME{use threads to emulate places maybe?} @; @FIXME{use threads to emulate places maybe?}

View File

@ -31,7 +31,7 @@
(list (car x) 'b (cadr x)) (list (car x) 'b (cadr x))
(vector (vector-ref x 0) 'b (vector-ref x 1)) (vector (vector-ref x 0) 'b (vector-ref x 1))
#s((abuilding 1 building 2) 6 'utah 'no)) #s((abuilding 1 building 2) 6 'utah 'no))
(define pc1 (place-channel->receiver-channel (place-channel-recv ch))) (define pc1 (place-channel-recv ch))
(pcrss pc1 (string-append x "-ok"))) (pcrss pc1 (string-append x "-ok")))
) )
END END
@ -54,8 +54,8 @@ END
((list 'a 'a) (list 'a 'b 'a)) ((list 'a 'a) (list 'a 'b 'a))
(#(a a) #(a b a)) (#(a a) #(a b a))
(h1 #s((abuilding 1 building 2) 6 'utah 'no))) (h1 #s((abuilding 1 building 2) 6 'utah 'no)))
(define pc1 (place-channel)) (define-values (pc1 pc2) (place-channel))
(place-channel-send pl pc1) (place-channel-send pl pc2)
(test "Testing-ok" place-channel-send/recv pc1 "Testing") (test "Testing-ok" place-channel-send/recv pc1 "Testing")
(place-wait pl) (place-wait pl)
) )

View File

@ -24,12 +24,10 @@ static Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]);
static Scheme_Object *def_place_exit_handler_proc(int argc, Scheme_Object *args[]); static Scheme_Object *def_place_exit_handler_proc(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel_receiver_channel(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_async_channel_create(); static Scheme_Place_Async_Channel *scheme_place_async_channel_create();
static Scheme_Object *scheme_place_bi_channel_create(); static Scheme_Place_Bi_Channel *scheme_place_bi_channel_create();
static Scheme_Object *scheme_place_bi_peer_channel_create(Scheme_Object *orig); static Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig);
static void scheme_place_bi_channel_set_signal(Scheme_Object *cho);
static int scheme_place_channel_ready(Scheme_Object *so); static int scheme_place_channel_ready(Scheme_Object *so);
@ -80,7 +78,6 @@ void scheme_init_place(Scheme_Env *env)
PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv); PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv); PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv);
PLACE_PRIM_W_ARITY("place-channel->receiver-channel", scheme_place_channel_receiver_channel, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel-send", scheme_place_send, 1, 2, plenv); PLACE_PRIM_W_ARITY("place-channel-send", scheme_place_send, 1, 2, plenv);
PLACE_PRIM_W_ARITY("place-channel-recv", scheme_place_recv, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-channel-recv", scheme_place_recv, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel?", scheme_place_channel_p, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-channel?", scheme_place_channel_p, 1, 1, plenv);
@ -172,12 +169,11 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
place_data->function = so; place_data->function = so;
place_data->ready = ready; place_data->ready = ready;
if (argc == 2) { if (argc == 2) {
Scheme_Object *channel; Scheme_Place_Bi_Channel *channel;
channel = scheme_place_bi_channel_create(); channel = scheme_place_bi_channel_create();
place->channel = channel; place->channel = (Scheme_Object *) channel;
scheme_place_bi_channel_set_signal(channel);
channel = scheme_place_bi_peer_channel_create(channel); channel = scheme_place_bi_peer_channel_create(channel);
place_data->channel = channel; place_data->channel = (Scheme_Object *) channel;
} }
else { else {
Scheme_Object *channel; Scheme_Object *channel;
@ -934,7 +930,6 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
} }
else { else {
channel = place_data->channel; channel = place_data->channel;
scheme_place_bi_channel_set_signal(channel);
} }
mzrt_sema_post(place_data->ready); mzrt_sema_post(place_data->ready);
@ -1176,7 +1171,7 @@ static void* GC_master_malloc_tagged(size_t size) {
return ptr; return ptr;
} }
Scheme_Object *scheme_place_async_channel_create() { Scheme_Place_Async_Channel *scheme_place_async_channel_create() {
Scheme_Object **msgs; Scheme_Object **msgs;
Scheme_Place_Async_Channel *ch; Scheme_Place_Async_Channel *ch;
@ -1191,11 +1186,11 @@ Scheme_Object *scheme_place_async_channel_create() {
mzrt_mutex_create(&ch->lock); mzrt_mutex_create(&ch->lock);
ch->msgs = msgs; ch->msgs = msgs;
ch->wakeup_signal = NULL; ch->wakeup_signal = NULL;
return (Scheme_Object *)ch; return ch;
} }
Scheme_Object *scheme_place_bi_channel_create() { Scheme_Place_Bi_Channel *scheme_place_bi_channel_create() {
Scheme_Object *tmp; Scheme_Place_Async_Channel *tmp;
Scheme_Place_Bi_Channel *ch; Scheme_Place_Bi_Channel *ch;
ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel)); ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel));
@ -1205,26 +1200,28 @@ Scheme_Object *scheme_place_bi_channel_create() {
ch->sendch = tmp; ch->sendch = tmp;
tmp = scheme_place_async_channel_create(); tmp = scheme_place_async_channel_create();
ch->recvch = tmp; ch->recvch = tmp;
return (Scheme_Object *)ch; return ch;
} }
Scheme_Object *scheme_place_bi_peer_channel_create(Scheme_Object *orig) { Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig) {
Scheme_Place_Bi_Channel *ch; Scheme_Place_Bi_Channel *ch;
ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel)); ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel));
ch->so.type = scheme_place_bi_channel_type; ch->so.type = scheme_place_bi_channel_type;
ch->sendch = ((Scheme_Place_Bi_Channel *)orig)->recvch; ch->sendch = orig->recvch;
ch->recvch = ((Scheme_Place_Bi_Channel *)orig)->sendch; ch->recvch = orig->sendch;
return (Scheme_Object *)ch; return ch;
} }
static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]) { static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]) {
if (argc == 0) { if (argc == 0) {
Scheme_Place_Bi_Channel *ch; Scheme_Place_Bi_Channel *ch;
Scheme_Object *a[2];
ch = scheme_place_bi_channel_create(); ch = scheme_place_bi_channel_create();
scheme_place_bi_channel_set_signal((Scheme_Object *) ch); a[0] = (Scheme_Object *) ch;
return ch; a[1] = (Scheme_Object *) scheme_place_bi_peer_channel_create(ch);
return scheme_values(2, a);
} }
else { else {
scheme_wrong_count_m("place-channel", 0, 0, argc, args, 0); scheme_wrong_count_m("place-channel", 0, 0, argc, args, 0);
@ -1232,39 +1229,11 @@ static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]) {
return scheme_true; return scheme_true;
} }
static Scheme_Object *scheme_place_channel_receiver_channel(int argc, Scheme_Object *args[]) {
if (argc == 1) {
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type)) {
Scheme_Place_Bi_Channel *ch;
ch = scheme_place_bi_peer_channel_create(args[0]);
scheme_place_bi_channel_set_signal((Scheme_Object *) ch);
return ch;
}
else {
scheme_wrong_type("place-channel->receiver-channel", "place-channel?", 0, argc, args);
}
}
else {
scheme_wrong_count_m("place-channel->receiver-channel", 1, 1, argc, args, 0);
}
return scheme_true;
}
static void scheme_place_bi_channel_set_signal(Scheme_Object *cho) {
Scheme_Place_Async_Channel *ch;
void *signaldescr;
signaldescr = scheme_get_signal_handle();
ch = (Scheme_Place_Async_Channel *) ((Scheme_Place_Bi_Channel *)cho)->recvch;
ch->wakeup_signal = signaldescr;
}
static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]) static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[])
{ {
return SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type) ? scheme_true : scheme_false; return SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type) ? scheme_true : scheme_false;
} }
void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o) { void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o) {
int cnt; int cnt;
mzrt_mutex_lock(ch->lock); mzrt_mutex_lock(ch->lock);
@ -1338,9 +1307,15 @@ Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) {
} }
mzrt_mutex_unlock(ch->lock); mzrt_mutex_unlock(ch->lock);
if(msg) break; if(msg) break;
else {
void *signaldescr;
signaldescr = scheme_get_signal_handle();
ch->wakeup_signal = signaldescr;
scheme_thread_block(0); scheme_thread_block(0);
scheme_block_until((Scheme_Ready_Fun) scheme_place_async_ch_ready, NULL, (Scheme_Object *) ch, 0); scheme_block_until((Scheme_Ready_Fun) scheme_place_async_ch_ready, NULL, (Scheme_Object *) ch, 0);
} }
}
return msg; return msg;
} }

View File

@ -3499,18 +3499,6 @@ void scheme_done_with_process_id(int pid, int is_group);
# endif # endif
#endif #endif
typedef struct Scheme_Place_Bi_Channel {
Scheme_Object so;
Scheme_Object *sendch;
Scheme_Object *recvch;
} Scheme_Place_Bi_Channel;
typedef struct Scheme_Place {
Scheme_Object so;
void *proc_thread;
Scheme_Object *channel;
} Scheme_Place;
typedef struct Scheme_Place_Async_Channel { typedef struct Scheme_Place_Async_Channel {
Scheme_Object so; Scheme_Object so;
int in; int in;
@ -3524,6 +3512,18 @@ typedef struct Scheme_Place_Async_Channel {
void *wakeup_signal; void *wakeup_signal;
} Scheme_Place_Async_Channel; } Scheme_Place_Async_Channel;
typedef struct Scheme_Place_Bi_Channel {
Scheme_Object so;
Scheme_Place_Async_Channel *sendch;
Scheme_Place_Async_Channel *recvch;
} Scheme_Place_Bi_Channel;
typedef struct Scheme_Place {
Scheme_Object so;
void *proc_thread;
Scheme_Object *channel;
} Scheme_Place;
Scheme_Env *scheme_place_instance_init(); Scheme_Env *scheme_place_instance_init();
void scheme_place_instance_destroy(); void scheme_place_instance_destroy();
void scheme_kill_green_thread_timer(); void scheme_kill_green_thread_timer();