Initial Distributed Places commit
Distributed Places allows the spawning of remote racket processes and places. Distributed Places communicate transparently over TCP sockets.
This commit is contained in:
parent
922857489a
commit
09bed0d61e
|
@ -1177,6 +1177,7 @@ path/s is either such a string or a list of them.
|
|||
"collects/racket/gui.rkt" drdr:command-line (gracket "-t" *)
|
||||
"collects/racket/match" responsible (samth)
|
||||
"collects/racket/match.rkt" responsible (samth)
|
||||
"collects/racket/place" responsible (tewk)
|
||||
"collects/racklog" responsible (jay)
|
||||
"collects/rackunit" responsible (jay noel ryanc)
|
||||
"collects/rackunit/gui.rkt" responsible (ryanc) drdr:command-line (gracket-text "-t" *)
|
||||
|
@ -1279,6 +1280,7 @@ path/s is either such a string or a list of them.
|
|||
"collects/scribblings/guide/contracts-examples/6.rkt" drdr:command-line (racket "-f" *)
|
||||
"collects/scribblings/guide/read.scrbl" drdr:command-line #f
|
||||
"collects/scribblings/raco" responsible (jay mflatt)
|
||||
"collects/scribblings/reference/distributed.scrbl" responsible (tewk)
|
||||
"collects/scriblib" responsible (mflatt)
|
||||
"collects/setup" responsible (mflatt)
|
||||
"collects/setup/main.rkt" drdr:command-line #f
|
||||
|
@ -1858,6 +1860,9 @@ path/s is either such a string or a list of them.
|
|||
"collects/tests/racket/path.rktl" drdr:command-line #f
|
||||
"collects/tests/racket/pathlib.rktl" drdr:command-line (racket "-f" *)
|
||||
"collects/tests/racket/pconvert.rktl" drdr:command-line #f
|
||||
"collects/tests/racket/place" responsible (tewk)
|
||||
"collects/tests/racket/place/distributed/distributed.rkt" drdr:command-line (racket "-tm" *)
|
||||
"collects/tests/racket/place/distributed/restarter.rkt" drdr:command-line (racket "-tm" *)
|
||||
"collects/tests/racket/place-chan-rand-help.rkt" responsible (tewk)
|
||||
"collects/tests/racket/place-chan-rand.rkt" responsible (tewk) drdr:random #t
|
||||
"collects/tests/racket/place-channel-fd.rkt" responsible (tewk) drdr:command-line (racket "-tm" *)
|
||||
|
|
150
collects/racket/place/define-remote-server.rkt
Normal file
150
collects/racket/place/define-remote-server.rkt
Normal file
|
@ -0,0 +1,150 @@
|
|||
#lang racket/base
|
||||
(require (for-syntax racket/base)
|
||||
(for-syntax syntax/stx)
|
||||
racket/place
|
||||
racket/match
|
||||
racket/class
|
||||
racket/stxparam
|
||||
(for-syntax racket/pretty)
|
||||
racket/place/distributed)
|
||||
|
||||
(define-syntax define/provide
|
||||
(syntax-rules ()
|
||||
[(_ (name x ...) body ...)
|
||||
(begin (provide name)
|
||||
(define (name x ...) body ...))]
|
||||
[(_ name val)
|
||||
(begin (provide name)
|
||||
(define name val))]))
|
||||
|
||||
(define (dplace/place-channel-get dest)
|
||||
(cond
|
||||
[(place-channel? dest) (place-channel-get dest)]
|
||||
[else (send dest get-msg)]))
|
||||
|
||||
(define (dplace/place-channel-put dest msg)
|
||||
(cond
|
||||
[(place-channel? dest) (place-channel-put dest msg)]
|
||||
[else (send dest put-msg msg)]))
|
||||
|
||||
|
||||
(define-syntax-rule (define-syntax-parameter-error x)
|
||||
(define-syntax-parameter x (lambda (stx) (raise-syntax-error 'x "only allowed inside define-*-remote-server definition" stx))))
|
||||
|
||||
(define-syntax-parameter-error log-to-parent)
|
||||
|
||||
(define-syntax (define-define-remote-server stx)
|
||||
(syntax-case stx ()
|
||||
[(_ form-name)
|
||||
#;(printf "FORM_NAME ~a ~a ~a\n" #'form-name (syntax->datum #'form-name)
|
||||
(equal? (syntax->datum #'form-name) 'define-named-remote-server))
|
||||
(with-syntax ([receive-line
|
||||
(cond
|
||||
[(eq? (syntax->datum #'form-name) 'define-named-remote-server)
|
||||
#'(list (list fname-symbol args (... ...)) src)]
|
||||
[else
|
||||
#'(list fname-symbol args (... ...))])]
|
||||
[send-dest
|
||||
(cond
|
||||
[(eq? (syntax->datum #'form-name) 'define-named-remote-server)
|
||||
#'src]
|
||||
[else
|
||||
#'ch])])
|
||||
(define x
|
||||
#'(define-syntax (form-name stx)
|
||||
(syntax-case stx ()
|
||||
[(_ name forms (... ...))
|
||||
(let ()
|
||||
|
||||
(define (is-id? id stx)
|
||||
(equal? (syntax-e stx) id))
|
||||
(define (define? stx) (is-id? 'define-state (stx-car stx)))
|
||||
|
||||
(define-values (states rpcs)
|
||||
(for/fold ([states null]
|
||||
[rpcs null]) ([f (syntax->list #'(forms (... ...)))])
|
||||
(cond
|
||||
[(define? f)
|
||||
(values (append states (list f)) rpcs)]
|
||||
[else
|
||||
(values states (append rpcs (list f)))]
|
||||
)))
|
||||
|
||||
(define (id->string x)
|
||||
(symbol->string (syntax->datum x)))
|
||||
(define (string->id stx x)
|
||||
(datum->syntax stx (string->symbol x)))
|
||||
|
||||
|
||||
(define trans-rpcs
|
||||
(for/list ([f rpcs])
|
||||
(syntax-case f ()
|
||||
[(define-type (fname args (... ...)) body (... ...))
|
||||
(with-syntax ([fname-symbol (string->id stx (format "~a-~a" (id->string #'name) (id->string #'fname)))]
|
||||
[(receive (... ...))
|
||||
(cond
|
||||
[(is-id? 'define-rpc #'define-type) #'((dplace/place-channel-get dest))]
|
||||
[(is-id? 'define-cast #'define-type) #'()]
|
||||
[else (raise "Bad define in define-remote-server")])])
|
||||
|
||||
#'(define/provide (fname-symbol dest args (... ...))
|
||||
(dplace/place-channel-put dest (list (quote fname) args (... ...)))
|
||||
receive (... ...)))])))
|
||||
|
||||
(define trans-place
|
||||
(with-syntax ([(states2 (... ...))
|
||||
(for/list ([s states])
|
||||
(syntax-case s ()
|
||||
[(_ rest (... ...))
|
||||
#'(define rest (... ...))]))]
|
||||
[(cases (... ...))
|
||||
(for/list ([r rpcs])
|
||||
(syntax-case r ()
|
||||
[(define-type (fname args (... ...)) body (... ...))
|
||||
(let ()
|
||||
(with-syntax ([fname-symbol #'(quote fname)]
|
||||
[(send-line (... ...))
|
||||
(cond
|
||||
[(is-id? 'define-rpc #'define-type) #'((place-channel-put send-dest result))]
|
||||
[(is-id? 'define-cast #'define-type) #'()]
|
||||
[else (raise "Bad define in define-remote-server")])])
|
||||
#'[receive-line
|
||||
(define result
|
||||
(let ()
|
||||
body (... ...)))
|
||||
send-line (... ...)
|
||||
(loop)]))]))])
|
||||
#`(place ch
|
||||
(let ()
|
||||
states2 (... ...)
|
||||
(let loop ()
|
||||
(define msg (place-channel-get ch))
|
||||
(define (log-to-parent-real msg #:severity [severity 'info])
|
||||
(place-channel-put ch (log-message severity msg)))
|
||||
(syntax-parameterize ([log-to-parent (make-rename-transformer #'log-to-parent-real)])
|
||||
(match msg
|
||||
cases (... ...)
|
||||
))
|
||||
loop)
|
||||
))))
|
||||
(with-syntax ([mkname (string->id stx (format "make-~a" (id->string #'name)))])
|
||||
(define x
|
||||
#`(begin
|
||||
(require racket/place
|
||||
racket/match)
|
||||
#,@trans-rpcs
|
||||
(define/provide (mkname) #,trans-place)
|
||||
(void)))
|
||||
;(pretty-print (syntax->datum x))
|
||||
x))]))
|
||||
)
|
||||
;(pretty-print (syntax->datum x))
|
||||
x)]))
|
||||
|
||||
(define-define-remote-server define-remote-server)
|
||||
(define-define-remote-server define-named-remote-server)
|
||||
(provide define-remote-server)
|
||||
(provide define-named-remote-server)
|
||||
(provide log-to-parent)
|
||||
|
||||
|
1407
collects/racket/place/distributed.rkt
Normal file
1407
collects/racket/place/distributed.rkt
Normal file
File diff suppressed because it is too large
Load Diff
30
collects/racket/place/distributed/examples/hello-world.rkt
Normal file
30
collects/racket/place/distributed/examples/hello-world.rkt
Normal file
|
@ -0,0 +1,30 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/place)
|
||||
|
||||
(provide main
|
||||
hello-world)
|
||||
|
||||
(define (hello-world)
|
||||
(place ch
|
||||
(printf "hello-world received: ~a\n" (place-channel-get ch))
|
||||
(define HW "Hello World")
|
||||
(place-channel-put ch (format "~a\n" HW))
|
||||
(printf "hello-world sent: ~a\n" HW)))
|
||||
|
||||
|
||||
(define (main)
|
||||
(define-values (vm pl)
|
||||
(spawn-vm-supervise-place-thunk-at/2 "localhost"
|
||||
#:listen-port 6344
|
||||
(get-current-module-path)
|
||||
'hello-world))
|
||||
(master-event-loop
|
||||
vm
|
||||
(after-seconds 2
|
||||
(dplace-put pl "Hello")
|
||||
(printf "master-event-loop received: ~a\n" (dplace-get pl)))
|
||||
|
||||
(after-seconds 6
|
||||
(exit 0))
|
||||
))
|
36
collects/racket/place/distributed/examples/logging/bank.rkt
Normal file
36
collects/racket/place/distributed/examples/logging/bank.rkt
Normal file
|
@ -0,0 +1,36 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/place/define-remote-server)
|
||||
|
||||
(define-remote-server
|
||||
bank
|
||||
|
||||
(define-state accounts (make-hash))
|
||||
(define-rpc (new-account who)
|
||||
(match (hash-has-key? accounts who)
|
||||
[#t '(already-exists)]
|
||||
[else
|
||||
(hash-set! accounts who 0)
|
||||
(log-to-parent #:severity 'debug (format "Logging new account for ~a" who))
|
||||
(list 'created who)]))
|
||||
(define-rpc (removeM who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(cond [(<= amount balance)
|
||||
(define new-balance (- balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance)]
|
||||
[else
|
||||
(list 'insufficient-funds balance)]))]
|
||||
[else
|
||||
(list 'invalid-account who)]))
|
||||
(define-rpc (add who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(define new-balance (+ balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance))]
|
||||
[else
|
||||
(list 'invalid-account who)])))
|
|
@ -0,0 +1,37 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/class
|
||||
racket/place
|
||||
racket/runtime-path
|
||||
"bank.rkt"
|
||||
"tuple.rkt")
|
||||
(define-runtime-path bank-path "bank.rkt")
|
||||
(define-runtime-path tuple-path "tuple.rkt")
|
||||
|
||||
(provide main)
|
||||
|
||||
(define (main)
|
||||
(define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344))
|
||||
(define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server))
|
||||
(define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank))
|
||||
|
||||
(master-event-loop
|
||||
remote-vm
|
||||
(after-seconds 4
|
||||
(displayln (bank-new-account bank-place 'user1))
|
||||
(displayln (bank-add bank-place 'user1 10))
|
||||
(displayln (bank-removeM bank-place 'user1 5)))
|
||||
|
||||
(after-seconds 2
|
||||
(define c (connect-to-named-place remote-vm 'tuple-server))
|
||||
(define d (connect-to-named-place remote-vm 'tuple-server))
|
||||
(displayln (tuple-server-set c "user0" 100))
|
||||
(displayln (tuple-server-set d "user2" 200))
|
||||
(displayln (tuple-server-get c "user0"))
|
||||
(displayln (tuple-server-get d "user2"))
|
||||
(displayln (tuple-server-get d "user0"))
|
||||
(displayln (tuple-server-get c "user2")))
|
||||
(after-seconds 6
|
||||
(node-send-exit remote-vm))
|
||||
(after-seconds 8
|
||||
(exit 0))))
|
14
collects/racket/place/distributed/examples/logging/tuple.rkt
Normal file
14
collects/racket/place/distributed/examples/logging/tuple.rkt
Normal file
|
@ -0,0 +1,14 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/place/define-remote-server)
|
||||
|
||||
(define-named-remote-server
|
||||
tuple-server
|
||||
|
||||
(define-state h (make-hash))
|
||||
(define-rpc (set k v)
|
||||
(hash-set! h k v)
|
||||
(log-to-parent #:severity 'debug (format "~a set to ~a" k v))
|
||||
v)
|
||||
(define-rpc (get k)
|
||||
(hash-ref h k #f)))
|
35
collects/racket/place/distributed/examples/multiple/bank.rkt
Normal file
35
collects/racket/place/distributed/examples/multiple/bank.rkt
Normal file
|
@ -0,0 +1,35 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/place/define-remote-server)
|
||||
|
||||
(define-remote-server
|
||||
bank
|
||||
|
||||
(define-state accounts (make-hash))
|
||||
(define-rpc (new-account who)
|
||||
(match (hash-has-key? accounts who)
|
||||
[#t '(already-exists)]
|
||||
[else
|
||||
(hash-set! accounts who 0)
|
||||
(list 'created who)]))
|
||||
(define-rpc (removeM who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(cond [(<= amount balance)
|
||||
(define new-balance (- balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance)]
|
||||
[else
|
||||
(list 'insufficient-funds balance)]))]
|
||||
[else
|
||||
(list 'invalid-account who)]))
|
||||
(define-rpc (add who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(define new-balance (+ balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance))]
|
||||
[else
|
||||
(list 'invalid-account who)])))
|
|
@ -0,0 +1,39 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/class
|
||||
racket/place
|
||||
racket/runtime-path
|
||||
"bank.rkt")
|
||||
|
||||
(define-runtime-path bank-path "bank.rkt")
|
||||
(define-runtime-path place-worker-path "place-worker.rkt")
|
||||
(define-runtime-path process-worker-path "process-worker.rkt")
|
||||
|
||||
(provide main
|
||||
wait-place-thunk)
|
||||
|
||||
(define (spawn-place-worker-at port message)
|
||||
(spawn-vm-supervise-dynamic-place-at "localhost" #:listen-port port place-worker-path 'place-worker #:initial-message message #:restart-on-exit #f))
|
||||
|
||||
(define (wait-place-thunk)
|
||||
(place ch
|
||||
(printf "BEGINING SLEEP\n")
|
||||
(sleep 5)
|
||||
(printf "SLEEP DONE\n")))
|
||||
|
||||
|
||||
(define (main)
|
||||
(define bank-vm (spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6344 bank-path 'make-bank))
|
||||
(define bank-place (send bank-vm get-first-place))
|
||||
(master-event-loop
|
||||
(spawn-place-worker-at 6341 "ONE")
|
||||
(spawn-place-worker-at 6342 "TWO")
|
||||
(spawn-place-worker-at 6343 "THREE")
|
||||
bank-vm
|
||||
(spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk #:restart-on-exit #t)
|
||||
(every-seconds 3.3 (printf "Hello from every-seconds\n") (flush-output))
|
||||
(after-seconds 2
|
||||
(displayln (bank-new-account bank-place 'user0))
|
||||
(displayln (bank-add bank-place 'user0 10))
|
||||
(displayln (bank-removeM bank-place 'user0 5)))
|
||||
))
|
|
@ -0,0 +1,30 @@
|
|||
#lang racket/base
|
||||
(require racket/place)
|
||||
|
||||
(provide place-worker
|
||||
main)
|
||||
|
||||
(define (place-worker ch)
|
||||
(random-seed (current-seconds))
|
||||
;(define id (place-channel-get ch))
|
||||
(define id "HI")
|
||||
(for ([i (in-range (+ 5 (random 5)))])
|
||||
(displayln (list (current-seconds) id i))
|
||||
(flush-output)
|
||||
(place-channel-put ch (list (current-seconds) id i))
|
||||
(sleep 3)))
|
||||
|
||||
;(define-values (p1 p2) (place-channel))
|
||||
;(place-worker p1)
|
||||
|
||||
(define (main . argv)
|
||||
(define p (place ch
|
||||
(random-seed (current-seconds))
|
||||
;(define id (place-channel-get ch))
|
||||
(define id "HI")
|
||||
(for ([i (in-range (+ 5 (random 5)))])
|
||||
(displayln (list (current-seconds) id i))
|
||||
(flush-output)
|
||||
;(place-channel-put ch (list (current-seconds) id i))
|
||||
#;(sleep 3))))
|
||||
(sync (handle-evt (place-dead-evt p) (lambda (e) (printf "DEAD\n")))))
|
35
collects/racket/place/distributed/examples/named/bank.rkt
Normal file
35
collects/racket/place/distributed/examples/named/bank.rkt
Normal file
|
@ -0,0 +1,35 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/place/define-remote-server)
|
||||
|
||||
(define-remote-server
|
||||
bank
|
||||
|
||||
(define-state accounts (make-hash))
|
||||
(define-rpc (new-account who)
|
||||
(match (hash-has-key? accounts who)
|
||||
[#t '(already-exists)]
|
||||
[else
|
||||
(hash-set! accounts who 0)
|
||||
(list 'created who)]))
|
||||
(define-rpc (removeM who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(cond [(<= amount balance)
|
||||
(define new-balance (- balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance)]
|
||||
[else
|
||||
(list 'insufficient-funds balance)]))]
|
||||
[else
|
||||
(list 'invalid-account who)]))
|
||||
(define-rpc (add who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(define new-balance (+ balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance))]
|
||||
[else
|
||||
(list 'invalid-account who)])))
|
40
collects/racket/place/distributed/examples/named/master.rkt
Normal file
40
collects/racket/place/distributed/examples/named/master.rkt
Normal file
|
@ -0,0 +1,40 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/class
|
||||
racket/place
|
||||
racket/runtime-path
|
||||
"bank.rkt"
|
||||
"tuple.rkt")
|
||||
(define-runtime-path bank-path "bank.rkt")
|
||||
(define-runtime-path tuple-path "tuple.rkt")
|
||||
|
||||
(provide main)
|
||||
|
||||
(define (main)
|
||||
(define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344))
|
||||
(define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server))
|
||||
(define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank))
|
||||
|
||||
(master-event-loop
|
||||
remote-vm
|
||||
(after-seconds 4
|
||||
(displayln (bank-new-account bank-place 'user0))
|
||||
(displayln (bank-add bank-place 'user0 10))
|
||||
(displayln (bank-removeM bank-place 'user0 5)))
|
||||
|
||||
(after-seconds 2
|
||||
(define c (connect-to-named-place remote-vm 'tuple-server))
|
||||
(define d (connect-to-named-place remote-vm 'tuple-server))
|
||||
(tuple-server-hello c)
|
||||
(tuple-server-hello d)
|
||||
(displayln (tuple-server-set c "user0" 100))
|
||||
(displayln (tuple-server-set d "user2" 200))
|
||||
(displayln (tuple-server-get c "user0"))
|
||||
(displayln (tuple-server-get d "user2"))
|
||||
(displayln (tuple-server-get d "user0"))
|
||||
(displayln (tuple-server-get c "user2"))
|
||||
)
|
||||
(after-seconds 8
|
||||
(node-send-exit remote-vm))
|
||||
(after-seconds 10
|
||||
(exit 0))))
|
15
collects/racket/place/distributed/examples/named/tuple.rkt
Normal file
15
collects/racket/place/distributed/examples/named/tuple.rkt
Normal file
|
@ -0,0 +1,15 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/place/define-remote-server)
|
||||
|
||||
(define-named-remote-server
|
||||
tuple-server
|
||||
|
||||
(define-state h (make-hash))
|
||||
(define-rpc (set k v)
|
||||
(hash-set! h k v)
|
||||
v)
|
||||
(define-rpc (get k)
|
||||
(hash-ref h k #f))
|
||||
(define-cast (hello)
|
||||
(printf "Hello from define-cast\n")(flush-output)))
|
|
@ -0,0 +1,19 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/class
|
||||
racket/place)
|
||||
|
||||
(provide wait-place-thunk)
|
||||
(provide main)
|
||||
|
||||
(define (wait-place-thunk)
|
||||
(place ch
|
||||
(printf "BEGINING SLEEP\n")
|
||||
(sleep 5)
|
||||
(printf "SLEEP DONE\n")))
|
||||
|
||||
(define (main)
|
||||
(master-event-loop
|
||||
(spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk #:restart-on-exit #t)
|
||||
(after-seconds 50
|
||||
(exit 0))))
|
|
@ -0,0 +1,18 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/class
|
||||
racket/place)
|
||||
|
||||
(provide wait-place-thunk)
|
||||
(provide main)
|
||||
|
||||
(define (wait-place-thunk)
|
||||
(place ch
|
||||
(printf "BEGINING SLEEP\n")
|
||||
(sleep 1)
|
||||
(printf "SLEEP DONE\n")))
|
||||
|
||||
(define (main)
|
||||
(master-event-loop
|
||||
(spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk
|
||||
#:restart-on-exit (restart-every 5 #:retry 3))))
|
3
collects/racket/place/distributed/info.rkt
Normal file
3
collects/racket/place/distributed/info.rkt
Normal file
|
@ -0,0 +1,3 @@
|
|||
#lang setup/infotab
|
||||
|
||||
(define compile-omit-paths '("examples"))
|
18
collects/racket/place/distributed/launch.rkt
Normal file
18
collects/racket/place/distributed/launch.rkt
Normal file
|
@ -0,0 +1,18 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/tcp
|
||||
racket/place/distributed)
|
||||
|
||||
(provide main)
|
||||
|
||||
(define (main . args)
|
||||
(match args
|
||||
[(list "spawn" node-port)
|
||||
(define listener (tcp-listen (->number node-port) 4 #t))
|
||||
(write-flush (list (->number node-port)))
|
||||
(start-spawned-node-router listener)]
|
||||
|
||||
;; Used to launch Design Pattern 1, MPI style distributed system.
|
||||
[(list "launch" mod-path conf-name i)
|
||||
(startup-config (dynamic-require (->path mod-path) (string->symbol conf-name)) (->number i))]))
|
||||
|
140
collects/scribblings/guide/distributed.scrbl
Normal file
140
collects/scribblings/guide/distributed.scrbl
Normal file
|
@ -0,0 +1,140 @@
|
|||
#lang scribble/doc
|
||||
@(require scribble/manual
|
||||
(except-in "guide-utils.rkt" log-message)
|
||||
scribble/eval
|
||||
scriblib/figure
|
||||
racket/port
|
||||
racket/contract
|
||||
(for-label racket/place/distributed))
|
||||
|
||||
@(define (codeblockfromfile filename)
|
||||
(call-with-input-file
|
||||
filename
|
||||
(lambda (i)
|
||||
(codeblock (port->string i)))))
|
||||
|
||||
@title[#:tag "distributed-places"]{Distributed Places}
|
||||
|
||||
The @racketmodname[racket/place/distributed] library provides support for
|
||||
distributed programming.
|
||||
|
||||
The example bellow demonstrates how to launch a remote racket vm instance,
|
||||
launch remote places on the new remote vm instance, and start an
|
||||
event loop that monitors the remote vm instance.
|
||||
|
||||
The example code can also be found in
|
||||
@filepath{racket/distributed/examples/named/master.rkt}.
|
||||
|
||||
@figure["named-example-master" "examples/named/master.rkt"]{
|
||||
@codeblockfromfile["../../racket/place/distributed/examples/named/master.rkt"]}
|
||||
|
||||
The @racket[spawn-remote-racket-vm] primitive connects to
|
||||
@tt{"localhost"} and starts a racloud node there that listens on port
|
||||
6344 for further instructions. The handle to the new racloud node is
|
||||
assigned to the @racket[remote-vm] variable. Localhost is used so that
|
||||
the example can be run using only a single machine. However localhost
|
||||
can be replaced by any host with ssh publickey access and racket. The
|
||||
@racket[supervise-named-place-thunk-at] creates a new place on the
|
||||
@racket[remote-vm]. The new place will be identified in the future by
|
||||
its name symbol @racket['tuple-server]. A place descriptor is
|
||||
expected to be returned by dynamically requiring
|
||||
@racket['make-tuple-server] from the @racket[tuple-path] module and
|
||||
invoking @racket['make-tuple-server].
|
||||
|
||||
The code for the tuple-server place exists in the file
|
||||
@filepath{tuple.rkt}. The @filepath{tuple.rkt} file contains the use of
|
||||
@racket[define-named-remote-server] form, which defines a RPC server
|
||||
suitiable for invocation by @racket[supervise-named-place-thunk-at].
|
||||
|
||||
|
||||
|
||||
@figure["named-example" "examples/named/tuple.rkt"]{
|
||||
@codeblockfromfile["../../racket/place/distributed/examples/named/tuple.rkt"]}
|
||||
|
||||
|
||||
|
||||
The @racket[define-named-remote-server] form takes an identifier and a
|
||||
list of custom expressions as its arguments. From the identifier a
|
||||
place-thunk function is created by prepending the @tt{make-} prefix.
|
||||
In this case @racket[make-tuple-server]. The
|
||||
@racket[make-tuple-server] identifier is the
|
||||
@racket{compute-instance-place-function-name} given to the
|
||||
@racket[supervise-named-place-thunk-at] form above. The
|
||||
@racket[define-state] custom form translates into a simple
|
||||
@racket[define] form, which is closed over by @racket[define-rpc]
|
||||
forms.
|
||||
|
||||
The @racket[define-rpc] form is expanded into two parts. The first
|
||||
part is the client stub that calls the rpc function. The client
|
||||
function name is formed by concatenating the
|
||||
@racket[define-named-remote-server] identifier, @tt{tuple-server}.
|
||||
with the RPC function name @tt{set} to form @racket[tuple-server-set].
|
||||
The RPC client functions take a destination argument which is a
|
||||
@racket[remote-connection%] descriptor and then the RPC function
|
||||
arguments. The RPC client function sends the RPC function name,
|
||||
@racket[set], and the RPC arguments to the destination by calling an
|
||||
internal function @racket[named-place-channel-put]. The RPC client
|
||||
then calls @racket[named-place-channel-get] to wait for the RPC
|
||||
response.
|
||||
|
||||
The second expansion part of @racket[define-rpc] is the server
|
||||
implementation of the RPC call. The server is implemented by a match
|
||||
expression inside the @racket[make-tuple-server] function. The match
|
||||
clause for @racket[tuple-server-set] matches on messages beginning
|
||||
with the @racket['set] symbol. The server executes the RPC call with
|
||||
the communicated arguments and sends the result back to the RPC
|
||||
client.
|
||||
|
||||
The @racket[define-rpc] form is similar to the @racket[define-rpc] form
|
||||
except there is no reply message from the server to client
|
||||
|
||||
@figure["define-named-remote-server-expansion" "Expansion of define-named-remote-server"]{
|
||||
@codeblock{
|
||||
'(begin
|
||||
(require racket/place racket/match)
|
||||
(define/provide
|
||||
(tuple-server-set dest k v)
|
||||
(named-place-channel-put dest (list 'set k v))
|
||||
(named-place-channel-get dest))
|
||||
(define/provide
|
||||
(tuple-server-get dest k)
|
||||
(named-place-channel-put dest (list 'get k))
|
||||
(named-place-channel-get dest))
|
||||
(define/provide
|
||||
(tuple-server-hello dest)
|
||||
(named-place-channel-put dest (list 'hello)))
|
||||
(define/provide
|
||||
(make-tuple-server)
|
||||
(place
|
||||
ch
|
||||
(let ()
|
||||
(define h (make-hash))
|
||||
(let loop ()
|
||||
(define msg (place-channel-get ch))
|
||||
(define (log-to-parent-real msg #:severity (severity 'info))
|
||||
(place-channel-put ch (log-message severity msg)))
|
||||
(syntax-parameterize
|
||||
((log-to-parent (make-rename-transformer #'log-to-parent-real)))
|
||||
(match
|
||||
msg
|
||||
((list (list 'set k v) src)
|
||||
(define result (let () (hash-set! h k v) v))
|
||||
(place-channel-put src result)
|
||||
(loop))
|
||||
((list (list 'get k) src)
|
||||
(define result (let () (hash-ref h k #f)))
|
||||
(place-channel-put src result)
|
||||
(loop))
|
||||
((list (list 'hello) src)
|
||||
(define result
|
||||
(let () (printf "Hello from define-cast\n") (flush-output)))
|
||||
(loop))))
|
||||
loop))))
|
||||
(void))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -418,3 +418,4 @@ argument instead.
|
|||
|
||||
@include-section["futures.scrbl"]
|
||||
@include-section["places.scrbl"]
|
||||
@include-section["distributed.scrbl"]
|
||||
|
|
|
@ -18,3 +18,4 @@ support for parallelism to improve performance.
|
|||
@include-section["thread-local.scrbl"]
|
||||
@include-section["futures.scrbl"]
|
||||
@include-section["places.scrbl"]
|
||||
@include-section["distributed.scrbl"]
|
||||
|
|
746
collects/scribblings/reference/distributed.scrbl
Normal file
746
collects/scribblings/reference/distributed.scrbl
Normal file
|
@ -0,0 +1,746 @@
|
|||
#lang scribble/manual
|
||||
@(require scribble/eval
|
||||
scribble/struct
|
||||
scribble/decode
|
||||
racket/contract
|
||||
racket/place/distributed
|
||||
racket/sandbox
|
||||
racket/class)
|
||||
@(require (for-label racket/place/distributed racket/class))
|
||||
|
||||
|
||||
@(define evaler (make-base-eval))
|
||||
@(interaction-eval #:eval evaler (require racket/place/distributed
|
||||
racket/class
|
||||
racket/place/define-remote-server))
|
||||
|
||||
@(define output-evaluator
|
||||
(parameterize ([sandbox-output 'string]
|
||||
[sandbox-error-output 'string])
|
||||
(make-evaluator 'racket/base)))
|
||||
|
||||
@(define-syntax interaction-output
|
||||
(syntax-rules ()
|
||||
[(_ #:eval evaluator e)
|
||||
(begin
|
||||
(interaction #:eval evaluator e)
|
||||
(printf "K ~a\n" (get-output evaluator)))]))
|
||||
|
||||
@title[#:tag "distributed-places"]{Distributed Places}
|
||||
|
||||
@defmodule[racket/place/distributed]
|
||||
|
||||
Distributed Places is a prototype of a distributed computing framework for
|
||||
Racket.
|
||||
|
||||
Distributed Places' distributed computing design is centered around
|
||||
machine nodes that do computation. The user/programmer configures a
|
||||
new distributed system using a declarative syntax and callbacks. A
|
||||
node begins life with one initial place, the message router. @;{See
|
||||
@figure-ref["node-places"]}. Once the node has been configured the
|
||||
message router is activated by calling the @racket[master-event-loop]
|
||||
function. The message router listens on a TCP port for incoming
|
||||
connections from other nodes in the distributed system. Compute places
|
||||
can be spawned within the node by sending place-spawn request messages
|
||||
to the node's message router.
|
||||
|
||||
The use of Distributed Places is predicated on a couple assumptions:
|
||||
|
||||
@itemlist[
|
||||
@item{ .ssh/config and authorized_keys are configured correctly to
|
||||
allow passwordless connection to remote hosts using public key
|
||||
authentication.}
|
||||
@item{The same user account is used across all nodes in the
|
||||
distributed network.}
|
||||
@item{All machines run the same version of Racket.}
|
||||
]
|
||||
|
||||
@examples[
|
||||
(module hello-world-example racket/base
|
||||
(require racket/place/distributed
|
||||
racket/place)
|
||||
|
||||
(provide main
|
||||
hello-world)
|
||||
|
||||
(define (hello-world)
|
||||
(place ch
|
||||
(printf "hello-world received: ~a\n" (place-channel-get ch))
|
||||
(define HW "Hello World")
|
||||
(place-channel-put ch (format "~a\n" HW))
|
||||
(printf "hello-world sent: ~a\n" HW)))
|
||||
|
||||
|
||||
(define (main)
|
||||
(define-values (vm pl)
|
||||
(spawn-vm-supervise-place-thunk-at/2 "localhost"
|
||||
#:listen-port 6344
|
||||
(get-current-module-path)
|
||||
'hello-world))
|
||||
(master-event-loop
|
||||
vm
|
||||
(after-seconds 2
|
||||
(dplace-put pl "Hello")
|
||||
(printf "master-event-loop received: ~a\n" (dplace-get pl)))
|
||||
|
||||
(after-seconds 6
|
||||
(exit 0))
|
||||
)))
|
||||
|
||||
(require 'hello-world-example)
|
||||
]
|
||||
|
||||
@defproc[(master-event-loop [ec events-container<%>?] ...+) void?]{
|
||||
Waits in an endless loop for one of many events to become ready. The
|
||||
@racket[master-event-loop] procedure constructs a @racket[node%]
|
||||
instance to serve as the message router for then node. The
|
||||
@racket[master-event-loop] procedure then adds all the declared
|
||||
@racket[events-container<%>]s to the @racket[node%] and finally calls
|
||||
the never ending loop @racket[sync-events] method, which handles the
|
||||
events for the node.
|
||||
}
|
||||
|
||||
@(define (p . l) (decode-paragraph l))
|
||||
@(define spawn-vm-note
|
||||
(make-splice
|
||||
(list
|
||||
@p{This function returns a @racket[remote-node%] instance not a @racket[remote-place%]
|
||||
Call @racket[(send vm get-first-place)] to obtain the @racket[remote-place%] instance.})) )
|
||||
|
||||
@defproc[(spawn-vm-supervise-dynamic-place-at
|
||||
[hostname string?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-place-function-name symbol?]
|
||||
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
|
||||
[#:initial-message initial-message any? #f]
|
||||
[#:racket-path racketpath string-path? (racket-path)]
|
||||
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
|
||||
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) remote-place?]{
|
||||
Spawns a new remote vm node at @racket[hostname] with one compute instance place specified by
|
||||
the @racket[compute-instance-module-path] and @racket[compute-instance-place-function-name]
|
||||
parameters. This procedure constructs the new remote-place by calling
|
||||
@racket[(dynamic-place compute-instance-module-path compute-instance-place-function-name)].
|
||||
@|spawn-vm-note|
|
||||
}
|
||||
|
||||
@defproc[(spawn-vm-supervise-dynamic-place-at/2
|
||||
[hostname string?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-place-function-name symbol?]
|
||||
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
|
||||
[#:initial-message initial-message any? #f]
|
||||
[#:racket-path racketpath string-path? (racket-path)]
|
||||
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
|
||||
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) (values remote-node%? remote-place%?)]{
|
||||
Spawns a new remote vm node at @racket[hostname] with one compute instance place specified by
|
||||
the @racket[compute-instance-module-path] and @racket[compute-instance-place-function-name]
|
||||
parameters. This procedure constructs the new remote-place by calling
|
||||
@racket[(dynamic-place compute-instance-module-path compute-instance-place-function-name)].
|
||||
The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two return values.
|
||||
}
|
||||
|
||||
@(define place-thunk-function
|
||||
(make-splice
|
||||
(list
|
||||
@p{
|
||||
The @racket[compute-instance-thunk-function-name] procedure is
|
||||
responsible for creating the place and returning the newly constructed
|
||||
the place descriptor. The
|
||||
@racket[compute-instance-thunk-function-name] procedure should
|
||||
accomplish this by calling either @racket[dynamic-place] or
|
||||
@racket[place] inside the thunk.
|
||||
})) )
|
||||
@defproc[(spawn-vm-supervise-place-thunk-at
|
||||
[hostname string?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-thunk-function-name symbol?]
|
||||
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
|
||||
[#:initial-message initial-message any? #f]
|
||||
[#:racket-path racketpath string-path? (racket-path)]
|
||||
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
|
||||
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{
|
||||
Spawns a new remote vm node at @racket[hostname] with one compute instance place.
|
||||
the @racket[compute-instance-module-path] and @racket[compute-instance-thunk-function-name]
|
||||
parameters. This procedure constructs the new remote-place by calling
|
||||
dynamically requiring the
|
||||
@racket[compute-instance-thunk-function-name] and invoking the
|
||||
@racket[compute-instance-thunk-function-name].
|
||||
|
||||
@racket[((dynamic-require compute-instance-module-path compute-instance-thunk-function-name))]
|
||||
|
||||
@|place-thunk-function|
|
||||
@|spawn-vm-note|
|
||||
}
|
||||
@defproc[(spawn-vm-supervise-place-thunk-at/2
|
||||
[hostname string?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-thunk-function-name symbol?]
|
||||
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
|
||||
[#:initial-message initial-message any? #f]
|
||||
[#:racket-path racketpath string-path? (racket-path)]
|
||||
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
|
||||
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) (values remote-vm%? remote-place%?)]{
|
||||
Spawns a new remote vm node at @racket[hostname] with one compute instance place.
|
||||
the @racket[compute-instance-module-path] and @racket[compute-instance-thunk-function-name]
|
||||
parameters. This procedure constructs the new remote-place by calling
|
||||
dynamically requiring the
|
||||
@racket[compute-instance-thunk-function-name] and invoking the
|
||||
@racket[compute-instance-thunk-function-name].
|
||||
|
||||
@racket[((dynamic-require compute-instance-module-path compute-instance-thunk-function-name))]
|
||||
|
||||
@|place-thunk-function|
|
||||
The new @racket[remote-vm%] and @racket[remote-place%] instances make up the two return values.
|
||||
}
|
||||
|
||||
@defproc[(spawn-remote-racket-vm
|
||||
[hostname string?]
|
||||
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]
|
||||
[#:racket-path racketpath string-path? (racket-path)]
|
||||
[#:ssh-bin-path sshpath string-path? (ssh-bin-path)]
|
||||
[#:launcher-path launcherpath string-path? (->string distributed-launch-path)]) remote-node%?]{
|
||||
Spawns a new remote vm node at @racket[hostname] and returns a @racket[remote-node%] handle.
|
||||
}
|
||||
@defproc[(supervise-dynamic-place-at
|
||||
[remote-vm remote-vm?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-place-function-name symbol?]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{
|
||||
Creates a new place on the @racket[remote-vm] by using
|
||||
@racket[dynamic-place] to invoke
|
||||
@racket[compute-instance-place-function-name] from the module
|
||||
@racket[compute-instance-module-path].
|
||||
}
|
||||
|
||||
@defproc[(supervise-place-thunk-at
|
||||
[remote-vm remote-vm?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-thunk-function-name symbol?]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{
|
||||
Creates a new place on the @racket[remote-vm] by executing the thunk
|
||||
@racket[compute-instance-thunk-function-name] from the module
|
||||
@racket[compute-instance-module-path].
|
||||
|
||||
@|place-thunk-function|
|
||||
}
|
||||
|
||||
@defproc[(supervise-process-at
|
||||
[hostname string?]
|
||||
[commandline-argument string?] ...+
|
||||
[#:listen-port port non-negative-integer? DEFAULT-ROUTER-PORT]) remote-process%?]{
|
||||
Spawns an attached external process at host @racket[hostname].
|
||||
}
|
||||
|
||||
@defproc[(supervise-named-dynamic-place-at
|
||||
[remote-vm remote-vm?]
|
||||
[place-name symbol?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-place-function-name symbol?]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{
|
||||
Creates a new place on the @racket[remote-vm] by using
|
||||
@racket[dynamic-place] to invoke
|
||||
@racket[compute-instance-place-function-name] from the module
|
||||
@racket[compute-instance-module-path]. The @racket[place-name] symbol
|
||||
is used to establish later connections to the named place.
|
||||
}
|
||||
|
||||
@defproc[(supervise-named-place-thunk-at
|
||||
[remote-vm remote-vm?]
|
||||
[place-name symbol?]
|
||||
[compute-instance-module-path module-path?]
|
||||
[compute-instance-thunk-function-name symbol?]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]) remote-place%?]{
|
||||
Creates a new place on the @racket[remote-vm] by executing the thunk
|
||||
@racket[compute-instance-thunk-function-name] from the module
|
||||
@racket[compute-instance-module-path]. The @racket[place-name] symbol
|
||||
is used to establish later connections to the named place.
|
||||
|
||||
|
||||
@|place-thunk-function|
|
||||
}
|
||||
|
||||
@defform[(restart-every [seconds (and/c real? nonnegative?)]
|
||||
[#:retry retry (or/c nonnegative-integer? #f) #f]
|
||||
[#:on-final-fail on-final-fail (or/c #f (-> any/c)) #f])]{
|
||||
|
||||
Returns a @racket[restarter%] instance that should be supplied to a @racket[#:restart-on-exit] argument.
|
||||
}
|
||||
|
||||
@defform[(every-seconds seconds body ....)]{
|
||||
Returns a @racket[respawn-and-fire%] instance that should be supplied to a @racket[master-event-loop].
|
||||
The @racket[respawn-and-fire%] instance executes the body expressions every @racket[seconds].
|
||||
}
|
||||
|
||||
@defform[(after-seconds seconds body ....)]{
|
||||
Returns a @racket[after-seconds%] instance that should be supplied to a @racket[master-event-loop].
|
||||
Executes the body expressions after a delay of @racket[seconds] from the start of the event loop.
|
||||
}
|
||||
|
||||
@defproc[(connect-to-named-place [vm remote-node%?] [name symbol?]) remote-connection%?]{
|
||||
Connects to a named place on the @racket[vm] named @racket[name] and returns a @racket[remote-connection%] object.
|
||||
}
|
||||
|
||||
@defproc[(log-message [severity symbol?] [msg string?]) void?]{
|
||||
Logs a message, which traversed the process tree until it reaches the root, where it is printed to the console.
|
||||
}
|
||||
|
||||
@definterface[event-container<%> ()]{
|
||||
All objects that are supplied to the @racket[master-event-loop] must
|
||||
implement the @racket[event-container<%>] interface. The
|
||||
@racket[master-event-loop] calls the @racket[register] method on each
|
||||
supplied @racket[event-container<%>] to obtain a list of events the
|
||||
event loop should wait for.
|
||||
|
||||
@defmethod[(register [events (listof events?)]) (listof events?)]{
|
||||
Returns the list of events inside the @racket[event-container<%>] that
|
||||
should be waited on by the @racket[master-event-loop].
|
||||
}
|
||||
|
||||
The following classes all implement @racket[event-container<%>] and
|
||||
can be supplied to a @racket[master-event-loop]:
|
||||
@racket[spawned-process%], @racket[place-socket-bridge%],
|
||||
@racket[node%], @racket[remote-node%], @racket[remote-place%],
|
||||
@racket[place%] @racket[connection%], @racket[respawn-and-fire%], and
|
||||
@racket[after-seconds%].
|
||||
|
||||
}
|
||||
|
||||
@defclass[spawned-process% object% (event-container<%>)
|
||||
(defmethod (get-pid) exact-positive-integer?) ]{
|
||||
|
||||
@defconstructor[([cmdline-list (listof (or/c string? path?))]
|
||||
[parent remote-node%? #f]
|
||||
)]{
|
||||
The @racket[cmdline-list] is a list of command line arguments of type @racket[string] and/or @racket[path].
|
||||
|
||||
The @racket[parent] argument is a @racket[remote-node%] instance that will be notified when the process dies via
|
||||
a @racket[(send parent process-died this)] call.
|
||||
}
|
||||
}
|
||||
|
||||
@examples[ #:eval evaler
|
||||
(new spawned-process% [cmdline-list
|
||||
(list (ssh-bin-path) "localhost" (racket-path) "-tm" distributed-launch-path "spawn" (->string 6340))])
|
||||
]
|
||||
|
||||
@defclass[place-socket-bridge% object% (event-container<%>)
|
||||
(defmethod (get-sc-id) exact-positive-integer?) ]{
|
||||
|
||||
@defconstructor[([pch place-channel?]
|
||||
[sch socket-connection%?]
|
||||
[id exact-positive-integer?]
|
||||
)]{
|
||||
The @racket[pch] argument is a @racket[place-channel]. Messages
|
||||
received on @racket[pch] are forwarded to the socket-connection%
|
||||
@racket[sch] via a @racket[dcgm] message. e.g.
|
||||
@racket[(sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))]
|
||||
The @racket[id] is a @racket[exact-positive-integer] that identifies
|
||||
the socket-connection subchannel for this inter-node place connection.
|
||||
}
|
||||
}
|
||||
|
||||
@defclass[node% object% (event-container<%>)]{
|
||||
|
||||
The @racket[node%] instance controls a distributed places node. It
|
||||
launches compute places and routes inter-node place messages in the
|
||||
distributed system. The @racket[master-event-loop] form constructs a
|
||||
@racket[node%] instance under the hood. Newly spawned nodes also have
|
||||
a @racket[node%] instance in their initial place that serves as the
|
||||
node's message router.
|
||||
|
||||
@defconstructor[([listen-port tcp-listen-port? #f])]{
|
||||
Constructs a @racket[node%] that will listen on @racket[listen-port] for inter-node connections.}
|
||||
|
||||
@defmethod[(sync-events) void?]{
|
||||
Starts the never ending event loop for this distributed places node.
|
||||
}
|
||||
}
|
||||
|
||||
@(define place-exec-note
|
||||
(make-splice
|
||||
(list
|
||||
@p{The @racket[place-exec] argument describes how the remote place should be launched.}
|
||||
@itemize[@item{@racket[(list 'place place-module-path place-thunk)]}
|
||||
@item{@racket[(list 'dynamic-place place-module-path place-func)]}]
|
||||
@p{The difference between these two launching methods is that
|
||||
the @racket['place] version of @racket[place-exec] expects a
|
||||
thunk, zero argument function, to be exported by the module
|
||||
@racket[place-module-path]. Executing the thunk is expected to
|
||||
create a new place and return a place descriptor to the newly
|
||||
created place. The @racket['dynamic-place] version of
|
||||
@racket[place-exec] expects place-func to be a function taking a
|
||||
single argument, the initial channel argument, and calls
|
||||
@racket[dynamic-place] on behalf of the user and creates the new
|
||||
place from the @racket[place-module-path] and
|
||||
@racket[place-func].}
|
||||
)))
|
||||
|
||||
@(define one-sided-note
|
||||
(make-splice
|
||||
(list
|
||||
@p{The @racket[#:one-sided-place] argument is an internal use
|
||||
argument for launching remote places from within a place using
|
||||
the old design pattern 1.})))
|
||||
|
||||
@(define restart-on-exit-note
|
||||
(make-splice
|
||||
(list
|
||||
@p{The @racket[#:restart-on-exit] boolean argument instructs the
|
||||
remote-place% instance to respawn the place on the remote node
|
||||
should it exit or terminate at any time. This boolean needs to
|
||||
be expanded to a restart criteria object in the future.})))
|
||||
|
||||
|
||||
@defclass[remote-node% object% (event-container<%>)]{
|
||||
|
||||
The @racket[node%] instance controls a distributed places node. It
|
||||
launches compute places and routes inter-node place messages in the
|
||||
distributed system. This is the remote api to a distributed places
|
||||
node. Instances of @racket[remote-node%] are returned by
|
||||
@racket[spawn-remote-racket-vm],
|
||||
@racket[spawn-vm-supervise-dynamic-place-at], and
|
||||
@racket[spawn-vm-supervise-place-thunk-at].
|
||||
|
||||
@defconstructor[([listen-port tcp-listen-port? #f]
|
||||
[restart-on-exit boolean? #f])]{
|
||||
Constructs a @racket[node%] that will listen on
|
||||
@racket[listen-port] for inter-node connections.
|
||||
|
||||
When set to true the @racket[restart-on-exit] parameter causes the
|
||||
specified node to be restarted when the ssh session spawning the node
|
||||
dies.
|
||||
}
|
||||
|
||||
@defmethod[(get-first-place) remote-place%?]{
|
||||
Returns the @racket[remote-place%] object instance for the first place spawned on this node.
|
||||
}
|
||||
@defmethod[(get-first-place-channel) place-channel?]{
|
||||
Returns the communication channel for the first place spawned on this node.
|
||||
}
|
||||
@defmethod[(get-log-prefix) string?]{
|
||||
Returns @racket[(format "PLACE ~a:~a" host-name listen-port)]
|
||||
}
|
||||
|
||||
@defmethod[(launch-place
|
||||
[place-exec list?]
|
||||
[#:restart-on-exit restart-on-exit boolean? #f]
|
||||
[#:one-sided-place one-sided-place boolean? #f]) remote-place%?]{
|
||||
Launches a place on the remote node represented by this @racket[remote-node%] instance.
|
||||
@|place-exec-note|
|
||||
@|one-sided-note|
|
||||
@|restart-on-exit-note|
|
||||
}
|
||||
|
||||
@defmethod[(remote-connect [name string?]) remote-connection%]{
|
||||
Connects to a named place on the remote node represented by this @racket[remote-node%] instance.
|
||||
}
|
||||
|
||||
@defmethod[(send-exit) void?]{
|
||||
Sends a message instructing the remote node represented by this
|
||||
@racket[remote-node%] instance to exit immediately
|
||||
}
|
||||
}
|
||||
|
||||
@defclass[remote-place% object% (event-container<%>)]{
|
||||
|
||||
The @racket[remote-place%] instance provides a remote api to a place
|
||||
running on a remote distributed places node. It launches a compute
|
||||
places and routes inter-node place messages to the remote place.
|
||||
|
||||
@defconstructor[([vm remote-node%?]
|
||||
[place-exec list?]
|
||||
[restart-on-exit #f]
|
||||
[one-sided-place #f]
|
||||
[on-channel/2 #f])]{
|
||||
Constructs a @racket[remote-place%] instance.
|
||||
@|place-exec-note|
|
||||
@|one-sided-note|
|
||||
@|restart-on-exit-note|
|
||||
|
||||
See @racket[set-on-channel/2!] for description of @racket[on-channel/2] argument.
|
||||
}
|
||||
|
||||
@defmethod[(set-on-channel/2! [callback (-> channel msg void?)]) void?]{
|
||||
Installs a handler function that handles messages from the remote place.
|
||||
The @racket[setup/distributed-docs] module uses this callback to handle job completion messages.
|
||||
}
|
||||
}
|
||||
|
||||
@defproc[(dplace-put [pl remote-place%?] [msg any/c]) void?]{
|
||||
This function is used inside @racket[master-event-loop] callbacks.
|
||||
It sends messages to remote places.
|
||||
}
|
||||
|
||||
@defproc[(dplace-get [pl remote-place%?]) any/c]{
|
||||
This function is used inside @racket[master-event-loop] callbacks.
|
||||
It takes the current delimited continuation and resumes it when a message arrives from @racket[pl].
|
||||
}
|
||||
|
||||
@defclass[remote-connection% object% (event-container<%>)]{
|
||||
|
||||
The @racket[remote-connection%] instance provides a remote api to a named place
|
||||
running on a remote distributed places node. It connects to a named compute
|
||||
places and routes inter-node place messages to the remote place.
|
||||
|
||||
@defconstructor[([vm remote-node%?]
|
||||
[name string?]
|
||||
[restart-on-exit #f]
|
||||
[on-channel/2 #f])]{
|
||||
Constructs a @racket[remote-place%] instance.
|
||||
@|restart-on-exit-note|
|
||||
|
||||
See @racket[set-on-channel/2!] for description of @racket[on-channel/2] argument.
|
||||
}
|
||||
|
||||
@defmethod[(set-on-channel/2! [callback (-> channel msg void?)]) void?]{
|
||||
Installs a handler function that handles messages from the remote place.
|
||||
The @racket[setup/distributed-docs] module uses this callback to handle job completion messages.
|
||||
}
|
||||
}
|
||||
|
||||
@defclass[place% object% (event-container<%>)]{
|
||||
|
||||
The @racket[place%] instance represents a place launched on a
|
||||
distributed places node at that node. It launches a compute places and
|
||||
routes inter-node place messages to the place.
|
||||
|
||||
@defconstructor[([vm remote-place%?]
|
||||
[place-exec list?]
|
||||
[ch-id exact-positive-integer?]
|
||||
[sc socket-connection%?]
|
||||
[on-place-dead (-> event void?) default-on-place-dead])]{
|
||||
Constructs a @racket[remote-place%] instance.
|
||||
@|place-exec-note|
|
||||
The @racket[ch-id] and @racket[sc] arguments are internally used to
|
||||
establish routing between the remote node spawning this place and the
|
||||
place itself. The @racket[on-place-dead] callback handles the event
|
||||
when the newly spawned place terminates.
|
||||
}
|
||||
|
||||
@defmethod[(wait-for-die) void?]{
|
||||
Blocks and waits for the subprocess representing the @racket[remote-node%] to exit.
|
||||
}
|
||||
}
|
||||
|
||||
@defproc[(node-send-exit [remote-node% node]) void?]{
|
||||
Sends @racket[node] a message telling it to exit immediately.
|
||||
}
|
||||
@defproc[(node-get-first-place [remote-node% node]) remote-place%?]{
|
||||
Returns the @racket[remote-place%] instance of the first place spawned at this node
|
||||
}
|
||||
|
||||
@defclass[connection% object% (event-container<%>)]{
|
||||
|
||||
The @racket[connection%] instance represents a connection to a
|
||||
named-place instance running on the current node. It routes inter-node
|
||||
place messages to the named place.
|
||||
|
||||
@defconstructor[([vm remote-node%?]
|
||||
[name string?]
|
||||
[ch-id exact-positive-integer?]
|
||||
[sc socket-connection%?])]{
|
||||
Constructs a @racket[remote-place%] instance.
|
||||
@|place-exec-note|
|
||||
The @racket[ch-id] and @racket[sc] arguments are internally used to
|
||||
establish routing between the remote node and this named-place.
|
||||
}
|
||||
}
|
||||
|
||||
@defclass[respawn-and-fire% object% (event-container<%>)]{
|
||||
|
||||
The @racket[respawn-and-fire%] instance represents a thunk that should
|
||||
execute every @racket[n] seconds.
|
||||
|
||||
@defconstructor[([seconds (and/c real? (not/c negative?))]
|
||||
[thunk (-> void?)])]{
|
||||
Constructs a @racket[respawn-and-fire%] instance that when placed
|
||||
inside a @racket[master-event-loop] construct causes the supplied
|
||||
thunk to execute every @racket[n] seconds.
|
||||
}
|
||||
}
|
||||
|
||||
@defclass[after-seconds% object% (event-container<%>)]{
|
||||
|
||||
The @racket[after-seconds%] instance represents a thunk that should
|
||||
execute after @racket[n] seconds.
|
||||
|
||||
@defconstructor[([seconds (and/c real? (not/c negative?))]
|
||||
[thunk (-> void?)])]{
|
||||
Constructs an @racket[after-seconds%] instance that when placed
|
||||
inside a @racket[master-event-loop] construct causes the supplied
|
||||
thunk to execute after @racket[n] seconds.
|
||||
}
|
||||
}
|
||||
|
||||
@defclass[restarter% after-seconds% (event-container<%>)]{
|
||||
|
||||
The @racket[restarter%] instance represents a restart strategy.
|
||||
|
||||
@defconstructor[([seconds (and/c real? (not/c negative?))]
|
||||
[retry (or/c #f nonnegative-integer?) #f]
|
||||
[on-final-fail (or/c #f (-> any/c)) #f])]{
|
||||
Constructs an @racket[restarter%] instance that when supplied to a
|
||||
@racket[#:restart-on-exit] argument, attempts to restart the process
|
||||
every @racket[seconds]. The @racket[retry] argument specifies how
|
||||
many time to attempt to restart the process before giving up. If the
|
||||
process stays alive for @racket[(* 2 seconds)] the attempted retries
|
||||
count is reset to @racket[0]. The @racket[on-final-fail] thunk is
|
||||
called when the number of retries is exceeded
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@defform[(define-remote-server name forms ...)]{
|
||||
|
||||
Creates a @racket[make-name] function that spawns a place running a instance of the @racket[name]
|
||||
remote server. The server sits in a loop waiting for rpc requests from the @racket[define-rpc] functions
|
||||
documented below.
|
||||
|
||||
@defform[(define-state id value)]{
|
||||
Expands to a @@racket[define], which is closed over by the @racket[define-rpc] functions
|
||||
to form local state.
|
||||
}
|
||||
|
||||
@defform[(define-rpc (id args ...) body ...)]{
|
||||
Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to
|
||||
the rpc server @racket[rpc-place] and waits for a response.
|
||||
@racket[(define (name-id rpc-place args ...) body)]
|
||||
}
|
||||
|
||||
@defform[(define-cast (id args ...) body ...)]{
|
||||
Expands to a client rpc function @tt{name-id} which sends @racket[id] and @racket[args ...] to
|
||||
the rpc server @racket[rpc-place] but does not receive any response. A cast is a one-way communication
|
||||
technique.
|
||||
@racket[(define (name-id rpc-place args ...) body)]
|
||||
}
|
||||
}
|
||||
|
||||
@examples[ #:eval evaler
|
||||
(define-named-remote-server
|
||||
tuple-server
|
||||
|
||||
(define-state h (make-hash))
|
||||
(define-rpc (set k v)
|
||||
(hash-set! h k v)
|
||||
v)
|
||||
(define-rpc (get k)
|
||||
(hash-ref h k #f)))]
|
||||
|
||||
@examples[ #:eval evaler
|
||||
(define-remote-server
|
||||
bank
|
||||
|
||||
(define-state accounts (make-hash))
|
||||
(define-rpc (new-account who)
|
||||
(match (hash-has-key? accounts who)
|
||||
[#t '(already-exists)]
|
||||
[else
|
||||
(hash-set! accounts who 0)
|
||||
(list 'created who)]))
|
||||
(define-rpc (removeM who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(cond [(<= amount balance)
|
||||
(define new-balance (- balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance)]
|
||||
[else
|
||||
(list 'insufficient-funds balance)]))]
|
||||
[else
|
||||
(list 'invalid-account who)]))
|
||||
(define-rpc (add who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(define new-balance (+ balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance))]
|
||||
[else
|
||||
(list 'invalid-account who)])))]
|
||||
|
||||
@defthing[distributed-launch-path path?]{
|
||||
Contains the path to the distributed places launcher.
|
||||
}
|
||||
|
||||
@defproc[(ssh-bin-path) string?]{
|
||||
Returns the path to the ssh binary on the local system in string form.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(ssh-bin-path)
|
||||
]
|
||||
|
||||
@defproc[(racket-path) path?]{
|
||||
Returns the path to the currently executing racket binary on the local system.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(racket-path)
|
||||
]
|
||||
|
||||
@defproc[(distributed-places-path) string?]{
|
||||
Returns the path to the distributed-places module on the local system.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(distributed-places-path)
|
||||
]
|
||||
|
||||
@defform[(get-current-module-path)]{
|
||||
Returns the path to the current module.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(begin
|
||||
(module my-module racket/base
|
||||
(require racket/place/distributed)
|
||||
(get-current-module-path))
|
||||
(require 'my-module))
|
||||
]
|
||||
|
||||
@defproc[(->string) string?]{
|
||||
Coerces strings, numbers, symbols, and paths to a string.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(->string "hello")
|
||||
(->string 1)
|
||||
(->string 'bye)
|
||||
(->string (build-path "ridge"))
|
||||
(->string #"bytes")
|
||||
]
|
||||
|
||||
@defproc[(->number) number?]{
|
||||
Coerces strings, numbers, to a number.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(->number "100")
|
||||
(->number 1)
|
||||
]
|
||||
|
||||
@defproc[(->path) path?]{
|
||||
Coerces paths and strings to a path.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(->path "/usr/bin")
|
||||
(->path (build-path "ridge"))
|
||||
]
|
||||
|
||||
@defproc[(->length) path?]{
|
||||
Returns the length of strings, bytes, and lists.
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(->length "Boo")
|
||||
(->length #"Woo")
|
||||
(->length (list 1 2 3 4))
|
||||
]
|
||||
|
||||
@defproc[(write-flush [datum any?] [port port?]) (void)]{
|
||||
Writes @racket[datum] to @racket[port] and then flushes @racket[port].
|
||||
}
|
||||
@examples[ #:eval evaler
|
||||
(write-flush "Hello World" (current-output-port))
|
||||
]
|
||||
|
||||
@(close-eval evaler)
|
|
@ -43,7 +43,8 @@
|
|||
(printf "~s\n" res)
|
||||
(let ([ok? (equal? expect res)])
|
||||
(unless ok?
|
||||
(printf " BUT EXPECTED ~s\n" expect))
|
||||
(printf " BUT EXPECTED ~s\n" expect)
|
||||
(eprintf "ERROR\n"))
|
||||
ok?)))
|
||||
|
||||
(define (echo ch) (place-channel-put ch (place-channel-get ch)))
|
||||
|
|
35
collects/tests/racket/place/distributed/bank.rkt
Normal file
35
collects/tests/racket/place/distributed/bank.rkt
Normal file
|
@ -0,0 +1,35 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/place/define-remote-server)
|
||||
|
||||
(define-remote-server
|
||||
bank
|
||||
|
||||
(define-state accounts (make-hash))
|
||||
(define-rpc (new-account who)
|
||||
(match (hash-has-key? accounts who)
|
||||
[#t '(already-exists)]
|
||||
[else
|
||||
(hash-set! accounts who 0)
|
||||
(list 'created who)]))
|
||||
(define-rpc (removeM who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(cond [(<= amount balance)
|
||||
(define new-balance (- balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance)]
|
||||
[else
|
||||
(list 'insufficient-funds balance)]))]
|
||||
[else
|
||||
(list 'invalid-account who)]))
|
||||
(define-rpc (add who amount)
|
||||
(cond
|
||||
[(hash-ref accounts who (lambda () #f)) =>
|
||||
(lambda (balance)
|
||||
(define new-balance (+ balance amount))
|
||||
(hash-set! accounts who new-balance)
|
||||
(list 'ok new-balance))]
|
||||
[else
|
||||
(list 'invalid-account who)])))
|
54
collects/tests/racket/place/distributed/distributed.rkt
Normal file
54
collects/tests/racket/place/distributed/distributed.rkt
Normal file
|
@ -0,0 +1,54 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/class
|
||||
racket/place
|
||||
racket/runtime-path
|
||||
"bank.rkt"
|
||||
"tuple.rkt")
|
||||
(define-runtime-path bank-path "bank.rkt")
|
||||
(define-runtime-path tuple-path "tuple.rkt")
|
||||
|
||||
(provide main)
|
||||
|
||||
(define (main)
|
||||
(define (test expect fun . args)
|
||||
(printf "~s ==> " (cons fun args))
|
||||
(flush-output)
|
||||
(let ([res (if (procedure? fun)
|
||||
(apply fun args)
|
||||
(car args))])
|
||||
(printf "~s\n" res)
|
||||
(let ([ok? (equal? expect res)])
|
||||
(unless ok?
|
||||
(printf " BUT EXPECTED ~s\n" expect)
|
||||
(eprintf "ERROR\n"))
|
||||
ok?)))
|
||||
|
||||
(define remote-vm (spawn-remote-racket-vm "localhost" #:listen-port 6344))
|
||||
(define tuple-place (supervise-named-place-thunk-at remote-vm 'tuple-server tuple-path 'make-tuple-server))
|
||||
(define bank-place (supervise-place-thunk-at remote-vm bank-path 'make-bank))
|
||||
|
||||
(master-event-loop
|
||||
remote-vm
|
||||
(after-seconds 2
|
||||
(define c (connect-to-named-place remote-vm 'tuple-server))
|
||||
(define d (connect-to-named-place remote-vm 'tuple-server))
|
||||
(tuple-server-hello c)
|
||||
(tuple-server-hello d)
|
||||
(test 100 tuple-server-set c "user0" 100)
|
||||
(test 200 tuple-server-set d "user2" 200)
|
||||
(test 100 tuple-server-get c "user0")
|
||||
(test 200 tuple-server-get d "user2")
|
||||
(test 100 tuple-server-get d "user0")
|
||||
(test 200 tuple-server-get c "user2")
|
||||
)
|
||||
(after-seconds 4
|
||||
(test '(created user1) bank-new-account bank-place 'user1)
|
||||
(test '(ok 10) bank-add bank-place 'user1 10)
|
||||
(test '(ok 5) bank-removeM bank-place 'user1 5)
|
||||
)
|
||||
|
||||
(after-seconds 15
|
||||
(node-send-exit remote-vm))
|
||||
(after-seconds 20
|
||||
(exit 0))))
|
21
collects/tests/racket/place/distributed/restarter.rkt
Normal file
21
collects/tests/racket/place/distributed/restarter.rkt
Normal file
|
@ -0,0 +1,21 @@
|
|||
#lang racket/base
|
||||
(require racket/place/distributed
|
||||
racket/class
|
||||
racket/place)
|
||||
|
||||
(provide wait-place-thunk)
|
||||
(provide main)
|
||||
|
||||
(define (wait-place-thunk)
|
||||
(place ch
|
||||
(printf "BEGINING SLEEP\n")
|
||||
(sleep 1)
|
||||
(printf "SLEEP DONE\n")))
|
||||
|
||||
(define (main)
|
||||
(master-event-loop
|
||||
(spawn-vm-supervise-place-thunk-at "localhost" #:listen-port 6345 (get-current-module-path) 'wait-place-thunk
|
||||
#:restart-on-exit (restart-every 5 #:retry 3
|
||||
#:on-final-fail (lambda ()
|
||||
(printf "Failed 3 times exititing\n")
|
||||
(exit 1))))))
|
15
collects/tests/racket/place/distributed/tuple.rkt
Normal file
15
collects/tests/racket/place/distributed/tuple.rkt
Normal file
|
@ -0,0 +1,15 @@
|
|||
#lang racket/base
|
||||
(require racket/match
|
||||
racket/place/define-remote-server)
|
||||
|
||||
(define-named-remote-server
|
||||
tuple-server
|
||||
|
||||
(define-state h (make-hash))
|
||||
(define-rpc (set k v)
|
||||
(hash-set! h k v)
|
||||
v)
|
||||
(define-rpc (get k)
|
||||
(hash-ref h k #f))
|
||||
(define-cast (hello)
|
||||
(printf "Hello from define-cast\n")(flush-output)))
|
Loading…
Reference in New Issue
Block a user