Fix rmpi argument names, simple map-reduce impl

This commit is contained in:
Kevin Tew 2012-05-29 15:55:07 -06:00
parent 3afba2d833
commit 1d480a63a2
3 changed files with 195 additions and 11 deletions

View File

@ -406,6 +406,8 @@
(send node log-from-child #:severity severity msg)]
[(dcgm #;51 (== DCGM-NEW-PLACE-CHANNEL) _ _ _)
(send node forward-mesg e pch)]
[(dcgm #;102 (== DCGM-CONTROL-NEW-CONNECTION) dest -1 (list name ch))
(send node forward-mesg e pch)]
[else (put-msg e)])))
nes))
(define/public (get-sc-id) id)
@ -553,12 +555,14 @@
(log-debug (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel))
(set! owner src-channel)]
[(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port))
(add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port]))]
(define node (find-spawned-node (list node-name node-port)))
(unless node
(add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port])))]
[(dcgm #;51 (== DCGM-NEW-PLACE-CHANNEL) src-id (and dest (list host port name)) pch)
;(printf/f "DCGM-NEW-PLACE-CHANNEL ~a ~a\n" src-id dest)
(define node (find-spawned-node (list host port)))
(unless node (raise (format "DCGM-CONTROL-NEW-CONNECTION Node ~a not found in ~a" dest spawned-nodes)))
(unless node (raise (format "1DCGM-CONTROL-NEW-CONNECTION Node ~a not found in ~a" dest spawned-nodes)))
(send node connect-channel src-id name #:one-sided pch)]
[(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 solo (list node-name node-port))
@ -572,11 +576,15 @@
(add-spawned-node (list node-name node-port) node)])]
[(dcgm #;101 (== DCGM-CONTROL-NEW-PLACE) dest -1 place-exec)
(define node (find-spawned-node dest))
(unless node (raise (format "DCGM-CONTROL-NEW-PLACE Node ~a not found in ~a" dest spawned-nodes)))
(unless node (raise (format "2DCGM-CONTROL-NEW-PLACE Node ~a not found in ~a" dest spawned-nodes)))
(send node launch-place place-exec)]
[(dcgm #;102 (== DCGM-CONTROL-NEW-CONNECTION) dest -1 (list name ch))
(define node (find-spawned-node dest))
(unless node (raise (format "DCGM-CONTROL-NEW-CONNECTION Node ~a not found in ~a" dest spawned-nodes)))
(define node
(or (find-spawned-node dest)
(let ()
(define node (new remote-node% [host-name (first dest)] [listen-port (second dest)]))
(add-spawned-node dest node)
node)))
(send node remote-connect name #:one-sided ch)]
[(dcgm mtype srcs dest msg)

View File

@ -0,0 +1,177 @@
#lang racket/base
(require racket/place
racket/match
racket/list
racket/pretty
racket/place/distributed)
(provide make-mr-workers
map-reduce
map-reduce-worker)
(define-syntax define/provide
(syntax-rules ()
[(_ (name x ...) body ...)
(begin (provide name)
(define (name x ...) body ...))]
[(_ name val)
(begin (provide name)
(define name val))]))
(define (map-coalesce-values kvl)
(let loop ([sl kvl]
[curk null]
[curv null]
[nl null])
(match sl
[(cons (cons slhk slhv) slt)
(if (null? curk)
(loop slt slhk (list slhv) nl)
(if (equal? slhk curk)
(loop slt curk (cons slhv curv) nl)
(loop sl null null (cons (cons curk (reverse curv)) nl))))]
[(list)
(reverse
(if (null? curk)
nl
(cons (cons curk (reverse curv)) nl)))])))
(define (coalesce-values kvl kvl2 less-than)
(let loop ([sl (sort (append kvl kvl2) less-than)]
[curk null]
[curv null]
[nl null])
(match sl
[(cons (cons slhk slhv) slt)
(if (null? curk)
(loop slt slhk slhv nl)
(if (equal? slhk curk)
(loop slt curk (append curv slhv) nl)
(loop sl null null (cons (cons curk curv) nl))))]
[(list)
(reverse
(if (null? curk)
nl
(cons (cons curk curv) nl)))])))
(define (->module-path x)
(match x
[(list 'file fn) (list 'file (bytes->string/locale fn))]
[(cons h t) (cons (->module-path h) (->module-path t))]
[(? bytes?) (bytes->path x)]
[else x]))
(define (apply-dynamic-require lst)
(match-define (list mp sym) lst)
(dynamic-require (->module-path mp) sym))
(define (map-reduce-worker ch)
(let loop ([map-val null])
(define msg (place-channel-get ch))
(match msg
[(list (list 'map mapper key-less-than t) rch)
(define nmv1 ((apply-dynamic-require mapper) t))
(define less-than (apply-dynamic-require key-less-than))
(define nmv2 (sort nmv1 less-than))
(define nmv (map-coalesce-values nmv2))
(place-channel-put rch (list 'reduce-ready))
(loop (cons rch nmv))]
[(list (list 'reduce-to reducer sorter addr) rch)
(define reduce-ch (mr-connect-to ch (take addr 2) (third addr)))
(place-channel-put reduce-ch (list 'reduce reducer sorter (cdr map-val)))
(place-channel-put rch (list 'reduce-done))
(loop null)]
[(list (list 'reduce reducer sorter kvs) rch)
(define less-than (apply-dynamic-require sorter))
(define nmvc (coalesce-values (cdr map-val) kvs less-than))
(define nmv ((apply-dynamic-require reducer) nmvc))
(place-channel-put (car map-val) (list 'reduce-ready))
(loop (cons (car map-val) nmv))]
[(list (list 'get-results) rch)
(place-channel-put rch (list 'results (cdr map-val)))
(loop null)]
)))
(define (i->place-name i)
(string->symbol (string-append "mpw" (number->string i))))
(define (make-mr-workers config)
(define nodes (spawn-nodes/join/local config))
(for ([n nodes]
[i (in-naturals)])
(supervise-named-dynamic-place-at n
(i->place-name i)
(->module-path (quote-module-path))
'map-reduce-worker))
nodes)
(define/provide (default-sorter a b)
(cond
[(number? a) (< a b)]
[(string? a) (string<? a b)]
[(pair? a) (default-sorter (car a) (car b))]
[else (raise (format "Cannot sort ~a" a))]))
(define (map-reduce nodes config tasks mapper reducer
#:sorter [sorter (list (quote-module-path) 'default-sorter)]
#:combiner [combiner #f]
#:outputer [outputer #f])
(define-values (mrth ch)
(start-message-router/thread
#:nodes nodes))
(define simple-config
(for/list ([c config]
[i (in-naturals)])
(append c (list (i->place-name i)))))
(define connections
(for/list ([c simple-config])
(match-define (list-rest host port name rest) c)
(define npch (mr-connect-to ch (list host port) name))
(list c npch)))
(define result
(let loop ([ts tasks]
[idle-mappers connections]
[mapping null]
[ready-to-reduce null]
[reducing null])
;(printf "STATE\n")
;(pretty-print (list ts idle-mappers mapping ready-to-reduce reducing))
;(flush-output)
(match (list ts idle-mappers mapping ready-to-reduce reducing)
[(list (cons tsh tst) (cons imh imt) mapping rtr r)
(*channel-put (second imh) (list 'map mapper sorter (list tsh)))
(loop tst imt (cons imh mapping) rtr r)]
[(list ts im m (cons rtr1 (cons rtr2 rtrt)) r)
(*channel-put (second rtr1) (list 'reduce-to reducer sorter (first rtr2)))
;
;(*channel-put rtr2 (list 'reduce-from rtr1))
(loop ts im m rtrt (cons rtr1 (cons rtr2 r)))]
[(list (list) im (list) (list rtr) (list))
(*channel-put (second rtr) (list 'get-results))
(second (*channel-get (second rtr)))]
[else ; wait
(apply sync/enable-break (for/list ([m (append mapping reducing)])
(wrap-evt (second m)
(lambda (e)
(match e
[(list 'reduce-ready)
(loop ts idle-mappers (remove m mapping) (cons m ready-to-reduce) (remove m reducing))]
[(list 'reduce-done)
(loop ts (cons m idle-mappers) mapping ready-to-reduce (remove m reducing))]
[else
(raise (format "Unknown response message ~a" e))])))))])))
(or (and outputer ((apply-dynamic-require outputer) result))
result))

View File

@ -187,8 +187,7 @@
(hash-set (car _rest) (string->keyword "listen-port") port))]))
; (printf/f "~a\n" rest)
(define-values (k v)
(let loop ([keys (map string->keyword
(sort (list "racket-path" "listen-port" "distributed-launch-path")))]
(let loop ([keys (list "racket-path" "listen-port" "distributed-launch-path")]
[k null]
[v null])
(match keys
@ -196,7 +195,7 @@
(cond
[(lookup-config-value rest head) => (lambda (x)
(loop tail
(cons head k)
(cons (string->keyword head) k)
(cons x v)))]
[else
(loop (cdr keys) k v)])]
@ -210,8 +209,8 @@
(match-define (list-rest host port name id rest) c)
(supervise-named-dynamic-place-at n
name
(lookup-config-value rest "rmpi-module")
(lookup-config-value rest "rmpi-func")))
(lookup-config-value rest "mpi-module")
(lookup-config-value rest "mpi-func")))
(define-values (mrth ch)
(start-message-router/thread
@ -226,7 +225,7 @@
(match-define (list-rest host port name id rest) c)
(define npch (mr-connect-to ch (list host port) name))
(*channel-put npch (list 'rmpi-id id simple-config))
(*channel-put npch (list 'args (or (lookup-config-value rest "rmpi-args") null))))
(*channel-put npch (list 'args (or (lookup-config-value rest "mpi-args") null))))
(for/first ([c config])
(match-define (list-rest host port name id rest) c)