From c619f7cbbf4222065700d7435f1c4d628dcddd47 Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Fri, 4 May 2012 11:19:42 -0600 Subject: [PATCH] [Distributed Places] more docs, removed old functions, RMPI function renames --- collects/racket/place/distributed.rkt | 79 ++++---- collects/racket/place/distributed/RMPI.rkt | 8 +- .../distributed/examples/hello-world.rkt | 4 +- .../scribblings/reference/distributed.scrbl | 170 +++++++++++++----- 4 files changed, 165 insertions(+), 96 deletions(-) diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 18b514f41e..2f553aa3a3 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -11,6 +11,7 @@ racket/udp racket/runtime-path racket/date + racket/contract syntax/location) (define-syntax define/provide @@ -48,20 +49,16 @@ spawn-remote-racket-node node-send-exit node-get-first-place - dplace-put - dplace-get ;; low-level API - - ll-channel-get - ll-channel-put write-flush printf/f displayln/f log-message - start-spawned-node-router + start-spawned-node-router ;not documented ;; Old Design Pattern 1 API + ;; not documented dcg-get-cg dcg-send dcg-send-type @@ -81,7 +78,7 @@ ;v3 api build-distributed-launch-path - build-node-args + #;build-node-args *channel-get *channel-put send-new-place-channel-to-named-dest @@ -99,7 +96,6 @@ node% socket-connection% remote-node% - remote-place% remote-connection% place% connection% @@ -110,8 +106,13 @@ ;re-provides quote-module-path + ;named-place-typed-channel named-place-typed-channel% tc-get + + ;contracts + *channel? + port-no? ) (define-runtime-path distributed-launch-path "distributed/launch.rkt") @@ -291,11 +292,8 @@ (define (total-node-count conf) (reduce-sum conf item (node-config-proc-count item))) ;; Contract: start-node-router : VectorOf[ (or/c place-channel socket-connection)] -> (void) -;; ;; Purpose: Forward messages between channels and build new point-to-point subchannels -;; ;; Example: - (define (start-spawned-node-router listener) (define nc (new node% [listen-port listener])) (send nc sync-events)) @@ -965,16 +963,6 @@ (define (node-send-exit node) (send node send-exit)) (define (node-get-first-place node) (send node get-first-place)) -(define (dplace-get dest) - (cond - [(place-channel? dest) (place-channel-get dest)] - [else (send dest get-msg)])) - -(define (dplace-put dest msg) - (cond - [(place-channel? dest) (place-channel-put dest msg)] - [else (send dest put-msg msg)])) - (define remote-connection% (backlink (class* @@ -1071,8 +1059,6 @@ (super-new) ))) -(define remote-place% remote-connection%) - (define place% (backlink (class* @@ -1179,10 +1165,6 @@ (super-new) ))) - -(define (ll-channel-put ch msg) (send ch put-msg msg)) -(define (ll-channel-get ch) (send ch get-raw-msg)) - (define respawn-and-fire% (backlink (class* @@ -1341,8 +1323,7 @@ #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:restart-on-exit [restart-on-exit #f] . command-line-list) - (void) - ) + (new spawned-process% [cmdline-list command-line-list])) (define (supervise-named-place-thunk-at node name place-path place-func #:initial-message [initial-message #f] @@ -1565,21 +1546,27 @@ ;;; [chan-vec (vector ch)])) ;;; (send mrn sync-events)]))) ;;; (place-channel-put mr (list listen-port))) +(define (*channel? ch) + (or (place-channel? ch) + (async-bi-channel? ch) + (channel? ch) + (is-a? ch remote-connection%))) + (define (*channel-put ch msg) - ((cond - [(place-channel? ch) place-channel-put] - [(async-bi-channel? ch) async-bi-channel-put] - [(channel? ch) channel-put] - [else (raise (format "unknown channel type ~a" ch))]) - ch msg)) + (cond + [(place-channel? ch) (place-channel-put ch msg)] + [(async-bi-channel? ch) (async-bi-channel-put ch msg)] + [(channel? ch) (channel-put ch msg)] + [(is-a? ch remote-connection%) (send ch put-msg msg)] + [else (raise (format "unknown channel type ~a" ch))])) (define (*channel-get ch) - ((cond - [(place-channel? ch) place-channel-get] - [(async-bi-channel? ch) async-bi-channel-get] - [(channel? ch) channel-get] - [else (raise (format "unknown channel type ~a" ch))]) - ch)) + (cond + [(place-channel? ch) (place-channel-get ch)] + [(async-bi-channel? ch) (async-bi-channel-get ch)] + [(channel? ch) (channel-get ch)] + [(is-a? ch remote-connection%) (send ch get-msg)] + [else (raise (format "unknown channel type ~a" ch))])) (define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:solo [solo #f]) @@ -1597,6 +1584,10 @@ (*channel-put mrch (dcgm DCGM-CONTROL-NEW-CONNECTION dest -1 (list name ch2))) ch1) +(define (port-no? x) + (and (exact-nonnegative-integer? x) + ((integer-in 0 65535) x))) + (define/provide (start-message-router/thread #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:nodes [nodes null]) (define ch (make-channel)) @@ -1623,13 +1614,13 @@ #:distributed-launch-path distributedlaunchpath)))) ch) -(define/provide (spawn-nodes/join nodes-desc) +(define/provide (spawn-nodes/join nodes-descs) (for/list ([x - (for/list ([n nodes-desc]) + (for/list ([n nodes-descs]) (apply keyword-apply spawn-node-at n))]) (channel-get x))) -(define build-node-args +#;(define build-node-args (make-keyword-procedure (lambda (kws kw-args . rest) (list kws kw-args rest)))) diff --git a/collects/racket/place/distributed/RMPI.rkt b/collects/racket/place/distributed/RMPI.rkt index 920144851e..5df30bc679 100644 --- a/collects/racket/place/distributed/RMPI.rkt +++ b/collects/racket/place/distributed/RMPI.rkt @@ -16,8 +16,8 @@ RMPI-id RMPI-cnt RMPI-partition - RMPI-BuildDefaultConfig - RMPI-Launch + RMPI-build-default-config + RMPI-launch RMPI-finish) (struct RMPI-COMM (id cnt channels) #:transparent) @@ -153,14 +153,14 @@ (define cnt (RMPI-cnt comm)) (partit num cnt id)) -(define RMPI-BuildDefaultConfig +(define RMPI-build-default-config (make-keyword-procedure (lambda (kws kw-args . rest) (for/hash ([kw kws] [kwa kw-args]) ; (displayln (keyword? kw)) (values kw kwa))))) -(define (RMPI-Launch default config) +(define (RMPI-launch default config) (define (lookup-config-value rest key-str) (define key (string->keyword key-str)) diff --git a/collects/racket/place/distributed/examples/hello-world.rkt b/collects/racket/place/distributed/examples/hello-world.rkt index 6236927960..2590231cba 100644 --- a/collects/racket/place/distributed/examples/hello-world.rkt +++ b/collects/racket/place/distributed/examples/hello-world.rkt @@ -21,8 +21,8 @@ (message-router node (after-seconds 2 - (dplace-put pl "Hello") - (printf "message-router received: ~a\n" (dplace-get pl))) + (*channel-put pl "Hello") + (printf "message-router received: ~a\n" (*channel-get pl))) (after-seconds 6 (exit 0)))) diff --git a/collects/scribblings/reference/distributed.scrbl b/collects/scribblings/reference/distributed.scrbl index 4d73fcbf65..f8d067dfbb 100644 --- a/collects/scribblings/reference/distributed.scrbl +++ b/collects/scribblings/reference/distributed.scrbl @@ -91,10 +91,10 @@ The use of Distributed Places is predicated on a couple assumptions: @defproc[(message-router [ec events-container<%>?] ...+) void?]{ Waits in an endless loop for one of many events to become ready. The @racket[message-router] procedure constructs a @racket[node%] - instance to serve as the message router for then node. The + instance to serve as the message router for the node. The @racket[message-router] procedure then adds all the declared @racket[events-container<%>]s to the @racket[node%] and finally calls - the never ending loop @racket[sync-events] method, which handles the + the never ending loop @racket[sync-events] method, which handles events for the node. } @@ -264,6 +264,17 @@ is used to establish later connections to the named place. @|place-thunk-function| } +@defproc[(supervise-thread-at + [remote-node remote-node?] + [instance-module-path module-path?] + [instance-thunk-function-name symbol?] + [#:restart-on-exit restart-on-exit any/c #f]) remote-place%?]{ +Creates a new threadon the @racket[remote-node] by using +@racket[dynamic-require] to invoke +@racket[instance-place-function-name] from the module +@racket[instance-module-path]. +} + @defform[(restart-every [seconds (and/c real? nonnegative?)] [#:retry retry (or/c nonnegative-integer? #f) #f] [#:on-final-fail on-final-fail (or/c #f (-> any/c)) #f])]{ @@ -447,18 +458,28 @@ node's message router. } } -@defclass[remote-place% object% (event-container<%>)]{ +@defproc[(node-send-exit [remote-node% node]) void?]{ +Sends @racket[node] a message telling it to exit immediately. +} +@defproc[(node-get-first-place [remote-node% node]) remote-place%?]{ +Returns the @racket[remote-place%] instance of the first place spawned at this node +} -The @racket[remote-place%] instance provides a remote api to a place -running on a remote distributed places node. It launches a compute -places and routes inter-node place messages to the remote place. + +@defclass[remote-connection% object% (event-container<%>)]{ + +The @racket[remote-connection%] +instance provides a remote api to a place +running on a remote distributed places node. It launches a +places or connects to a named place and routes inter-node place messages to the remote place. @defconstructor[([node remote-node%?] [place-exec list?] + [name string?] [restart-on-exit #f] [one-sided-place? #f] [on-channel #f])]{ - Constructs a @racket[remote-place%] instance. + Constructs a @racket[remote-connection%] instance. @|place-exec-note| @|one-sided-note| @|restart-on-exit-note| @@ -472,37 +493,6 @@ places and routes inter-node place messages to the remote place. } } -@defproc[(dplace-put [pl remote-place%?] [msg any/c]) void?]{ - This function is used inside @racket[message-router] callbacks. - It sends messages to remote places. -} - -@defproc[(dplace-get [pl remote-place%?]) any/c]{ - This function is used inside @racket[message-router] callbacks. - It takes the current delimited continuation and resumes it when a message arrives from @racket[pl]. -} - -@defclass[remote-connection% object% (event-container<%>)]{ - -The @racket[remote-connection%] instance provides a remote api to a named place -running on a remote distributed places node. It connects to a named compute -places and routes inter-node place messages to the remote place. - -@defconstructor[([node remote-node%?] - [name string?] - [restart-on-exit #f] - [on-channel #f])]{ - Constructs a @racket[remote-place%] instance. - @|restart-on-exit-note| - - See @racket[set-on-channel!] for description of @racket[on-channel] argument. -} - -@defmethod[(set-on-channel! [callback (-> channel msg void?)]) void?]{ - Installs a handler function that handles messages from the remote place. - The @racket[setup/distributed-docs] module uses this callback to handle job completion messages. -} -} @defclass[place% object% (event-container<%>)]{ @@ -528,13 +518,6 @@ places and routes inter-node place messages to the remote place. } } -@defproc[(node-send-exit [remote-node% node]) void?]{ -Sends @racket[node] a message telling it to exit immediately. -} -@defproc[(node-get-first-place [remote-node% node]) remote-place%?]{ -Returns the @racket[remote-place%] instance of the first place spawned at this node -} - @defclass[connection% object% (event-container<%>)]{ The @racket[connection%] instance represents a connection to a @@ -685,6 +668,92 @@ Returns the path to the ssh binary on the local system in string form. Returns the path to the currently executing racket binary on the local system. } +@defproc[(build-distributed-launch-path [collects-path path-string?]) string?]{ +Returns the path to the distributed places launch file. +The function can take an optional argument specifying the path to the collects directory. +} +i + +@;{ +@defproc[(build-node-args . list?) list?]{ +Takes all the positional and keyword arguments pass to it and builds a +@racket[(list (list keywords ...) (list keyword-arguments ...) (list positional-args ...))] +suitable as an argument to @racket[(lambda (x) (apply keyword-apply spawn-node-at x))].} +} + +@defproc[(spawn-nodes/join [nodes-descs list?]) void?]{ +Spawns a list of nodes by calling @racket[(lambda (x) (apply keyword-apply spawn-node-at x))] for each node description in +@racket[nodes-descs] and then waits for each node to spawn. +} + + +@defproc[(*channel-put [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)] [msg any?]) void?]{ +Sends @racket[msg] over @racket[ch] channel. +} + +@defproc[(*channel-get [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)]) any?]{ +Returns a message received on @racket[ch] channel. +} + +@defproc[(*channel? [ch (or/c place-channel? async-bi-channel? channel? remote-connection%?)]) boolean?]{ +Returns @racket[#t] if @racket[ch] is one of @racket[place-channel?] @racket[async-bi-channel?] +@racket[channel?] @racket[remote-connection%?]. +} + + +@defproc[(send-new-place-channel-to-named-dest [ch *channel?] [src-id any?] + [dest-list (listOf string? non-negative-integer? string?)]) + place-channel?]{ +Creates and returns a new place channel connection to a named place at @racket[dest-list]. +The @racket[dest-list] argument is a list of a remote-hostname remote-port and named-place name. +The channel @racket[ch] should be a connection to a @racket[message-router]. +} + +@defproc[(mr-spawn-remote-node [mrch *channel?] [host string?] [#:listen-port listen-port non-negative-integer? DEFAULT-ROUTER-PORT] + [#:solo solo boolean? #f]) void?]{ +Sends a message to a message router over @racket[mrch] channel asking the message router to spawn a new node +at @racket[host] listening on port @racket[listen-port]. If the @racket[#:solo] keyword argument is supplied +the new node is not folded into the complete network with other nodes in the distributed system. +} + +@defproc[(mr-supervise-named-dynamic-place-at [mrch *channel?] [dest (listOf string? port-no?)] [name string?] [path string?] [func symbol?]) void?]{ +Sends a message to a message router over @racket[mrch] channel asking the message router to spawn +a named place at @racket[dest] named @racket[name]. The place is spawned at the remote node by calling +dynamic place with module-path @racket[path] and function @racket[func]. The @racket[dest] parameter should be a +list of remote-hostname and remote-port. +} + +@defproc[(mr-connect-to [mrch *channel?] [dest (listOf string? non-negative-integer?)] [name string?]) void?]{ +Sends a message to a message router over @racket[mrch] channel asking the message router to create a new +connection to the named place named @racket[name] at @racket[dest]. +The @racket[dest] parameter should be a list of remote-hostname and remote-port. +} + +@defproc[(start-message-router/thread [#:listen-port listen-port port-no? DEFAULT-ROUTER-PORT] + [#:nodes nodes list? null]) (values thread? channel?)]{ +Starts a message router in a racket thread connected to @racket[nodes], listening on port @racket[listen-port], and +returns a @racket[channel?] connection to the message router. +} + +@defproc[(port-no? [no (and/c exact-nonnegative-integer? (integer-in 0 65535))]) boolean?]{ +Returns @racket[#t] if @racket[no] is a @racket[exact-nonnegative-integer?] between @racket[0] and @racket[65535]. +} + +@defclass[named-place-typed-channel% object% () ]{ + + @defconstructor[([ch place-channel?])]{ + The @racket[ch] argument is a @racket[place-channel]. + } + @defmethod[(get [type symbol?]) any?]{ + Returns the first message received on @racket[ch] that has the type @racket[type]. Messages are lists and their type is the first +item of the list which should be a @racket[symbol?]. Messages of other types that are received are queue for later @racket[get] requests. + } +} + +@defproc[(tc-get [type symbol?] [ch place-channel?]) void?]{ +Gets a message of type @racket[type] from the @racket[named-place-typed-channel%] @racket[ch]. +} + @;{@examples[ #:eval evaler (racket-path) ]} @@ -727,9 +796,18 @@ Returns the length of strings, bytes, and lists. ] } -@defproc[(write-flush [datum any?] [port port?]) (void)]{ +@defproc[(write-flush [datum any?] [port port?]) void?]{ Writes @racket[datum] to @racket[port] and then flushes @racket[port]. } + +@defproc[(printf/f [format string?] [args any?] ...) void?]{ +Calls @racket[printf] followed by a call to @racket[flush-output]. +} + +@defproc[(displayln/f [item any?]) void?]{ +Calls @racket[displayln] followed by a call to @racket[flush-output]. +} + @examples[ #:eval evaler (write-flush "Hello World" (current-output-port)) ]