[Distributed Places] Docs clean up part 1

This commit is contained in:
Kevin Tew 2012-05-10 15:36:21 -06:00
parent 076e698e8c
commit 154a96ab1f
9 changed files with 387 additions and 239 deletions

View File

@ -152,8 +152,9 @@ x)]))
(define-define-remote-server define-remote-server)
(define-define-remote-server define-named-remote-server)
(provide define-remote-server)
(provide define-named-remote-server)
(provide log-to-parent)
(provide define-remote-server
define-named-remote-server
log-to-parent)

View File

@ -88,6 +88,7 @@
mr-connect-to
start-message-router/thread
spawn-nodes/join
spawn-nodes/join/local
;classes
event-container<%>
@ -113,6 +114,8 @@
;contracts
*channel?
port-no?
place-channel-get
place-channel-put
)
(define-runtime-path distributed-launch-path "distributed/launch.rkt")
@ -713,7 +716,7 @@
[port #f]
[retry-times 30]
[delay 1]
[background-connect #t]
[background-connect? #t]
[in #f]
[out #f]
[remote-node #f])
@ -777,7 +780,7 @@
(raise "Not-implemented/needed")
(cons (wrap-evt in void) nes))
(when (and host port background-connect)
(when (and host port background-connect?)
(set! connecting #t)
(set! ch (make-channel))
(thread
@ -789,7 +792,7 @@
(raise (format "socket error connecting to ~a:~a" host port)))])
(tcp-connect/retry host port #:times retry-times #:delay delay)))
list)))))
(when (and host port (not background-connect))
(when (and host port (not background-connect?))
(tcp-connect/retry host port #:times retry-times #:delay delay))
(super-new)
)))
@ -1016,9 +1019,10 @@
(define/public (place-died)
(cond
[restart-on-exit
(if (equal? restart-on-exit #t)
(restart-place)
(send restart-on-exit restart restart-place))]
(cond
[(equal? restart-on-exit #t) (restart-place)]
[(procedure? restart-on-exit) (restart-on-exit)]
[else (send restart-on-exit restart restart-place)])]
[else
(log-debug (format "No restart condition for ~a:~a"
(send node get-log-prefix)
@ -1042,7 +1046,8 @@
on-channel-event])) es) es)]
[es (send psb register es)]
[es (if (and restart-on-exit
(not (equal? restart-on-exit #t)))
(not (equal? restart-on-exit #t))
(not (procedure? restart-on-exit)))
(send restart-on-exit register es)
es)])
es))
@ -1234,6 +1239,14 @@
(set! fire-time (+ (current-inexact-milliseconds) (* seconds 1000)))]))
))
(define on-exit%
(class*
object% ()
(init-field thunk)
(define/public (restart restart-func)
(thunk))))
(define (startup-config conf conf-idx)
(start-node-router
@ -1460,6 +1473,9 @@
(new restarter% [seconds seconds] [retry retry]
[on-final-fail on-final-fail]))
(define (on-place-exit thunk)
(new (on-exit% [thunk thunk])))
(define (log-message severity msg)
(dcgm DCGM-TYPE-LOG-TO-PARENT -1 -1 (list severity msg)))
@ -1624,6 +1640,33 @@
(make-keyword-procedure (lambda (kws kw-args . rest)
(list kws kw-args rest))))
(define/provide (spawn-nodes/join/local nodes-descs)
(spawn-nodes/join
(for/list ([c nodes-descs])
(match-define (list-rest host port _rest) c)
(define rest
(cond
[(null? _rest)
(list (make-immutable-hash (list (cons "listen-port" port))))]
[else
(list
(hash-set (car _rest) "listen-port" port))]))
(define-values (k v)
(let loop ([keys (list "racket-path" "listen-port" "distributed-launch-path")]
[k null]
[v null])
(cond
[(pair? keys)
(cond
[(hash-ref (car rest) (car keys) #f) => (lambda (x)
(loop (cdr keys)
(cons (string->keyword (car keys)) k)
(cons x v)))]
[else
(loop (cdr keys) k v)])]
[else
(values k v)])))
(list k v (list host)))))
(define named-place-typed-channel%
(class*
@ -1636,7 +1679,6 @@
(cond
[(null? l)
(define nm (place-channel-get ch))
;(printf/f "NM ~a ~a\n" type nm)
(set! msgs (append msgs (list nm)))
(loop msgs null)]
[(equal? type (caaar l))

View File

@ -6,7 +6,8 @@
(define (hello-world)
(place ch
(printf "hello-world received: ~a\n" (place-channel-get ch))
(printf "hello-world received: ~a\n"
(place-channel-get ch))
(define HW "Hello World")
(place-channel-put ch (format "~a\n" HW))
(printf "hello-world sent: ~a\n" HW)))
@ -14,10 +15,12 @@
(module+ main
(define-values (node pl)
(spawn-node-supervise-place-thunk-at "localhost"
(spawn-node-supervise-place-thunk-at
"localhost"
#:listen-port 6344
(quote-module-path "..")
'hello-world))
(message-router
node
(after-seconds 2

View File

@ -2,9 +2,7 @@
(require racket/match
racket/place/define-remote-server)
(define-remote-server
bank
(define-remote-server bank
(define-state accounts (make-hash))
(define-rpc (new-account who)
(match (hash-has-key? accounts who)

View File

@ -2,9 +2,7 @@
(require racket/match
racket/place/define-remote-server)
(define-named-remote-server
tuple-server
(define-named-remote-server tuple-server
(define-state h (make-hash))
(define-rpc (set k v)
(hash-set! h k v)
@ -12,4 +10,5 @@
(define-rpc (get k)
(hash-ref h k #f))
(define-cast (hello)
(printf "Hello from define-cast\n")(flush-output)))
(printf "Hello from define-cast\n")
(flush-output)))

View File

@ -6,7 +6,9 @@
racket/port
racket/contract
racket/runtime-path
(for-label racket/place/distributed))
(for-label racket/place/distributed
racket/match
racket/place/define-remote-server))
@(define (codeblockfromfile filename)
(call-with-input-file
@ -29,27 +31,31 @@ event loop that monitors the remote vm instance.
The example code can also be found in
@filepath{racket/distributed/examples/named/master.rkt}.
@figure["named-example-master" "examples/named/master.rkt"]{
@codeblockfromfile[(path->string master-path)]
}
The @racket[spawn-remote-racket-vm] primitive connects to
@tt{"localhost"} and starts a racloud node there that listens on port
6344 for further instructions. The handle to the new racloud node is
assigned to the @racket[remote-vm] variable. Localhost is used so that
the example can be run using only a single machine. However localhost
can be replaced by any host with ssh publickey access and racket. The
@racket[supervise-named-place-thunk-at] creates a new place on the
@racket[supervise-named-dynamic-place-at] creates a new place on the
@racket[remote-vm]. The new place will be identified in the future by
its name symbol @racket['tuple-server]. A place descriptor is
expected to be returned by dynamically requiring
@racket['make-tuple-server] from the @racket[tuple-path] module and
invoking @racket['make-tuple-server].
expected to be returned by invoking @racket[dynamic-place] with the
@racket[tuple-path] module path and the @racket['make-tuple-server]
symbol.
The code for the tuple-server place exists in the file
@filepath{tuple.rkt}. The @filepath{tuple.rkt} file contains the use of
@racket[define-named-remote-server] form, which defines a RPC server
suitiable for invocation by @racket[supervise-named-place-thunk-at].
suitiable for invocation by @racket[supervise-named-dynamic-place-at].
@ -64,16 +70,16 @@ list of custom expressions as its arguments. From the identifier a
place-thunk function is created by prepending the @tt{make-} prefix.
In this case @racket[make-tuple-server]. The
@racket[make-tuple-server] identifier is the
@racket{compute-instance-place-function-name} given to the
@racket[supervise-named-place-thunk-at] form above. The
@racket[place-function-name] given to the
@racket[supervise-named-dynamic-place-at] form above. The
@racket[define-state] custom form translates into a simple
@racket[define] form, which is closed over by @racket[define-rpc]
forms.
@racket[define] form, which is closed over by the @racket[define-rpc]
form.
The @racket[define-rpc] form is expanded into two parts. The first
part is the client stub that calls the rpc function. The client
part is the client stubs that call the rpc functions. The client
function name is formed by concatenating the
@racket[define-named-remote-server] identifier, @tt{tuple-server}.
@racket[define-named-remote-server] identifier, @tt{tuple-server},
with the RPC function name @tt{set} to form @racket[tuple-server-set].
The RPC client functions take a destination argument which is a
@racket[remote-connection%] descriptor and then the RPC function
@ -91,13 +97,14 @@ with the @racket['set] symbol. The server executes the RPC call with
the communicated arguments and sends the result back to the RPC
client.
The @racket[define-rpc] form is similar to the @racket[define-rpc] form
The @racket[define-cast] form is similar to the @racket[define-rpc] form
except there is no reply message from the server to client
@figure["define-named-remote-server-expansion" "Expansion of define-named-remote-server"]{
@codeblock0{
'(begin
(require racket/place racket/match)
(module tuple racket/base
(require racket/place
racket/match)
(define/provide
(tuple-server-set dest k v)
(named-place-channel-put dest (list 'set k v))
@ -110,15 +117,17 @@ except there is no reply message from the server to client
(tuple-server-hello dest)
(named-place-channel-put dest (list 'hello)))
(define/provide
(make-tuple-server)
(place
ch
(make-tuple-server ch)
(let ()
(define h (make-hash))
(let loop ()
(define msg (place-channel-get ch))
(define (log-to-parent-real msg #:severity (severity 'info))
(place-channel-put ch (log-message severity msg)))
(define (log-to-parent-real
msg
#:severity (severity 'info))
(place-channel-put
ch
(log-message severity msg)))
(syntax-parameterize
((log-to-parent (make-rename-transformer
#'log-to-parent-real)))
@ -139,7 +148,6 @@ except there is no reply message from the server to client
(flush-output)))
(loop))))
loop))))
(void))
}
}

View File

@ -7,13 +7,14 @@
racket/place/distributed/RMPI
racket/sandbox
racket/class)
@(require (for-label racket/place/distributed/RMPI racket/class))
@(require (for-label racket/base
racket/place/distributed/RMPI))
@(define evaler (make-base-eval))
@(interaction-eval #:eval evaler (require racket/place/distributed
racket/class
racket/place/define-remote-server))
#;racket/place/distributed/RMPI))
@(define output-evaluator
(parameterize ([sandbox-output 'string]
@ -27,11 +28,11 @@
(interaction #:eval evaluator e)
(printf "K ~a\n" (get-output evaluator)))]))
@title[#:tag "distributed-places-MPI"]{Racket Distributed Places MPI}
@title[#:tag "distributed-places-MPI"]{Distributed Places MPI}
@defmodule[racket/place/distributed/RMPI]
@defproc[(RMPI-id [comm RMPI-COMM?]) non-negative-integer? ]{
@defproc[(RMPI-id [comm RMPI-COMM?]) exact-nonnegative-integer? ]{
Takes a RMPI communicator structure, @racket[comm], and returns the node id of the RMPI
process.
}
@ -41,28 +42,28 @@
processes in the communicator group.
}
@defproc[(RMPI-send [comm RMPI-COMM?] [dest non-negative-integer?] [val any?]) void?]{
@defproc[(RMPI-send [comm RMPI-COMM?] [dest exact-nonnegative-integer?] [val any]) void?]{
Sends @racket[val] to destination RMPI process number @racket[dest] using the RMPI communicator structure @racket[comm].
}
@defproc[(RMPI-recv [comm RMPI-COMM?] [src non-negative-integer?]) any?]{
@defproc[(RMPI-recv [comm RMPI-COMM?] [src exact-nonnegative-integer?]) any]{
Receives a message from source RMPI process number @racket[src] using the RMPI communicator structure @racket[comm].
}
@defproc[(RMPI-init [ch place-channel?]) (values RMPI-COMM? (listof any?) named-place-type-channel%?)]{
@defproc[(RMPI-init [ch place-channel?]) (values RMPI-COMM? (listof any) (is-a?/c named-place-type-channel%))]{
Creates the @racket[RMPI-COMM] structure instance using the named place's original place-channel @racket[ch].
In addition to the communicator structure, @racket[RMPI-init] returns a list of initial arguments and the original place-channel
@racket[ch] wrapped in a @racket[named-place-type-channel%]. The @racket[named-place-type-channel%] wrapper allows for
the reception of list messages typed by an initial symbol.
}
@defproc*[([(RMPI-BCast [comm RMPI-COMM?] [src non-negative-integer?]) any?]
[(RMPI-BCast [comm RMPI-COMM?] [src non-negative-integer?] [val any?]) any?])]{
@defproc*[([(RMPI-BCast [comm RMPI-COMM?] [src exact-nonnegative-integer?]) any]
[(RMPI-BCast [comm RMPI-COMM?] [src exact-nonnegative-integer?] [val any]) any])]{
Broadcasts @racket[val] from @racket[src] to all RMPI processes in the communication group using a hypercube algorithm.
Receiving processes call @racket[(RMPI-BCast comm src)].
}
@defproc[(RMPI-Reduce [comm RMPI-COMM?] [dest non-negative-integer?] [op procedure?] [val any?]) any?]{
@defproc[(RMPI-Reduce [comm RMPI-COMM?] [dest exact-nonnegative-integer?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to @racket[dest] RMPI node using a hypercube algorithm.
}
@ -70,7 +71,7 @@
Introduces a synchronization barrier for all RMPI processes in the communcication group @racket[comm].
}
@defproc[(RMPI-AllReduce [comm RMPI-COMM?] [op procedure?] [val any?]) any?]{
@defproc[(RMPI-AllReduce [comm RMPI-COMM?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to RMPI node @racket[0] and then broadcasts the reduced value to all nodes in the communication group.
}
@ -84,17 +85,17 @@
[#:distributed-launch-path distributed-launch-path string?]
[#:mpi-module mpi-module string?]
[#:mpi-func mpi-func symbol?]
[#:mpi-args mpi-args (listof any?)]) hash?]{
[#:mpi-args mpi-args (listof any)]) hash?]{
Builds a hash from keywords to keyword arguments for use with the @racket[RMPI-launch function].
}
@defproc[(RMPI-launch [default-node-config hash?] [config (listof (listof string? port-no? symbol? non-negative-integer?))]) void?]{
@defproc[(RMPI-launch [default-node-config hash?] [config (listof (listof string? port-no? symbol? exact-nonnegative-integer?))]) void?]{
Launches distributed places nodes running @racket[#:mpi-func] in @racket[#:mpi-module] with @racket[#:mpi-args].
The config is a list of node configs, where each node config consists of a hostname, port, named place symbol and RMPI id number, followed by optional keyword arguments @racket[#:racket-path], @racket[#:distributed-launch-path], @racket[#:mpi-module], @racket[#:mpi-func], and @racket[#:mpi-args]. Missing optional keyword arguments will be taken from the @racket[default-node-config] hash of keyword arguments.
The config is a list of node configs, where each node config consists of a hostname, port, named place symbol and RMPI id number, followed by and optional hash of keyword @racket[#:racket-path], @racket[#:distributed-launch-path], @racket[#:mpi-module], @racket[#:mpi-func], and @racket[#:mpi-args] to keyword arguments. Missing optional keyword arguments will be taken from the @racket[default-node-config] hash of keyword arguments.
}
@defproc[(RMPI-finish [comm RMPI-COMM?] [tc named-place-type-channel%?]) void?]{
@defproc[(RMPI-finish [comm RMPI-COMM?] [tc (is-a?/c named-place-type-channel%)]) void?]{
Rendezvous with the @racket[RMPI-launch], using the @racket[tc] returned by @racket[RMPI-launch], to indicate that the RMPI module is done executing and that @racket[RMPI-launch] can return control to its caller.
}

View File

@ -19,4 +19,3 @@ support for parallelism to improve performance.
@include-section["futures.scrbl"]
@include-section["places.scrbl"]
@include-section["distributed.scrbl"]
@include-section["RMPI.scrbl"]

View File

@ -5,8 +5,13 @@
racket/contract
racket/place/distributed
racket/sandbox
racket/class)
@(require (for-label racket/place/distributed racket/class))
racket/class
(for-label (except-in racket/base log-message))
(for-label
racket/place
racket/place/distributed
racket/class
racket/contract))
@(define evaler (make-base-eval))
@ -79,8 +84,8 @@ The use of Distributed Places is predicated on a couple assumptions:
(message-router
node
(after-seconds 2
(dplace-put pl "Hello")
(printf "message-router received: ~a\n" (dplace-get pl)))
(*channel-put pl "Hello")
(printf "message-router received: ~a\n" (*channel-get pl)))
(after-seconds 6
(exit 0)))))
@ -119,8 +124,8 @@ parameters. This procedure constructs the new remote-place by calling
[hostname string?]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any? #f]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
@ -133,12 +138,12 @@ parameters. This procedure constructs the new remote-place by calling
[hostname string?]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any? #f]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (values remote-node%? remote-place%?)]{
[#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-place%))]{
@|spawn-node-dynamic-note|
The new @racket[remote-node%] and @racket[remote-place%] instances make up the two return values.
}
@ -173,12 +178,12 @@ dynamically requiring the
[hostname string?]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any? #f]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{
@|spawn-node-thunk-note|
@|place-thunk-function|
@|spawn-node-note|
@ -187,12 +192,12 @@ dynamically requiring the
[hostname string?]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any? #f]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any #f]
[#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (values remote-node%? remote-place%?)]{
[#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-place%))]{
@|spawn-node-thunk-note|
@|place-thunk-function|
The new @racket[remote-node%] and @racket[remote-place%] instances make up the two return values.
@ -200,17 +205,17 @@ The new @racket[remote-node%] and @racket[remote-place%] instances make up the t
@defproc[(spawn-remote-racket-node
[hostname string?]
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:racket-path racketpath string-path? (racket-path)]
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]) remote-node%?]{
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]) (is-a?/c remote-node%)]{
Spawns a new remote node at @racket[hostname] and returns a @racket[remote-node%] handle.
}
@defproc[(supervise-dynamic-place-at
[remote-node remote-node?]
[remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{
Creates a new place on the @racket[remote-node] by using
@racket[dynamic-place] to invoke
@racket[instance-place-function-name] from the module
@ -218,10 +223,10 @@ Creates a new place on the @racket[remote-node] by using
}
@defproc[(supervise-place-thunk-at
[remote-node remote-node?]
[remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{
Creates a new place on the @racket[remote-node] by executing the thunk
@racket[instance-thunk-function-name] from the module
@racket[instance-module-path].
@ -232,16 +237,16 @@ Creates a new place on the @racket[remote-node] by executing the thunk
@defproc[(supervise-process-at
[hostname string?]
[commandline-argument string?] ...+
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]) remote-process%?]{
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]) (is-a?/c remote-process%)]{
Spawns an attached external process at host @racket[hostname].
}
@defproc[(supervise-named-dynamic-place-at
[remote-node remote-node?]
[remote-node (is-a?/c remote-node%)]
[place-name symbol?]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{
Creates a new place on the @racket[remote-node] by using
@racket[dynamic-place] to invoke
@racket[instance-place-function-name] from the module
@ -250,11 +255,11 @@ is used to establish later connections to the named place.
}
@defproc[(supervise-named-place-thunk-at
[remote-node remote-node?]
[remote-node (is-a?/c remote-node%)]
[place-name symbol?]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{
Creates a new place on the @racket[remote-node] by executing the thunk
@racket[instance-thunk-function-name] from the module
@racket[instance-module-path]. The @racket[place-name] symbol
@ -265,18 +270,18 @@ is used to establish later connections to the named place.
}
@defproc[(supervise-thread-at
[remote-node remote-node?]
[remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-place%)]{
Creates a new threadon the @racket[remote-node] by using
@racket[dynamic-require] to invoke
@racket[instance-place-function-name] from the module
@racket[instance-module-path].
}
@defform[(restart-every [seconds (and/c real? nonnegative?)]
[#:retry retry (or/c nonnegative-integer? #f) #f]
@defform[(restart-every [seconds (number?)]
[#:retry retry (or/c number? #f) #f]
[#:on-final-fail on-final-fail (or/c #f (-> any/c)) #f])]{
Returns a @racket[restarter%] instance that should be supplied to a @racket[#:restart-on-exit] argument.
@ -292,7 +297,7 @@ Returns a @racket[after-seconds%] instance that should be supplied to a @racket[
Executes the body expressions after a delay of @racket[seconds] from the start of the event loop.
}
@defproc[(connect-to-named-place [node remote-node%?] [name symbol?]) remote-connection%?]{
@defproc[(connect-to-named-place [node (is-a?/c remote-node%)] [name symbol?]) (is-a?/c remote-connection%)]{
Connects to a named place on the @racket[node] named @racket[name] and returns a @racket[remote-connection%] object.
}
@ -325,7 +330,7 @@ can be supplied to a @racket[message-router]:
(defmethod (get-pid) exact-positive-integer?) ]{
@defconstructor[([cmdline-list (listof (or/c string? path?))]
[parent remote-node%? #f]
[parent (is-a?/c remote-node%) #f]
)]{
The @racket[cmdline-list] is a list of command line arguments of type @racket[string] and/or @racket[path].
@ -343,17 +348,45 @@ a @racket[(send parent process-died this)] call.
@defclass[place-socket-bridge% object% (event-container<%>)
(defmethod (get-sc-id) exact-positive-integer?) ]{
@defconstructor[([pch place-channel?]
[sch socket-connection%?]
@defconstructor[([pch place-channel?]
[sch (is-a?/c socket-connection%)]
[id exact-positive-integer?]
)]{
The @racket[pch] argument is a @racket[place-channel]. Messages
received on @racket[pch] are forwarded to the socket-connection%
@racket[sch] via a @racket[dcgm] message. e.g.
@racket[(sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))]
The @racket[id] is a @racket[exact-positive-integer] that identifies
the socket-connection subchannel for this inter-node place connection.
The @racket[pch] argument is a @racket[place-channel]. Messages
received on @racket[pch] are forwarded to the socket-connection%
@racket[sch] via a @racket[dcgm] message. e.g.
@racket[(sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))]
The @racket[id] is a @racket[exact-positive-integer] that identifies
the socket-connection subchannel for this inter-node place connection.
}
}
@defclass[socket-connection% object% (event-container<%>)]{
@defconstructor[([host (or/c string? #f) #f]
[port (or/c port-no? #f) #f]
[retry-times exact-nonnegative-integer? 30]
[delay number? 1]
[background-connect? boolean? #f]
[in (or/c input-port? #f) #f]
[out (or/c output-port #f) #f]
[remote-node (or/c (is-a?/c remote-node%) #f) #f]
)]{
When a @racket[host] and @racket[port] are supplied a new tcp
connection is established. If a @racket[input-port?] and
@racket[output-port?] are supplied as @racket[in] and @racket[out],
the ports are used as a connection to the remote host. The
@racket[retry-times] argument specifies how many times to retry the
connection attempt should it fail to connect and defaults to 30 retry
attempts. Often a remote node is still booting up when a connection
is attempted and the connection needs to be retried several times.
The @racket[delay] argument specifies how many seconds to wait between
retry attempts. The @racket[background-connect] argument defaults to
@racket[#t] and specifies that the constructor should retry
immediately and that connecion establishment should occur in the
background. Finally, the @racket[remote-node] argument specifies the
@racket[remote-node%] instance that should be notified should the
connection fail.
}
}
@defclass[node% object% (event-container<%>)]{
@ -428,7 +461,7 @@ node's message router.
dies.
}
@defmethod[(get-first-place) remote-place%?]{
@defmethod[(get-first-place) (is-a?/c remote-place%)]{
Returns the @racket[remote-place%] object instance for the first place spawned on this node.
}
@defmethod[(get-first-place-channel) place-channel?]{
@ -441,7 +474,7 @@ node's message router.
@defmethod[(launch-place
[place-exec list?]
[#:restart-on-exit restart-on-exit any/c #f]
[#:one-sided-place? one-sided-place? any/c #f]) remote-place%?]{
[#:one-sided-place? one-sided-place? any/c #f]) (is-a?/c remote-place%)]{
Launches a place on the remote node represented by this @racket[remote-node%] instance.
@|place-exec-note|
@|one-sided-note|
@ -461,7 +494,7 @@ node's message router.
@defproc[(node-send-exit [remote-node% node]) void?]{
Sends @racket[node] a message telling it to exit immediately.
}
@defproc[(node-get-first-place [remote-node% node]) remote-place%?]{
@defproc[(node-get-first-place [remote-node% node]) (is-a?/c remote-place%)]{
Returns the @racket[remote-place%] instance of the first place spawned at this node
}
@ -473,7 +506,7 @@ instance provides a remote api to a place
running on a remote distributed places node. It launches a
places or connects to a named place and routes inter-node place messages to the remote place.
@defconstructor[([node remote-node%?]
@defconstructor[([node (is-a?/c remote-node%)]
[place-exec list?]
[name string?]
[restart-on-exit #f]
@ -500,10 +533,10 @@ places or connects to a named place and routes inter-node place messages to the
distributed places node at that node. It launches a compute places and
routes inter-node place messages to the place.
@defconstructor[([node remote-place%?]
@defconstructor[([node (is-a?/c remote-place%)]
[place-exec list?]
[ch-id exact-positive-integer?]
[sc socket-connection%?]
[sc (is-a?/c socket-connection%)]
[on-place-dead (-> event void?) default-on-place-dead])]{
Constructs a @racket[remote-place%] instance.
@|place-exec-note|
@ -524,10 +557,10 @@ The @racket[connection%] instance represents a connection to a
named-place instance running on the current node. It routes inter-node
place messages to the named place.
@defconstructor[([node remote-node%?]
@defconstructor[([node (is-a?/c remote-node%)]
[name string?]
[ch-id exact-positive-integer?]
[sc socket-connection%?])]{
[sc (is-a?/c socket-connection%)])]{
Constructs a @racket[remote-place%] instance.
@|place-exec-note|
The @racket[ch-id] and @racket[sc] arguments are internally used to
@ -565,8 +598,8 @@ place messages to the named place.
The @racket[restarter%] instance represents a restart strategy.
@defconstructor[([seconds (and/c real? (not/c negative?))]
[retry (or/c #f nonnegative-integer?) #f]
@defconstructor[([seconds number?]
[retry (or/c number? #f) #f]
[on-final-fail (or/c #f (-> any/c)) #f])]{
Constructs an @racket[restarter%] instance that when supplied to a
@racket[#:restart-on-exit] argument, attempts to restart the process
@ -578,83 +611,10 @@ place messages to the named place.
}
}
@defform[(define-remote-server name forms ...)]{
Creates a @racket[make-name] function that spawns a place running a instance of the @racket[name]
remote server. The server sits in a loop waiting for rpc requests from the @racket[define-rpc] functions
documented below.
@defform[(define-state id value)]{
Expands to a @@racket[define], which is closed over by the @racket[define-rpc] functions
to form local state.
}
@defform[(define-rpc (id args ...) body ...)]{
Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to
the rpc server @racket[rpc-place] and waits for a response.
@racket[(define (name-id rpc-place args ...) body)]
}
@defform[(define-cast (id args ...) body ...)]{
Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to
the rpc server @racket[rpc-place] but does not receive any response. A cast is a one-way communication
technique.
@racket[(define (name-id rpc-place args ...) body)]
}
}
@examples[ #:eval evaler
(module example1 racket
(require racket/place/define-remote-server)
(define-named-remote-server
tuple-server
(define-state h (make-hash))
(define-rpc (set k v)
(hash-set! h k v)
v)
(define-rpc (get k)
(hash-ref h k #f))))]
@examples[ #:eval evaler
(module example2 racket
(require racket/place/define-remote-server)
(define-remote-server
bank
(define-state accounts (make-hash))
(define-rpc (new-account who)
(match (hash-has-key? accounts who)
[#t '(already-exists)]
[else
(hash-set! accounts who 0)
(list 'created who)]))
(define-rpc (removeM who amount)
(cond
[(hash-ref accounts who (lambda () #f)) =>
(lambda (balance)
(cond [(<= amount balance)
(define new-balance (- balance amount))
(hash-set! accounts who new-balance)
(list 'ok new-balance)]
[else
(list 'insufficient-funds balance)]))]
[else
(list 'invalid-account who)]))
(define-rpc (add who amount)
(cond
[(hash-ref accounts who (lambda () #f)) =>
(lambda (balance)
(define new-balance (+ balance amount))
(hash-set! accounts who new-balance)
(list 'ok new-balance))]
[else
(list 'invalid-account who)]))))]
@defthing[distributed-launch-path path?]{
Contains the path to the distributed places launcher.
Contains the local path to the distributed places launcher. The
distributed places launcher is the bootsrap file that launches the
message router on a new node.
}
@defproc[(ssh-bin-path) string?]{
@ -672,7 +632,6 @@ Returns the path to the currently executing racket binary on the local system.
Returns the path to the distributed places launch file.
The function can take an optional argument specifying the path to the collects directory.
}
i
@;{
@defproc[(build-node-args . list?) list?]{
@ -687,43 +646,43 @@ Spawns a list of nodes by calling @racket[(lambda (x) (apply keyword-apply spawn
}
@defproc[(*channel-put [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)] [msg any?]) void?]{
@defproc[(*channel-put [ch (or/c place-channel? async-bi-channel? channel? (is-a?/c remote-connection%))] [msg any]) void?]{
Sends @racket[msg] over @racket[ch] channel.
}
@defproc[(*channel-get [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)]) any?]{
@defproc[(*channel-get [ch (or/c place-channel? async-bi-channel? channel? (is-a?/c remote-connection%))]) any]{
Returns a message received on @racket[ch] channel.
}
@defproc[(*channel? [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)]) boolean?]{
@defproc[(*channel? [ch (or/c place-channel? async-bi-channel? channel? (is-a?/c remote-connection%))]) boolean?]{
Returns @racket[#t] if @racket[ch] is one of @racket[place-channel?] @racket[async-bi-channel?]
@racket[channel?] @racket[remote-connection%?].
@racket[channel?] @racket[(is-a?/c remote-connection%)].
}
@defproc[(send-new-place-channel-to-named-dest [ch *channel?] [src-id any?]
[dest-list (listOf string? non-negative-integer? string?)])
@defproc[(send-new-place-channel-to-named-dest [ch *channel?] [src-id any]
[dest-list (listof string? port-no? string?)])
place-channel?]{
Creates and returns a new place channel connection to a named place at @racket[dest-list].
The @racket[dest-list] argument is a list of a remote-hostname remote-port and named-place name.
The channel @racket[ch] should be a connection to a @racket[message-router].
}
@defproc[(mr-spawn-remote-node [mrch *channel?] [host string?] [#:listen-port listen-port non-negative-integer? DEFAULT-ROUTER-PORT]
@defproc[(mr-spawn-remote-node [mrch *channel?] [host string?] [#:listen-port listen-port port-no? DEFAULT-ROUTER-PORT]
[#:solo solo boolean? #f]) void?]{
Sends a message to a message router over @racket[mrch] channel asking the message router to spawn a new node
at @racket[host] listening on port @racket[listen-port]. If the @racket[#:solo] keyword argument is supplied
the new node is not folded into the complete network with other nodes in the distributed system.
}
@defproc[(mr-supervise-named-dynamic-place-at [mrch *channel?] [dest (listOf string? port-no?)] [name string?] [path string?] [func symbol?]) void?]{
@defproc[(mr-supervise-named-dynamic-place-at [mrch *channel?] [dest (listof string? port-no?)] [name string?] [path string?] [func symbol?]) void?]{
Sends a message to a message router over @racket[mrch] channel asking the message router to spawn
a named place at @racket[dest] named @racket[name]. The place is spawned at the remote node by calling
dynamic place with module-path @racket[path] and function @racket[func]. The @racket[dest] parameter should be a
list of remote-hostname and remote-port.
}
@defproc[(mr-connect-to [mrch *channel?] [dest (listOf string? non-negative-integer?)] [name string?]) void?]{
@defproc[(mr-connect-to [mrch *channel?] [dest (listof string? port-no?)] [name string?]) void?]{
Sends a message to a message router over @racket[mrch] channel asking the message router to create a new
connection to the named place named @racket[name] at @racket[dest].
The @racket[dest] parameter should be a list of remote-hostname and remote-port.
@ -739,14 +698,18 @@ returns a @racket[channel?] connection to the message router.
Returns @racket[#t] if @racket[no] is a @racket[exact-nonnegative-integer?] between @racket[0] and @racket[65535].
}
@defthing[DEFAULT-ROUTER-PORT port-no?]{
The default port for distributed places message router.
}
@defclass[named-place-typed-channel% object% () ]{
@defconstructor[([ch place-channel?])]{
The @racket[ch] argument is a @racket[place-channel].
}
@defmethod[(get [type symbol?]) any?]{
@defmethod[(get [type symbol?]) any]{
Returns the first message received on @racket[ch] that has the type @racket[type]. Messages are lists and their type is the first
item of the list which should be a @racket[symbol?]. Messages of other types that are received are queue for later @racket[get] requests.
item of the list which should be a @racket[symbol?]. Messages of other types that are received are queued for later @racket[get] requests.
}
}
@ -796,15 +759,15 @@ Returns the length of strings, bytes, and lists.
]
}
@defproc[(write-flush [datum any?] [port port?]) void?]{
@defproc[(write-flush [datum any] [port port?]) void?]{
Writes @racket[datum] to @racket[port] and then flushes @racket[port].
}
@defproc[(printf/f [format string?] [args any?] ...) void?]{
@defproc[(printf/f [format string?] [args any] ...) void?]{
Calls @racket[printf] followed by a call to @racket[flush-output].
}
@defproc[(displayln/f [item any?]) void?]{
@defproc[(displayln/f [item any]) void?]{
Calls @racket[displayln] followed by a call to @racket[flush-output].
}
@ -812,4 +775,138 @@ Calls @racket[displayln] followed by a call to @racket[flush-output].
(write-flush "Hello World" (current-output-port))
]
@;@include-section["define-remote-server.scrbl"]
@section{Define Remote Server}
@defmodule[racket/place/define-remote-server]
@deftogether[(@defform[(define-remote-server [name identifier?] rpc-forms ...+)]
@defform[(define-named-remote-server [name identifier?] rpc-forms ...+)])]{
The @racket[define-remote-server] and @racket[define-named-remote-server] forms
are nearly identical. The @racket[define-remote-server] form should be used
with @racket[supervise-dynamic-place-at] to build a private rpc server, while
the @racket[define-named-remote-server] form should be used with
@racket[supervise-named-dynamic-place-at] to build a rpc server inside a named
place.
The @racket[define-named-remote-server] form takes an identifier and a
list of custom expressions as its arguments. From the identifier a
function is created by prepending the @tt{make-} prefix. This
procedure takes a single argument a @racket[place-channel]. In the
example below, the @racket[make-tuple-server] identifier is the
@racket{place-function-name} given to the
@racket[supervise-named-dynamic-place-at] form to spawn an rpc server.
The server created by the @racket[make-tuple-server] procedure sits in
a loop waiting for rpc requests from the @racket[define-rpc] functions
documented below.
@defform[(define-state id value)]{
Expands to a @@racket[define], which is closed over by the @racket[define-rpc] functions
to form local state.
}
@defform[(define-rpc (id args ...) body ...)]{
Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to
the rpc server @racket[rpc-place] and waits for a response.
@racket[(define (name-id rpc-place args ...) body)]
}
@defform[(define-cast (id args ...) body ...)]{
Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to
the rpc server @racket[rpc-place] but does not receive any response. A cast is a one-way communication
technique.
@racket[(define (name-id rpc-place args ...) body)]
}
The
@racket[define-state] custom form translates into a simple
@racket[define] form, which is closed over by the @racket[define-rpc]
forms.
The @racket[define-rpc] form is expanded into two parts. The first
part is the client stubs that call the rpc functions. The client
function name is formed by concatenating the
@racket[define-named-remote-server] identifier, @tt{tuple-server},
with the RPC function name @tt{set} to form @racket[tuple-server-set].
The RPC client functions take a destination argument which is a
@racket[remote-connection%] descriptor and then the RPC function
arguments. The RPC client function sends the RPC function name,
@racket[set], and the RPC arguments to the destination by calling an
internal function @racket[named-place-channel-put]. The RPC client
then calls @racket[named-place-channel-get] to wait for the RPC
response.
The second expansion part of @racket[define-rpc] is the server
implementation of the RPC call. The server is implemented by a match
expression inside the @racket[make-tuple-server] function. The match
clause for @racket[tuple-server-set] matches on messages beginning
with the @racket['set] symbol. The server executes the RPC call with
the communicated arguments and sends the result back to the RPC
client.
The @racket[define-cast] form is similar to the @racket[define-rpc] form
except there is no reply message from the server to client
}
@examples[ #:eval evaler
(module tuple-server-example racket/base
(require racket/match
racket/place/define-remote-server)
(define-named-remote-server tuple-server
(define-state h (make-hash))
(define-rpc (set k v)
(hash-set! h k v)
v)
(define-rpc (get k)
(hash-ref h k #f))
(define-cast (hello)
(printf "Hello from define-cast\n")
(flush-output))))
]
@examples[ #:eval evaler
(module bank-server-example racket/base
(require racket/match
racket/place/define-remote-server)
(define-remote-server bank
(define-state accounts (make-hash))
(define-rpc (new-account who)
(match (hash-has-key? accounts who)
[#t '(already-exists)]
[else
(hash-set! accounts who 0)
(list 'created who)]))
(define-rpc (removeM who amount)
(cond
[(hash-ref accounts who (lambda () #f)) =>
(lambda (balance)
(cond [(<= amount balance)
(define new-balance (- balance amount))
(hash-set! accounts who new-balance)
(list 'ok new-balance)]
[else
(list 'insufficient-funds balance)]))]
[else
(list 'invalid-account who)]))
(define-rpc (add who amount)
(cond
[(hash-ref accounts who (lambda () #f)) =>
(lambda (balance)
(define new-balance (+ balance amount))
(hash-set! accounts who new-balance)
(list 'ok new-balance))]
[else
(list 'invalid-account who)]))))
]
@defproc[(log-to-parent [msg string?] [#:severity severity symbol? 'info]) void?]{
The @racket[log-to-parent] procedure can be used inside a
@racket[define-remote-server] or @racket[define-named-remote-server] form to
send a logging message to the remote owner of the rpc server.
}
@(close-eval evaler)
@include-section["RMPI.scrbl"]