fix bug in thread'-based implementation of
place'
which is used when parallel places are unavailable
This commit is contained in:
parent
bce90e2a71
commit
2fa35a2a5c
|
@ -21,7 +21,8 @@
|
||||||
processor-count
|
processor-count
|
||||||
(rename-out [pl-place-enabled? place-enabled?]))
|
(rename-out [pl-place-enabled? place-enabled?]))
|
||||||
|
|
||||||
(define-struct TH-place (th ch) #:property prop:evt (lambda (x) (TH-place-channel-out (TH-place-ch x))))
|
(define-struct TH-place (th ch cust)
|
||||||
|
#:property prop:evt (lambda (x) (TH-place-channel-in (TH-place-ch x))))
|
||||||
|
|
||||||
(define (place-channel-send/receive ch msg)
|
(define (place-channel-send/receive ch msg)
|
||||||
(place-channel-send ch msg)
|
(place-channel-send ch msg)
|
||||||
|
@ -33,8 +34,9 @@
|
||||||
(thread
|
(thread
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(let loop ()
|
(let loop ()
|
||||||
(channel-put ch (thread-receive))
|
(let ([v (thread-receive)])
|
||||||
(loop))))
|
(channel-put ch v)
|
||||||
|
(loop)))))
|
||||||
ch))
|
ch))
|
||||||
|
|
||||||
(define (th-place mod funcname)
|
(define (th-place mod funcname)
|
||||||
|
@ -43,18 +45,20 @@
|
||||||
(unless (symbol? funcname)
|
(unless (symbol? funcname)
|
||||||
(raise-type-error 'place "symbol?" 1 mod funcname))
|
(raise-type-error 'place "symbol?" 1 mod funcname))
|
||||||
(define-values (pch cch) (th-place-channel))
|
(define-values (pch cch) (th-place-channel))
|
||||||
(define th (thread (lambda ()
|
(define cust (make-custodian-from-main))
|
||||||
(with-continuation-mark
|
(define th (thread
|
||||||
parameterization-key
|
(lambda ()
|
||||||
orig-paramz
|
(with-continuation-mark
|
||||||
(parameterize ([current-namespace (make-base-namespace)]
|
parameterization-key
|
||||||
[current-custodian (make-custodian-from-main)])
|
orig-paramz
|
||||||
((dynamic-require mod funcname) cch))))))
|
(parameterize ([current-namespace (make-base-namespace)]
|
||||||
(TH-place th pch))
|
[current-custodian cust])
|
||||||
|
((dynamic-require mod funcname) cch))))))
|
||||||
|
(TH-place th pch cust))
|
||||||
|
|
||||||
(define (th-place-sleep n) (sleep n))
|
(define (th-place-sleep n) (sleep n))
|
||||||
(define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0)
|
(define (th-place-wait pl) (thread-wait (TH-place-th pl)) 0)
|
||||||
(define (th-place-kill pl) (kill-thread (TH-place-th pl)))
|
(define (th-place-kill pl) (custodian-shutdown-all (TH-place-cust pl)))
|
||||||
(define (th-place-channel)
|
(define (th-place-channel)
|
||||||
(define-values (as ar) (make-th-async-channel))
|
(define-values (as ar) (make-th-async-channel))
|
||||||
(define-values (bs br) (make-th-async-channel))
|
(define-values (bs br) (make-th-async-channel))
|
||||||
|
@ -94,9 +98,7 @@
|
||||||
[(TH-place? pl) (TH-place-channel-out (TH-place-ch pl))]
|
[(TH-place? pl) (TH-place-channel-out (TH-place-ch pl))]
|
||||||
[(TH-place-channel? pl) (TH-place-channel-out pl)]
|
[(TH-place-channel? pl) (TH-place-channel-out pl)]
|
||||||
[else (raise-type-error 'place-channel-send "expect a place? or place-channel?" pl)]))
|
[else (raise-type-error 'place-channel-send "expect a place? or place-channel?" pl)]))
|
||||||
(sync (thread-resume-evt th))
|
(void (thread-send th (deep-copy msg) #f)))
|
||||||
(thread-send th
|
|
||||||
(deep-copy msg)))
|
|
||||||
|
|
||||||
(define (th-place-channel-receive pl)
|
(define (th-place-channel-receive pl)
|
||||||
(channel-get
|
(channel-get
|
||||||
|
|
|
@ -495,9 +495,9 @@
|
||||||
|
|
||||||
|
|
||||||
;; ----------------------------------------
|
;; ----------------------------------------
|
||||||
;; A module that collects all the built-in modules,
|
;; When places are implemented by plain old threads,
|
||||||
;; so that it's easier to keep them attached in new
|
;; place channels need to be shared across namespaces,
|
||||||
;; namespaces.
|
;; so `#%place-struct' is included in builtins
|
||||||
|
|
||||||
(module #%place-struct '#%kernel
|
(module #%place-struct '#%kernel
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user