[Distributed Places] place functions now work with distributed places

This commit is contained in:
Kevin Tew 2012-07-12 11:40:14 -06:00
parent 1a7c9844ec
commit d8861a6947
4 changed files with 83 additions and 50 deletions

View File

@ -8,6 +8,7 @@
racket/flonum
racket/vector
racket/place/private/th-place
racket/place/private/prop
racket/private/streams
unstable/lazy-require
@ -35,25 +36,36 @@
place-dead-evt
)
(define-syntax (define-pl-func stx)
(syntax-case stx ()
[(_ func p args ...)
(with-syntax [(func-sym #'(quote func))
(pl-func (string->symbol (string-append "pl-" (symbol->string (syntax->datum #'func)))))
(th-func (string->symbol (string-append "th-" (symbol->string (syntax->datum #'func)))))]
#'(define (func p args ...)
(cond
[(prop:place? p) ((prop:place-ref p) func-sym p args ...)]
[(pl-place-enabled?) (pl-func p args ...)]
[else (th-func p args ...)])))]))
(lazy-require [racket/place/distributed (supervise-dynamic-place-at)])
(define (place-channel-put/get ch msg)
(place-channel-put ch msg)
(place-channel-get ch))
(define-syntax-rule (define-pl x p t) (define x (if (pl-place-enabled?) p t)))
(define place-channel (if (pl-place-enabled?) pl-place-channel th-place-channel))
(define-pl place-sleep pl-place-sleep th-place-sleep)
(define-pl place-wait pl-place-wait th-place-wait)
(define-pl place-kill pl-place-kill th-place-kill)
(define-pl place-break pl-place-break th-place-break)
(define-pl place-channel pl-place-channel th-place-channel)
(define-pl place-channel-put pl-place-channel-put th-place-channel-put)
(define-pl place-channel-get pl-place-channel-get th-place-channel-get)
(define-pl place-channel? pl-place-channel? th-place-channel?)
(define-pl place? pl-place? th-place?)
(define-pl place-message-allowed? pl-place-message-allowed? th-place-message-allowed?)
(define-pl place-dead-evt pl-place-dead-evt th-place-dead-evt)
(define-pl-func place-sleep p)
(define-pl-func place-wait p)
(define-pl-func place-kill p)
(define-pl-func place-break p)
(define-pl-func place-channel-put p msg)
(define-pl-func place-channel-get p)
(define-pl-func place-channel? p)
(define-pl-func place? p)
(define-pl-func place-message-allowed? p)
(define-pl-func place-dead-evt p)
(define (pump-place p pin pout perr in out err)
(cond

View File

@ -3,6 +3,7 @@
racket/match
racket/tcp
racket/place
racket/place/private/prop
racket/place/private/th-place
racket/place/private/coercion
racket/place/private/async-bi-channel
@ -138,18 +139,6 @@
(define DEFAULT-ROUTER-PORT 6340)
(define-syntax quote-module-path-bytes
(syntax-rules ()
[(_)
(->module-path (quote-module-name))]
[(_ path ... )
(->module-path
(let ([qmn (quote-module-name)])
(cond
[(list? qmn) (append (list 'submod) qmn (list path ...))]
[else (list 'submod qmn path ...)])))]))
; returns the path to the current racket executable on the current machine.
(define (racket-path)
(parameterize ([current-directory (find-system-path 'orig-dir)])
@ -334,6 +323,30 @@
register
))
(define place<%>
(interface* ()
([prop:place
(lambda (method obj . args)
(case method
[(place-channel-get)
(*channel-get obj)]
[(place-channel-put)
(apply *channel-put obj args)]
[(place-wait)
(distributed-place-wait obj)]
[(place-channel?) #t]
[(place?) #t]
[else
(raise (format "Error in place<%> ~v ~v ~v" obj method args))]))])
))
(define event<%>
(interface* ()
([prop:evt
(lambda (x)
(apply choice-evt (send x register null)))])))
(define-syntax-rule (for/filter/fold/cons tail xs body ...)
(for/fold ([n tail]) xs
(define r (let () body ...))
@ -399,7 +412,7 @@
(define place-socket-bridge%
(backlink
(class*
object% (event-container<%>)
object% (event-container<%> event<%>)
(init-field pch
sch
id
@ -838,7 +851,7 @@
(define remote-node%
(backlink
(class*
object% (event-container<%>)
object% (event-container<%> event<%>)
(init-field host-name)
(init-field listen-port)
(init-field [cmdline-list #f])
@ -1014,7 +1027,7 @@
(define remote-connection%
(backlink
(class*
object% (event-container<%>)
object% (event-container<%> place<%> event<%>)
(init-field node)
(init-field [place-exec #f])
(init-field [name #f])
@ -1389,7 +1402,7 @@
#:initial-message [initial-message #f]
#:restart-on-exit [restart-on-exit #f])
(send node launch-place
(list 'place (->module-path-bytes place-path) place-func (->string name))
(list 'place (->writeable-module-path place-path) place-func (->string name))
;#:initial-message initial-message
#:restart-on-exit restart-on-exit
))
@ -1398,7 +1411,7 @@
#:initial-message [initial-message #f]
#:restart-on-exit [restart-on-exit #f])
(send node launch-place
(list 'dynamic-place (->module-path-bytes place-path) place-func (->string name))
(list 'dynamic-place (->writeable-module-path place-path) place-func (->string name))
;#:initial-message initial-message
#:restart-on-exit restart-on-exit
))
@ -1407,10 +1420,10 @@
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(define-values (node pl)
(spawn-node-supervise-place-at/exec host (list 'dynamic-place (->module-path-bytes place-path) place-func) #:listen-port listen-port
(spawn-node-supervise-place-at/exec host (list 'dynamic-place (->writeable-module-path place-path) place-func) #:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
@ -1422,10 +1435,10 @@
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(define-values (node pl)
(spawn-node-supervise-place-at/exec host (list 'place (->module-path-bytes place-path) place-func) #:listen-port listen-port
(spawn-node-supervise-place-at/exec host (list 'place (->writeable-module-path place-path) place-func) #:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
@ -1437,9 +1450,9 @@
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(spawn-node-supervise-place-at/exec host (list 'dynamic-place (->module-path-bytes place-path) place-func) #:listen-port listen-port
(spawn-node-supervise-place-at/exec host (list 'dynamic-place (->writeable-module-path place-path) place-func) #:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
@ -1450,9 +1463,9 @@
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(spawn-node-supervise-place-at/exec host (list 'place (->module-path-bytes place-path) place-func) #:listen-port listen-port
(spawn-node-supervise-place-at/exec host (list 'place (->writeable-module-path place-path) place-func) #:listen-port listen-port
#:initial-message initial-message
#:racket-path racketpath
#:ssh-bin-path sshpath
@ -1463,7 +1476,7 @@
#:initial-message [initial-message #f]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:restart-on-exit [restart-on-exit #f])
(define node (spawn-remote-racket-node host
#:listen-port listen-port
@ -1491,7 +1504,7 @@
(define (spawn-remote-racket-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:use-current-ports [use-current-ports #f])
(new remote-node%
[host-name host]
@ -1502,7 +1515,7 @@
(define (create-place-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]
#:use-current-ports [use-current-ports #t])
(spawn-remote-racket-node host
#:listen-port listen-port
@ -1512,13 +1525,13 @@
#:use-current-ports use-current-ports))
(define (supervise-dynamic-place-at remote-node place-path place-func)
(send remote-node launch-place (list 'dynamic-place (->module-path-bytes place-path) place-func)))
(send remote-node launch-place (list 'dynamic-place (->writeable-module-path place-path) place-func)))
(define (supervise-place-thunk-at remote-node place-path place-func)
(send remote-node launch-place (list 'place (->module-path-bytes place-path) place-func)))
(send remote-node launch-place (list 'place (->writeable-module-path place-path) place-func)))
(define (supervise-thread-at remote-node place-path place-func)
(send remote-node launch-place (list 'thread (->module-path-bytes place-path) place-func)))
(send remote-node launch-place (list 'thread (->writeable-module-path place-path) place-func)))
(define-syntax-rule (every-seconds _seconds _body ...)
(new respawn-and-fire% [seconds _seconds] [thunk (lambda () _body ...)]))
@ -1631,23 +1644,23 @@
(define (*channel-put ch msg)
(cond
[(is-a? ch remote-connection%) (send ch put-msg msg)]
[(place-channel? ch) (place-channel-put ch msg)]
[(async-bi-channel? ch) (async-bi-channel-put ch msg)]
[(channel? ch) (channel-put ch msg)]
[(is-a? ch remote-connection%) (send ch put-msg msg)]
[else (raise (format "unknown channel type ~a" ch))]))
(define (*channel-get ch)
(cond
[(place-channel? ch) (place-channel-get ch)]
[(async-bi-channel? ch) (async-bi-channel-get ch)]
[(channel? ch) (channel-get ch)]
[(is-a? ch remote-connection%)
(cond
[(continuation-mark-set-first #f in-message-router-mark)
(send ch get-msg)]
[else
(send ch get-raw-msg)])]
[(place-channel? ch) (place-channel-get ch)]
[(async-bi-channel? ch) (async-bi-channel-get ch)]
[(channel? ch) (channel-get ch)]
[else (raise (format "unknown channel type ~a" ch))]))
(define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
@ -1686,7 +1699,7 @@
(define (spawn-node-at host #:listen-port [listen-port DEFAULT-ROUTER-PORT]
#:racket-path [racketpath (racket-path)]
#:ssh-bin-path [sshpath (ssh-bin-path)]
#:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)])
#:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)])
(define ch (make-channel))
(thread

View File

@ -1,7 +1,7 @@
#lang racket/base
(provide ->path
->module-path-bytes
->writeable-module-path
->module-path
->number
->string
@ -28,7 +28,7 @@
[(string? x) (string->path x)]
[(bytes? x) (bytes->path x)]))
(define (->module-path-bytes x)
(define (->writeable-module-path x)
(cond [(path? x) (path->bytes x)]
[(list? x) (map ->path-bytes x)]
[(string? x) (string->bytes/locale x)]

View File

@ -0,0 +1,8 @@
#lang racket/base
(provide prop:place
prop:place?
prop:place-ref)
(define-values (prop:place prop:place? prop:place-ref)
(make-struct-type-property 'place))