Fix *channel-get's use of continuations

Cleanup map-reduce.rkt
Cleanup rmpi.rkt
Added create-place-node and distributed-place-wait
This commit is contained in:
Kevin Tew 2012-06-20 16:35:01 -06:00
parent 1d480a63a2
commit 89c7ba6493
7 changed files with 166 additions and 65 deletions

View File

@ -9,6 +9,8 @@
racket/vector
racket/place/private/th-place
mzlib/private/streams
unstable/lazy-require
(for-syntax racket/base
racket/syntax))
@ -33,6 +35,7 @@
place-dead-evt
)
(lazy-require [racket/place/distributed (supervise-dynamic-place-at)])
(define (place-channel-put/get ch msg)
(place-channel-put ch msg)
@ -59,9 +62,13 @@
(pl-place-pumper-threads p (vector t-in t-out t-err))]
[else (void)]))
(define (dynamic-place module-path function)
(start-place 'dynamic-place module-path function
#f (current-output-port) (current-error-port)))
(define (dynamic-place module-path function #:at [node #f])
(cond
[node
(supervise-dynamic-place-at node module-path function)]
[else
(start-place 'dynamic-place module-path function
#f (current-output-port) (current-error-port))]))
(define (dynamic-place* module-path
function

View File

@ -52,6 +52,10 @@
node-send-exit
node-get-first-place
;; without message router api
create-place-node
distributed-place-wait
;; low-level API
write-flush
printf/f
@ -121,6 +125,11 @@
port-no?
)
(define in-message-router-mark (cons #f #f))
(define (call-in-message-router thunk)
(with-continuation-mark in-message-router-mark #f
(call-with-continuation-prompt thunk)))
(define-runtime-path distributed-launch-path "distributed/launch.rkt")
(define (build-distributed-launch-path [collects-path
(simplify-path (find-executable-path (find-system-path 'exec-file)
@ -340,13 +349,13 @@
object% (event-container<%>)
(init-field cmdline-list)
(init-field [parent #f])
(field [s #f]
[i #f]
[o #f]
[e #f]
[pid #f])
(init-field [s #f]
[i #f]
[o #f]
[e #f])
(field [pid #f])
(let-values ([(_s _o _i _e) (apply subprocess #f #f #f cmdline-list)])
(let-values ([(_s _o _i _e) (apply subprocess o i e cmdline-list)])
(set! pid (subprocess-pid _s))
(set! s _s)
(set! o (box _o))
@ -395,6 +404,11 @@
sch
id
node)
(field [msg-queue null])
(define (queue-had-died-message?)
(for/or ([msg msg-queue])
(= (dcgm-type msg) DCGM-DPLACE-DIED)))
(define/public (register nes)
(cons
@ -414,9 +428,20 @@
(define/public (get-raw-msg)
(let loop ()
(define msg (send sch read-message))
(if (= (dcgm-type msg) DCGM-DPLACE-DIED)
(loop)
(dcgm-msg msg))))
(cond
[(= (dcgm-type msg) DCGM-DPLACE-DIED)
(set! msg-queue (append msg-queue (list msg)))
(loop)]
[else
(dcgm-msg msg)])))
(define/public (wait-to-die)
(or
(queue-had-died-message?)
(let loop ()
(define msg (send sch read-message))
(unless (= (dcgm-type msg) DCGM-DPLACE-DIED)
(loop))))
(void))
(define/public (put-msg msg)
;(printf/f "PSB3 ~a ~a ~a\n" sch id msg)
(sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg)))
@ -786,7 +811,9 @@
(define/public (read-message)
(when (equal? out #f) (ensure-connected))
(read in))
(define m (read in))
;(printf/f "MESSAGE ~a\n" m)
m)
(define/public (register nes)
(raise "Not-implemented/needed")
(cons (wrap-evt in void) nes))
@ -817,6 +844,7 @@
(init-field [cmdline-list #f])
(init-field [sc #f]) ;socket-connection
(init-field [restart-on-exit #f])
(init-field [use-current-ports #f])
(field [sp #f]) ;spawned-process
(field [id 0])
(field [remote-places null])
@ -831,7 +859,12 @@
(set! remote-places (append remote-places(list rp))))
(define (spawn-node)
(and cmdline-list
(set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this]))))
(set! sp
(if use-current-ports
(new spawned-process% [cmdline-list cmdline-list] [parent this]
[o (current-output-port)]
[e (current-error-port)])
(new spawned-process% [cmdline-list cmdline-list] [parent this])))))
(define (setup-socket-connection)
(set! sc (new socket-connection% [host host-name] [port listen-port] [remote-node this]))
(sconn-write-flush sc (dcgm DCGM-TYPE-SET-OWNER -1 -1 "")))
@ -976,6 +1009,7 @@
(define (node-send-exit node) (send node send-exit))
(define (node-get-first-place node) (send node get-first-place))
(define (distributed-place-wait p) (send p place-wait))
(define remote-connection%
(backlink
@ -1045,7 +1079,7 @@
(cond
[k
(lambda (e)
(call-with-continuation-prompt (lambda ()
(call-in-message-router(lambda ()
(begin0
(k e)
(set! k #f)))))]
@ -1071,6 +1105,8 @@
(set! k _k)
(abort-current-continuation (default-continuation-prompt-tag) void))))
(define/public (put-msg msg) (send psb put-msg msg))
(define/public (place-wait)
(send psb wait-to-die))
(super-new)
)))
@ -1088,7 +1124,7 @@
(field [running #f])
(define (default-on-place-dead e)
(set! pd #f)
(set! psb #f)
;(set! psb #f)
(sconn-write-flush sc (dcgm DCGM-DPLACE-DIED -1 -1 ch-id))
(sconn-remove-subchannel sc ch-id))
@ -1195,7 +1231,7 @@
(wrap-evt (alarm-evt fire-time)
(lambda (x)
(set! fire-time (+ (current-inexact-milliseconds) (* seconds 1000)))
(thunk)))
(call-in-message-router thunk)))
es)
es))
@ -1216,7 +1252,7 @@
(wrap-evt (alarm-evt fire-time)
(lambda (x)
(set! fire-time #f)
(call-with-continuation-prompt thunk)))
(call-in-message-router thunk)))
es)
es))
@ -1455,11 +1491,25 @@
(define (spawn-remote-racket-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)])
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:use-current-ports [use-current-ports #f])
(new remote-node%
[host-name host]
[listen-port listen-port]
[cmdline-list (list sshpath host racketpath "-tm" distributedlaunchpath "spawn" (->string listen-port))]))
[cmdline-list (list sshpath host racketpath "-tm" distributedlaunchpath "spawn" (->string listen-port))]
[use-current-ports use-current-ports]))
(define (create-place-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:use-current-ports [use-current-ports #t])
(spawn-remote-racket-node host
#:listen-port listen-port
#:racket-path racketpath
#:ssh-bin-path sshpath
#:distributed-launch-path distributedlaunchpath
#:use-current-ports use-current-ports))
(define (supervise-dynamic-place-at remote-node place-path place-func)
(send remote-node launch-place (list 'dynamic-place (->module-path-bytes place-path) place-func)))
@ -1592,7 +1642,12 @@
[(place-channel? ch) (place-channel-get ch)]
[(async-bi-channel? ch) (async-bi-channel-get ch)]
[(channel? ch) (channel-get ch)]
[(is-a? ch remote-connection%) (send ch get-msg)]
[(is-a? ch remote-connection%)
(cond
[(continuation-mark-set-first #f in-message-router-mark)
(send ch get-msg)]
[else
(send ch get-raw-msg)])]
[else (raise (format "unknown channel type ~a" ch))]))
(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
@ -1688,6 +1743,28 @@
object% ()
(init-field ch)
(field [msgs null])
(define/public (get-evt type)
(cond
[(has-type type) => (lambda (x)
(wrap-evt always-evt (lambda (e) x)))]
[else
(wrap-evt ch (lambda (v)
(set! msgs (append msgs (list v)))
(cond
[(has-type type) => (lambda (x) x)]
[else #f])))]))
(define (has-type type)
(let loop ([l msgs]
[nl null])
(cond
[(null? l) #f]
[(equal? type (caaar l))
(set! msgs (append (reverse nl) (cdr l)))
(car l)]
[else
(loop (cdr l) (cons (car l) nl))])))
(define/public (get type)
(let loop ([l msgs]
[nl null])

View File

@ -10,7 +10,7 @@
(match args
[(list "spawn" node-port)
(define listener (tcp-listen (->number node-port) 4 #t))
(write-flush (list (->number node-port)))
(printf/f "~a\n" (list (->number node-port)))
(start-spawned-node-router listener)]
;; Used to launch Design Pattern 1, MPI style distributed system.

View File

@ -6,18 +6,10 @@
racket/place/distributed)
(provide make-mr-workers
(provide make-map-reduce-workers
map-reduce
map-reduce-worker)
(define-syntax define/provide
(syntax-rules ()
[(_ (name x ...) body ...)
(begin (provide name)
(define (name x ...) body ...))]
[(_ name val)
(begin (provide name)
(define name val))]))
map-reduce-worker
default-sorter)
(define (map-coalesce-values kvl)
(let loop ([sl kvl]
@ -32,7 +24,7 @@
(loop slt curk (cons slhv curv) nl)
(loop sl null null (cons (cons curk (reverse curv)) nl))))]
[(list)
(reverse
(reverse
(if (null? curk)
nl
(cons (cons curk (reverse curv)) nl)))])))
@ -68,11 +60,11 @@
(dynamic-require (->module-path mp) sym))
(define (map-reduce-worker ch)
(let loop ([map-val null])
(let loop ([map-vals null])
(define msg (place-channel-get ch))
(match msg
[(list (list 'map mapper key-less-than t) rch)
(define nmv1 ((apply-dynamic-require mapper) t))
[(list (list 'map mapper key-less-than task) rch)
(define nmv1 ((apply-dynamic-require mapper) task))
(define less-than (apply-dynamic-require key-less-than))
(define nmv2 (sort nmv1 less-than))
(define nmv (map-coalesce-values nmv2))
@ -80,17 +72,17 @@
(loop (cons rch nmv))]
[(list (list 'reduce-to reducer sorter addr) rch)
(define reduce-ch (mr-connect-to ch (take addr 2) (third addr)))
(place-channel-put reduce-ch (list 'reduce reducer sorter (cdr map-val)))
(place-channel-put reduce-ch (list 'reduce reducer sorter (cdr map-vals)))
(place-channel-put rch (list 'reduce-done))
(loop null)]
[(list (list 'reduce reducer sorter kvs) rch)
(define less-than (apply-dynamic-require sorter))
(define nmvc (coalesce-values (cdr map-val) kvs less-than))
(define nmvc (coalesce-values (cdr map-vals) kvs less-than))
(define nmv ((apply-dynamic-require reducer) nmvc))
(place-channel-put (car map-val) (list 'reduce-ready))
(loop (cons (car map-val) nmv))]
(place-channel-put (car map-vals) (list 'reduce-ready))
(loop (cons (car map-vals) nmv))]
[(list (list 'get-results) rch)
(place-channel-put rch (list 'results (cdr map-val)))
(place-channel-put rch (list 'results (cdr map-vals)))
(loop null)]
)))
@ -98,7 +90,7 @@
(string->symbol (string-append "mpw" (number->string i))))
(define (make-mr-workers config)
(define (make-map-reduce-workers config)
(define nodes (spawn-nodes/join/local config))
(for ([n nodes]
[i (in-naturals)])
@ -109,7 +101,7 @@
nodes)
(define/provide (default-sorter a b)
(define (default-sorter a b)
(cond
[(number? a) (< a b)]
[(string? a) (string<? a b)]
@ -135,7 +127,7 @@
(define npch (mr-connect-to ch (list host port) name))
(list c npch)))
(define result
(define result
(let loop ([ts tasks]
[idle-mappers connections]
[mapping null]
@ -150,8 +142,6 @@
(loop tst imt (cons imh mapping) rtr r)]
[(list ts im m (cons rtr1 (cons rtr2 rtrt)) r)
(*channel-put (second rtr1) (list 'reduce-to reducer sorter (first rtr2)))
;
;(*channel-put rtr2 (list 'reduce-from rtr1))
(loop ts im m rtrt (cons rtr1 (cons rtr2 r)))]
[(list (list) im (list) (list rtr) (list))
(*channel-put (second rtr) (list 'get-results))

View File

@ -32,26 +32,24 @@
(define tc (new named-place-typed-channel% [ch ch]))
(match-define (list (list 'rmpi-id id config) return-ch) (tc-get 'rmpi-id tc))
(match-define (list (list 'args args) src-ch) (tc-get 'args tc))
(define mpi-comm-vector
(for/vector #:length (length config) ([c config])
(match-define (list dest dest-port dest-name dest-id) c)
(cond
[(< id dest-id)
;(printf/f "sending connect to dest-id ~a from id ~a over ~a" dest-id id ch)
(send-new-place-channel-to-named-dest ch id (list dest dest-port dest-name))]
[else null])))
(for ([i (length config)])
(cond
[(> id i)
(match-define (list (list 'new-place-channel src-id) src-ch) (tc-get 'new-place-channel tc))
;(printf/f "received connect from id ~a ~a" src-id src-ch)
(vector-set! mpi-comm-vector src-id src-ch)]
[else null]))
(when (> id i)
(match-define (list (list 'new-place-channel src-id) src-ch) (tc-get 'new-place-channel tc))
(vector-set! mpi-comm-vector src-id src-ch)))
(values
(rmpi-comm id (length config) mpi-comm-vector)
args
tc
))
tc))
(define rmpi-broadcast
@ -161,7 +159,7 @@
; (displayln (keyword? kw))
(values kw kwa)))))
(define (rmpi-launch default config)
(define (rmpi-launch default config #:no-wait [no-wait #f])
(define (lookup-config-value rest key-str)
(define key
(string->keyword key-str))
@ -227,12 +225,19 @@
(*channel-put npch (list 'rmpi-id id simple-config))
(*channel-put npch (list 'args (or (lookup-config-value rest "mpi-args") null))))
(for/first ([c config])
(match-define (list-rest host port name id rest) c)
(define npch (mr-connect-to ch (list host port) name))
(*channel-put npch (list 'done?))
;Wait for 'done message from mpi node id 0
(*channel-get npch)))
(cond
[no-wait
(for/first ([c config])
(match-define (list-rest host port name id rest) c)
(define npch (mr-connect-to ch (list host port) name))
(list npch))]
[else
(for/first ([c config])
(match-define (list-rest host port name id rest) c)
(define npch (mr-connect-to ch (list host port) name))
(*channel-put npch (list 'done?))
;Wait for 'done message from mpi node id 0
(*channel-get npch))]))
(define (rmpi-finish comm tc)

View File

@ -18,6 +18,11 @@
[(bytes? x) (bytes->path x)]
[else x]))
(define (path-bytes->string x)
(cond
[(bytes? x) (bytes->string/locale x)]
[else x]))
(define (->path x)
(cond [(path? x) x]
[(string? x) (string->path x)]
@ -31,7 +36,10 @@
(define (->module-path x)
(cond [(path? x) x]
[(list? x) (map path-bytes->path x)]
[(and (list? x) (pair? x))
(cond
[(equal? (car x) 'file) (map path-bytes->string x)]
[else (map path-bytes->path x)])]
[(bytes? x) (bytes->path x)]
[(string? x) (string->path x)]))

View File

@ -201,10 +201,22 @@ values (like @racket[spawn-node-supervise-dynamic-place-at]): the new
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:racket-path racket-path string-path? (racket-path)]
[#:ssh-bin-path ssh-path string-path? (ssh-bin-path)]
[#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]) (is-a?/c remote-node%)]{
[#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]
[#:use-current-ports use-current-ports #f]) (is-a?/c remote-node%)]{
Spawns a new remote node at @racket[hostname] and returns a @racket[remote-node%] handle.}
@defproc[(create-place-node
[hostname string?]
[#:listen-port port port-no? DEFAULT-ROUTER-PORT]
[#:racket-path racket-path string-path? (racket-path)]
[#:ssh-bin-path ssh-path string-path? (ssh-bin-path)]
[#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]
[#:use-current-ports use-current-ports #t]) (is-a?/c remote-node%)]{
Like @racket[spawn-remote-racket-node], but the @racket[current-output-port] and @racket[current-error-port]
are used as the standard ports for the spawned process instead of new pipe ports.}
@defproc[(supervise-dynamic-place-at
[remote-node (is-a?/c remote-node%)]
[instance-module-path module-path?]
@ -493,6 +505,8 @@ Sends @racket[node] a message telling it to exit immediately.
Returns the @racket[remote-connection%] instance of the first place spawned at this node
}
@defproc[(distributed-place-wait [remote-connection% place]) void?]{
Waits for @racket[place] to terminate.}
@defclass[remote-connection% object% (event-container<%>)]{