[Distributed Places] added #:thunk and #:named keyword arguments to distributed place creation functions

This commit is contained in:
Kevin Tew 2012-07-31 11:51:55 -06:00
parent aa5b1e192c
commit 6b48d34867
9 changed files with 94 additions and 178 deletions

View File

@ -48,7 +48,7 @@
[(pl-place-enabled?) (pl-func p args ...)]
[else (th-func p args ...)])))]))
(lazy-require [racket/place/distributed (supervise-dynamic-place-at)])
(lazy-require [racket/place/distributed (supervise-place-at)])
(define (place-channel-put/get ch msg)
(place-channel-put ch msg)
@ -74,10 +74,10 @@
(pl-place-pumper-threads p (vector t-in t-out t-err))]
[else (void)]))
(define (dynamic-place module-path function #:at [node #f])
(define (dynamic-place module-path function #:at [node #f] #:named [named #f])
(cond
[node
(supervise-dynamic-place-at node module-path function)]
(supervise-place-at node module-path function #:named named)]
[else
(start-place 'dynamic-place module-path function
#f (current-output-port) (current-error-port))]))

View File

@ -32,14 +32,9 @@
;; New Design Pattern 2 API
message-router
spawn-node-supervise-dynamic-place-at
spawn-node-supervise-place-thunk-at
spawn-node-with-dynamic-place-at
spawn-node-with-place-thunk-at
supervise-named-dynamic-place-at
supervise-named-place-thunk-at
supervise-place-thunk-at
supervise-dynamic-place-at
spawn-node-supervise-place-at
spawn-node-with-place-at
supervise-place-at
supervise-thread-at
supervise-process-at
@ -1398,32 +1393,24 @@
. command-line-list)
(new spawned-process% [cmdline-list command-line-list]))
(define (supervise-named-place-thunk-at node name place-path place-func
#:initial-message [initial-message #f]
#:restart-on-exit [restart-on-exit #f])
(send node launch-place
(list 'place (->writeable-module-path place-path) place-func (->string name))
;#:initial-message initial-message
#:restart-on-exit restart-on-exit
))
(define (mk-place-creation-addr place-path place-func name thunk)
(list* (if thunk 'place 'dynamic-place)
(->writeable-module-path place-path)
place-func
(if name (list (->string name)) null)))
(define (supervise-named-dynamic-place-at node name place-path place-func
#:initial-message [initial-message #f]
#:restart-on-exit [restart-on-exit #f])
(send node launch-place
(list 'dynamic-place (->writeable-module-path place-path) place-func (->string name))
;#:initial-message initial-message
#:restart-on-exit restart-on-exit
))
(define (spawn-node-with-dynamic-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT]
(define (spawn-node-with-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
#:restart-on-exit [restart-on-exit #f]
#:named [named #f]
#:thunk [thunk #f])
(define-values (node pl)
(spawn-node-supervise-place-at/exec host (list 'dynamic-place (->writeable-module-path place-path) place-func) #:listen-port listen-port
(spawn-node-supervise-place-at/exec host
(mk-place-creation-addr (->writeable-module-path place-path) place-func thunk)
#:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
@ -1431,41 +1418,17 @@
#:restart-on-exit restart-on-exit))
node)
(define (spawn-node-with-place-thunk-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT]
(define (spawn-node-supervise-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(define-values (node pl)
(spawn-node-supervise-place-at/exec host (list 'place (->writeable-module-path place-path) place-func) #:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
#:distributed-launch-path distributedlaunchpath
#:restart-on-exit restart-on-exit))
node)
(define (spawn-node-supervise-dynamic-place-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(spawn-node-supervise-place-at/exec host (list 'dynamic-place (->writeable-module-path place-path) place-func) #:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
#:distributed-launch-path distributedlaunchpath
#:restart-on-exit restart-on-exit))
(define (spawn-node-supervise-place-thunk-at host place-path place-func #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(spawn-node-supervise-place-at/exec host (list 'place (->writeable-module-path place-path) place-func) #:listen-port listen-port
#:restart-on-exit [restart-on-exit #f]
#:named [named #f]
#:thunk [thunk #f])
(spawn-node-supervise-place-at/exec host
(mk-place-creation-addr (->writeable-module-path place-path) place-func thunk)
#:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
@ -1524,11 +1487,17 @@
#:distributed-launch-path distributedlaunchpath
#:use-current-ports use-current-ports))
(define (supervise-dynamic-place-at remote-node place-path place-func)
(send remote-node launch-place (list 'dynamic-place (->writeable-module-path place-path) place-func)))
(define (supervise-place-at remote-node place-path place-func
;#:initial-message [initial-message #f]
#:restart-on-exit [restart-on-exit #f]
#:named [named #f]
#:thunk [thunk #f])
(define (supervise-place-thunk-at remote-node place-path place-func)
(send remote-node launch-place (list 'place (->writeable-module-path place-path) place-func)))
(send remote-node launch-place
(mk-place-creation-addr place-path place-func named thunk)
#:restart-on-exit restart-on-exit
;#:initial-message initial-message
))
(define (supervise-thread-at remote-node place-path place-func)
(send remote-node launch-place (list 'thread (->writeable-module-path place-path) place-func)))

View File

@ -15,11 +15,12 @@
(module+ main
(define-values (node pl)
(spawn-node-supervise-place-thunk-at
(spawn-node-supervise-place-at
"localhost"
#:listen-port 6344
(quote-module-path "..")
'hello-world))
'hello-world
#:thunk #t))
(message-router
node

View File

@ -12,8 +12,8 @@
(define (main)
(define remote-node (spawn-remote-racket-node "localhost" #:listen-port 6344))
(define tuple-place (supervise-named-dynamic-place-at remote-node 'tuple-server tuple-path 'make-tuple-server))
(define bank-place (supervise-dynamic-place-at remote-node bank-path 'make-bank))
(define tuple-place (supervise-place-at remote-node tuple-path 'make-tuple-server #:named 'tuple-server))
(define bank-place (supervise-place-at remote-node bank-path 'make-bank))
(message-router
remote-node

View File

@ -15,7 +15,7 @@
wait-place-thunk)
(define (spawn-place-worker-at port message)
(spawn-node-with-dynamic-place-at "localhost" #:listen-port port place-worker-path 'place-worker #:initial-message message #:restart-on-exit #f))
(spawn-node-with-place-at "localhost" #:listen-port port place-worker-path 'place-worker #:initial-message message #:restart-on-exit #f))
(define (wait-place-thunk)
(place ch
@ -25,14 +25,14 @@
(define (main)
(define bank-node (spawn-node-with-dynamic-place-at "localhost" #:listen-port 6344 bank-path 'make-bank))
(define bank-node (spawn-node-with--place-at "localhost" #:listen-port 6344 bank-path 'make-bank))
(define bank-place (send bank-node get-first-place))
(message-router
(spawn-place-worker-at 6341 "ONE")
(spawn-place-worker-at 6342 "TWO")
(spawn-place-worker-at 6343 "THREE")
bank-node
(spawn-node-with-place-thunk-at "localhost" #:listen-port 6345 (quote-module-name) 'wait-place-thunk #:restart-on-exit #t)
(spawn-node-with-place-at "localhost" #:listen-port 6345 #:thunk #t (quote-module-name) 'wait-place-thunk #:restart-on-exit #t)
(every-seconds 3.3 (printf "Hello from every-seconds\n") (flush-output))
(after-seconds 2
(displayln (bank-new-account bank-place 'user0))

View File

@ -14,12 +14,12 @@
(define remote-node (spawn-remote-racket-node
"localhost"
#:listen-port 6344))
(define tuple-place (supervise-named-dynamic-place-at
(define tuple-place (supervise-place-at
remote-node
'tuple-server
#:named 'tuple-server
tuple-path
'make-tuple-server))
(define bank-place (supervise-dynamic-place-at
(define bank-place (supervise-place-at
remote-node bank-path
'make-bank))

View File

@ -94,11 +94,10 @@
(define nodes (spawn-nodes/join/local config))
(for ([n nodes]
[i (in-naturals)])
(supervise-named-dynamic-place-at n
(i->place-name i)
(->module-path (quote-module-path))
'map-reduce-worker))
(supervise-place-at n
(->module-path (quote-module-path))
'map-reduce-worker
#:named (i->place-name)))
nodes)
(define (default-sorter a b)

View File

@ -205,10 +205,10 @@
(for ([n nodes]
[c config])
(match-define (list-rest host port name id rest) c)
(supervise-named-dynamic-place-at n
name
(lookup-config-value rest "mpi-module")
(lookup-config-value rest "mpi-func")))
(supervise-place-at n
(lookup-config-value rest "mpi-module")
(lookup-config-value rest "mpi-func")
#:named name))
(define-values (mrth ch)
(start-message-router/thread

View File

@ -84,10 +84,11 @@ passes it a @racket["Hello World"] string:
;; remote-connection% instance (pl) for communicating with the
;; new node and place
(define-values (node pl)
(spawn-node-supervise-place-thunk-at "localhost"
#:listen-port 6344
(quote-module-path "..")
'hello-world))
(spawn-node-supervise-place-at "localhost"
#:listen-port 6344
#:thunk #t
(quote-module-path "..")
'hello-world))
;; starts a message router which adds three event-container<%>s to
;; its list of events to handle: the node and two after-seconds
@ -115,7 +116,7 @@ passes it a @racket["Hello World"] string:
events for the node.
}
@defproc[(spawn-node-with-dynamic-place-at
@defproc[(spawn-node-with-place-at
[hostname string?]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
@ -124,13 +125,25 @@ passes it a @racket["Hello World"] string:
[#:racket-path racket-path string-path? (racket-path)]
[#:ssh-bin-path ssh-path string-path? (ssh-bin-path)]
[#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
[#:restart-on-exit restart-on-exit any/c #f]
[#:named place-name (or/c #f symbol?) #f]
[#:thunk thunk (or/c #f #t) #f]) (is-a?/c remote-connection%)]{
Spawns a new remote node at @racket[hostname] with one instance place specified by
the @racket[instance-module-path] and @racket[instance-place-function-name]; that
is, the result of
@racket[(dynamic-place instance-module-path instance-place-function-name)].
is called in the new place of the new node.
the @racket[instance-module-path] and @racket[instance-place-function-name].
When @racket[thunk] is @racket[#f], the place is created as the result of the framework
calling @racket[(dynamic-place instance-module-path instance-place-function-name)].
in the new node.
When @racket[thunk] is @racket[#t] the
@racket[instance-place-function-name] function should use
@racket[dynamic-place] or @racket[place] to create and return an
initial place in the new node.
When the @racket[place-name] symbol is present a named place is
created. The @racket[place-name] symbol is used to establish later
connections to the named place.}
The result is a @racket[remote-node%] instance, not a
@racket[remote-connection%]. Use @method[remote-node%
@ -143,9 +156,7 @@ procedure of zero arguments to implement the restart procedure, or it
can be an object that support a @racket[restart] method that takes a
@tech{place} argument.}
@defproc[(spawn-node-supervise-dynamic-place-at
@defproc[(spawn-node-supervise-place-at
[hostname string?]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
@ -154,48 +165,13 @@ can be an object that support a @racket[restart] method that takes a
[#:racket-path racket-path string-path? (racket-path)]
[#:ssh-bin-path ssh-path string-path? (ssh-bin-path)]
[#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-connection%))]{
[#:restart-on-exit restart-on-exit any/c #f]
[#:named named (or/c #f string?) #f]
[#:thunk thunk (or/c #f #t) #f]) (values (is-a?/c remote-node%) (is-a?/c remote-connection%))]{
Like @racket[spawn-node-with-dynamic-place-at], but the result is two values: the
new @racket[remote-node%] and its @racket[remote-connection%] instance.}
@defproc[(spawn-node-with-place-thunk-at
[hostname string?]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any #f]
[#:racket-path racket-path string-path? (racket-path)]
[#:ssh-bin-path ssh-path string-path? (ssh-bin-path)]
[#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Like @racket[spawn-node-with-dynamic-place-at], but in the new node,
@racket[(dynamic-place instance-module-path instance-thunk-function-name)]
is called to create a place and return the newly constructed
the place descriptor. That is, the
@racket[instance-thunk-function-name] function should
use @racket[dynamic-place] or @racket[place] to create an initial
place in the new node.}
@defproc[(spawn-node-supervise-place-thunk-at
[hostname string?]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:initial-message initial-message any #f]
[#:racket-path racket-path string-path? (racket-path)]
[#:ssh-bin-path ssh-path string-path? (ssh-bin-path)]
[#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]
[#:restart-on-exit restart-on-exit any/c #f]) (values (is-a?/c remote-node%) (is-a?/c remote-connection%))]{
Like @racket[spawn-node-with-place-thunk-at], but the result is two
values (like @racket[spawn-node-supervise-dynamic-place-at]): the new
@racket[remote-node%] and its @racket[remote-connection%] instance.}
@defproc[(spawn-remote-racket-node
[hostname string?]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
@ -217,30 +193,28 @@ Spawns a new remote node at @racket[hostname] and returns a @racket[remote-node%
Like @racket[spawn-remote-racket-node], but the @racket[current-output-port] and @racket[current-error-port]
are used as the standard ports for the spawned process instead of new pipe ports.}
@defproc[(supervise-dynamic-place-at
@defproc[(supervise-place-at
[remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
[#:restart-on-exit restart-on-exit any/c #f]
[#:named named (or/c #f symbol?) #f]
[#:thunk thunk (or/c #f #t) #f]) (is-a?/c remote-connection%)]{
Creates a new place on @racket[remote-node] by using
When @racket[thunk] is @racket[#f], creates a new place on @racket[remote-node] by using
@racket[dynamic-place] to invoke
@racket[instance-place-function-name] from the module
@racket[instance-module-path].}
@racket[instance-module-path].
@defproc[(supervise-place-thunk-at
[remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new place at @racket[remote-node] by executing the thunk
exported as @racket[instance-thunk-function-name] from the module
When @racket[thunk] is @racket[#t], creates a new place at @racket[remote-node] by executing the thunk
exported as @racket[instance-place-function-name] from the module
@racket[instance-module-path]. The function should use
@racket[dynamic-place] or @racket[place] to create a place in the new
node.}
@racket[dynamic-place] or @racket[place] to create and return a place in the new
node.
When the @racket[place-name] symbol is present a named place is
created. The @racket[place-name] symbol is used to establish later
connections to the named place.}
@defproc[(supervise-process-at
[hostname string?]
@ -249,33 +223,6 @@ node.}
Spawns an attached external process at host @racket[hostname].
}
@defproc[(supervise-named-dynamic-place-at
[remote-node (is-a?/c remote-node%)]
[place-name symbol?]
[instance-module-path module-path?]
[instance-place-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new place on the @racket[remote-node] by using
@racket[dynamic-place] to invoke
@racket[instance-place-function-name] from the module
@racket[instance-module-path]. The @racket[place-name] symbol
is used to establish later connections to the named place.}
@defproc[(supervise-named-place-thunk-at
[remote-node (is-a?/c remote-node%)]
[place-name symbol?]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) (is-a?/c remote-connection%)]{
Creates a new place on the @racket[remote-node] by executing the thunk
@racket[instance-thunk-function-name] from the module
@racket[instance-module-path]; that is, the function should use
@racket[dynamic-place] or @racket[place] to create a place.
The @racket[place-name] symbol
is used to establish later connections to the named place.}
@defproc[(supervise-thread-at
[remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?]