[Distributed Places] Docs clean up part 2

This commit is contained in:
Kevin Tew 2012-05-14 14:58:40 -06:00
parent 29a401b692
commit fbbf8579e0
5 changed files with 142 additions and 91 deletions

View File

@ -1624,16 +1624,20 @@
(define ch (make-channel)) (define ch (make-channel))
(thread (thread
(lambda () (channel-put ch (spawn-remote-racket-node host #:listen-port listen-port (lambda ()
(with-handlers ([exn:fail? (lambda (e) (channel-put #f))])
(channel-put ch (spawn-remote-racket-node host #:listen-port listen-port
#:racket-path racketpath #:racket-path racketpath
#:ssh-bin-path sshpath #:ssh-bin-path sshpath
#:distributed-launch-path distributedlaunchpath)))) #:distributed-launch-path distributedlaunchpath)))))
ch) ch)
(define/provide (spawn-nodes/join nodes-descs) (define/provide (spawn-nodes/join nodes-descs)
(for/list ([x (for/list ([x
(for/list ([n nodes-descs]) (for/list ([n nodes-descs])
(apply keyword-apply spawn-node-at n))]) (apply keyword-apply spawn-node-at n))])
(unless x
(raise "Failed to connect to a remotely spawned node"))
(channel-get x))) (channel-get x)))
#;(define build-node-args #;(define build-node-args

View File

@ -18,7 +18,8 @@
rmpi-partition rmpi-partition
rmpi-build-default-config rmpi-build-default-config
rmpi-launch rmpi-launch
rmpi-finish) rmpi-finish
(struct-out rmpi-comm))
(struct rmpi-comm (id cnt channels) #:transparent) (struct rmpi-comm (id cnt channels) #:transparent)
@ -29,7 +30,7 @@
(define (rmpi-init ch) (define (rmpi-init ch)
(define tc (new named-place-typed-channel% [ch ch])) (define tc (new named-place-typed-channel% [ch ch]))
(match-define (list (list 'mpi-id id config) return-ch) (tc-get 'mpi-id tc)) (match-define (list (list 'rmpi-id id config) return-ch) (tc-get 'rmpi-id tc))
(match-define (list (list 'args args) src-ch) (tc-get 'args tc)) (match-define (list (list 'args args) src-ch) (tc-get 'args tc))
(define mpi-comm-vector (define mpi-comm-vector
(for/vector #:length (length config) ([c config]) (for/vector #:length (length config) ([c config])
@ -186,19 +187,20 @@
(hash-set (car _rest) (string->keyword "listen-port") port))])) (hash-set (car _rest) (string->keyword "listen-port") port))]))
; (printf/f "~a\n" rest) ; (printf/f "~a\n" rest)
(define-values (k v) (define-values (k v)
(let loop ([keys (list "racket-path" "listen-port" "distributed-launch-path")] (let loop ([keys (map string->keyword
(sort (list "racket-path" "listen-port" "distributed-launch-path")))]
[k null] [k null]
[v null]) [v null])
(match keys
[(cons head tail)
(cond (cond
[(pair? keys) [(lookup-config-value rest head) => (lambda (x)
(cond (loop tail
[(lookup-config-value rest (car keys)) => (lambda (x) (cons head k)
(loop (cdr keys)
(cons (string->keyword (car keys)) k)
(cons x v)))] (cons x v)))]
[else [else
(loop (cdr keys) k v)])] (loop (cdr keys) k v)])]
[else [(list)
(values k v)]))) (values k v)])))
; (printf/f "~a\n" (list k v (list host))) ; (printf/f "~a\n" (list k v (list host)))
(list k v (list host))))) (list k v (list host)))))
@ -208,18 +210,23 @@
(match-define (list-rest host port name id rest) c) (match-define (list-rest host port name id rest) c)
(supervise-named-dynamic-place-at n (supervise-named-dynamic-place-at n
name name
(lookup-config-value rest "mpi-module") (lookup-config-value rest "rmpi-module")
(lookup-config-value rest "mpi-func"))) (lookup-config-value rest "rmpi-func")))
(define-values (mrth ch) (define-values (mrth ch)
(start-message-router/thread (start-message-router/thread
#:nodes nodes)) #:nodes nodes))
(define simple-config
(for/list ([c config])
(match-define (list-rest host port name id rest) c)
(list host port name id)))
(for ([c config]) (for ([c config])
(match-define (list-rest host port name id rest) c) (match-define (list-rest host port name id rest) c)
(define npch (mr-connect-to ch (list host port) name)) (define npch (mr-connect-to ch (list host port) name))
(*channel-put npch (list 'mpi-id id config)) (*channel-put npch (list 'rmpi-id id simple-config))
(*channel-put npch (list 'args (or (lookup-config-value rest "mpi-args") null)))) (*channel-put npch (list 'args (or (lookup-config-value rest "rmpi-args") null))))
(for/first ([c config]) (for/first ([c config])
(match-define (list-rest host port name id rest) c) (match-define (list-rest host port name id rest) c)

View File

@ -24,9 +24,9 @@
The @racketmodname[racket/place/distributed] library provides support for The @racketmodname[racket/place/distributed] library provides support for
distributed programming. distributed programming.
The example bellow demonstrates how to launch a remote racket vm instance, The example bellow demonstrates how to launch a remote racket node instance,
launch remote places on the new remote vm instance, and start an launch remote places on the new remote node instance, and start an
event loop that monitors the remote vm instance. event loop that monitors the remote node instance.
The example code can also be found in The example code can also be found in
@filepath{racket/distributed/examples/named/master.rkt}. @filepath{racket/distributed/examples/named/master.rkt}.
@ -39,14 +39,14 @@ The example code can also be found in
The @racket[spawn-remote-racket-vm] primitive connects to The @racket[spawn-remote-racket-node] primitive connects to
@tt{"localhost"} and starts a racloud node there that listens on port @tt{"localhost"} and starts a racloud node there that listens on port
6344 for further instructions. The handle to the new racloud node is 6344 for further instructions. The handle to the new racloud node is
assigned to the @racket[remote-vm] variable. Localhost is used so that assigned to the @racket[remote-node] variable. Localhost is used so that
the example can be run using only a single machine. However localhost the example can be run using only a single machine. However localhost
can be replaced by any host with ssh publickey access and racket. The can be replaced by any host with ssh publickey access and racket. The
@racket[supervise-named-dynamic-place-at] creates a new place on the @racket[supervise-named-dynamic-place-at] creates a new place on the
@racket[remote-vm]. The new place will be identified in the future by @racket[remote-node]. The new place will be identified in the future by
its name symbol @racket['tuple-server]. A place descriptor is its name symbol @racket['tuple-server]. A place descriptor is
expected to be returned by invoking @racket[dynamic-place] with the expected to be returned by invoking @racket[dynamic-place] with the
@racket[tuple-path] module path and the @racket['make-tuple-server] @racket[tuple-path] module path and the @racket['make-tuple-server]

View File

@ -8,10 +8,12 @@
racket/class racket/class
(for-label (except-in racket/base log-message)) (for-label (except-in racket/base log-message))
(for-label (for-label
racket/place racket/place/define-remote-server
racket/place/distributed racket/place/distributed
racket/class racket/class
racket/contract)) racket/contract
racket/place
racket/place/private/async-bi-channel))
@(define evaler (make-base-eval)) @(define evaler (make-base-eval))
@ -93,13 +95,21 @@ The use of Distributed Places is predicated on a couple assumptions:
(require 'hello-world-example) (require 'hello-world-example)
] ]
@defproc[(message-router [ec events-container<%>?] ...+) void?]{ @definterface[events-container<%> ()]{
Every event container must implement the @racket[register] method.
@defmethod[(register [rl (listof evt?)]) (listof evt?)]{
Event containers should @racket[cons] on their events to the supplied list of events, @racket[rl],
and return the resulting list.
}
}
@defproc[(message-router [ec (is-a?/c events-container<%>)] ...+) void?]{
Waits in an endless loop for one of many events to become ready. The Waits in an endless loop for one of many events to become ready. The
@racket[message-router] procedure constructs a @racket[node%] @racket[message-router] procedure constructs a @racket[node%]
instance to serve as the message router for the node. The instance to serve as the message router for the node. The
@racket[message-router] procedure then adds all the declared @racket[message-router] procedure then adds all the declared
@racket[events-container<%>]s to the @racket[node%] and finally calls @racket[events-container<%>]s to the @racket[node%] and finally calls
the never ending loop @racket[sync-events] method, which handles the never ending loop @method[node% sync-events] method, which handles
events for the node. events for the node.
} }
@ -107,8 +117,8 @@ The use of Distributed Places is predicated on a couple assumptions:
@(define spawn-node-note @(define spawn-node-note
(make-splice (make-splice
(list (list
@p{This function returns a @racket[remote-node%] instance not a @racket[remote-place%] @p{This function returns a @racket[remote-node%] instance not a @racket[remote-connection%]
Call @racket[(send node get-first-place)] to obtain the @racket[remote-place%] instance.})) ) Call @racket[(send node get-first-place)] to obtain the @racket[remote-connection%] instance.})) )
@(define spawn-node-dynamic-note @(define spawn-node-dynamic-note
(make-splice (make-splice
@ -116,7 +126,7 @@ The use of Distributed Places is predicated on a couple assumptions:
@p{ @p{
Spawns a new remote node at @racket[hostname] with one instance place specified by Spawns a new remote node at @racket[hostname] with one instance place specified by
the @racket[instance-module-path] and @racket[instance-place-function-name] the @racket[instance-module-path] and @racket[instance-place-function-name]
parameters. This procedure constructs the new remote-place by calling parameters. This procedure constructs the new remote-connection by calling
@racket[(dynamic-place instance-module-path instance-place-function-name)]. @racket[(dynamic-place instance-module-path instance-place-function-name)].
}))) })))
@ -128,8 +138,8 @@ parameters. This procedure constructs the new remote-place by calling
[#:initial-message initial-message any #f] [#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)] [#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)] [#:distributed-launch-path launcherpath string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place?]{ [#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
@|spawn-node-dynamic-note| @|spawn-node-dynamic-note|
@|spawn-node-note| @|spawn-node-note|
} }
@ -142,10 +152,10 @@ parameters. This procedure constructs the new remote-place by calling
[#:initial-message initial-message any #f] [#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)] [#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)] [#:distributed-launch-path launcherpath string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-place%))]{ [#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-connection%))]{
@|spawn-node-dynamic-note| @|spawn-node-dynamic-note|
The new @racket[remote-node%] and @racket[remote-place%] instances make up the two return values. The new @racket[remote-node%] and @racket[remote-connection%] instances make up the two return values.
} }
@(define place-thunk-function @(define place-thunk-function
@ -166,7 +176,7 @@ accomplish this by calling either @racket[dynamic-place] or
@p{ @p{
Spawns a new remote node at @racket[hostname] with one instance place. Spawns a new remote node at @racket[hostname] with one instance place.
the @racket[instance-module-path] and @racket[instance-thunk-function-name] the @racket[instance-module-path] and @racket[instance-thunk-function-name]
parameters. This procedure constructs the new remote-place by calling parameters. This procedure constructs the new remote-connection by calling
dynamically requiring the dynamically requiring the
@racket[instance-thunk-function-name] and invoking the @racket[instance-thunk-function-name] and invoking the
@racket[instance-thunk-function-name]. @racket[instance-thunk-function-name].
@ -182,8 +192,8 @@ dynamically requiring the
[#:initial-message initial-message any #f] [#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)] [#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)] [#:distributed-launch-path launcherpath string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{ [#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
@|spawn-node-thunk-note| @|spawn-node-thunk-note|
@|place-thunk-function| @|place-thunk-function|
@|spawn-node-note| @|spawn-node-note|
@ -196,11 +206,11 @@ dynamically requiring the
[#:initial-message initial-message any #f] [#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)] [#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)] [#:distributed-launch-path launcherpath string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-place%))]{ [#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-connection%))]{
@|spawn-node-thunk-note| @|spawn-node-thunk-note|
@|place-thunk-function| @|place-thunk-function|
The new @racket[remote-node%] and @racket[remote-place%] instances make up the two return values. The new @racket[remote-node%] and @racket[remote-connection%] instances make up the two return values.
} }
@defproc[(spawn-remote-racket-node @defproc[(spawn-remote-racket-node
@ -208,14 +218,14 @@ The new @racket[remote-node%] and @racket[remote-place%] instances make up the t
[#:listen-port port port-no? DEFAULT-ROUTER-PORT] [#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:racket-path racketpath string-path? (racket-path)] [#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)] [#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]) (is-a?/c remote-node%)]{ [#:distributed-launch-path launcherpath string-path? (path->string distributed-launch-path)]) (is-a?/c remote-node%)]{
Spawns a new remote node at @racket[hostname] and returns a @racket[remote-node%] handle. Spawns a new remote node at @racket[hostname] and returns a @racket[remote-node%] handle.
} }
@defproc[(supervise-dynamic-place-at @defproc[(supervise-dynamic-place-at
[remote-node (is-a?/c remote-node%)] [remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?] [instance-module-path module-path?]
[instance-place-function-name symbol?] [instance-place-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{ [#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new place on the @racket[remote-node] by using Creates a new place on the @racket[remote-node] by using
@racket[dynamic-place] to invoke @racket[dynamic-place] to invoke
@racket[instance-place-function-name] from the module @racket[instance-place-function-name] from the module
@ -226,7 +236,7 @@ Creates a new place on the @racket[remote-node] by using
[remote-node (is-a?/c remote-node%)] [remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?] [instance-module-path module-path?]
[instance-thunk-function-name symbol?] [instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{ [#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new place on the @racket[remote-node] by executing the thunk Creates a new place on the @racket[remote-node] by executing the thunk
@racket[instance-thunk-function-name] from the module @racket[instance-thunk-function-name] from the module
@racket[instance-module-path]. @racket[instance-module-path].
@ -246,7 +256,7 @@ Spawns an attached external process at host @racket[hostname].
[place-name symbol?] [place-name symbol?]
[instance-module-path module-path?] [instance-module-path module-path?]
[instance-place-function-name symbol?] [instance-place-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{ [#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new place on the @racket[remote-node] by using Creates a new place on the @racket[remote-node] by using
@racket[dynamic-place] to invoke @racket[dynamic-place] to invoke
@racket[instance-place-function-name] from the module @racket[instance-place-function-name] from the module
@ -259,7 +269,7 @@ is used to establish later connections to the named place.
[place-name symbol?] [place-name symbol?]
[instance-module-path module-path?] [instance-module-path module-path?]
[instance-thunk-function-name symbol?] [instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{ [#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new place on the @racket[remote-node] by executing the thunk Creates a new place on the @racket[remote-node] by executing the thunk
@racket[instance-thunk-function-name] from the module @racket[instance-thunk-function-name] from the module
@racket[instance-module-path]. The @racket[place-name] symbol @racket[instance-module-path]. The @racket[place-name] symbol
@ -273,7 +283,7 @@ is used to establish later connections to the named place.
[remote-node (is-a?/c remote-node%)] [remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?] [instance-module-path module-path?]
[instance-thunk-function-name symbol?] [instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{ [#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new threadon the @racket[remote-node] by using Creates a new threadon the @racket[remote-node] by using
@racket[dynamic-require] to invoke @racket[dynamic-require] to invoke
@racket[instance-place-function-name] from the module @racket[instance-place-function-name] from the module
@ -320,7 +330,7 @@ Connects to a named place on the @racket[node] named @racket[name] and returns a
The following classes all implement @racket[event-container<%>] and The following classes all implement @racket[event-container<%>] and
can be supplied to a @racket[message-router]: can be supplied to a @racket[message-router]:
@racket[spawned-process%], @racket[place-socket-bridge%], @racket[spawned-process%], @racket[place-socket-bridge%],
@racket[node%], @racket[remote-node%], @racket[remote-place%], @racket[node%], @racket[remote-node%], @racket[remote-connection%],
@racket[place%] @racket[connection%], @racket[respawn-and-fire%], and @racket[place%] @racket[connection%], @racket[respawn-and-fire%], and
@racket[after-seconds%]. @racket[after-seconds%].
@ -436,7 +446,7 @@ node's message router.
(make-splice (make-splice
(list (list
@p{The @racket[#:restart-on-exit] boolean argument instructs the @p{The @racket[#:restart-on-exit] boolean argument instructs the
remote-place% instance to respawn the place on the remote node remote-connection% instance to respawn the place on the remote node
should it exit or terminate at any time. This boolean needs to should it exit or terminate at any time. This boolean needs to
be expanded to a restart criteria object in the future.}))) be expanded to a restart criteria object in the future.})))
@ -461,8 +471,8 @@ node's message router.
dies. dies.
} }
@defmethod[(get-first-place) (is-a?/c remote-place%)]{ @defmethod[(get-first-place) (is-a?/c remote-connection%)]{
Returns the @racket[remote-place%] object instance for the first place spawned on this node. Returns the @racket[remote-connection%] object instance for the first place spawned on this node.
} }
@defmethod[(get-first-place-channel) place-channel?]{ @defmethod[(get-first-place-channel) place-channel?]{
Returns the communication channel for the first place spawned on this node. Returns the communication channel for the first place spawned on this node.
@ -474,7 +484,7 @@ node's message router.
@defmethod[(launch-place @defmethod[(launch-place
[place-exec list?] [place-exec list?]
[#:restart-on-exit restart-on-exit any/c #f] [#:restart-on-exit restart-on-exit any/c #f]
[#:one-sided-place? one-sided-place? any/c #f]) (is-a?/c remote-place%)]{ [#:one-sided-place? one-sided-place? any/c #f]) (is-a?/c remote-connection%)]{
Launches a place on the remote node represented by this @racket[remote-node%] instance. Launches a place on the remote node represented by this @racket[remote-node%] instance.
@|place-exec-note| @|place-exec-note|
@|one-sided-note| @|one-sided-note|
@ -494,8 +504,8 @@ node's message router.
@defproc[(node-send-exit [remote-node% node]) void?]{ @defproc[(node-send-exit [remote-node% node]) void?]{
Sends @racket[node] a message telling it to exit immediately. Sends @racket[node] a message telling it to exit immediately.
} }
@defproc[(node-get-first-place [remote-node% node]) (is-a?/c remote-place%)]{ @defproc[(node-get-first-place [remote-node% node]) (is-a?/c remote-connection%)]{
Returns the @racket[remote-place%] instance of the first place spawned at this node Returns the @racket[remote-connection%] instance of the first place spawned at this node
} }
@ -533,12 +543,12 @@ places or connects to a named place and routes inter-node place messages to the
distributed places node at that node. It launches a compute places and distributed places node at that node. It launches a compute places and
routes inter-node place messages to the place. routes inter-node place messages to the place.
@defconstructor[([node (is-a?/c remote-place%)] @defconstructor[([node (is-a?/c remote-connection%)]
[place-exec list?] [place-exec list?]
[ch-id exact-positive-integer?] [ch-id exact-positive-integer?]
[sc (is-a?/c socket-connection%)] [sc (is-a?/c socket-connection%)]
[on-place-dead (-> event void?) default-on-place-dead])]{ [on-place-dead (-> event void?) default-on-place-dead])]{
Constructs a @racket[remote-place%] instance. Constructs a @racket[remote-connection%] instance.
@|place-exec-note| @|place-exec-note|
The @racket[ch-id] and @racket[sc] arguments are internally used to The @racket[ch-id] and @racket[sc] arguments are internally used to
establish routing between the remote node spawning this place and the establish routing between the remote node spawning this place and the
@ -561,7 +571,7 @@ place messages to the named place.
[name string?] [name string?]
[ch-id exact-positive-integer?] [ch-id exact-positive-integer?]
[sc (is-a?/c socket-connection%)])]{ [sc (is-a?/c socket-connection%)])]{
Constructs a @racket[remote-place%] instance. Constructs a @racket[remote-connection%] instance.
@|place-exec-note| @|place-exec-note|
The @racket[ch-id] and @racket[sc] arguments are internally used to The @racket[ch-id] and @racket[sc] arguments are internally used to
establish routing between the remote node and this named-place. establish routing between the remote node and this named-place.
@ -640,6 +650,15 @@ Takes all the positional and keyword arguments pass to it and builds a
suitable as an argument to @racket[(lambda (x) (apply keyword-apply spawn-node-at x))].} suitable as an argument to @racket[(lambda (x) (apply keyword-apply spawn-node-at x))].}
} }
@defproc[(spawn-node-at [hostname string?]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:distributed-launch-path launcherpath string-path? (path->string distributed-launch-path)]) channel?]{
Spawns a node in the background using a Racket thread and returns a channel that becomes ready with a @racket[remote-node%]
once the node has spawned successfully
}
@defproc[(spawn-nodes/join [nodes-descs list?]) void?]{ @defproc[(spawn-nodes/join [nodes-descs list?]) void?]{
Spawns a list of nodes by calling @racket[(lambda (x) (apply keyword-apply spawn-node-at x))] for each node description in Spawns a list of nodes by calling @racket[(lambda (x) (apply keyword-apply spawn-node-at x))] for each node description in
@racket[nodes-descs] and then waits for each node to spawn. @racket[nodes-descs] and then waits for each node to spawn.
@ -794,7 +813,7 @@ list of custom expressions as its arguments. From the identifier a
function is created by prepending the @tt{make-} prefix. This function is created by prepending the @tt{make-} prefix. This
procedure takes a single argument a @racket[place-channel]. In the procedure takes a single argument a @racket[place-channel]. In the
example below, the @racket[make-tuple-server] identifier is the example below, the @racket[make-tuple-server] identifier is the
@racket{place-function-name} given to the @racket[place-function-name] given to the
@racket[supervise-named-dynamic-place-at] form to spawn an rpc server. @racket[supervise-named-dynamic-place-at] form to spawn an rpc server.
The server created by the @racket[make-tuple-server] procedure sits in The server created by the @racket[make-tuple-server] procedure sits in
a loop waiting for rpc requests from the @racket[define-rpc] functions a loop waiting for rpc requests from the @racket[define-rpc] functions
@ -878,7 +897,7 @@ except there is no reply message from the server to client
[else [else
(hash-set! accounts who 0) (hash-set! accounts who 0)
(list 'created who)])) (list 'created who)]))
(define-rpc (removeM who amount) (define-rpc (remove who amount)
(cond (cond
[(hash-ref accounts who (lambda () #f)) => [(hash-ref accounts who (lambda () #f)) =>
(lambda (balance) (lambda (balance)
@ -907,6 +926,25 @@ except there is no reply message from the server to client
@racket[define-remote-server] or @racket[define-named-remote-server] form to @racket[define-remote-server] or @racket[define-named-remote-server] form to
send a logging message to the remote owner of the rpc server. send a logging message to the remote owner of the rpc server.
} }
@section{Async Bidirectional Channels}
@defmodule[racket/place/private/async-bi-channel]
@defproc[(make-async-bi-channel) async-bi-channel?]{
Creates and returns a opaque structure, which is the async bidirectional channel.
}
@defproc[(async-bi-channel? [ch any]) boolean?]{
A predicate that returns @racket[#t] for async bidirectional channels.
}
@defproc[(async-bi-channel-get [ch async-bi-channel?]) any]{
Returns the next available message from the async bidirectional channel @racket[ch].
}
@defproc[(async-bi-channel-put [ch async-bi-channel?] [msg any]) void?]{
Sends message @racket[msg] to the remote end of the async bidirectional channel @racket[ch].
}
@(close-eval evaler) @(close-eval evaler)
@include-section["rmpi.scrbl"] @include-section["rmpi.scrbl"]

View File

@ -7,87 +7,87 @@
racket/place/distributed/rmpi racket/place/distributed/rmpi
racket/sandbox racket/sandbox
racket/class) racket/class)
@(require (for-label racket/base @(require (for-label (except-in racket/base log-message)
racket/place/distributed/rmpi)) racket/contract
racket/class
racket/place
racket/place/distributed
racket/place/distributed/rmpi
racket/place/private/async-bi-channel))
@(define evaler (make-base-eval)) @(define evaler (make-base-eval))
@(interaction-eval #:eval evaler (require racket/place/distributed @(interaction-eval #:eval evaler (require racket/place/distributed
racket/class racket/class))
#;racket/place/distributed/rmpi))
@(define output-evaluator
(parameterize ([sandbox-output 'string]
[sandbox-error-output 'string])
(make-evaluator 'racket/base)))
@(define-syntax interaction-output
(syntax-rules ()
[(_ #:eval evaluator e)
(begin
(interaction #:eval evaluator e)
(printf "K ~a\n" (get-output evaluator)))]))
@title[#:tag "distributed-places-MPI"]{Distributed Places MPI} @title[#:tag "distributed-places-MPI"]{Distributed Places MPI}
@defmodule[racket/place/distributed/rmpi] @defmodule[racket/place/distributed/rmpi]
@defproc[(rmpi-id [comm RMPI-comm?]) exact-nonnegative-integer? ]{ @defstruct*[rmpi-comm ([id exact-nonnegative-integer?] [cnt exact-positive-integer?] [channels vector?])]{
The communicator struct @racket[rmpi-comm] contains the rmpi process
rank @racket[id], the quantity of processes in the communicator group,
@racket[cnt], and a vector of place-channels, one for each process in
the group.
}
@defproc[(rmpi-id [comm rmpi-comm?]) exact-nonnegative-integer? ]{
Takes a rmpi communicator structure, @racket[comm], and returns the node id of the RMPI Takes a rmpi communicator structure, @racket[comm], and returns the node id of the RMPI
process. process.
} }
@defproc[(rmpi-cnt [comm RMPI-comm?]) positive-integer? ]{ @defproc[(rmpi-cnt [comm rmpi-comm?]) exact-positive-integer? ]{
Takes a rmpi communicator structure, @racket[comm], and returns the count of the RMPI Takes a rmpi communicator structure, @racket[comm], and returns the count of the RMPI
processes in the communicator group. processes in the communicator group.
} }
@defproc[(rmpi-send [comm RMPI-comm?] [dest exact-nonnegative-integer?] [val any]) void?]{ @defproc[(rmpi-send [comm rmpi-comm?] [dest exact-nonnegative-integer?] [val any]) void?]{
Sends @racket[val] to destination rmpi process number @racket[dest] Sends @racket[val] to destination rmpi process number @racket[dest]
using the RMPI communicator structure @racket[comm]. using the RMPI communicator structure @racket[comm].
} }
@defproc[(rmpi-recv [comm RMPI-comm?] [src exact-nonnegative-integer?]) any]{ @defproc[(rmpi-recv [comm rmpi-comm?] [src exact-nonnegative-integer?]) any]{
Receives a message from source rmpi process number @racket[src] Receives a message from source rmpi process number @racket[src]
using the RMPI communicator structure @racket[comm]. using the RMPI communicator structure @racket[comm].
} }
@defproc[(rmpi-init [ch place-channel?]) (values RMPI-comm? (listof any) (is-a?/c named-place-type-channel%))]{ @defproc[(rmpi-init [ch place-channel?]) (values rmpi-comm? (listof any) (is-a?/c named-place-typed-channel%))]{
Creates the @racket[rmpi-comm] structure instance using the named Creates the @racket[rmpi-comm] structure instance using the named
place's original place-channel @racket[ch]. In addition to the place's original place-channel @racket[ch]. In addition to the
communicator structure, @racket[rmpi-init] returns a list of initial communicator structure, @racket[rmpi-init] returns a list of initial
arguments and the original place-channel @racket[ch] wrapped in a arguments and the original place-channel @racket[ch] wrapped in a
@racket[named-place-type-channel%]. The @racket[named-place-typed-channel%]. The
@racket[named-place-type-channel%] wrapper allows for the reception @racket[named-place-typed-channel%] wrapper allows for the reception
of list messages typed by an initial symbol. of list messages typed by an initial symbol.
} }
@defproc*[([(rmpi-broadcast [comm RMPI-comm?] [src exact-nonnegative-integer?]) any] @defproc*[([(rmpi-broadcast [comm rmpi-comm?] [src exact-nonnegative-integer?]) any]
[(rmpi-broadcast [comm RMPI-comm?] [src exact-nonnegative-integer?] [val any]) any])]{ [(rmpi-broadcast [comm rmpi-comm?] [src exact-nonnegative-integer?] [val any]) any])]{
Broadcasts @racket[val] from @racket[src] to all rmpi processes in Broadcasts @racket[val] from @racket[src] to all rmpi processes in
the communication group using a hypercube algorithm. Receiving the communication group using a hypercube algorithm. Receiving
processes call @racket[(rmpi-broadcast comm src)]. processes call @racket[(rmpi-broadcast comm src)].
} }
@defproc[(rmpi-reduce [comm RMPI-comm?] [dest exact-nonnegative-integer?] [op procedure?] [val any]) any]{ @defproc[(rmpi-reduce [comm rmpi-comm?] [dest exact-nonnegative-integer?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to @racket[dest] Reduces @racket[val] using the @racket[op] operator to @racket[dest]
rmpi node using a hypercube algorithm. rmpi node using a hypercube algorithm.
} }
@defproc[(rmpi-barrier [comm RMPI-comm?]) void?]{ @defproc[(rmpi-barrier [comm rmpi-comm?]) void?]{
Introduces a synchronization barrier for all rmpi processes in the Introduces a synchronization barrier for all rmpi processes in the
communcication group @racket[comm]. communcication group @racket[comm].
} }
@defproc[(rmpi-allreduce [comm RMPI-comm?] [op procedure?] [val any]) any]{ @defproc[(rmpi-allreduce [comm rmpi-comm?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to rmpi node Reduces @racket[val] using the @racket[op] operator to rmpi node
@racket[0] and then broadcasts the reduced value to all nodes in the @racket[0] and then broadcasts the reduced value to all nodes in the
communication group. communication group.
} }
@defproc[(rmpi-partition [comm RMPI-comm?] [num positive-integer?]) (values positive-integer? positive-integer?)]{ @defproc[(rmpi-partition [comm rmpi-comm?] [num exact-nonnegative-integer?]) (values exact-nonnegative-integer? exact-nonnegative-integer?)]{
Partitions @racket[num] into @racket[rmpi-cnt] equal pieces and Partitions @racket[num] into @racket[rmpi-cnt] equal pieces and
returns the offset and length for the @racket[RMPI-id]th piece. returns the offset and length for the @racket[rmpi-id]th piece.
} }
@defproc[(rmpi-build-default-config @defproc[(rmpi-build-default-config
@ -114,15 +114,16 @@
@racket[default-node-config] hash of keyword arguments. @racket[default-node-config] hash of keyword arguments.
} }
@defproc[(rmpi-finish [comm RMPI-comm?] [tc (is-a?/c named-place-type-channel%)]) void?]{ @defproc[(rmpi-finish [comm rmpi-comm?] [tc (is-a?/c named-place-typed-channel%)]) void?]{
Rendezvous with the @racket[rmpi-launch], using the @racket[tc] Rendezvous with the @racket[rmpi-launch], using the @racket[tc]
returned by @racket[RMPI-launch], to indicate that the RMPI module returned by @racket[rmpi-launch], to indicate that the RMPI module
is done executing and that @racket[RMPI-launch] can return control is done executing and that @racket[rmpi-launch] can return control
to its caller. to its caller.
} }
@examples[ #:eval evaler @examples[ #:eval evaler
(eval:alts
(rmpi-launch (rmpi-launch
(rmpi-build-default-config (rmpi-build-default-config
#:racket-path "/tmp/mplt/bin/racket" #:racket-path "/tmp/mplt/bin/racket"
@ -135,6 +136,7 @@
(list "nodeb.example.com" 6340 'kmeans_1 1) (list "nodeb.example.com" 6340 'kmeans_1 1)
(list "nodec.example.com" 6340 'kmeans_2 2) (list "nodec.example.com" 6340 'kmeans_2 2)
(list "noded.example.com" 6340 'kmeans_3 3))) (list "noded.example.com" 6340 'kmeans_3 3)))
(void))
] ]
@(close-eval evaler) @(close-eval evaler)