Move engines from mzlib/thread to racket/engine
(they were previously called "coroutines" but the term "engine" is less ambiguous) original commit: 2dcf06077461175936616f21737fbed7b0c27eb0
This commit is contained in:
parent
45606c4539
commit
46ebb5dac5
|
@ -1,9 +1,15 @@
|
||||||
|
|
||||||
(module thread mzscheme
|
(module thread mzscheme
|
||||||
(require "kw.rkt" "contract.rkt")
|
(require "kw.rkt" "contract.rkt" racket/engine)
|
||||||
|
|
||||||
(provide run-server
|
(provide run-server
|
||||||
consumer-thread)
|
consumer-thread
|
||||||
|
|
||||||
|
(rename engine? coroutine?)
|
||||||
|
(rename engine coroutine)
|
||||||
|
(rename engine-run coroutine-run)
|
||||||
|
(rename engine-result coroutine-result)
|
||||||
|
(rename engine-kill coroutine-kill))
|
||||||
|
|
||||||
#|
|
#|
|
||||||
t accepts a function, f, and creates a thread. It returns the thread and a
|
t accepts a function, f, and creates a thread. It returns the thread and a
|
||||||
|
@ -127,108 +133,4 @@
|
||||||
(sync/timeout connection-timeout t)
|
(sync/timeout connection-timeout t)
|
||||||
(custodian-shutdown-all c)))))))))
|
(custodian-shutdown-all c)))))))))
|
||||||
(loop))))
|
(loop))))
|
||||||
(lambda () (tcp-close l)))))
|
(lambda () (tcp-close l))))))
|
||||||
|
|
||||||
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; Couroutine
|
|
||||||
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
;; An X-coroutine-object is
|
|
||||||
;; (make-coroutine-object thread semaphore channel channel X)
|
|
||||||
(define-struct coroutine-object (worker can-stop-lock done-ch ex-ch result))
|
|
||||||
|
|
||||||
;; coroutine : ((bool ->) -> X) -> X-coroutine-object
|
|
||||||
(define (coroutine f)
|
|
||||||
;;(printf "2. new coroutine\n")
|
|
||||||
(let* ([can-stop-lock (make-semaphore 1)]
|
|
||||||
[done-ch (make-channel)]
|
|
||||||
[ex-ch (make-channel)]
|
|
||||||
[proceed-sema (make-semaphore)]
|
|
||||||
[stop-enabled? #t]
|
|
||||||
[enable-stop
|
|
||||||
(lambda (enable?)
|
|
||||||
;;(printf "3. enabling ~a\n" enable?)
|
|
||||||
(cond
|
|
||||||
[(and enable? (not stop-enabled?))
|
|
||||||
(semaphore-post can-stop-lock)
|
|
||||||
(set! stop-enabled? #t)]
|
|
||||||
[(and (not enable?) stop-enabled?)
|
|
||||||
(semaphore-wait can-stop-lock)
|
|
||||||
(set! stop-enabled? #f)])
|
|
||||||
;;(printf "3. finished enabling\n")
|
|
||||||
)]
|
|
||||||
[tid (thread (lambda ()
|
|
||||||
(semaphore-wait proceed-sema)
|
|
||||||
;;(printf "3. creating coroutine thread\n")
|
|
||||||
(with-handlers ([(lambda (exn) #t)
|
|
||||||
(lambda (exn)
|
|
||||||
(enable-stop #t)
|
|
||||||
(channel-put ex-ch exn))])
|
|
||||||
(let ([v (f enable-stop)])
|
|
||||||
(enable-stop #t)
|
|
||||||
(channel-put done-ch v)))))])
|
|
||||||
(begin0 (make-coroutine-object tid can-stop-lock done-ch ex-ch #f)
|
|
||||||
(thread-suspend tid)
|
|
||||||
(semaphore-post proceed-sema))))
|
|
||||||
|
|
||||||
;; coroutine : real-number X-coroutine-object -> bool
|
|
||||||
(define (coroutine-run timeout w)
|
|
||||||
(if (coroutine-object-worker w)
|
|
||||||
(let ([can-stop-lock (coroutine-object-can-stop-lock w)]
|
|
||||||
[worker (coroutine-object-worker w)])
|
|
||||||
#;(printf "2. starting coroutine\n")
|
|
||||||
(thread-resume worker)
|
|
||||||
(dynamic-wind
|
|
||||||
void
|
|
||||||
;; Let the co-routine run...
|
|
||||||
(lambda ()
|
|
||||||
(sync (choice-evt (wrap-evt (if (evt? timeout)
|
|
||||||
timeout
|
|
||||||
(alarm-evt (+ timeout (current-inexact-milliseconds))))
|
|
||||||
(lambda (x)
|
|
||||||
#;(printf "2. alarm-evt\n")
|
|
||||||
(semaphore-wait can-stop-lock)
|
|
||||||
(thread-suspend worker)
|
|
||||||
(semaphore-post can-stop-lock)
|
|
||||||
#f))
|
|
||||||
(wrap-evt (coroutine-object-done-ch w)
|
|
||||||
(lambda (res)
|
|
||||||
#;(printf "2. coroutine-done-evt\n")
|
|
||||||
(set-coroutine-object-result! w res)
|
|
||||||
(coroutine-kill w)
|
|
||||||
#t))
|
|
||||||
(wrap-evt (coroutine-object-ex-ch w)
|
|
||||||
(lambda (exn)
|
|
||||||
#;(printf "2. ex-evt\n")
|
|
||||||
(coroutine-kill w)
|
|
||||||
(raise exn))))))
|
|
||||||
;; In case we escape through a break:
|
|
||||||
(lambda ()
|
|
||||||
(when (thread-running? worker)
|
|
||||||
(semaphore-wait can-stop-lock)
|
|
||||||
(thread-suspend worker)
|
|
||||||
(semaphore-post can-stop-lock)))))
|
|
||||||
#t))
|
|
||||||
|
|
||||||
;; coroutine-result : X-coroutine-object -> X
|
|
||||||
(define (coroutine-result w)
|
|
||||||
(coroutine-object-result w))
|
|
||||||
|
|
||||||
;; coroutine-kill : X-coroutine-object ->
|
|
||||||
(define (coroutine-kill w)
|
|
||||||
(set-coroutine-object-can-stop-lock! w #f)
|
|
||||||
(set-coroutine-object-done-ch! w #f)
|
|
||||||
(set-coroutine-object-ex-ch! w #f)
|
|
||||||
(when (coroutine-object-worker w)
|
|
||||||
(kill-thread (coroutine-object-worker w))
|
|
||||||
(set-coroutine-object-worker! w #f)))
|
|
||||||
|
|
||||||
(define (coroutine? x)
|
|
||||||
(coroutine-object? x))
|
|
||||||
|
|
||||||
(provide coroutine?)
|
|
||||||
(provide/contract
|
|
||||||
(coroutine (((any/c . -> . any) . -> . any) . -> . coroutine?))
|
|
||||||
(coroutine-run ((or/c evt? real?) coroutine? . -> . boolean?))
|
|
||||||
(coroutine-result (coroutine? . -> . any))
|
|
||||||
(coroutine-kill (coroutine? . -> . any))))
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user