diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index 6a4e539933..6714e55ef1 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -9,6 +9,8 @@ racket/vector racket/place/private/th-place mzlib/private/streams + unstable/lazy-require + (for-syntax racket/base racket/syntax)) @@ -33,6 +35,7 @@ place-dead-evt ) +(lazy-require [racket/place/distributed (supervise-dynamic-place-at)]) (define (place-channel-put/get ch msg) (place-channel-put ch msg) @@ -59,9 +62,13 @@ (pl-place-pumper-threads p (vector t-in t-out t-err))] [else (void)])) -(define (dynamic-place module-path function) - (start-place 'dynamic-place module-path function - #f (current-output-port) (current-error-port))) +(define (dynamic-place module-path function #:at [node #f]) + (cond + [node + (supervise-dynamic-place-at node module-path function)] + [else + (start-place 'dynamic-place module-path function + #f (current-output-port) (current-error-port))])) (define (dynamic-place* module-path function diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index a2d9c7360b..1559754601 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -52,6 +52,10 @@ node-send-exit node-get-first-place + ;; without message router api + create-place-node + distributed-place-wait + ;; low-level API write-flush printf/f @@ -121,6 +125,11 @@ port-no? ) +(define in-message-router-mark (cons #f #f)) +(define (call-in-message-router thunk) + (with-continuation-mark in-message-router-mark #f + (call-with-continuation-prompt thunk))) + (define-runtime-path distributed-launch-path "distributed/launch.rkt") (define (build-distributed-launch-path [collects-path (simplify-path (find-executable-path (find-system-path 'exec-file) @@ -340,13 +349,13 @@ object% (event-container<%>) (init-field cmdline-list) (init-field [parent #f]) - (field [s #f] - [i #f] - [o #f] - [e #f] - [pid #f]) + (init-field [s #f] + [i #f] + [o #f] + [e #f]) + (field [pid #f]) - (let-values ([(_s _o _i _e) (apply subprocess #f #f #f cmdline-list)]) + (let-values ([(_s _o _i _e) (apply subprocess o i e cmdline-list)]) (set! pid (subprocess-pid _s)) (set! s _s) (set! o (box _o)) @@ -395,6 +404,11 @@ sch id node) + (field [msg-queue null]) + + (define (queue-had-died-message?) + (for/or ([msg msg-queue]) + (= (dcgm-type msg) DCGM-DPLACE-DIED))) (define/public (register nes) (cons @@ -414,9 +428,20 @@ (define/public (get-raw-msg) (let loop () (define msg (send sch read-message)) - (if (= (dcgm-type msg) DCGM-DPLACE-DIED) - (loop) - (dcgm-msg msg)))) + (cond + [(= (dcgm-type msg) DCGM-DPLACE-DIED) + (set! msg-queue (append msg-queue (list msg))) + (loop)] + [else + (dcgm-msg msg)]))) + (define/public (wait-to-die) + (or + (queue-had-died-message?) + (let loop () + (define msg (send sch read-message)) + (unless (= (dcgm-type msg) DCGM-DPLACE-DIED) + (loop)))) + (void)) (define/public (put-msg msg) ;(printf/f "PSB3 ~a ~a ~a\n" sch id msg) (sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))) @@ -786,7 +811,9 @@ (define/public (read-message) (when (equal? out #f) (ensure-connected)) - (read in)) + (define m (read in)) + ;(printf/f "MESSAGE ~a\n" m) + m) (define/public (register nes) (raise "Not-implemented/needed") (cons (wrap-evt in void) nes)) @@ -817,6 +844,7 @@ (init-field [cmdline-list #f]) (init-field [sc #f]) ;socket-connection (init-field [restart-on-exit #f]) + (init-field [use-current-ports #f]) (field [sp #f]) ;spawned-process (field [id 0]) (field [remote-places null]) @@ -831,7 +859,12 @@ (set! remote-places (append remote-places(list rp)))) (define (spawn-node) (and cmdline-list - (set! sp (new spawned-process% [cmdline-list cmdline-list] [parent this])))) + (set! sp + (if use-current-ports + (new spawned-process% [cmdline-list cmdline-list] [parent this] + [o (current-output-port)] + [e (current-error-port)]) + (new spawned-process% [cmdline-list cmdline-list] [parent this]))))) (define (setup-socket-connection) (set! sc (new socket-connection% [host host-name] [port listen-port] [remote-node this])) (sconn-write-flush sc (dcgm DCGM-TYPE-SET-OWNER -1 -1 ""))) @@ -976,6 +1009,7 @@ (define (node-send-exit node) (send node send-exit)) (define (node-get-first-place node) (send node get-first-place)) +(define (distributed-place-wait p) (send p place-wait)) (define remote-connection% (backlink @@ -1045,7 +1079,7 @@ (cond [k (lambda (e) - (call-with-continuation-prompt (lambda () + (call-in-message-router(lambda () (begin0 (k e) (set! k #f)))))] @@ -1071,6 +1105,8 @@ (set! k _k) (abort-current-continuation (default-continuation-prompt-tag) void)))) (define/public (put-msg msg) (send psb put-msg msg)) + (define/public (place-wait) + (send psb wait-to-die)) (super-new) ))) @@ -1088,7 +1124,7 @@ (field [running #f]) (define (default-on-place-dead e) (set! pd #f) - (set! psb #f) + ;(set! psb #f) (sconn-write-flush sc (dcgm DCGM-DPLACE-DIED -1 -1 ch-id)) (sconn-remove-subchannel sc ch-id)) @@ -1195,7 +1231,7 @@ (wrap-evt (alarm-evt fire-time) (lambda (x) (set! fire-time (+ (current-inexact-milliseconds) (* seconds 1000))) - (thunk))) + (call-in-message-router thunk))) es) es)) @@ -1216,7 +1252,7 @@ (wrap-evt (alarm-evt fire-time) (lambda (x) (set! fire-time #f) - (call-with-continuation-prompt thunk))) + (call-in-message-router thunk))) es) es)) @@ -1455,11 +1491,25 @@ (define (spawn-remote-racket-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]) + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:use-current-ports [use-current-ports #f]) (new remote-node% [host-name host] [listen-port listen-port] - [cmdline-list (list sshpath host racketpath "-tm" distributedlaunchpath "spawn" (->string listen-port))])) + [cmdline-list (list sshpath host racketpath "-tm" distributedlaunchpath "spawn" (->string listen-port))] + [use-current-ports use-current-ports])) + +(define (create-place-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT] + #:racket-path [racketpath (racket-path)] + #:ssh-bin-path [sshpath (ssh-bin-path)] + #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:use-current-ports [use-current-ports #t]) + (spawn-remote-racket-node host + #:listen-port listen-port + #:racket-path racketpath + #:ssh-bin-path sshpath + #:distributed-launch-path distributedlaunchpath + #:use-current-ports use-current-ports)) (define (supervise-dynamic-place-at remote-node place-path place-func) (send remote-node launch-place (list 'dynamic-place (->module-path-bytes place-path) place-func))) @@ -1592,7 +1642,12 @@ [(place-channel? ch) (place-channel-get ch)] [(async-bi-channel? ch) (async-bi-channel-get ch)] [(channel? ch) (channel-get ch)] - [(is-a? ch remote-connection%) (send ch get-msg)] + [(is-a? ch remote-connection%) + (cond + [(continuation-mark-set-first #f in-message-router-mark) + (send ch get-msg)] + [else + (send ch get-raw-msg)])] [else (raise (format "unknown channel type ~a" ch))])) (define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT] @@ -1688,6 +1743,28 @@ object% () (init-field ch) (field [msgs null]) + (define/public (get-evt type) + (cond + [(has-type type) => (lambda (x) + (wrap-evt always-evt (lambda (e) x)))] + [else + (wrap-evt ch (lambda (v) + (set! msgs (append msgs (list v))) + (cond + [(has-type type) => (lambda (x) x)] + [else #f])))])) + + (define (has-type type) + (let loop ([l msgs] + [nl null]) + (cond + [(null? l) #f] + [(equal? type (caaar l)) + (set! msgs (append (reverse nl) (cdr l))) + (car l)] + [else + (loop (cdr l) (cons (car l) nl))]))) + (define/public (get type) (let loop ([l msgs] [nl null]) diff --git a/collects/racket/place/distributed/launch.rkt b/collects/racket/place/distributed/launch.rkt index b29672ccda..5a4ef41acd 100644 --- a/collects/racket/place/distributed/launch.rkt +++ b/collects/racket/place/distributed/launch.rkt @@ -10,7 +10,7 @@ (match args [(list "spawn" node-port) (define listener (tcp-listen (->number node-port) 4 #t)) - (write-flush (list (->number node-port))) + (printf/f "~a\n" (list (->number node-port))) (start-spawned-node-router listener)] ;; Used to launch Design Pattern 1, MPI style distributed system. diff --git a/collects/racket/place/distributed/map-reduce.rkt b/collects/racket/place/distributed/map-reduce.rkt index 51dbdd348f..163bbac759 100644 --- a/collects/racket/place/distributed/map-reduce.rkt +++ b/collects/racket/place/distributed/map-reduce.rkt @@ -6,18 +6,10 @@ racket/place/distributed) -(provide make-mr-workers +(provide make-map-reduce-workers map-reduce - map-reduce-worker) - -(define-syntax define/provide - (syntax-rules () - [(_ (name x ...) body ...) - (begin (provide name) - (define (name x ...) body ...))] - [(_ name val) - (begin (provide name) - (define name val))])) + map-reduce-worker + default-sorter) (define (map-coalesce-values kvl) (let loop ([sl kvl] @@ -32,7 +24,7 @@ (loop slt curk (cons slhv curv) nl) (loop sl null null (cons (cons curk (reverse curv)) nl))))] [(list) - (reverse + (reverse (if (null? curk) nl (cons (cons curk (reverse curv)) nl)))]))) @@ -68,11 +60,11 @@ (dynamic-require (->module-path mp) sym)) (define (map-reduce-worker ch) - (let loop ([map-val null]) + (let loop ([map-vals null]) (define msg (place-channel-get ch)) (match msg - [(list (list 'map mapper key-less-than t) rch) - (define nmv1 ((apply-dynamic-require mapper) t)) + [(list (list 'map mapper key-less-than task) rch) + (define nmv1 ((apply-dynamic-require mapper) task)) (define less-than (apply-dynamic-require key-less-than)) (define nmv2 (sort nmv1 less-than)) (define nmv (map-coalesce-values nmv2)) @@ -80,17 +72,17 @@ (loop (cons rch nmv))] [(list (list 'reduce-to reducer sorter addr) rch) (define reduce-ch (mr-connect-to ch (take addr 2) (third addr))) - (place-channel-put reduce-ch (list 'reduce reducer sorter (cdr map-val))) + (place-channel-put reduce-ch (list 'reduce reducer sorter (cdr map-vals))) (place-channel-put rch (list 'reduce-done)) (loop null)] [(list (list 'reduce reducer sorter kvs) rch) (define less-than (apply-dynamic-require sorter)) - (define nmvc (coalesce-values (cdr map-val) kvs less-than)) + (define nmvc (coalesce-values (cdr map-vals) kvs less-than)) (define nmv ((apply-dynamic-require reducer) nmvc)) - (place-channel-put (car map-val) (list 'reduce-ready)) - (loop (cons (car map-val) nmv))] + (place-channel-put (car map-vals) (list 'reduce-ready)) + (loop (cons (car map-vals) nmv))] [(list (list 'get-results) rch) - (place-channel-put rch (list 'results (cdr map-val))) + (place-channel-put rch (list 'results (cdr map-vals))) (loop null)] ))) @@ -98,7 +90,7 @@ (string->symbol (string-append "mpw" (number->string i)))) -(define (make-mr-workers config) +(define (make-map-reduce-workers config) (define nodes (spawn-nodes/join/local config)) (for ([n nodes] [i (in-naturals)]) @@ -109,7 +101,7 @@ nodes) -(define/provide (default-sorter a b) +(define (default-sorter a b) (cond [(number? a) (< a b)] [(string? a) (string id i) - (match-define (list (list 'new-place-channel src-id) src-ch) (tc-get 'new-place-channel tc)) - ;(printf/f "received connect from id ~a ~a" src-id src-ch) - (vector-set! mpi-comm-vector src-id src-ch)] - [else null])) + (when (> id i) + (match-define (list (list 'new-place-channel src-id) src-ch) (tc-get 'new-place-channel tc)) + (vector-set! mpi-comm-vector src-id src-ch))) + (values (rmpi-comm id (length config) mpi-comm-vector) args - tc - )) + tc)) (define rmpi-broadcast @@ -161,7 +159,7 @@ ; (displayln (keyword? kw)) (values kw kwa))))) -(define (rmpi-launch default config) +(define (rmpi-launch default config #:no-wait [no-wait #f]) (define (lookup-config-value rest key-str) (define key (string->keyword key-str)) @@ -227,12 +225,19 @@ (*channel-put npch (list 'rmpi-id id simple-config)) (*channel-put npch (list 'args (or (lookup-config-value rest "mpi-args") null)))) - (for/first ([c config]) - (match-define (list-rest host port name id rest) c) - (define npch (mr-connect-to ch (list host port) name)) - (*channel-put npch (list 'done?)) - ;Wait for 'done message from mpi node id 0 - (*channel-get npch))) + (cond + [no-wait + (for/first ([c config]) + (match-define (list-rest host port name id rest) c) + (define npch (mr-connect-to ch (list host port) name)) + (list npch))] + [else + (for/first ([c config]) + (match-define (list-rest host port name id rest) c) + (define npch (mr-connect-to ch (list host port) name)) + (*channel-put npch (list 'done?)) + ;Wait for 'done message from mpi node id 0 + (*channel-get npch))])) (define (rmpi-finish comm tc) diff --git a/collects/racket/place/private/coercion.rkt b/collects/racket/place/private/coercion.rkt index 93b66b1367..eb759b1c59 100644 --- a/collects/racket/place/private/coercion.rkt +++ b/collects/racket/place/private/coercion.rkt @@ -18,6 +18,11 @@ [(bytes? x) (bytes->path x)] [else x])) +(define (path-bytes->string x) + (cond + [(bytes? x) (bytes->string/locale x)] + [else x])) + (define (->path x) (cond [(path? x) x] [(string? x) (string->path x)] @@ -31,7 +36,10 @@ (define (->module-path x) (cond [(path? x) x] - [(list? x) (map path-bytes->path x)] + [(and (list? x) (pair? x)) + (cond + [(equal? (car x) 'file) (map path-bytes->string x)] + [else (map path-bytes->path x)])] [(bytes? x) (bytes->path x)] [(string? x) (string->path x)])) diff --git a/collects/scribblings/reference/distributed.scrbl b/collects/scribblings/reference/distributed.scrbl index e3e9157137..77005725e5 100644 --- a/collects/scribblings/reference/distributed.scrbl +++ b/collects/scribblings/reference/distributed.scrbl @@ -201,10 +201,22 @@ values (like @racket[spawn-node-supervise-dynamic-place-at]): the new [#:listen-port port port-no? DEFAULT-ROUTER-PORT] [#:racket-path racket-path string-path? (racket-path)] [#:ssh-bin-path ssh-path string-path? (ssh-bin-path)] - [#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)]) (is-a?/c remote-node%)]{ + [#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)] + [#:use-current-ports use-current-ports #f]) (is-a?/c remote-node%)]{ Spawns a new remote node at @racket[hostname] and returns a @racket[remote-node%] handle.} +@defproc[(create-place-node + [hostname string?] + [#:listen-port port port-no? DEFAULT-ROUTER-PORT] + [#:racket-path racket-path string-path? (racket-path)] + [#:ssh-bin-path ssh-path string-path? (ssh-bin-path)] + [#:distributed-launch-path launcher-path string-path? (path->string distributed-launch-path)] + [#:use-current-ports use-current-ports #t]) (is-a?/c remote-node%)]{ + +Like @racket[spawn-remote-racket-node], but the @racket[current-output-port] and @racket[current-error-port] +are used as the standard ports for the spawned process instead of new pipe ports.} + @defproc[(supervise-dynamic-place-at [remote-node (is-a?/c remote-node%)] [instance-module-path module-path?] @@ -493,6 +505,8 @@ Sends @racket[node] a message telling it to exit immediately. Returns the @racket[remote-connection%] instance of the first place spawned at this node } +@defproc[(distributed-place-wait [remote-connection% place]) void?]{ +Waits for @racket[place] to terminate.} @defclass[remote-connection% object% (event-container<%>)]{