[Distributed Places] more docs, removed old functions, RMPI function renames

This commit is contained in:
Kevin Tew 2012-05-04 11:19:42 -06:00
parent 1fab365129
commit c619f7cbbf
4 changed files with 165 additions and 96 deletions

View File

@ -11,6 +11,7 @@
racket/udp
racket/runtime-path
racket/date
racket/contract
syntax/location)
(define-syntax define/provide
@ -48,20 +49,16 @@
spawn-remote-racket-node
node-send-exit
node-get-first-place
dplace-put
dplace-get
;; low-level API
ll-channel-get
ll-channel-put
write-flush
printf/f
displayln/f
log-message
start-spawned-node-router
start-spawned-node-router ;not documented
;; Old Design Pattern 1 API
;; not documented
dcg-get-cg
dcg-send
dcg-send-type
@ -81,7 +78,7 @@
;v3 api
build-distributed-launch-path
build-node-args
#;build-node-args
*channel-get
*channel-put
send-new-place-channel-to-named-dest
@ -99,7 +96,6 @@
node%
socket-connection%
remote-node%
remote-place%
remote-connection%
place%
connection%
@ -110,8 +106,13 @@
;re-provides
quote-module-path
;named-place-typed-channel
named-place-typed-channel%
tc-get
;contracts
*channel?
port-no?
)
(define-runtime-path distributed-launch-path "distributed/launch.rkt")
@ -291,11 +292,8 @@
(define (total-node-count conf) (reduce-sum conf item (node-config-proc-count item)))
;; Contract: start-node-router : VectorOf[ (or/c place-channel socket-connection)] -> (void)
;;
;; Purpose: Forward messages between channels and build new point-to-point subchannels
;;
;; Example:
(define (start-spawned-node-router listener)
(define nc (new node% [listen-port listener]))
(send nc sync-events))
@ -965,16 +963,6 @@
(define (node-send-exit node) (send node send-exit))
(define (node-get-first-place node) (send node get-first-place))
(define (dplace-get dest)
(cond
[(place-channel? dest) (place-channel-get dest)]
[else (send dest get-msg)]))
(define (dplace-put dest msg)
(cond
[(place-channel? dest) (place-channel-put dest msg)]
[else (send dest put-msg msg)]))
(define remote-connection%
(backlink
(class*
@ -1071,8 +1059,6 @@
(super-new)
)))
(define remote-place% remote-connection%)
(define place%
(backlink
(class*
@ -1179,10 +1165,6 @@
(super-new)
)))
(define (ll-channel-put ch msg) (send ch put-msg msg))
(define (ll-channel-get ch) (send ch get-raw-msg))
(define respawn-and-fire%
(backlink
(class*
@ -1341,8 +1323,7 @@
#:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:restart-on-exit [restart-on-exit #f]
. command-line-list)
(void)
)
(new spawned-process% [cmdline-list command-line-list]))
(define (supervise-named-place-thunk-at node name place-path place-func
#:initial-message [initial-message #f]
@ -1565,21 +1546,27 @@
;;; [chan-vec (vector ch)]))
;;; (send mrn sync-events)])))
;;; (place-channel-put mr (list listen-port)))
(define (*channel? ch)
(or (place-channel? ch)
(async-bi-channel? ch)
(channel? ch)
(is-a? ch remote-connection%)))
(define (*channel-put ch msg)
((cond
[(place-channel? ch) place-channel-put]
[(async-bi-channel? ch) async-bi-channel-put]
[(channel? ch) channel-put]
[else (raise (format "unknown channel type ~a" ch))])
ch msg))
(cond
[(place-channel? ch) (place-channel-put ch msg)]
[(async-bi-channel? ch) (async-bi-channel-put ch msg)]
[(channel? ch) (channel-put ch msg)]
[(is-a? ch remote-connection%) (send ch put-msg msg)]
[else (raise (format "unknown channel type ~a" ch))]))
(define (*channel-get ch)
((cond
[(place-channel? ch) place-channel-get]
[(async-bi-channel? ch) async-bi-channel-get]
[(channel? ch) channel-get]
[else (raise (format "unknown channel type ~a" ch))])
ch))
(cond
[(place-channel? ch) (place-channel-get ch)]
[(async-bi-channel? ch) (async-bi-channel-get ch)]
[(channel? ch) (channel-get ch)]
[(is-a? ch remote-connection%) (send ch get-msg)]
[else (raise (format "unknown channel type ~a" ch))]))
(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:solo [solo #f])
@ -1597,6 +1584,10 @@
(*channel-put mrch (dcgm DCGM-CONTROL-NEW-CONNECTION dest -1 (list name ch2)))
ch1)
(define (port-no? x)
(and (exact-nonnegative-integer? x)
((integer-in 0 65535) x)))
(define/provide (start-message-router/thread #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:nodes [nodes null])
(define ch (make-channel))
@ -1623,13 +1614,13 @@
#:distributed-launch-path distributedlaunchpath))))
ch)
(define/provide (spawn-nodes/join nodes-desc)
(define/provide (spawn-nodes/join nodes-descs)
(for/list ([x
(for/list ([n nodes-desc])
(for/list ([n nodes-descs])
(apply keyword-apply spawn-node-at n))])
(channel-get x)))
(define build-node-args
#;(define build-node-args
(make-keyword-procedure (lambda (kws kw-args . rest)
(list kws kw-args rest))))

View File

@ -16,8 +16,8 @@
RMPI-id
RMPI-cnt
RMPI-partition
RMPI-BuildDefaultConfig
RMPI-Launch
RMPI-build-default-config
RMPI-launch
RMPI-finish)
(struct RMPI-COMM (id cnt channels) #:transparent)
@ -153,14 +153,14 @@
(define cnt (RMPI-cnt comm))
(partit num cnt id))
(define RMPI-BuildDefaultConfig
(define RMPI-build-default-config
(make-keyword-procedure (lambda (kws kw-args . rest)
(for/hash ([kw kws]
[kwa kw-args])
; (displayln (keyword? kw))
(values kw kwa)))))
(define (RMPI-Launch default config)
(define (RMPI-launch default config)
(define (lookup-config-value rest key-str)
(define key
(string->keyword key-str))

View File

@ -21,8 +21,8 @@
(message-router
node
(after-seconds 2
(dplace-put pl "Hello")
(printf "message-router received: ~a\n" (dplace-get pl)))
(*channel-put pl "Hello")
(printf "message-router received: ~a\n" (*channel-get pl)))
(after-seconds 6
(exit 0))))

View File

@ -91,10 +91,10 @@ The use of Distributed Places is predicated on a couple assumptions:
@defproc[(message-router [ec events-container<%>?] ...+) void?]{
Waits in an endless loop for one of many events to become ready. The
@racket[message-router] procedure constructs a @racket[node%]
instance to serve as the message router for then node. The
instance to serve as the message router for the node. The
@racket[message-router] procedure then adds all the declared
@racket[events-container<%>]s to the @racket[node%] and finally calls
the never ending loop @racket[sync-events] method, which handles the
the never ending loop @racket[sync-events] method, which handles
events for the node.
}
@ -264,6 +264,17 @@ is used to establish later connections to the named place.
@|place-thunk-function|
}
@defproc[(supervise-thread-at
[remote-node remote-node?]
[instance-module-path module-path?]
[instance-thunk-function-name symbol?]
[#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{
Creates a new threadon the @racket[remote-node] by using
@racket[dynamic-require] to invoke
@racket[instance-place-function-name] from the module
@racket[instance-module-path].
}
@defform[(restart-every [seconds (and/c real? nonnegative?)]
[#:retry retry (or/c nonnegative-integer? #f) #f]
[#:on-final-fail on-final-fail (or/c #f (-> any/c)) #f])]{
@ -447,18 +458,28 @@ node's message router.
}
}
@defclass[remote-place% object% (event-container<%>)]{
@defproc[(node-send-exit [remote-node% node]) void?]{
Sends @racket[node] a message telling it to exit immediately.
}
@defproc[(node-get-first-place [remote-node% node]) remote-place%?]{
Returns the @racket[remote-place%] instance of the first place spawned at this node
}
The @racket[remote-place%] instance provides a remote api to a place
running on a remote distributed places node. It launches a compute
places and routes inter-node place messages to the remote place.
@defclass[remote-connection% object% (event-container<%>)]{
The @racket[remote-connection%]
instance provides a remote api to a place
running on a remote distributed places node. It launches a
places or connects to a named place and routes inter-node place messages to the remote place.
@defconstructor[([node remote-node%?]
[place-exec list?]
[name string?]
[restart-on-exit #f]
[one-sided-place? #f]
[on-channel #f])]{
Constructs a @racket[remote-place%] instance.
Constructs a @racket[remote-connection%] instance.
@|place-exec-note|
@|one-sided-note|
@|restart-on-exit-note|
@ -472,37 +493,6 @@ places and routes inter-node place messages to the remote place.
}
}
@defproc[(dplace-put [pl remote-place%?] [msg any/c]) void?]{
This function is used inside @racket[message-router] callbacks.
It sends messages to remote places.
}
@defproc[(dplace-get [pl remote-place%?]) any/c]{
This function is used inside @racket[message-router] callbacks.
It takes the current delimited continuation and resumes it when a message arrives from @racket[pl].
}
@defclass[remote-connection% object% (event-container<%>)]{
The @racket[remote-connection%] instance provides a remote api to a named place
running on a remote distributed places node. It connects to a named compute
places and routes inter-node place messages to the remote place.
@defconstructor[([node remote-node%?]
[name string?]
[restart-on-exit #f]
[on-channel #f])]{
Constructs a @racket[remote-place%] instance.
@|restart-on-exit-note|
See @racket[set-on-channel!] for description of @racket[on-channel] argument.
}
@defmethod[(set-on-channel! [callback (-> channel msg void?)]) void?]{
Installs a handler function that handles messages from the remote place.
The @racket[setup/distributed-docs] module uses this callback to handle job completion messages.
}
}
@defclass[place% object% (event-container<%>)]{
@ -528,13 +518,6 @@ places and routes inter-node place messages to the remote place.
}
}
@defproc[(node-send-exit [remote-node% node]) void?]{
Sends @racket[node] a message telling it to exit immediately.
}
@defproc[(node-get-first-place [remote-node% node]) remote-place%?]{
Returns the @racket[remote-place%] instance of the first place spawned at this node
}
@defclass[connection% object% (event-container<%>)]{
The @racket[connection%] instance represents a connection to a
@ -685,6 +668,92 @@ Returns the path to the ssh binary on the local system in string form.
Returns the path to the currently executing racket binary on the local system.
}
@defproc[(build-distributed-launch-path [collects-path path-string?]) string?]{
Returns the path to the distributed places launch file.
The function can take an optional argument specifying the path to the collects directory.
}
i
@;{
@defproc[(build-node-args . list?) list?]{
Takes all the positional and keyword arguments pass to it and builds a
@racket[(list (list keywords ...) (list keyword-arguments ...) (list positional-args ...))]
suitable as an argument to @racket[(lambda (x) (apply keyword-apply spawn-node-at x))].}
}
@defproc[(spawn-nodes/join [nodes-descs list?]) void?]{
Spawns a list of nodes by calling @racket[(lambda (x) (apply keyword-apply spawn-node-at x))] for each node description in
@racket[nodes-descs] and then waits for each node to spawn.
}
@defproc[(*channel-put [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)] [msg any?]) void?]{
Sends @racket[msg] over @racket[ch] channel.
}
@defproc[(*channel-get [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)]) any?]{
Returns a message received on @racket[ch] channel.
}
@defproc[(*channel? [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)]) boolean?]{
Returns @racket[#t] if @racket[ch] is one of @racket[place-channel?] @racket[async-bi-channel?]
@racket[channel?] @racket[remote-connection%?].
}
@defproc[(send-new-place-channel-to-named-dest [ch *channel?] [src-id any?]
[dest-list (listOf string? non-negative-integer? string?)])
place-channel?]{
Creates and returns a new place channel connection to a named place at @racket[dest-list].
The @racket[dest-list] argument is a list of a remote-hostname remote-port and named-place name.
The channel @racket[ch] should be a connection to a @racket[message-router].
}
@defproc[(mr-spawn-remote-node [mrch *channel?] [host string?] [#:listen-port listen-port non-negative-integer? DEFAULT-ROUTER-PORT]
[#:solo solo boolean? #f]) void?]{
Sends a message to a message router over @racket[mrch] channel asking the message router to spawn a new node
at @racket[host] listening on port @racket[listen-port]. If the @racket[#:solo] keyword argument is supplied
the new node is not folded into the complete network with other nodes in the distributed system.
}
@defproc[(mr-supervise-named-dynamic-place-at [mrch *channel?] [dest (listOf string? port-no?)] [name string?] [path string?] [func symbol?]) void?]{
Sends a message to a message router over @racket[mrch] channel asking the message router to spawn
a named place at @racket[dest] named @racket[name]. The place is spawned at the remote node by calling
dynamic place with module-path @racket[path] and function @racket[func]. The @racket[dest] parameter should be a
list of remote-hostname and remote-port.
}
@defproc[(mr-connect-to [mrch *channel?] [dest (listOf string? non-negative-integer?)] [name string?]) void?]{
Sends a message to a message router over @racket[mrch] channel asking the message router to create a new
connection to the named place named @racket[name] at @racket[dest].
The @racket[dest] parameter should be a list of remote-hostname and remote-port.
}
@defproc[(start-message-router/thread [#:listen-port listen-port port-no? DEFAULT-ROUTER-PORT]
[#:nodes nodes list? null]) (values thread? channel?)]{
Starts a message router in a racket thread connected to @racket[nodes], listening on port @racket[listen-port], and
returns a @racket[channel?] connection to the message router.
}
@defproc[(port-no? [no (and/c exact-nonnegative-integer? (integer-in 0 65535))]) boolean?]{
Returns @racket[#t] if @racket[no] is a @racket[exact-nonnegative-integer?] between @racket[0] and @racket[65535].
}
@defclass[named-place-typed-channel% object% () ]{
@defconstructor[([ch place-channel?])]{
The @racket[ch] argument is a @racket[place-channel].
}
@defmethod[(get [type symbol?]) any?]{
Returns the first message received on @racket[ch] that has the type @racket[type]. Messages are lists and their type is the first
item of the list which should be a @racket[symbol?]. Messages of other types that are received are queue for later @racket[get] requests.
}
}
@defproc[(tc-get [type symbol?] [ch place-channel?]) void?]{
Gets a message of type @racket[type] from the @racket[named-place-typed-channel%] @racket[ch].
}
@;{@examples[ #:eval evaler
(racket-path)
]}
@ -727,9 +796,18 @@ Returns the length of strings, bytes, and lists.
]
}
@defproc[(write-flush [datum any?] [port port?]) (void)]{
@defproc[(write-flush [datum any?] [port port?]) void?]{
Writes @racket[datum] to @racket[port] and then flushes @racket[port].
}
@defproc[(printf/f [format string?] [args any?] ...) void?]{
Calls @racket[printf] followed by a call to @racket[flush-output].
}
@defproc[(displayln/f [item any?]) void?]{
Calls @racket[displayln] followed by a call to @racket[flush-output].
}
@examples[ #:eval evaler
(write-flush "Hello World" (current-output-port))
]