original commit: 037b4b4df019e1b3e7a40c5bae722d1c34d1c552
This commit is contained in:
Matthew Flatt 2004-11-30 14:08:07 +00:00
parent 6d0d143b23
commit 2cbc8b98d0
2 changed files with 145 additions and 2 deletions

View File

@ -1,7 +1,8 @@
(module thread mzscheme
(require "spidey.ss"
"etc.ss")
"etc.ss"
"contract.ss")
(provide run-server
consumer-thread)
@ -100,5 +101,104 @@
(sync/timeout connection-timeout t)
(custodian-shutdown-all c)))))))))
(loop)))
(lambda () (tcp-close l)))))))
(lambda () (tcp-close l))))))
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Couroutine
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; An X-coroutine-object is
;; (make-coroutine-object thread semaphore channel channel X)
(define-struct coroutine-object (worker can-stop-lock done-ch ex-ch result))
;; coroutine : ((bool ->) -> X) -> X-coroutine-object
(define (coroutine f)
;;(printf "2. new coroutine~n")
(let* ((can-stop-lock (make-semaphore 1))
(done-ch (make-channel))
(ex-ch (make-channel))
(stop-enabled? #t)
(enable-stop
(lambda (enable?)
;;(printf "3. enabling ~a~n" enable?)
(cond
((and enable? (not stop-enabled?))
(semaphore-post can-stop-lock)
(set! stop-enabled? #t))
((and (not enable?) stop-enabled?)
(semaphore-wait can-stop-lock)
(set! stop-enabled? #f)))
;;(printf "3. finished enabling~n")
))
(tid (thread (lambda ()
;;(printf "3. creating coroutine thread~n")
(with-handlers (((lambda (exn) #t)
(lambda (exn)
(channel-put ex-ch exn))))
(let ([v (f enable-stop)])
(enable-stop #t)
(channel-put done-ch v)))))))
(begin0
(make-coroutine-object tid can-stop-lock done-ch ex-ch #f)
(thread-suspend tid))))
;; coroutine : real-number X-coroutine-object -> bool
(define (coroutine-run timeout w)
(if (coroutine-object-worker w)
#;(printf "2. starting coroutine~n")
(let ((can-stop-lock (coroutine-object-can-stop-lock w))
(worker (coroutine-object-worker w)))
(thread-resume worker)
(dynamic-wind
void
;; Let the co-routine run...
(lambda ()
(sync (choice-evt (wrap-evt (alarm-evt (+ timeout (current-inexact-milliseconds)))
(lambda (x)
#;(printf "2. alarm-evt~n")
(semaphore-wait can-stop-lock)
(thread-suspend worker)
(semaphore-post can-stop-lock)
#f))
(wrap-evt (coroutine-object-done-ch w)
(lambda (res)
#;(printf "2. coroutine-done-evt~n")
(set-coroutine-object-result! w res)
(coroutine-kill w)
#t))
(wrap-evt (coroutine-object-ex-ch w)
(lambda (exn)
#;(printf "2. ex-evt~n")
(coroutine-kill w)
(raise exn))))))
;; In case we escape through a break:
(lambda ()
(when (thread-running? worker)
(semaphore-wait can-stop-lock)
(thread-suspend worker)
(semaphore-post can-stop-lock)))))
#t))
;; coroutine-result : X-coroutine-object -> X
(define (coroutine-result w)
(coroutine-object-result w))
;; coroutine-kill : X-coroutine-object ->
(define (coroutine-kill w)
(set-coroutine-object-can-stop-lock! w #f)
(set-coroutine-object-done-ch! w #f)
(set-coroutine-object-ex-ch! w #f)
(when (coroutine-object-worker w)
(kill-thread (coroutine-object-worker w))
(set-coroutine-object-worker! w #f)))
(define (coroutine? x)
(coroutine-object? x))
(provide coroutine?)
(provide/contract
(coroutine (((any/c . -> . any) . -> . any) . -> . coroutine?))
(coroutine-run (real? coroutine? . -> . boolean?))
(coroutine-result (coroutine? . -> . any))
(coroutine-kill (coroutine? . -> . any))))

View File

@ -52,3 +52,46 @@
(arity-test consumer-thread 1 2)
(err/rt-test (consumer-thread 9))
(arity-test g 2 3)
;; coroutines ----------------------------------------
(define cntr 0)
(define w (coroutine (lambda (enable-stop)
(let loop ((i 0))
(enable-stop #f)
(set! cntr i)
(enable-stop #t)
(loop (add1 i))))))
(test #t coroutine? w)
(test #f coroutine-result w)
(test #f coroutine-run 0.1 w)
(test #t positive? cntr)
(test (void) coroutine-kill w)
(test #t coroutine-run 100 w)
(define w2 (coroutine (lambda (enable-stop)
(let loop ((i 100))
(cond
((< i 0) 13)
(else
(enable-stop #f)
(set! cntr i)
(enable-stop #t)
(loop (sub1 i))))))))
(test #t coroutine-run 0.1 w2)
(test 13 coroutine-result w2)
(test #t coroutine-run 100 w2)
(define w3 (coroutine (lambda (enable-stop)
(raise 14))))
(err/rt-test (coroutine-run 0.1 w3) (lambda (x) (eq? x 14)))
(test #f coroutine-result w3)
(test #t coroutine-run 100 w3)
(define w4 (coroutine (lambda (enable-stop)
(enable-stop #f)
(raise 15))))
(test #f coroutine-result w4)
(err/rt-test (coroutine-run 0.1 w4) (lambda (x) (eq? x 15)))
(test #t coroutine-run 100 w4)