diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index 5c7b07df3c..745d450945 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -13,5 +13,4 @@ place-channel-recv place-channel? place? - place-channel-send/recv - place-channel->receiver-channel) + place-channel-send/recv) diff --git a/collects/scribblings/places/places.scrbl b/collects/scribblings/places/places.scrbl index ef8ffdd247..15a3282857 100644 --- a/collects/scribblings/places/places.scrbl +++ b/collects/scribblings/places/places.scrbl @@ -7,10 +7,10 @@ @(require scribble/manual scribble/urls scribble/struct - (for-label scheme - scheme/base - scheme/contract - scheme/place)) + (for-label racket + racket/base + racket/contract + racket/place)) @; ---------------------------------------------------------------------- @@ -18,41 +18,51 @@ The PLT futures API enables the development of parallel programs which take advantage of machines with multiple processors, cores, or hardware threads. -@defmodule[scheme/place]{} +@defmodule[racket/place]{} @defproc[(place [module-path module-path?] [start-proc proc?] [place-channel place-channel?]) place?]{ - Starts running @scheme[start-proc] in parallel. scheme[start-proc] must - be a function defined in @scheme[module-path]. Each place is created with a scheme[place-channel] + Starts running @racket[start-proc] in parallel. @racket[start-proc] must + be a function defined in @racket[module-path]. Each place is created with a racket[place-channel] that permits communication with the place originator. This initial channel can be overridden with - an optional scheme[place-channel] argument. The @scheme[place] + an optional @racket[place-channel] argument. The @racket[place] procedure returns immediately with a place descriptor value representing the newly constructed place. } @defproc[(place-wait [p place?]) exact-integer?]{ - Returns the return value of a completed place @scheme[p], blocking until + Returns the return value of a completed place @racket[p], blocking until the place completes (if it has not already completed). } @defproc[(place? [x any/c]) boolean?]{ - Returns @scheme[#t] if @scheme[x] is a place object. + Returns @racket[#t] if @racket[x] is a place object. +} + +@defproc[(place-channel) (values place-channel? place-channel?)]{ + Returns two @racket[place-channel] objects. + + One @racket[place-channel] should be used by the current @racket[place] to send + messages to a destination @racket[place]. + + The other @racket[place-channel] should be sent to a destination @racket[place] over + an existing @racket[place-channel]. } @defproc[(place-channel-send [ch place-channel?] [x any/c]) void]{ - Sends an immutable message @scheme[x] on channel @scheme[ch]. + Sends an immutable message @racket[x] on channel @racket[ch]. } @defproc[(place-channel-recv [p place-channel?]) any/c]{ - Returns an immutable message received on channel @scheme[ch]. + Returns an immutable message received on channel @racket[ch]. } @defproc[(place-channel? [x any/c]) boolean?]{ - Returns @scheme[#t] if @scheme[x] is a place-channel object. + Returns @racket[#t] if @racket[x] is a place-channel object. } @defproc[(place-channel-send/recv [ch place-channel?] [x any/c]) void]{ - Sends an immutable message @scheme[x] on channel @scheme[ch] and then + Sends an immutable message @racket[x] on channel @racket[ch] and then waits for a repy message. - Returns an immutable message received on channel @scheme[ch]. + Returns an immutable message received on channel @racket[ch]. } @section[#:tag "example"]{How Do I Keep Those Cores Busy?} @@ -60,7 +70,7 @@ hardware threads. This code launches two places passing 1 and 2 as the initial channels and then waits for the places to complete and return. -@schemeblock[ +@racketblock[ (let ((pls (map (lambda (x) (place "place-worker.ss" 'place-main x)) (list 1 2)))) (map place-wait pls)) @@ -68,8 +78,8 @@ and then waits for the places to complete and return. This is the code for the place-worker.ss module that each place will execute. -@schemeblock[ -(module place-worker scheme +@racketblock[ +(module place-worker racket (provide place-main) (define (place-main x) @@ -77,13 +87,10 @@ This is the code for the place-worker.ss module that each place will execute. ] @section[#:tag "place-channels"]{Place Channels} -@;@defproc[(make-place-channel) channel?]{ -@;Creates and returns a new channel. - -Place channels can be used with @scheme[place-channel-recv], or as a +Place channels can be used with @racket[place-channel-recv], or as a @tech[#:doc '(lib "scribblings/reference/reference.scrbl")]{synchronizable event} (see @secref[#:doc '(lib "scribblings/reference/reference.scrbl") "sync"]) to receive a value -through the channel. The channel can be used with @scheme[place-channel-send] +through the channel. The channel can be used with @racket[place-channel-send] to send a value through the channel. @section[#:tag "messagepassingparallelism"]{Message Passing Parallelism} @@ -91,18 +98,18 @@ to send a value through the channel. Places can only communicate by passing immutable messages on place-channels. Only immutable pairs, vectors, and structs can be communicated across places channels. -@section[#:tag "logging"]{Architecture and Garbage Collection} +@section[#:tag "places-architecture"]{Architecture and Garbage Collection} Immutable messages communicated on place-channels are first copied to a shared garbage collector called the master. Places are allowed to garbage collect independently of one another. The master collector, however, has to pause all mutators before it can collect garbage. -@section[#:tag "compiling"]{Enabling Places in Racket Builds} +@section[#:tag "enabling-places"]{Enabling Places in Racket Builds} PLT's parallel-places support is only enabled if you pass @DFlag{enable-places} to @exec{configure} when you build PLT (and -that build currently only works with @exec{mzscheme}, not with -@exec{mred}). When parallel-future support is not enabled, -@scheme[place] usage is a syntax error. +that build currently only works with @exec{racket}, not with +@exec{gracket}). When parallel-places support is not enabled, +@racket[place] usage is a syntax error. @; @FIXME{use threads to emulate places maybe?} diff --git a/collects/tests/racket/place-channel.rktl b/collects/tests/racket/place-channel.rktl index 12d9722168..374c21e8ac 100644 --- a/collects/tests/racket/place-channel.rktl +++ b/collects/tests/racket/place-channel.rktl @@ -31,7 +31,7 @@ (list (car x) 'b (cadr x)) (vector (vector-ref x 0) 'b (vector-ref x 1)) #s((abuilding 1 building 2) 6 'utah 'no)) - (define pc1 (place-channel->receiver-channel (place-channel-recv ch))) + (define pc1 (place-channel-recv ch)) (pcrss pc1 (string-append x "-ok"))) ) END @@ -54,8 +54,8 @@ END ((list 'a 'a) (list 'a 'b 'a)) (#(a a) #(a b a)) (h1 #s((abuilding 1 building 2) 6 'utah 'no))) - (define pc1 (place-channel)) - (place-channel-send pl pc1) + (define-values (pc1 pc2) (place-channel)) + (place-channel-send pl pc2) (test "Testing-ok" place-channel-send/recv pc1 "Testing") (place-wait pl) ) diff --git a/src/racket/src/places.c b/src/racket/src/places.c index 4ac4d8f1ac..aabaa59d97 100644 --- a/src/racket/src/places.c +++ b/src/racket/src/places.c @@ -24,12 +24,10 @@ static Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]); static Scheme_Object *def_place_exit_handler_proc(int argc, Scheme_Object *args[]); static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]); -static Scheme_Object *scheme_place_channel_receiver_channel(int argc, Scheme_Object *args[]); -static Scheme_Object *scheme_place_async_channel_create(); -static Scheme_Object *scheme_place_bi_channel_create(); -static Scheme_Object *scheme_place_bi_peer_channel_create(Scheme_Object *orig); -static void scheme_place_bi_channel_set_signal(Scheme_Object *cho); +static Scheme_Place_Async_Channel *scheme_place_async_channel_create(); +static Scheme_Place_Bi_Channel *scheme_place_bi_channel_create(); +static Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig); static int scheme_place_channel_ready(Scheme_Object *so); @@ -80,7 +78,6 @@ void scheme_init_place(Scheme_Env *env) PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv); PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv); - PLACE_PRIM_W_ARITY("place-channel->receiver-channel", scheme_place_channel_receiver_channel, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-channel-send", scheme_place_send, 1, 2, plenv); PLACE_PRIM_W_ARITY("place-channel-recv", scheme_place_recv, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-channel?", scheme_place_channel_p, 1, 1, plenv); @@ -172,12 +169,11 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { place_data->function = so; place_data->ready = ready; if (argc == 2) { - Scheme_Object *channel; + Scheme_Place_Bi_Channel *channel; channel = scheme_place_bi_channel_create(); - place->channel = channel; - scheme_place_bi_channel_set_signal(channel); + place->channel = (Scheme_Object *) channel; channel = scheme_place_bi_peer_channel_create(channel); - place_data->channel = channel; + place_data->channel = (Scheme_Object *) channel; } else { Scheme_Object *channel; @@ -934,7 +930,6 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { } else { channel = place_data->channel; - scheme_place_bi_channel_set_signal(channel); } mzrt_sema_post(place_data->ready); @@ -1176,7 +1171,7 @@ static void* GC_master_malloc_tagged(size_t size) { return ptr; } -Scheme_Object *scheme_place_async_channel_create() { +Scheme_Place_Async_Channel *scheme_place_async_channel_create() { Scheme_Object **msgs; Scheme_Place_Async_Channel *ch; @@ -1191,11 +1186,11 @@ Scheme_Object *scheme_place_async_channel_create() { mzrt_mutex_create(&ch->lock); ch->msgs = msgs; ch->wakeup_signal = NULL; - return (Scheme_Object *)ch; + return ch; } -Scheme_Object *scheme_place_bi_channel_create() { - Scheme_Object *tmp; +Scheme_Place_Bi_Channel *scheme_place_bi_channel_create() { + Scheme_Place_Async_Channel *tmp; Scheme_Place_Bi_Channel *ch; ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel)); @@ -1205,26 +1200,28 @@ Scheme_Object *scheme_place_bi_channel_create() { ch->sendch = tmp; tmp = scheme_place_async_channel_create(); ch->recvch = tmp; - return (Scheme_Object *)ch; + return ch; } -Scheme_Object *scheme_place_bi_peer_channel_create(Scheme_Object *orig) { +Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig) { Scheme_Place_Bi_Channel *ch; ch = GC_master_malloc_tagged(sizeof(Scheme_Place_Bi_Channel)); ch->so.type = scheme_place_bi_channel_type; - ch->sendch = ((Scheme_Place_Bi_Channel *)orig)->recvch; - ch->recvch = ((Scheme_Place_Bi_Channel *)orig)->sendch; - return (Scheme_Object *)ch; + ch->sendch = orig->recvch; + ch->recvch = orig->sendch; + return ch; } static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]) { if (argc == 0) { Scheme_Place_Bi_Channel *ch; + Scheme_Object *a[2]; ch = scheme_place_bi_channel_create(); - scheme_place_bi_channel_set_signal((Scheme_Object *) ch); - return ch; + a[0] = (Scheme_Object *) ch; + a[1] = (Scheme_Object *) scheme_place_bi_peer_channel_create(ch); + return scheme_values(2, a); } else { scheme_wrong_count_m("place-channel", 0, 0, argc, args, 0); @@ -1232,39 +1229,11 @@ static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]) { return scheme_true; } -static Scheme_Object *scheme_place_channel_receiver_channel(int argc, Scheme_Object *args[]) { - if (argc == 1) { - if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type)) { - Scheme_Place_Bi_Channel *ch; - ch = scheme_place_bi_peer_channel_create(args[0]); - scheme_place_bi_channel_set_signal((Scheme_Object *) ch); - return ch; - } - else { - scheme_wrong_type("place-channel->receiver-channel", "place-channel?", 0, argc, args); - } - } - else { - scheme_wrong_count_m("place-channel->receiver-channel", 1, 1, argc, args, 0); - } - return scheme_true; -} - - -static void scheme_place_bi_channel_set_signal(Scheme_Object *cho) { - Scheme_Place_Async_Channel *ch; - void *signaldescr; - signaldescr = scheme_get_signal_handle(); - ch = (Scheme_Place_Async_Channel *) ((Scheme_Place_Bi_Channel *)cho)->recvch; - ch->wakeup_signal = signaldescr; -} - static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]) { return SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_bi_channel_type) ? scheme_true : scheme_false; } - void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o) { int cnt; mzrt_mutex_lock(ch->lock); @@ -1338,8 +1307,14 @@ Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) { } mzrt_mutex_unlock(ch->lock); if(msg) break; - scheme_thread_block(0); - scheme_block_until((Scheme_Ready_Fun) scheme_place_async_ch_ready, NULL, (Scheme_Object *) ch, 0); + else { + void *signaldescr; + signaldescr = scheme_get_signal_handle(); + ch->wakeup_signal = signaldescr; + + scheme_thread_block(0); + scheme_block_until((Scheme_Ready_Fun) scheme_place_async_ch_ready, NULL, (Scheme_Object *) ch, 0); + } } return msg; } diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 5e91e0c4a1..014e23c79f 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -3499,18 +3499,6 @@ void scheme_done_with_process_id(int pid, int is_group); # endif #endif -typedef struct Scheme_Place_Bi_Channel { - Scheme_Object so; - Scheme_Object *sendch; - Scheme_Object *recvch; -} Scheme_Place_Bi_Channel; - -typedef struct Scheme_Place { - Scheme_Object so; - void *proc_thread; - Scheme_Object *channel; -} Scheme_Place; - typedef struct Scheme_Place_Async_Channel { Scheme_Object so; int in; @@ -3524,6 +3512,18 @@ typedef struct Scheme_Place_Async_Channel { void *wakeup_signal; } Scheme_Place_Async_Channel; +typedef struct Scheme_Place_Bi_Channel { + Scheme_Object so; + Scheme_Place_Async_Channel *sendch; + Scheme_Place_Async_Channel *recvch; +} Scheme_Place_Bi_Channel; + +typedef struct Scheme_Place { + Scheme_Object so; + void *proc_thread; + Scheme_Object *channel; +} Scheme_Place; + Scheme_Env *scheme_place_instance_init(); void scheme_place_instance_destroy(); void scheme_kill_green_thread_timer();