diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 576fe54467..a2d9c7360b 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -406,6 +406,8 @@ (send node log-from-child #:severity severity msg)] [(dcgm #;51 (== DCGM-NEW-PLACE-CHANNEL) _ _ _) (send node forward-mesg e pch)] + [(dcgm #;102 (== DCGM-CONTROL-NEW-CONNECTION) dest -1 (list name ch)) + (send node forward-mesg e pch)] [else (put-msg e)]))) nes)) (define/public (get-sc-id) id) @@ -553,12 +555,14 @@ (log-debug (format "RECV DCGM-TYPE-SET-OWNER ~a" src-channel)) (set! owner src-channel)] [(dcgm #;50 (== DCGM-NEW-NODE-CONNECT) -1 -1 (list node-name node-port)) - (add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port]))] + (define node (find-spawned-node (list node-name node-port))) + (unless node + (add-spawned-node (list node-name node-port) (new remote-node% [host-name node-name] [listen-port node-port])))] [(dcgm #;51 (== DCGM-NEW-PLACE-CHANNEL) src-id (and dest (list host port name)) pch) ;(printf/f "DCGM-NEW-PLACE-CHANNEL ~a ~a\n" src-id dest) (define node (find-spawned-node (list host port))) - (unless node (raise (format "DCGM-CONTROL-NEW-CONNECTION Node ~a not found in ~a" dest spawned-nodes))) + (unless node (raise (format "1DCGM-CONTROL-NEW-CONNECTION Node ~a not found in ~a" dest spawned-nodes))) (send node connect-channel src-id name #:one-sided pch)] [(dcgm #;100 (== DCGM-CONTROL-NEW-NODE) -1 solo (list node-name node-port)) @@ -572,11 +576,15 @@ (add-spawned-node (list node-name node-port) node)])] [(dcgm #;101 (== DCGM-CONTROL-NEW-PLACE) dest -1 place-exec) (define node (find-spawned-node dest)) - (unless node (raise (format "DCGM-CONTROL-NEW-PLACE Node ~a not found in ~a" dest spawned-nodes))) + (unless node (raise (format "2DCGM-CONTROL-NEW-PLACE Node ~a not found in ~a" dest spawned-nodes))) (send node launch-place place-exec)] [(dcgm #;102 (== DCGM-CONTROL-NEW-CONNECTION) dest -1 (list name ch)) - (define node (find-spawned-node dest)) - (unless node (raise (format "DCGM-CONTROL-NEW-CONNECTION Node ~a not found in ~a" dest spawned-nodes))) + (define node + (or (find-spawned-node dest) + (let () + (define node (new remote-node% [host-name (first dest)] [listen-port (second dest)])) + (add-spawned-node dest node) + node))) (send node remote-connect name #:one-sided ch)] [(dcgm mtype srcs dest msg) diff --git a/collects/racket/place/distributed/map-reduce.rkt b/collects/racket/place/distributed/map-reduce.rkt new file mode 100644 index 0000000000..51dbdd348f --- /dev/null +++ b/collects/racket/place/distributed/map-reduce.rkt @@ -0,0 +1,177 @@ +#lang racket/base +(require racket/place + racket/match + racket/list + racket/pretty + racket/place/distributed) + + +(provide make-mr-workers + map-reduce + map-reduce-worker) + +(define-syntax define/provide + (syntax-rules () + [(_ (name x ...) body ...) + (begin (provide name) + (define (name x ...) body ...))] + [(_ name val) + (begin (provide name) + (define name val))])) + +(define (map-coalesce-values kvl) + (let loop ([sl kvl] + [curk null] + [curv null] + [nl null]) + (match sl + [(cons (cons slhk slhv) slt) + (if (null? curk) + (loop slt slhk (list slhv) nl) + (if (equal? slhk curk) + (loop slt curk (cons slhv curv) nl) + (loop sl null null (cons (cons curk (reverse curv)) nl))))] + [(list) + (reverse + (if (null? curk) + nl + (cons (cons curk (reverse curv)) nl)))]))) + +(define (coalesce-values kvl kvl2 less-than) + (let loop ([sl (sort (append kvl kvl2) less-than)] + [curk null] + [curv null] + [nl null]) + (match sl + [(cons (cons slhk slhv) slt) + (if (null? curk) + (loop slt slhk slhv nl) + (if (equal? slhk curk) + (loop slt curk (append curv slhv) nl) + (loop sl null null (cons (cons curk curv) nl))))] + [(list) + (reverse + (if (null? curk) + nl + (cons (cons curk curv) nl)))]))) + + +(define (->module-path x) + (match x + [(list 'file fn) (list 'file (bytes->string/locale fn))] + [(cons h t) (cons (->module-path h) (->module-path t))] + [(? bytes?) (bytes->path x)] + [else x])) + +(define (apply-dynamic-require lst) + (match-define (list mp sym) lst) + (dynamic-require (->module-path mp) sym)) + +(define (map-reduce-worker ch) + (let loop ([map-val null]) + (define msg (place-channel-get ch)) + (match msg + [(list (list 'map mapper key-less-than t) rch) + (define nmv1 ((apply-dynamic-require mapper) t)) + (define less-than (apply-dynamic-require key-less-than)) + (define nmv2 (sort nmv1 less-than)) + (define nmv (map-coalesce-values nmv2)) + (place-channel-put rch (list 'reduce-ready)) + (loop (cons rch nmv))] + [(list (list 'reduce-to reducer sorter addr) rch) + (define reduce-ch (mr-connect-to ch (take addr 2) (third addr))) + (place-channel-put reduce-ch (list 'reduce reducer sorter (cdr map-val))) + (place-channel-put rch (list 'reduce-done)) + (loop null)] + [(list (list 'reduce reducer sorter kvs) rch) + (define less-than (apply-dynamic-require sorter)) + (define nmvc (coalesce-values (cdr map-val) kvs less-than)) + (define nmv ((apply-dynamic-require reducer) nmvc)) + (place-channel-put (car map-val) (list 'reduce-ready)) + (loop (cons (car map-val) nmv))] + [(list (list 'get-results) rch) + (place-channel-put rch (list 'results (cdr map-val))) + (loop null)] + ))) + +(define (i->place-name i) + (string->symbol (string-append "mpw" (number->string i)))) + + +(define (make-mr-workers config) + (define nodes (spawn-nodes/join/local config)) + (for ([n nodes] + [i (in-naturals)]) + (supervise-named-dynamic-place-at n + (i->place-name i) + (->module-path (quote-module-path)) + 'map-reduce-worker)) + + nodes) + +(define/provide (default-sorter a b) + (cond + [(number? a) (< a b)] + [(string? a) (stringplace-name i))))) + + (define connections + (for/list ([c simple-config]) + (match-define (list-rest host port name rest) c) + (define npch (mr-connect-to ch (list host port) name)) + (list c npch))) + + (define result + (let loop ([ts tasks] + [idle-mappers connections] + [mapping null] + [ready-to-reduce null] + [reducing null]) + ;(printf "STATE\n") + ;(pretty-print (list ts idle-mappers mapping ready-to-reduce reducing)) + ;(flush-output) + (match (list ts idle-mappers mapping ready-to-reduce reducing) + [(list (cons tsh tst) (cons imh imt) mapping rtr r) + (*channel-put (second imh) (list 'map mapper sorter (list tsh))) + (loop tst imt (cons imh mapping) rtr r)] + [(list ts im m (cons rtr1 (cons rtr2 rtrt)) r) + (*channel-put (second rtr1) (list 'reduce-to reducer sorter (first rtr2))) + ; + ;(*channel-put rtr2 (list 'reduce-from rtr1)) + (loop ts im m rtrt (cons rtr1 (cons rtr2 r)))] + [(list (list) im (list) (list rtr) (list)) + (*channel-put (second rtr) (list 'get-results)) + (second (*channel-get (second rtr)))] + [else ; wait + (apply sync/enable-break (for/list ([m (append mapping reducing)]) + (wrap-evt (second m) + (lambda (e) + (match e + [(list 'reduce-ready) + (loop ts idle-mappers (remove m mapping) (cons m ready-to-reduce) (remove m reducing))] + [(list 'reduce-done) + (loop ts (cons m idle-mappers) mapping ready-to-reduce (remove m reducing))] + [else + (raise (format "Unknown response message ~a" e))])))))]))) + + (or (and outputer ((apply-dynamic-require outputer) result)) + result)) + + + + + diff --git a/collects/racket/place/distributed/rmpi.rkt b/collects/racket/place/distributed/rmpi.rkt index ac8c0173b6..a7bf1557fc 100644 --- a/collects/racket/place/distributed/rmpi.rkt +++ b/collects/racket/place/distributed/rmpi.rkt @@ -187,8 +187,7 @@ (hash-set (car _rest) (string->keyword "listen-port") port))])) ; (printf/f "~a\n" rest) (define-values (k v) - (let loop ([keys (map string->keyword - (sort (list "racket-path" "listen-port" "distributed-launch-path")))] + (let loop ([keys (list "racket-path" "listen-port" "distributed-launch-path")] [k null] [v null]) (match keys @@ -196,7 +195,7 @@ (cond [(lookup-config-value rest head) => (lambda (x) (loop tail - (cons head k) + (cons (string->keyword head) k) (cons x v)))] [else (loop (cdr keys) k v)])] @@ -210,8 +209,8 @@ (match-define (list-rest host port name id rest) c) (supervise-named-dynamic-place-at n name - (lookup-config-value rest "rmpi-module") - (lookup-config-value rest "rmpi-func"))) + (lookup-config-value rest "mpi-module") + (lookup-config-value rest "mpi-func"))) (define-values (mrth ch) (start-message-router/thread @@ -226,7 +225,7 @@ (match-define (list-rest host port name id rest) c) (define npch (mr-connect-to ch (list host port) name)) (*channel-put npch (list 'rmpi-id id simple-config)) - (*channel-put npch (list 'args (or (lookup-config-value rest "rmpi-args") null)))) + (*channel-put npch (list 'args (or (lookup-config-value rest "mpi-args") null)))) (for/first ([c config]) (match-define (list-rest host port name id rest) c)