diff --git a/collects/racket/place/distributed/RMPI.rkt b/collects/racket/place/distributed/rmpi.rkt similarity index 80% rename from collects/racket/place/distributed/RMPI.rkt rename to collects/racket/place/distributed/rmpi.rkt index 5df30bc679..4c7c021112 100644 --- a/collects/racket/place/distributed/RMPI.rkt +++ b/collects/racket/place/distributed/rmpi.rkt @@ -6,28 +6,28 @@ racket/place racket/class) -(provide RMPI-init - RMPI-send - RMPI-recv - RMPI-BCast - RMPI-Reduce - RMPI-AllReduce - RMPI-Barrier - RMPI-id - RMPI-cnt - RMPI-partition - RMPI-build-default-config - RMPI-launch - RMPI-finish) +(provide rmpi-init + rmpi-send + rmpi-recv + rmpi-broadcast + rmpi-reduce + rmpi-allreduce + rmpi-barrier + rmpi-id + rmpi-cnt + rmpi-partition + rmpi-build-default-config + rmpi-launch + rmpi-finish) -(struct RMPI-COMM (id cnt channels) #:transparent) +(struct rmpi-comm (id cnt channels) #:transparent) -(define (RMPI-id comm) (RMPI-COMM-id comm)) -(define (RMPI-cnt comm) (RMPI-COMM-cnt comm)) -(define (RMPI-send comm dest val) (place-channel-put (vector-ref (RMPI-COMM-channels comm) dest) val)) -(define (RMPI-recv comm src) (place-channel-get (vector-ref (RMPI-COMM-channels comm) src))) +(define (rmpi-id comm) (rmpi-comm-id comm)) +(define (rmpi-cnt comm) (rmpi-comm-cnt comm)) +(define (rmpi-send comm dest val) (place-channel-put (vector-ref (rmpi-comm-channels comm) dest) val)) +(define (rmpi-recv comm src) (place-channel-get (vector-ref (rmpi-comm-channels comm) src))) -(define (RMPI-init ch) +(define (rmpi-init ch) (define tc (new named-place-typed-channel% [ch ch])) (match-define (list (list 'mpi-id id config) return-ch) (tc-get 'mpi-id tc)) (match-define (list (list 'args args) src-ch) (tc-get 'args tc)) @@ -47,18 +47,18 @@ (vector-set! mpi-comm-vector src-id src-ch)] [else null])) (values - (RMPI-COMM id (length config) mpi-comm-vector) + (rmpi-comm id (length config) mpi-comm-vector) args tc )) -(define RMPI-BCast +(define rmpi-broadcast (case-lambda [(comm src) - (RMPI-BCast comm src (void))] + (rmpi-broadcast comm src (void))] [(comm src val) - (match-define (RMPI-COMM real-id cnt chs) comm) + (match-define (rmpi-comm real-id cnt chs) comm) (define offset (- cnt src)) (define id (modulo (+ real-id (- cnt src)) cnt)) (let loop ([i 0] @@ -97,8 +97,8 @@ (fancy-reducer op a b))] [else (raise (format "fancy-reducer error on ~a ~a ~a" op recv-val val))])) -(define (RMPI-Reduce comm dest op val) - (match-define (RMPI-COMM real-id cnt chs) comm) +(define (rmpi-reduce comm dest op val) + (match-define (rmpi-comm real-id cnt chs) comm) (define i (let loop ([i 0]) (if (>= (arithmetic-shift 1 i) cnt) @@ -135,32 +135,32 @@ (fancy-reducer op recv-val val)])]))] [else val]))) -(define (RMPI-Barrier comm) - (RMPI-Reduce comm 0 + 1) - (RMPI-BCast comm 0 1)) +(define (rmpi-barrier comm) + (rmpi-reduce comm 0 + 1) + (rmpi-broadcast comm 0 1)) -(define (RMPI-AllReduce comm op val) - (define rv (RMPI-Reduce comm 0 op val)) - (RMPI-BCast comm 0 rv)) +(define (rmpi-allreduce comm op val) + (define rv (rmpi-reduce comm 0 op val)) + (rmpi-broadcast comm 0 rv)) (define (partit num cnt id) (define-values (quo rem) (quotient/remainder num cnt)) (values (+ (* id quo) (if (< id rem) id 0)) (+ quo (if (< id rem) 1 0)))) -(define (RMPI-partition comm num) - (define id (RMPI-id comm)) - (define cnt (RMPI-cnt comm)) +(define (rmpi-partition comm num) + (define id (rmpi-id comm)) + (define cnt (rmpi-cnt comm)) (partit num cnt id)) -(define RMPI-build-default-config +(define rmpi-build-default-config (make-keyword-procedure (lambda (kws kw-args . rest) (for/hash ([kw kws] [kwa kw-args]) ; (displayln (keyword? kw)) (values kw kwa))))) -(define (RMPI-launch default config) +(define (rmpi-launch default config) (define (lookup-config-value rest key-str) (define key (string->keyword key-str)) @@ -229,20 +229,20 @@ (*channel-get npch))) -(define (RMPI-finish comm tc) - (when (= (RMPI-id comm) 0) +(define (rmpi-finish comm tc) + (when (= (rmpi-id comm) 0) (place-channel-put (second (tc-get 'done? tc)) 'done))) (module+ bcast-print-test - (RMPI-BCast (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 0 "Hi") - (RMPI-BCast (RMPI-COMM 3 8 (vector 0 1 2 3 4 5 6 7)) 0) - (RMPI-BCast (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 3) + (rmpi-broadcast (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 0 "Hi") + (rmpi-broadcast (rmpi-comm 3 8 (vector 0 1 2 3 4 5 6 7)) 0) + (rmpi-broadcast (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 3) ) (module+ reduce-print-test - (RMPI-Reduce (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7) - (RMPI-Reduce (RMPI-COMM 3 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7) - (RMPI-Reduce (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 3 + 7) + (rmpi-reduce (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7) + (rmpi-reduce (rmpi-comm 3 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7) + (rmpi-reduce (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 3 + 7) ) (module+ test diff --git a/collects/scribblings/reference/RMPI.scrbl b/collects/scribblings/reference/RMPI.scrbl deleted file mode 100644 index e1e30dfceb..0000000000 --- a/collects/scribblings/reference/RMPI.scrbl +++ /dev/null @@ -1,118 +0,0 @@ -#lang scribble/manual -@(require scribble/eval - scribble/struct - scribble/decode - racket/contract - racket/place/distributed - racket/place/distributed/RMPI - racket/sandbox - racket/class) -@(require (for-label racket/base - racket/place/distributed/RMPI)) - - -@(define evaler (make-base-eval)) -@(interaction-eval #:eval evaler (require racket/place/distributed - racket/class - #;racket/place/distributed/RMPI)) - -@(define output-evaluator - (parameterize ([sandbox-output 'string] - [sandbox-error-output 'string]) - (make-evaluator 'racket/base))) - -@(define-syntax interaction-output - (syntax-rules () - [(_ #:eval evaluator e) - (begin - (interaction #:eval evaluator e) - (printf "K ~a\n" (get-output evaluator)))])) - -@title[#:tag "distributed-places-MPI"]{Distributed Places MPI} - -@defmodule[racket/place/distributed/RMPI] - -@defproc[(RMPI-id [comm RMPI-COMM?]) exact-nonnegative-integer? ]{ - Takes a RMPI communicator structure, @racket[comm], and returns the node id of the RMPI - process. -} - -@defproc[(RMPI-cnt [comm RMPI-COMM?]) positive-integer? ]{ - Takes a RMPI communicator structure, @racket[comm], and returns the count of the RMPI - processes in the communicator group. -} - -@defproc[(RMPI-send [comm RMPI-COMM?] [dest exact-nonnegative-integer?] [val any]) void?]{ - Sends @racket[val] to destination RMPI process number @racket[dest] using the RMPI communicator structure @racket[comm]. -} - -@defproc[(RMPI-recv [comm RMPI-COMM?] [src exact-nonnegative-integer?]) any]{ - Receives a message from source RMPI process number @racket[src] using the RMPI communicator structure @racket[comm]. -} - -@defproc[(RMPI-init [ch place-channel?]) (values RMPI-COMM? (listof any) (is-a?/c named-place-type-channel%))]{ - Creates the @racket[RMPI-COMM] structure instance using the named place's original place-channel @racket[ch]. - In addition to the communicator structure, @racket[RMPI-init] returns a list of initial arguments and the original place-channel - @racket[ch] wrapped in a @racket[named-place-type-channel%]. The @racket[named-place-type-channel%] wrapper allows for - the reception of list messages typed by an initial symbol. -} - -@defproc*[([(RMPI-BCast [comm RMPI-COMM?] [src exact-nonnegative-integer?]) any] - [(RMPI-BCast [comm RMPI-COMM?] [src exact-nonnegative-integer?] [val any]) any])]{ - Broadcasts @racket[val] from @racket[src] to all RMPI processes in the communication group using a hypercube algorithm. - Receiving processes call @racket[(RMPI-BCast comm src)]. -} - -@defproc[(RMPI-Reduce [comm RMPI-COMM?] [dest exact-nonnegative-integer?] [op procedure?] [val any]) any]{ - Reduces @racket[val] using the @racket[op] operator to @racket[dest] RMPI node using a hypercube algorithm. -} - -@defproc[(RMPI-Barrier [comm RMPI-COMM?]) void?]{ - Introduces a synchronization barrier for all RMPI processes in the communcication group @racket[comm]. -} - -@defproc[(RMPI-AllReduce [comm RMPI-COMM?] [op procedure?] [val any]) any]{ - Reduces @racket[val] using the @racket[op] operator to RMPI node @racket[0] and then broadcasts the reduced value to all nodes in the communication group. -} - -@defproc[(RMPI-partition [comm RMPI-COMM?] [num positive-integer?]) (values positive-integer? positive-integer?)]{ - Partitions @racket[num] into @racket[RMPI-cnt] equal pieces and returns the offset and length for the @racket[RMPI-id]th - piece. -} - -@defproc[(RMPI-build-default-config - [#:racket-path racket-path string?] - [#:distributed-launch-path distributed-launch-path string?] - [#:mpi-module mpi-module string?] - [#:mpi-func mpi-func symbol?] - [#:mpi-args mpi-args (listof any)]) hash?]{ - - Builds a hash from keywords to keyword arguments for use with the @racket[RMPI-launch function]. -} - -@defproc[(RMPI-launch [default-node-config hash?] [config (listof (listof string? port-no? symbol? exact-nonnegative-integer?))]) void?]{ - Launches distributed places nodes running @racket[#:mpi-func] in @racket[#:mpi-module] with @racket[#:mpi-args]. - The config is a list of node configs, where each node config consists of a hostname, port, named place symbol and RMPI id number, followed by and optional hash of keyword @racket[#:racket-path], @racket[#:distributed-launch-path], @racket[#:mpi-module], @racket[#:mpi-func], and @racket[#:mpi-args] to keyword arguments. Missing optional keyword arguments will be taken from the @racket[default-node-config] hash of keyword arguments. -} - -@defproc[(RMPI-finish [comm RMPI-COMM?] [tc (is-a?/c named-place-type-channel%)]) void?]{ - Rendezvous with the @racket[RMPI-launch], using the @racket[tc] returned by @racket[RMPI-launch], to indicate that the RMPI module is done executing and that @racket[RMPI-launch] can return control to its caller. -} - - -@examples[ #:eval evaler -(RMPI-launch - (RMPI-build-default-config - #:racket-path "/tmp/mplt/bin/racket" - #:distributed-launch-path (build-distributed-launch-path "/tmp/mplt/collects") - #:mpi-module "/tmp/mplt/kmeans.rkt" - #:mpi-func 'kmeans-place - #:mpi-args (list "/tmp/mplt/color100.bin" #t 100 9 10 0.0000001)) - - (list (list "nodea.example.com" 6340 'kmeans_0 0) - (list "nodeb.example.com" 6340 'kmeans_1 1) - (list "nodec.example.com" 6340 'kmeans_2 2) - (list "noded.example.com" 6340 'kmeans_3 3))) -] - -@(close-eval evaler) diff --git a/collects/scribblings/reference/distributed.scrbl b/collects/scribblings/reference/distributed.scrbl index 006b4e901e..acd4b8d23f 100644 --- a/collects/scribblings/reference/distributed.scrbl +++ b/collects/scribblings/reference/distributed.scrbl @@ -909,4 +909,4 @@ except there is no reply message from the server to client } @(close-eval evaler) -@include-section["RMPI.scrbl"] +@include-section["rmpi.scrbl"] diff --git a/collects/scribblings/reference/rmpi.scrbl b/collects/scribblings/reference/rmpi.scrbl new file mode 100644 index 0000000000..d8b0fb7eb8 --- /dev/null +++ b/collects/scribblings/reference/rmpi.scrbl @@ -0,0 +1,140 @@ +#lang scribble/manual +@(require scribble/eval + scribble/struct + scribble/decode + racket/contract + racket/place/distributed + racket/place/distributed/rmpi + racket/sandbox + racket/class) +@(require (for-label racket/base + racket/place/distributed/rmpi)) + + +@(define evaler (make-base-eval)) +@(interaction-eval #:eval evaler (require racket/place/distributed + racket/class + #;racket/place/distributed/rmpi)) + +@(define output-evaluator + (parameterize ([sandbox-output 'string] + [sandbox-error-output 'string]) + (make-evaluator 'racket/base))) + +@(define-syntax interaction-output + (syntax-rules () + [(_ #:eval evaluator e) + (begin + (interaction #:eval evaluator e) + (printf "K ~a\n" (get-output evaluator)))])) + +@title[#:tag "distributed-places-MPI"]{Distributed Places MPI} + +@defmodule[racket/place/distributed/rmpi] + +@defproc[(rmpi-id [comm RMPI-comm?]) exact-nonnegative-integer? ]{ + Takes a rmpi communicator structure, @racket[comm], and returns the node id of the RMPI + process. +} + +@defproc[(rmpi-cnt [comm RMPI-comm?]) positive-integer? ]{ + Takes a rmpi communicator structure, @racket[comm], and returns the count of the RMPI + processes in the communicator group. +} + +@defproc[(rmpi-send [comm RMPI-comm?] [dest exact-nonnegative-integer?] [val any]) void?]{ + Sends @racket[val] to destination rmpi process number @racket[dest] + using the RMPI communicator structure @racket[comm]. +} + +@defproc[(rmpi-recv [comm RMPI-comm?] [src exact-nonnegative-integer?]) any]{ + Receives a message from source rmpi process number @racket[src] + using the RMPI communicator structure @racket[comm]. +} + +@defproc[(rmpi-init [ch place-channel?]) (values RMPI-comm? (listof any) (is-a?/c named-place-type-channel%))]{ + Creates the @racket[rmpi-comm] structure instance using the named + place's original place-channel @racket[ch]. In addition to the + communicator structure, @racket[rmpi-init] returns a list of initial + arguments and the original place-channel @racket[ch] wrapped in a + @racket[named-place-type-channel%]. The + @racket[named-place-type-channel%] wrapper allows for the reception + of list messages typed by an initial symbol. +} + +@defproc*[([(rmpi-broadcast [comm RMPI-comm?] [src exact-nonnegative-integer?]) any] + [(rmpi-broadcast [comm RMPI-comm?] [src exact-nonnegative-integer?] [val any]) any])]{ + Broadcasts @racket[val] from @racket[src] to all rmpi processes in + the communication group using a hypercube algorithm. Receiving + processes call @racket[(rmpi-broadcast comm src)]. +} + +@defproc[(rmpi-reduce [comm RMPI-comm?] [dest exact-nonnegative-integer?] [op procedure?] [val any]) any]{ + Reduces @racket[val] using the @racket[op] operator to @racket[dest] + rmpi node using a hypercube algorithm. +} + +@defproc[(rmpi-barrier [comm RMPI-comm?]) void?]{ + Introduces a synchronization barrier for all rmpi processes in the + communcication group @racket[comm]. +} + +@defproc[(rmpi-allreduce [comm RMPI-comm?] [op procedure?] [val any]) any]{ + Reduces @racket[val] using the @racket[op] operator to rmpi node + @racket[0] and then broadcasts the reduced value to all nodes in the + communication group. +} + +@defproc[(rmpi-partition [comm RMPI-comm?] [num positive-integer?]) (values positive-integer? positive-integer?)]{ + Partitions @racket[num] into @racket[rmpi-cnt] equal pieces and + returns the offset and length for the @racket[RMPI-id]th piece. +} + +@defproc[(rmpi-build-default-config + [#:racket-path racket-path string?] + [#:distributed-launch-path distributed-launch-path string?] + [#:mpi-module mpi-module string?] + [#:mpi-func mpi-func symbol?] + [#:mpi-args mpi-args (listof any)]) hash?]{ + + Builds a hash from keywords to keyword arguments for use with the + @racket[rmpi-launch function]. +} + +@defproc[(rmpi-launch [default-node-config hash?] + [config (listof (listof string? port-no? symbol? exact-nonnegative-integer?))]) void?]{ + Launches distributed places nodes running @racket[#:mpi-func] in + @racket[#:mpi-module] with @racket[#:mpi-args]. The config is a + list of node configs, where each node config consists of a hostname, + port, named place symbol and rmpi id number, followed by and + optional hash of keyword @racket[#:racket-path], + @racket[#:distributed-launch-path], @racket[#:mpi-module], + @racket[#:mpi-func], and @racket[#:mpi-args] to keyword arguments. + Missing optional keyword arguments will be taken from the + @racket[default-node-config] hash of keyword arguments. +} + +@defproc[(rmpi-finish [comm RMPI-comm?] [tc (is-a?/c named-place-type-channel%)]) void?]{ + Rendezvous with the @racket[rmpi-launch], using the @racket[tc] + returned by @racket[RMPI-launch], to indicate that the RMPI module + is done executing and that @racket[RMPI-launch] can return control + to its caller. +} + + +@examples[ #:eval evaler +(rmpi-launch + (rmpi-build-default-config + #:racket-path "/tmp/mplt/bin/racket" + #:distributed-launch-path (build-distributed-launch-path "/tmp/mplt/collects") + #:mpi-module "/tmp/mplt/kmeans.rkt" + #:mpi-func 'kmeans-place + #:mpi-args (list "/tmp/mplt/color100.bin" #t 100 9 10 0.0000001)) + + (list (list "nodea.example.com" 6340 'kmeans_0 0) + (list "nodeb.example.com" 6340 'kmeans_1 1) + (list "nodec.example.com" 6340 'kmeans_2 2) + (list "noded.example.com" 6340 'kmeans_3 3))) +] + +@(close-eval evaler)