From 2cbc8b98d04298fdf2da81142c62dde9c3a6e2f9 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Tue, 30 Nov 2004 14:08:07 +0000 Subject: [PATCH] . original commit: 037b4b4df019e1b3e7a40c5bae722d1c34d1c552 --- collects/mzlib/thread.ss | 104 ++++++++++++++++++++++++++- collects/tests/mzscheme/threadlib.ss | 43 +++++++++++ 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/collects/mzlib/thread.ss b/collects/mzlib/thread.ss index 9755992..27566bb 100644 --- a/collects/mzlib/thread.ss +++ b/collects/mzlib/thread.ss @@ -1,7 +1,8 @@ (module thread mzscheme (require "spidey.ss" - "etc.ss") + "etc.ss" + "contract.ss") (provide run-server consumer-thread) @@ -100,5 +101,104 @@ (sync/timeout connection-timeout t) (custodian-shutdown-all c))))))))) (loop))) - (lambda () (tcp-close l))))))) + (lambda () (tcp-close l)))))) + + ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + ;; Couroutine + ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + + ;; An X-coroutine-object is + ;; (make-coroutine-object thread semaphore channel channel X) + (define-struct coroutine-object (worker can-stop-lock done-ch ex-ch result)) + + ;; coroutine : ((bool ->) -> X) -> X-coroutine-object + (define (coroutine f) + ;;(printf "2. new coroutine~n") + (let* ((can-stop-lock (make-semaphore 1)) + (done-ch (make-channel)) + (ex-ch (make-channel)) + (stop-enabled? #t) + (enable-stop + (lambda (enable?) + ;;(printf "3. enabling ~a~n" enable?) + (cond + ((and enable? (not stop-enabled?)) + (semaphore-post can-stop-lock) + (set! stop-enabled? #t)) + ((and (not enable?) stop-enabled?) + (semaphore-wait can-stop-lock) + (set! stop-enabled? #f))) + ;;(printf "3. finished enabling~n") + )) + (tid (thread (lambda () + ;;(printf "3. creating coroutine thread~n") + (with-handlers (((lambda (exn) #t) + (lambda (exn) + (channel-put ex-ch exn)))) + (let ([v (f enable-stop)]) + (enable-stop #t) + (channel-put done-ch v))))))) + (begin0 + (make-coroutine-object tid can-stop-lock done-ch ex-ch #f) + (thread-suspend tid)))) + + ;; coroutine : real-number X-coroutine-object -> bool + (define (coroutine-run timeout w) + (if (coroutine-object-worker w) + #;(printf "2. starting coroutine~n") + (let ((can-stop-lock (coroutine-object-can-stop-lock w)) + (worker (coroutine-object-worker w))) + (thread-resume worker) + (dynamic-wind + void + ;; Let the co-routine run... + (lambda () + (sync (choice-evt (wrap-evt (alarm-evt (+ timeout (current-inexact-milliseconds))) + (lambda (x) + #;(printf "2. alarm-evt~n") + (semaphore-wait can-stop-lock) + (thread-suspend worker) + (semaphore-post can-stop-lock) + #f)) + (wrap-evt (coroutine-object-done-ch w) + (lambda (res) + #;(printf "2. coroutine-done-evt~n") + (set-coroutine-object-result! w res) + (coroutine-kill w) + #t)) + (wrap-evt (coroutine-object-ex-ch w) + (lambda (exn) + #;(printf "2. ex-evt~n") + (coroutine-kill w) + (raise exn)))))) + ;; In case we escape through a break: + (lambda () + (when (thread-running? worker) + (semaphore-wait can-stop-lock) + (thread-suspend worker) + (semaphore-post can-stop-lock))))) + #t)) + + ;; coroutine-result : X-coroutine-object -> X + (define (coroutine-result w) + (coroutine-object-result w)) + + ;; coroutine-kill : X-coroutine-object -> + (define (coroutine-kill w) + (set-coroutine-object-can-stop-lock! w #f) + (set-coroutine-object-done-ch! w #f) + (set-coroutine-object-ex-ch! w #f) + (when (coroutine-object-worker w) + (kill-thread (coroutine-object-worker w)) + (set-coroutine-object-worker! w #f))) + + (define (coroutine? x) + (coroutine-object? x)) + + (provide coroutine?) + (provide/contract + (coroutine (((any/c . -> . any) . -> . any) . -> . coroutine?)) + (coroutine-run (real? coroutine? . -> . boolean?)) + (coroutine-result (coroutine? . -> . any)) + (coroutine-kill (coroutine? . -> . any)))) diff --git a/collects/tests/mzscheme/threadlib.ss b/collects/tests/mzscheme/threadlib.ss index 8a4fdf7..055dec4 100644 --- a/collects/tests/mzscheme/threadlib.ss +++ b/collects/tests/mzscheme/threadlib.ss @@ -52,3 +52,46 @@ (arity-test consumer-thread 1 2) (err/rt-test (consumer-thread 9)) (arity-test g 2 3) + + +;; coroutines ---------------------------------------- + +(define cntr 0) +(define w (coroutine (lambda (enable-stop) + (let loop ((i 0)) + (enable-stop #f) + (set! cntr i) + (enable-stop #t) + (loop (add1 i)))))) +(test #t coroutine? w) +(test #f coroutine-result w) +(test #f coroutine-run 0.1 w) +(test #t positive? cntr) +(test (void) coroutine-kill w) +(test #t coroutine-run 100 w) + +(define w2 (coroutine (lambda (enable-stop) + (let loop ((i 100)) + (cond + ((< i 0) 13) + (else + (enable-stop #f) + (set! cntr i) + (enable-stop #t) + (loop (sub1 i)))))))) +(test #t coroutine-run 0.1 w2) +(test 13 coroutine-result w2) +(test #t coroutine-run 100 w2) + +(define w3 (coroutine (lambda (enable-stop) + (raise 14)))) +(err/rt-test (coroutine-run 0.1 w3) (lambda (x) (eq? x 14))) +(test #f coroutine-result w3) +(test #t coroutine-run 100 w3) + +(define w4 (coroutine (lambda (enable-stop) + (enable-stop #f) + (raise 15)))) +(test #f coroutine-result w4) +(err/rt-test (coroutine-run 0.1 w4) (lambda (x) (eq? x 15))) +(test #t coroutine-run 100 w4)