[Distributed Places] Merge place and connection

This commit is contained in:
Kevin Tew 2012-03-21 11:17:32 -06:00
parent 4f8a13062f
commit 394373ab2d

View File

@ -102,7 +102,7 @@
[else (list 'submod qmn path ...)])))]))
; returns the path to the racket executable on the current machine.
; returns the path to the current racket executable on the current machine.
(define (racket-path)
(parameterize ([current-directory (find-system-path 'orig-dir)])
(find-executable-path (find-system-path 'exec-file) #f)))
@ -568,10 +568,10 @@
;socket channel
(define (sconn-add-subchannel s ch-id ch) (send s add-subchannel ch-id ch))
(define (sconn-lookup-subchannel s ch-id) (send s lookup-subchannel ch-id))
(define (sconn-write-flush s x) (send s _write-flush x))
(define (sconn-remove-subchannel s scid) (send s remove-subchannel scid))
(define (sconn-add-subchannel s ch-id ch) (send s add-subchannel ch-id ch))
(define (sconn-lookup-subchannel s ch-id) (send s lookup-subchannel ch-id))
(define (sconn-write-flush s x) (send s _write-flush x))
(define (sconn-remove-subchannel s scid) (send s remove-subchannel scid))
(define (sconn-get-forward-event s forwarder) (send s get-forward-event forwarder))
(define socket-connection%
@ -791,89 +791,6 @@
(define (node-send-exit node) (send node send-exit))
(define (node-get-first-place node) (send node get-first-place))
(define remote-place%
(backlink
(class*
object% (event-container<%>)
(init-field node)
(init-field [place-exec #f])
(init-field [restart-on-exit #f])
(init-field [one-sided-place? #f])
(init-field [on-channel #f])
(field [psb #f])
(field [pc #f])
(field [rpc #f])
(field [running #f])
(field [k #f])
(field [handle-channel #t])
(cond
[one-sided-place?
(set! rpc one-sided-place?)]
[else
(define-values (pch1 pch2) (place-channel))
(set! rpc pch1)
(set! pc pch2)])
(set! psb (send node spawn-remote-place place-exec rpc))
(define (restart-place)
(send node drop-sc-id (send psb get-sc-id))
(set! psb (send node spawn-remote-place place-exec rpc)))
(define/public (stop) (void))
(define/public (get-channel) pc)
(define/public (set-on-channel! proc) (set! on-channel proc))
(define/public (get-sc-id) (send psb get-sc-id))
(define/public (set-handle-channel! x) (set! handle-channel x))
(define/public (place-died)
(cond
[restart-on-exit
(if (equal? restart-on-exit #t)
(restart-place)
(send restart-on-exit restart restart-place))]
[else
(log-debug (format "No restart condition for ~a:~a"
(send node get-log-prefix)
(send psb get-sc-id)))]))
(define (on-channel-event e)
(log-debug (format "~a ~a" (send node get-log-prefix) e)))
(define/public (register es)
(let* ([es (if (and handle-channel pc)
(cons (wrap-evt pc
(cond
[k
(lambda (e)
(call-with-continuation-prompt (lambda ()
(begin0
(k e)
(set! k #f)))))]
[on-channel
(lambda (e)
(on-channel pc e))]
[else
on-channel-event])) es)
es)]
[es (send psb register es)]
[es (if (and restart-on-exit
(not (equal? restart-on-exit #t)))
(send restart-on-exit register es)
es)])
es))
(define/public (set-continuation _k) (set! k _k))
(define/public (get-raw-msg) (send psb get-raw-msg))
(define/public (get-msg)
(call-with-composable-continuation
(lambda (_k)
(set! k _k)
(abort-current-continuation (default-continuation-prompt-tag) void))))
(define/public (put-msg msg) (send psb put-msg msg))
(super-new)
)))
(define (dplace-get dest)
(cond
[(place-channel? dest) (place-channel-get dest)]
@ -889,26 +806,57 @@
(class*
object% (event-container<%>)
(init-field node)
(init-field name)
(init-field [place-exec #f])
(init-field [name #f])
(init-field [one-sided-place #f])
(init-field [restart-on-exit #f])
(init-field [on-channel #f])
(init-field [on-channel/2 #f])
(field [psb #f])
(field [pc #f])
(field [rpc #f])
(field [running #f])
(field [k #f])
(define-values (pch1 pch2) (place-channel))
(set! psb (send node spawn-remote-connection name pch1))
(set! pc pch2)
(unless (or place-exec name)
(raise "for new places place-exec must be set, for named-place connections the named argument must be supplied"))
(when (and place-exec name)
(raise "only one of the place-exec or the named arguements can be set at a time."))
(when (and name restart-on-exit)
(raise "named place connections that restart on exit are not possible"))
(cond
[one-sided-place
(set! rpc one-sided-place)
(set! psb (send vm spawn-remote-place place-exec rpc))]
[else
(define-values (pch1 pch2) (place-channel))
(set! rpc pch1)
(set! pc pch2)
(set! psb
(if place-exec
(send vm spawn-remote-place place-exec rpc)
(send vm spawn-remote-connection name rpc)))])
(define (restart-place)
(send node drop-sc-id (send psb get-sc-id))
(set! psb (send node spawn-remote-place place-exec rpc)))
(define/public (stop) (void))
(define/public (get-channel) pc)
(define/public (set-on-channel! proc) (set! on-channel proc))
(define/public (get-sc-id) (send psb get-sc-id))
(define/public (set-handle-channel! x) (set! on-channel x))
(define/public (place-died)
(cond
[restart-on-exit
(if (equal? restart-on-exit #t)
(restart-place)
(send restart-on-exit restart restart-place))]
[else
(log-debug (format "No restart condition for ~a:~a"
(send node get-log-prefix)
(send psb get-sc-id))))
(send psb get-sc-id)))]))
(define (on-channel-event e)
(log-debug (format "~a ~a" (send node get-log-prefix) e)))
(define/public (register es)
@ -920,12 +868,17 @@
(begin0
(k e)
(set! k #f)))))]
[on-channel
[on-channel/2
(lambda (e)
(on-channel pc e))]
(on-channel/2 pc e))]
[on-channel on-channel]
[else
on-channel-event])) es) es)]
[es (send psb register es)])
[es (send psb register es)]
[es (if (and restart-on-exit
(not (equal? restart-on-exit #t)))
(send restart-on-exit register es)
es)])
es))
(define/public (set-continuation _k) (set! k _k))
@ -940,6 +893,8 @@
(super-new)
)))
(define remote-place% remote-connection%)
(define place%
(backlink
(class*