From d8861a69471690c508471a01696e821c7ecf909f Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Thu, 12 Jul 2012 11:40:14 -0600 Subject: [PATCH] [Distributed Places] place functions now work with distributed places --- collects/racket/place.rkt | 36 ++++++--- collects/racket/place/distributed.rkt | 85 +++++++++++++--------- collects/racket/place/private/coercion.rkt | 4 +- collects/racket/place/private/prop.rkt | 8 ++ 4 files changed, 83 insertions(+), 50 deletions(-) create mode 100644 collects/racket/place/private/prop.rkt diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index 95e5ddd517..3bd93d60ce 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -8,6 +8,7 @@ racket/flonum racket/vector racket/place/private/th-place + racket/place/private/prop racket/private/streams unstable/lazy-require @@ -35,25 +36,36 @@ place-dead-evt ) +(define-syntax (define-pl-func stx) + (syntax-case stx () + [(_ func p args ...) + (with-syntax [(func-sym #'(quote func)) + (pl-func (string->symbol (string-append "pl-" (symbol->string (syntax->datum #'func))))) + (th-func (string->symbol (string-append "th-" (symbol->string (syntax->datum #'func)))))] + #'(define (func p args ...) + (cond + [(prop:place? p) ((prop:place-ref p) func-sym p args ...)] + [(pl-place-enabled?) (pl-func p args ...)] + [else (th-func p args ...)])))])) + (lazy-require [racket/place/distributed (supervise-dynamic-place-at)]) (define (place-channel-put/get ch msg) (place-channel-put ch msg) (place-channel-get ch)) -(define-syntax-rule (define-pl x p t) (define x (if (pl-place-enabled?) p t))) +(define place-channel (if (pl-place-enabled?) pl-place-channel th-place-channel)) -(define-pl place-sleep pl-place-sleep th-place-sleep) -(define-pl place-wait pl-place-wait th-place-wait) -(define-pl place-kill pl-place-kill th-place-kill) -(define-pl place-break pl-place-break th-place-break) -(define-pl place-channel pl-place-channel th-place-channel) -(define-pl place-channel-put pl-place-channel-put th-place-channel-put) -(define-pl place-channel-get pl-place-channel-get th-place-channel-get) -(define-pl place-channel? pl-place-channel? th-place-channel?) -(define-pl place? pl-place? th-place?) -(define-pl place-message-allowed? pl-place-message-allowed? th-place-message-allowed?) -(define-pl place-dead-evt pl-place-dead-evt th-place-dead-evt) +(define-pl-func place-sleep p) +(define-pl-func place-wait p) +(define-pl-func place-kill p) +(define-pl-func place-break p) +(define-pl-func place-channel-put p msg) +(define-pl-func place-channel-get p) +(define-pl-func place-channel? p) +(define-pl-func place? p) +(define-pl-func place-message-allowed? p) +(define-pl-func place-dead-evt p) (define (pump-place p pin pout perr in out err) (cond diff --git a/collects/racket/place/distributed.rkt b/collects/racket/place/distributed.rkt index 1559754601..47d9cb05cc 100644 --- a/collects/racket/place/distributed.rkt +++ b/collects/racket/place/distributed.rkt @@ -3,6 +3,7 @@ racket/match racket/tcp racket/place + racket/place/private/prop racket/place/private/th-place racket/place/private/coercion racket/place/private/async-bi-channel @@ -138,18 +139,6 @@ (define DEFAULT-ROUTER-PORT 6340) -(define-syntax quote-module-path-bytes - (syntax-rules () - [(_) - (->module-path (quote-module-name))] - [(_ path ... ) - (->module-path - (let ([qmn (quote-module-name)]) - (cond - [(list? qmn) (append (list 'submod) qmn (list path ...))] - [else (list 'submod qmn path ...)])))])) - - ; returns the path to the current racket executable on the current machine. (define (racket-path) (parameterize ([current-directory (find-system-path 'orig-dir)]) @@ -334,6 +323,30 @@ register )) +(define place<%> + (interface* () + ([prop:place + (lambda (method obj . args) + (case method + [(place-channel-get) + (*channel-get obj)] + [(place-channel-put) + (apply *channel-put obj args)] + [(place-wait) + (distributed-place-wait obj)] + [(place-channel?) #t] + [(place?) #t] + [else + (raise (format "Error in place<%> ~v ~v ~v" obj method args))]))]) + )) + +(define event<%> + (interface* () + ([prop:evt + (lambda (x) + (apply choice-evt (send x register null)))]))) + + (define-syntax-rule (for/filter/fold/cons tail xs body ...) (for/fold ([n tail]) xs (define r (let () body ...)) @@ -399,7 +412,7 @@ (define place-socket-bridge% (backlink (class* - object% (event-container<%>) + object% (event-container<%> event<%>) (init-field pch sch id @@ -838,7 +851,7 @@ (define remote-node% (backlink (class* - object% (event-container<%>) + object% (event-container<%> event<%>) (init-field host-name) (init-field listen-port) (init-field [cmdline-list #f]) @@ -1014,7 +1027,7 @@ (define remote-connection% (backlink (class* - object% (event-container<%>) + object% (event-container<%> place<%> event<%>) (init-field node) (init-field [place-exec #f]) (init-field [name #f]) @@ -1389,7 +1402,7 @@ #:initial-message [initial-message #f] #:restart-on-exit [restart-on-exit #f]) (send node launch-place - (list 'place (->module-path-bytes place-path) place-func (->string name)) + (list 'place (->writeable-module-path place-path) place-func (->string name)) ;#:initial-message initial-message #:restart-on-exit restart-on-exit )) @@ -1398,7 +1411,7 @@ #:initial-message [initial-message #f] #:restart-on-exit [restart-on-exit #f]) (send node launch-place - (list 'dynamic-place (->module-path-bytes place-path) place-func (->string name)) + (list 'dynamic-place (->writeable-module-path place-path) place-func (->string name)) ;#:initial-message initial-message #:restart-on-exit restart-on-exit )) @@ -1407,10 +1420,10 @@ #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) (define-values (node pl) - (spawn-node-supervise-place-at/exec host (list 'dynamic-place (->module-path-bytes place-path) place-func) #:listen-port listen-port + (spawn-node-supervise-place-at/exec host (list 'dynamic-place (->writeable-module-path place-path) place-func) #:listen-port listen-port #:initial-message initial-message #:racket-path racketpath #:ssh-bin-path sshpath @@ -1422,10 +1435,10 @@ #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) (define-values (node pl) - (spawn-node-supervise-place-at/exec host (list 'place (->module-path-bytes place-path) place-func) #:listen-port listen-port + (spawn-node-supervise-place-at/exec host (list 'place (->writeable-module-path place-path) place-func) #:listen-port listen-port #:initial-message initial-message #:racket-path racketpath #:ssh-bin-path sshpath @@ -1437,9 +1450,9 @@ #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) - (spawn-node-supervise-place-at/exec host (list 'dynamic-place (->module-path-bytes place-path) place-func) #:listen-port listen-port + (spawn-node-supervise-place-at/exec host (list 'dynamic-place (->writeable-module-path place-path) place-func) #:listen-port listen-port #:initial-message initial-message #:racket-path racketpath #:ssh-bin-path sshpath @@ -1450,9 +1463,9 @@ #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) - (spawn-node-supervise-place-at/exec host (list 'place (->module-path-bytes place-path) place-func) #:listen-port listen-port + (spawn-node-supervise-place-at/exec host (list 'place (->writeable-module-path place-path) place-func) #:listen-port listen-port #:initial-message initial-message #:racket-path racketpath #:ssh-bin-path sshpath @@ -1463,7 +1476,7 @@ #:initial-message [initial-message #f] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)] #:restart-on-exit [restart-on-exit #f]) (define node (spawn-remote-racket-node host #:listen-port listen-port @@ -1491,7 +1504,7 @@ (define (spawn-remote-racket-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)] #:use-current-ports [use-current-ports #f]) (new remote-node% [host-name host] @@ -1502,7 +1515,7 @@ (define (create-place-node host #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)] + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)] #:use-current-ports [use-current-ports #t]) (spawn-remote-racket-node host #:listen-port listen-port @@ -1512,13 +1525,13 @@ #:use-current-ports use-current-ports)) (define (supervise-dynamic-place-at remote-node place-path place-func) - (send remote-node launch-place (list 'dynamic-place (->module-path-bytes place-path) place-func))) + (send remote-node launch-place (list 'dynamic-place (->writeable-module-path place-path) place-func))) (define (supervise-place-thunk-at remote-node place-path place-func) - (send remote-node launch-place (list 'place (->module-path-bytes place-path) place-func))) + (send remote-node launch-place (list 'place (->writeable-module-path place-path) place-func))) (define (supervise-thread-at remote-node place-path place-func) - (send remote-node launch-place (list 'thread (->module-path-bytes place-path) place-func))) + (send remote-node launch-place (list 'thread (->writeable-module-path place-path) place-func))) (define-syntax-rule (every-seconds _seconds _body ...) (new respawn-and-fire% [seconds _seconds] [thunk (lambda () _body ...)])) @@ -1631,23 +1644,23 @@ (define (*channel-put ch msg) (cond + [(is-a? ch remote-connection%) (send ch put-msg msg)] [(place-channel? ch) (place-channel-put ch msg)] [(async-bi-channel? ch) (async-bi-channel-put ch msg)] [(channel? ch) (channel-put ch msg)] - [(is-a? ch remote-connection%) (send ch put-msg msg)] [else (raise (format "unknown channel type ~a" ch))])) (define (*channel-get ch) (cond - [(place-channel? ch) (place-channel-get ch)] - [(async-bi-channel? ch) (async-bi-channel-get ch)] - [(channel? ch) (channel-get ch)] [(is-a? ch remote-connection%) (cond [(continuation-mark-set-first #f in-message-router-mark) (send ch get-msg)] [else (send ch get-raw-msg)])] + [(place-channel? ch) (place-channel-get ch)] + [(async-bi-channel? ch) (async-bi-channel-get ch)] + [(channel? ch) (channel-get ch)] [else (raise (format "unknown channel type ~a" ch))])) (define/provide (mr-spawn-remote-node mrch host #:listen-port [listen-port DEFAULT-ROUTER-PORT] @@ -1686,7 +1699,7 @@ (define (spawn-node-at host #:listen-port [listen-port DEFAULT-ROUTER-PORT] #:racket-path [racketpath (racket-path)] #:ssh-bin-path [sshpath (ssh-bin-path)] - #:distributed-launch-path [distributedlaunchpath (->module-path-bytes distributed-launch-path)]) + #:distributed-launch-path [distributedlaunchpath (->writeable-module-path distributed-launch-path)]) (define ch (make-channel)) (thread diff --git a/collects/racket/place/private/coercion.rkt b/collects/racket/place/private/coercion.rkt index eb759b1c59..5ce6bd3089 100644 --- a/collects/racket/place/private/coercion.rkt +++ b/collects/racket/place/private/coercion.rkt @@ -1,7 +1,7 @@ #lang racket/base (provide ->path - ->module-path-bytes + ->writeable-module-path ->module-path ->number ->string @@ -28,7 +28,7 @@ [(string? x) (string->path x)] [(bytes? x) (bytes->path x)])) -(define (->module-path-bytes x) +(define (->writeable-module-path x) (cond [(path? x) (path->bytes x)] [(list? x) (map ->path-bytes x)] [(string? x) (string->bytes/locale x)] diff --git a/collects/racket/place/private/prop.rkt b/collects/racket/place/private/prop.rkt new file mode 100644 index 0000000000..924483aedb --- /dev/null +++ b/collects/racket/place/private/prop.rkt @@ -0,0 +1,8 @@ +#lang racket/base + +(provide prop:place + prop:place? + prop:place-ref) + +(define-values (prop:place prop:place? prop:place-ref) + (make-struct-type-property 'place))