Rename place-channel-recv place-channel-receive

This commit is contained in:
Kevin Tew 2011-03-31 13:35:48 -06:00
parent f7ab3fe2b3
commit 26d7768d3d
12 changed files with 79 additions and 71 deletions

View File

@ -16,18 +16,18 @@
place-kill
place-channel
place-channel-send
place-channel-recv
place-channel-receive
place-channel?
place?
place-channel-send/recv
place-channel-send/receive
processor-count
(rename-out [pl-place-enabled? place-enabled?]))
(define-struct TH-place (th ch) #:property prop:evt (lambda (x) (TH-place-channel-out (TH-place-ch x))))
(define (place-channel-send/recv ch msg)
(define (place-channel-send/receive ch msg)
(place-channel-send ch msg)
(place-channel-recv ch))
(place-channel-receive ch))
(define (make-th-async-channel)
(define ch (make-channel))
@ -98,12 +98,12 @@
(thread-send th
(deep-copy msg)))
(define (th-place-channel-recv pl)
(define (th-place-channel-receive pl)
(channel-get
(cond
[(TH-place? pl) (TH-place-channel-in (TH-place-ch pl))]
[(TH-place-channel? pl) (TH-place-channel-in pl)]
[else (raise-type-error 'place-channel-recv "expect a place? or place-channel?" pl)])))
[else (raise-type-error 'place-channel-receive "expect a place? or place-channel?" pl)])))
(define (th-place-channel? pl)
(or (TH-place? pl)
@ -117,6 +117,6 @@
(define-pl place-kill pl-place-kill th-place-kill)
(define-pl place-channel pl-place-channel th-place-channel)
(define-pl place-channel-send pl-place-channel-send th-place-channel-send)
(define-pl place-channel-recv pl-place-channel-recv th-place-channel-recv)
(define-pl place-channel-receive pl-place-channel-receive th-place-channel-receive)
(define-pl place-channel? pl-place-channel? th-place-channel?)
(define-pl place? pl-place? TH-place?)

View File

@ -184,6 +184,14 @@ generate events (see @racket[prop:evt]).
or blocked on events with timeouts that have not yet expired. The
event's result is @|void-const|.}
@item{@racket[place-channel] --- a place-channel is ready when
@racket[place-channel-receive] would not block. The channel's result as an
event is the same as the @racket[place-channel-receive] result.}
@item{@racket[place] --- a place is ready when @racket[place-channel-receive]
would not block. The result as an event is the same as the
@racket[place-channel-receive] result.}
]
@;------------------------------------------------------------------------

View File

@ -48,7 +48,7 @@ places that share the value, because they are allowed in a
A @tech{place channel} can be used as a @tech{synchronizable event}
(see @secref["sync"]) to receive a value through the channel. A place
can also receive messages with @racket[place-channel-recv], and
can also receive messages with @racket[place-channel-receive], and
messages can be sent with @racket[place-channel-send].
Constraints on messages across a place channel---and therefore on the
@ -66,7 +66,7 @@ message to each, and then waits for the places to complete and return:
(for ([i (in-range 2)]
[p pls])
(place-channel-send p i)
(printf "~a\n" (place-channel-recv p)))
(printf "~a\n" (place-channel-receive p)))
(map place-wait pls))
]
@ -81,7 +81,7 @@ racket
(define (place-main pch)
(place-channel-send pch (format "Hello from place ~a"
(place-channel-recv pch))))
(place-channel-receive pch))))
]
@ -134,11 +134,11 @@ racket
Sends an immutable message @racket[v] on channel @racket[pch].
}
@defproc[(place-channel-recv [pch place-channel?]) any/c]{
@defproc[(place-channel-receive [pch place-channel?]) any/c]{
Returns an immutable message received on channel @racket[pch].
}
@defproc[(place-channel-send/recv [pch place-channel?] [v any/c]) void]{
@defproc[(place-channel-send/receive [pch place-channel?] [v any/c]) void]{
Sends an immutable message @racket[v] on channel @racket[pch] and then
waits for a reply message on the same channel.
}

View File

@ -216,7 +216,7 @@
[pl null])
(define/public (send/msg msg) (place-channel-send pl msg))
(define/public (recv/msg) (place-channel-recv pl))
(define/public (recv/msg) (place-channel-receive pl))
(define/public (get-id) id)
(define/public (get-out) pl)
(define/public (kill) #f)
@ -239,7 +239,7 @@
(place/anon (ch)
(let ([cmc ((dynamic-require 'compiler/cm 'make-caching-managed-compile-zo))])
(let loop ()
(match (place-channel-recv ch)
(match (place-channel-receive ch)
[(list 'DIE) void]
[(list name dir file)
(let ([dir (bytes->path dir)]
@ -254,7 +254,7 @@
(match cmd
['lock
(send/msg (list (list 'LOCK (path->bytes fn)) "" ""))
(match (place-channel-recv ch)
(match (place-channel-receive ch)
[(list 'locked) #t]
[(list 'compiled) #f])]
['unlock (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))]))

View File

@ -20,7 +20,7 @@
(define count 10)
(define fourk-b-message (make-bytes message-size 66))
(for ([i (in-range count)])
(place-channel-recv ch)
(place-channel-receive ch)
(place-channel-send ch fourk-b-message)))])
(define message-size (* 4024 1024))
@ -30,7 +30,7 @@
(time-apply (lambda ()
(for ([i (in-range count)])
(pp:place-channel-send pl four-k-message)
(pp:place-channel-recv pl))) null))
(pp:place-channel-receive pl))) null))
(print-out "processes" (/ (* 2 count message-size) (/ t2 1000)))
@ -49,7 +49,7 @@
(define count 50)
(define fourk-b-message (make-bytes message-size 66))
(for ([i (in-range count)])
(place-channel-recv ch)
(place-channel-receive ch)
(place-channel-send ch fourk-b-message)))
)
END
@ -63,7 +63,7 @@ END
(time-apply (lambda ()
(for ([i (in-range count)])
(place-channel-send pl four-k-message)
(place-channel-recv pl))) null))
(place-channel-receive pl))) null))
(print-out "places" (/ (* 2 count message-size) (/ t2 1000)))
@ -79,7 +79,7 @@ END
(define (place-main ch)
(define count 500)
(for ([i (in-range count)])
(place-channel-send ch (place-channel-recv ch))))
(place-channel-send ch (place-channel-receive ch))))
)
END
"pct1.ss")
@ -94,7 +94,7 @@ END
(time-apply (lambda ()
(for ([i (in-range count)])
(place-channel-send pl tree)
(place-channel-recv pl))) null))
(place-channel-receive pl))) null))
(printf "~a ~a ~a ~a\n" r t1 t2 t3)

View File

@ -13,7 +13,7 @@
(define (barrier ch)
(place-channel-send ch 0)
(place-channel-recv ch))
(place-channel-receive ch))
(define (place-main ch)
(place-channel-send ch 2)
@ -29,7 +29,7 @@ END
(define (barrier ch)
(place-channel-send ch 0)
(place-channel-recv ch))
(place-channel-receive ch))
(define (place-main ch)
(place-channel-send ch 2)
@ -49,7 +49,7 @@ END
(let ([pls (time-n msg 0
(for/list ([i (in-range plcnt)])
(let ([p (place module-path 'place-main)])
(place-channel-recv p)
(place-channel-receive p)
p)))])
(barrier-m pls)
(places-wait pls))
@ -58,7 +58,7 @@ END
(let ([pls (time-n msg 1
(let ([pls (for/list ([i (in-range plcnt)])
(place module-path 'place-main))])
(map place-channel-recv pls) pls))])
(map place-channel-receive pls) pls))])
(barrier-m pls)
(places-wait pls)))

View File

@ -15,9 +15,9 @@
place
place-wait
place-kill
place-channel-recv
place-channel-receive
place-channel-send
place-channel-send/recv
place-channel-send/receive
place-child-channel
place/base
map-reduce/lambda
@ -41,7 +41,7 @@
(flush-output out))
;; receives msg on channel ch
(define (place-channel-recv ch)
(define (place-channel-receive ch)
(deserialize (fasl->s-exp (read (place-channel-s-in (resolve->channel ch))))))
;; create a place given a module file path and a func-name to invoke
@ -78,9 +78,9 @@
(subprocess-wait spo)
(subprocess-status spo)))
(define (place-channel-send/recv ch x)
(define (place-channel-send/receive ch x)
(place-channel-send ch x)
(place-channel-recv ch))
(place-channel-receive ch))
;; splits lst into n equal pieces
(define (split-n n lst)
@ -166,12 +166,12 @@
#'(begin
(define places (for/list ([i (in-range (processor-count))])
(place/lambda (name ch)
(place-channel-send ch ((lambda (listvar) body ...) (place-channel-recv ch))))))
(place-channel-send ch ((lambda (listvar) body ...) (place-channel-receive ch))))))
(for ([p places]
[item (split-n (processor-count) lst)])
(place-channel-send p item))
(define result ((lambda (listvar) body ...) (map place-channel-recv places)))
(define result ((lambda (listvar) body ...) (map place-channel-receive places)))
(map place-wait places)
(map place-kill places)
result)]))

View File

@ -24,12 +24,12 @@
fn)
(define (barrier-m pls)
(for ([ch pls]) (place-channel-recv ch))
(for ([ch pls]) (place-channel-receive ch))
(for ([ch pls]) (place-channel-send ch 1)))
(define (barrier ch)
(place-channel-send ch 0)
(place-channel-recv ch))
(place-channel-receive ch))
(define (places-wait pls)
(for ([p pls]) (place-wait p)))

View File

@ -14,7 +14,7 @@
(provide place-main)
(define (place-main ch)
(match (place-channel-recv ch)
(match (place-channel-receive ch)
[(list id reps cnt)
(define ids (number->string id))
(for ([j (in-range reps)])

View File

@ -22,7 +22,7 @@
(with-syntax
[(x (syntax-local-introduce #'x))]
#'(place-channel-send ch
(let ([x (place-channel-recv ch)])
(let ([x (place-channel-receive ch)])
body)))]))
(define-syntax-rule (pcrss ch body ...) (begin (pcrs ch body) ...))
@ -37,10 +37,10 @@
#s((abuilding 1 building 2) 6 'utah 'no)
`(,x))
(define pc1 (place-channel-recv ch))
(define pc1 (place-channel-receive ch))
(pcrss pc1 (string-append x "-ok"))
(define pc3 (first (place-channel-recv ch)))
(define pc3 (first (place-channel-receive ch)))
(pcrss pc3 (string-append x "-ok3"))
(pcrss ch (begin (flvector-set! x 2 5.0) "Ready1"))
@ -48,15 +48,15 @@
(pcrss ch (begin (bytes-set! x 2 67) "Ready3"))
(pcrss ch (begin (bytes-set! x 2 67) "Ready4"))
(define pc5 (place-channel-recv ch))
(define pc5 (place-channel-receive ch))
(place-channel-send pc5 "Ready5")
)
)
END
"pct1.ss")
(define-syntax-rule (pc-send-recv-test ch (send expect) ...)
(begin (test expect place-channel-send/recv ch send) ...))
(define-syntax-rule (pc-send-receive-test ch (send expect) ...)
(begin (test expect place-channel-send/receive ch send) ...))
(define-struct building (rooms location) #:prefab)
@ -70,7 +70,7 @@ END
(define b2 (make-shared-bytes 4 65))
(let ([pl (place "pct1.ss" 'place-main)])
(pc-send-recv-test pl
(pc-send-receive-test pl
(1 2 )
("Hello" "Hello-ok")
((cons 'a 'a) (cons 'a 'b))
@ -81,22 +81,22 @@ END
(define-values (pc1 pc2) (place-channel))
(place-channel-send pl pc2)
(test "Testing-ok" place-channel-send/recv pc1 "Testing")
(test "Testing-ok" place-channel-send/receive pc1 "Testing")
(define-values (pc3 pc4) (place-channel))
(place-channel-send pl (list pc4))
(test "Testing-ok3" place-channel-send/recv pc3 "Testing")
(test "Testing-ok3" place-channel-send/receive pc3 "Testing")
(test "Ready1" place-channel-send/recv pl flv1)
(test "Ready1" place-channel-send/receive pl flv1)
(test 5.0 flvector-ref flv1 2)
(test "Ready2" place-channel-send/recv pl flv2)
(test "Ready2" place-channel-send/receive pl flv2)
(test 6.0 flvector-ref flv2 2)
(test "Ready3" place-channel-send/recv pl b1)
(test "Ready3" place-channel-send/receive pl b1)
(test 67 bytes-ref b1 2)
(test "Ready4" place-channel-send/recv pl b2)
(test "Ready4" place-channel-send/receive pl b2)
(test 67 bytes-ref b2 2)
(define-values (pc5 pc6) (place-channel))

View File

@ -20,10 +20,10 @@
(arity-test place-wait 1 1)
(arity-test place-channel 0 0)
(arity-test place-channel-send 2 2)
(arity-test place-channel-recv 1 1)
(arity-test place-channel-receive 1 1)
(arity-test place-channel? 1 1)
(arity-test place? 1 1)
(arity-test place-channel-send/recv 2 2)
(arity-test place-channel-send/receive 2 2)
(arity-test processor-count 0 0)
(err/rt-test (place "foo.rkt"))

View File

@ -22,7 +22,7 @@ static Scheme_Object *scheme_place_kill(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_sleep(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_p(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_receive(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel_p(int argc, Scheme_Object *args[]);
static Scheme_Object *def_place_exit_handler_proc(int argc, Scheme_Object *args[]);
static Scheme_Object *scheme_place_channel(int argc, Scheme_Object *args[]);
@ -33,7 +33,7 @@ static Scheme_Place_Bi_Channel *scheme_place_bi_channel_create();
static Scheme_Place_Bi_Channel *scheme_place_bi_peer_channel_create(Scheme_Place_Bi_Channel *orig);
static int scheme_place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *sinfo);
static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Object *o);
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch);
static Scheme_Object *scheme_place_async_receive(Scheme_Place_Async_Channel *ch);
static Scheme_Object *scheme_places_deep_copy_to_master(Scheme_Object *so);
/* Scheme_Object *scheme_places_deep_copy(Scheme_Object *so); */
@ -81,17 +81,17 @@ void scheme_init_place(Scheme_Env *env)
plenv = scheme_primitive_module(scheme_intern_symbol("#%place"), env);
GLOBAL_PRIM_W_ARITY("place-enabled?", scheme_place_enabled, 0, 0, plenv);
GLOBAL_PRIM_W_ARITY("place-shared?", scheme_place_shared, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place", scheme_place, 2, 2, plenv);
PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-kill", scheme_place_kill, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv);
PLACE_PRIM_W_ARITY("place-channel-send", scheme_place_send, 1, 2, plenv);
PLACE_PRIM_W_ARITY("place-channel-recv", scheme_place_recv, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel?", scheme_place_channel_p, 1, 1, plenv);
GLOBAL_PRIM_W_ARITY("place-enabled?", scheme_place_enabled, 0, 0, plenv);
GLOBAL_PRIM_W_ARITY("place-shared?", scheme_place_shared, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place", scheme_place, 2, 2, plenv);
PLACE_PRIM_W_ARITY("place-sleep", scheme_place_sleep, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-wait", scheme_place_wait, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-kill", scheme_place_kill, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place?", scheme_place_p, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel", scheme_place_channel, 0, 0, plenv);
PLACE_PRIM_W_ARITY("place-channel-send", scheme_place_send, 1, 2, plenv);
PLACE_PRIM_W_ARITY("place-channel-receive", scheme_place_receive, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-channel?", scheme_place_channel_p, 1, 1, plenv);
#ifdef MZ_USE_PLACES
REGISTER_SO(scheme_def_place_exit_proc);
@ -1298,7 +1298,7 @@ Scheme_Object *scheme_place_send(int argc, Scheme_Object *args[]) {
return scheme_true;
}
Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) {
Scheme_Object *scheme_place_receive(int argc, Scheme_Object *args[]) {
if (argc == 1) {
Scheme_Place_Bi_Channel *ch;
if (SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) {
@ -1309,12 +1309,12 @@ Scheme_Object *scheme_place_recv(int argc, Scheme_Object *args[]) {
}
else {
ch = NULL;
scheme_wrong_type("place-channel-recv", "place-channel", 0, argc, args);
scheme_wrong_type("place-channel-receive", "place-channel", 0, argc, args);
}
return scheme_place_async_recv((Scheme_Place_Async_Channel *) ch->recvch);
return scheme_place_async_receive((Scheme_Place_Async_Channel *) ch->recvch);
}
else {
scheme_wrong_count_m("place-channel-recv", 1, 1, argc, args, 0);
scheme_wrong_count_m("place-channel-receive", 1, 1, argc, args, 0);
}
return scheme_true;
}
@ -1570,7 +1570,7 @@ static void scheme_place_async_send(Scheme_Place_Async_Channel *ch, Scheme_Objec
}
}
static Scheme_Object *scheme_place_async_try_recv(Scheme_Place_Async_Channel *ch) {
static Scheme_Object *scheme_place_async_try_receive(Scheme_Place_Async_Channel *ch) {
Scheme_Object *msg = NULL;
void *msg_memory = NULL;
@ -1621,7 +1621,7 @@ static int scheme_place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *s
ch = (Scheme_Place_Bi_Channel *)so;
}
msg = scheme_place_async_try_recv((Scheme_Place_Async_Channel *) ch->recvch);
msg = scheme_place_async_try_receive((Scheme_Place_Async_Channel *) ch->recvch);
if (msg != NULL) {
scheme_set_sync_target(sinfo, msg, NULL, NULL, 0, 0, NULL);
return 1;
@ -1629,10 +1629,10 @@ static int scheme_place_channel_ready(Scheme_Object *so, Scheme_Schedule_Info *s
return 0;
}
static Scheme_Object *scheme_place_async_recv(Scheme_Place_Async_Channel *ch) {
static Scheme_Object *scheme_place_async_receive(Scheme_Place_Async_Channel *ch) {
Scheme_Object *msg = NULL;
while(1) {
msg = scheme_place_async_try_recv(ch);
msg = scheme_place_async_try_receive(ch);
if(msg) break;
else {
void *signaldescr;