[Distributed Places] Remove caps and camelcase

This commit is contained in:
Kevin Tew 2012-05-10 19:39:27 -06:00
parent 154a96ab1f
commit 4c59943b0d
4 changed files with 185 additions and 163 deletions

View File

@ -6,28 +6,28 @@
racket/place
racket/class)
(provide RMPI-init
RMPI-send
RMPI-recv
RMPI-BCast
RMPI-Reduce
RMPI-AllReduce
RMPI-Barrier
RMPI-id
RMPI-cnt
RMPI-partition
RMPI-build-default-config
RMPI-launch
RMPI-finish)
(provide rmpi-init
rmpi-send
rmpi-recv
rmpi-broadcast
rmpi-reduce
rmpi-allreduce
rmpi-barrier
rmpi-id
rmpi-cnt
rmpi-partition
rmpi-build-default-config
rmpi-launch
rmpi-finish)
(struct RMPI-COMM (id cnt channels) #:transparent)
(struct rmpi-comm (id cnt channels) #:transparent)
(define (RMPI-id comm) (RMPI-COMM-id comm))
(define (RMPI-cnt comm) (RMPI-COMM-cnt comm))
(define (RMPI-send comm dest val) (place-channel-put (vector-ref (RMPI-COMM-channels comm) dest) val))
(define (RMPI-recv comm src) (place-channel-get (vector-ref (RMPI-COMM-channels comm) src)))
(define (rmpi-id comm) (rmpi-comm-id comm))
(define (rmpi-cnt comm) (rmpi-comm-cnt comm))
(define (rmpi-send comm dest val) (place-channel-put (vector-ref (rmpi-comm-channels comm) dest) val))
(define (rmpi-recv comm src) (place-channel-get (vector-ref (rmpi-comm-channels comm) src)))
(define (RMPI-init ch)
(define (rmpi-init ch)
(define tc (new named-place-typed-channel% [ch ch]))
(match-define (list (list 'mpi-id id config) return-ch) (tc-get 'mpi-id tc))
(match-define (list (list 'args args) src-ch) (tc-get 'args tc))
@ -47,18 +47,18 @@
(vector-set! mpi-comm-vector src-id src-ch)]
[else null]))
(values
(RMPI-COMM id (length config) mpi-comm-vector)
(rmpi-comm id (length config) mpi-comm-vector)
args
tc
))
(define RMPI-BCast
(define rmpi-broadcast
(case-lambda
[(comm src)
(RMPI-BCast comm src (void))]
(rmpi-broadcast comm src (void))]
[(comm src val)
(match-define (RMPI-COMM real-id cnt chs) comm)
(match-define (rmpi-comm real-id cnt chs) comm)
(define offset (- cnt src))
(define id (modulo (+ real-id (- cnt src)) cnt))
(let loop ([i 0]
@ -97,8 +97,8 @@
(fancy-reducer op a b))]
[else (raise (format "fancy-reducer error on ~a ~a ~a" op recv-val val))]))
(define (RMPI-Reduce comm dest op val)
(match-define (RMPI-COMM real-id cnt chs) comm)
(define (rmpi-reduce comm dest op val)
(match-define (rmpi-comm real-id cnt chs) comm)
(define i
(let loop ([i 0])
(if (>= (arithmetic-shift 1 i) cnt)
@ -135,32 +135,32 @@
(fancy-reducer op recv-val val)])]))]
[else val])))
(define (RMPI-Barrier comm)
(RMPI-Reduce comm 0 + 1)
(RMPI-BCast comm 0 1))
(define (rmpi-barrier comm)
(rmpi-reduce comm 0 + 1)
(rmpi-broadcast comm 0 1))
(define (RMPI-AllReduce comm op val)
(define rv (RMPI-Reduce comm 0 op val))
(RMPI-BCast comm 0 rv))
(define (rmpi-allreduce comm op val)
(define rv (rmpi-reduce comm 0 op val))
(rmpi-broadcast comm 0 rv))
(define (partit num cnt id)
(define-values (quo rem) (quotient/remainder num cnt))
(values (+ (* id quo) (if (< id rem) id 0))
(+ quo (if (< id rem) 1 0))))
(define (RMPI-partition comm num)
(define id (RMPI-id comm))
(define cnt (RMPI-cnt comm))
(define (rmpi-partition comm num)
(define id (rmpi-id comm))
(define cnt (rmpi-cnt comm))
(partit num cnt id))
(define RMPI-build-default-config
(define rmpi-build-default-config
(make-keyword-procedure (lambda (kws kw-args . rest)
(for/hash ([kw kws]
[kwa kw-args])
; (displayln (keyword? kw))
(values kw kwa)))))
(define (RMPI-launch default config)
(define (rmpi-launch default config)
(define (lookup-config-value rest key-str)
(define key
(string->keyword key-str))
@ -229,20 +229,20 @@
(*channel-get npch)))
(define (RMPI-finish comm tc)
(when (= (RMPI-id comm) 0)
(define (rmpi-finish comm tc)
(when (= (rmpi-id comm) 0)
(place-channel-put (second (tc-get 'done? tc)) 'done)))
(module+ bcast-print-test
(RMPI-BCast (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 0 "Hi")
(RMPI-BCast (RMPI-COMM 3 8 (vector 0 1 2 3 4 5 6 7)) 0)
(RMPI-BCast (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 3)
(rmpi-broadcast (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 0 "Hi")
(rmpi-broadcast (rmpi-comm 3 8 (vector 0 1 2 3 4 5 6 7)) 0)
(rmpi-broadcast (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 3)
)
(module+ reduce-print-test
(RMPI-Reduce (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7)
(RMPI-Reduce (RMPI-COMM 3 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7)
(RMPI-Reduce (RMPI-COMM 0 8 (vector 0 1 2 3 4 5 6 7)) 3 + 7)
(rmpi-reduce (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7)
(rmpi-reduce (rmpi-comm 3 8 (vector 0 1 2 3 4 5 6 7)) 0 + 7)
(rmpi-reduce (rmpi-comm 0 8 (vector 0 1 2 3 4 5 6 7)) 3 + 7)
)
(module+ test

View File

@ -1,118 +0,0 @@
#lang scribble/manual
@(require scribble/eval
scribble/struct
scribble/decode
racket/contract
racket/place/distributed
racket/place/distributed/RMPI
racket/sandbox
racket/class)
@(require (for-label racket/base
racket/place/distributed/RMPI))
@(define evaler (make-base-eval))
@(interaction-eval #:eval evaler (require racket/place/distributed
racket/class
#;racket/place/distributed/RMPI))
@(define output-evaluator
(parameterize ([sandbox-output 'string]
[sandbox-error-output 'string])
(make-evaluator 'racket/base)))
@(define-syntax interaction-output
(syntax-rules ()
[(_ #:eval evaluator e)
(begin
(interaction #:eval evaluator e)
(printf "K ~a\n" (get-output evaluator)))]))
@title[#:tag "distributed-places-MPI"]{Distributed Places MPI}
@defmodule[racket/place/distributed/RMPI]
@defproc[(RMPI-id [comm RMPI-COMM?]) exact-nonnegative-integer? ]{
Takes a RMPI communicator structure, @racket[comm], and returns the node id of the RMPI
process.
}
@defproc[(RMPI-cnt [comm RMPI-COMM?]) positive-integer? ]{
Takes a RMPI communicator structure, @racket[comm], and returns the count of the RMPI
processes in the communicator group.
}
@defproc[(RMPI-send [comm RMPI-COMM?] [dest exact-nonnegative-integer?] [val any]) void?]{
Sends @racket[val] to destination RMPI process number @racket[dest] using the RMPI communicator structure @racket[comm].
}
@defproc[(RMPI-recv [comm RMPI-COMM?] [src exact-nonnegative-integer?]) any]{
Receives a message from source RMPI process number @racket[src] using the RMPI communicator structure @racket[comm].
}
@defproc[(RMPI-init [ch place-channel?]) (values RMPI-COMM? (listof any) (is-a?/c named-place-type-channel%))]{
Creates the @racket[RMPI-COMM] structure instance using the named place's original place-channel @racket[ch].
In addition to the communicator structure, @racket[RMPI-init] returns a list of initial arguments and the original place-channel
@racket[ch] wrapped in a @racket[named-place-type-channel%]. The @racket[named-place-type-channel%] wrapper allows for
the reception of list messages typed by an initial symbol.
}
@defproc*[([(RMPI-BCast [comm RMPI-COMM?] [src exact-nonnegative-integer?]) any]
[(RMPI-BCast [comm RMPI-COMM?] [src exact-nonnegative-integer?] [val any]) any])]{
Broadcasts @racket[val] from @racket[src] to all RMPI processes in the communication group using a hypercube algorithm.
Receiving processes call @racket[(RMPI-BCast comm src)].
}
@defproc[(RMPI-Reduce [comm RMPI-COMM?] [dest exact-nonnegative-integer?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to @racket[dest] RMPI node using a hypercube algorithm.
}
@defproc[(RMPI-Barrier [comm RMPI-COMM?]) void?]{
Introduces a synchronization barrier for all RMPI processes in the communcication group @racket[comm].
}
@defproc[(RMPI-AllReduce [comm RMPI-COMM?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to RMPI node @racket[0] and then broadcasts the reduced value to all nodes in the communication group.
}
@defproc[(RMPI-partition [comm RMPI-COMM?] [num positive-integer?]) (values positive-integer? positive-integer?)]{
Partitions @racket[num] into @racket[RMPI-cnt] equal pieces and returns the offset and length for the @racket[RMPI-id]th
piece.
}
@defproc[(RMPI-build-default-config
[#:racket-path racket-path string?]
[#:distributed-launch-path distributed-launch-path string?]
[#:mpi-module mpi-module string?]
[#:mpi-func mpi-func symbol?]
[#:mpi-args mpi-args (listof any)]) hash?]{
Builds a hash from keywords to keyword arguments for use with the @racket[RMPI-launch function].
}
@defproc[(RMPI-launch [default-node-config hash?] [config (listof (listof string? port-no? symbol? exact-nonnegative-integer?))]) void?]{
Launches distributed places nodes running @racket[#:mpi-func] in @racket[#:mpi-module] with @racket[#:mpi-args].
The config is a list of node configs, where each node config consists of a hostname, port, named place symbol and RMPI id number, followed by and optional hash of keyword @racket[#:racket-path], @racket[#:distributed-launch-path], @racket[#:mpi-module], @racket[#:mpi-func], and @racket[#:mpi-args] to keyword arguments. Missing optional keyword arguments will be taken from the @racket[default-node-config] hash of keyword arguments.
}
@defproc[(RMPI-finish [comm RMPI-COMM?] [tc (is-a?/c named-place-type-channel%)]) void?]{
Rendezvous with the @racket[RMPI-launch], using the @racket[tc] returned by @racket[RMPI-launch], to indicate that the RMPI module is done executing and that @racket[RMPI-launch] can return control to its caller.
}
@examples[ #:eval evaler
(RMPI-launch
(RMPI-build-default-config
#:racket-path "/tmp/mplt/bin/racket"
#:distributed-launch-path (build-distributed-launch-path "/tmp/mplt/collects")
#:mpi-module "/tmp/mplt/kmeans.rkt"
#:mpi-func 'kmeans-place
#:mpi-args (list "/tmp/mplt/color100.bin" #t 100 9 10 0.0000001))
(list (list "nodea.example.com" 6340 'kmeans_0 0)
(list "nodeb.example.com" 6340 'kmeans_1 1)
(list "nodec.example.com" 6340 'kmeans_2 2)
(list "noded.example.com" 6340 'kmeans_3 3)))
]
@(close-eval evaler)

View File

@ -909,4 +909,4 @@ except there is no reply message from the server to client
}
@(close-eval evaler)
@include-section["RMPI.scrbl"]
@include-section["rmpi.scrbl"]

View File

@ -0,0 +1,140 @@
#lang scribble/manual
@(require scribble/eval
scribble/struct
scribble/decode
racket/contract
racket/place/distributed
racket/place/distributed/rmpi
racket/sandbox
racket/class)
@(require (for-label racket/base
racket/place/distributed/rmpi))
@(define evaler (make-base-eval))
@(interaction-eval #:eval evaler (require racket/place/distributed
racket/class
#;racket/place/distributed/rmpi))
@(define output-evaluator
(parameterize ([sandbox-output 'string]
[sandbox-error-output 'string])
(make-evaluator 'racket/base)))
@(define-syntax interaction-output
(syntax-rules ()
[(_ #:eval evaluator e)
(begin
(interaction #:eval evaluator e)
(printf "K ~a\n" (get-output evaluator)))]))
@title[#:tag "distributed-places-MPI"]{Distributed Places MPI}
@defmodule[racket/place/distributed/rmpi]
@defproc[(rmpi-id [comm RMPI-comm?]) exact-nonnegative-integer? ]{
Takes a rmpi communicator structure, @racket[comm], and returns the node id of the RMPI
process.
}
@defproc[(rmpi-cnt [comm RMPI-comm?]) positive-integer? ]{
Takes a rmpi communicator structure, @racket[comm], and returns the count of the RMPI
processes in the communicator group.
}
@defproc[(rmpi-send [comm RMPI-comm?] [dest exact-nonnegative-integer?] [val any]) void?]{
Sends @racket[val] to destination rmpi process number @racket[dest]
using the RMPI communicator structure @racket[comm].
}
@defproc[(rmpi-recv [comm RMPI-comm?] [src exact-nonnegative-integer?]) any]{
Receives a message from source rmpi process number @racket[src]
using the RMPI communicator structure @racket[comm].
}
@defproc[(rmpi-init [ch place-channel?]) (values RMPI-comm? (listof any) (is-a?/c named-place-type-channel%))]{
Creates the @racket[rmpi-comm] structure instance using the named
place's original place-channel @racket[ch]. In addition to the
communicator structure, @racket[rmpi-init] returns a list of initial
arguments and the original place-channel @racket[ch] wrapped in a
@racket[named-place-type-channel%]. The
@racket[named-place-type-channel%] wrapper allows for the reception
of list messages typed by an initial symbol.
}
@defproc*[([(rmpi-broadcast [comm RMPI-comm?] [src exact-nonnegative-integer?]) any]
[(rmpi-broadcast [comm RMPI-comm?] [src exact-nonnegative-integer?] [val any]) any])]{
Broadcasts @racket[val] from @racket[src] to all rmpi processes in
the communication group using a hypercube algorithm. Receiving
processes call @racket[(rmpi-broadcast comm src)].
}
@defproc[(rmpi-reduce [comm RMPI-comm?] [dest exact-nonnegative-integer?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to @racket[dest]
rmpi node using a hypercube algorithm.
}
@defproc[(rmpi-barrier [comm RMPI-comm?]) void?]{
Introduces a synchronization barrier for all rmpi processes in the
communcication group @racket[comm].
}
@defproc[(rmpi-allreduce [comm RMPI-comm?] [op procedure?] [val any]) any]{
Reduces @racket[val] using the @racket[op] operator to rmpi node
@racket[0] and then broadcasts the reduced value to all nodes in the
communication group.
}
@defproc[(rmpi-partition [comm RMPI-comm?] [num positive-integer?]) (values positive-integer? positive-integer?)]{
Partitions @racket[num] into @racket[rmpi-cnt] equal pieces and
returns the offset and length for the @racket[RMPI-id]th piece.
}
@defproc[(rmpi-build-default-config
[#:racket-path racket-path string?]
[#:distributed-launch-path distributed-launch-path string?]
[#:mpi-module mpi-module string?]
[#:mpi-func mpi-func symbol?]
[#:mpi-args mpi-args (listof any)]) hash?]{
Builds a hash from keywords to keyword arguments for use with the
@racket[rmpi-launch function].
}
@defproc[(rmpi-launch [default-node-config hash?]
[config (listof (listof string? port-no? symbol? exact-nonnegative-integer?))]) void?]{
Launches distributed places nodes running @racket[#:mpi-func] in
@racket[#:mpi-module] with @racket[#:mpi-args]. The config is a
list of node configs, where each node config consists of a hostname,
port, named place symbol and rmpi id number, followed by and
optional hash of keyword @racket[#:racket-path],
@racket[#:distributed-launch-path], @racket[#:mpi-module],
@racket[#:mpi-func], and @racket[#:mpi-args] to keyword arguments.
Missing optional keyword arguments will be taken from the
@racket[default-node-config] hash of keyword arguments.
}
@defproc[(rmpi-finish [comm RMPI-comm?] [tc (is-a?/c named-place-type-channel%)]) void?]{
Rendezvous with the @racket[rmpi-launch], using the @racket[tc]
returned by @racket[RMPI-launch], to indicate that the RMPI module
is done executing and that @racket[RMPI-launch] can return control
to its caller.
}
@examples[ #:eval evaler
(rmpi-launch
(rmpi-build-default-config
#:racket-path "/tmp/mplt/bin/racket"
#:distributed-launch-path (build-distributed-launch-path "/tmp/mplt/collects")
#:mpi-module "/tmp/mplt/kmeans.rkt"
#:mpi-func 'kmeans-place
#:mpi-args (list "/tmp/mplt/color100.bin" #t 100 9 10 0.0000001))
(list (list "nodea.example.com" 6340 'kmeans_0 0)
(list "nodeb.example.com" 6340 'kmeans_1 1)
(list "nodec.example.com" 6340 'kmeans_2 2)
(list "noded.example.com" 6340 'kmeans_3 3)))
]
@(close-eval evaler)