diff --git a/collects/mzlib/thread.rkt b/collects/mzlib/thread.rkt index e024066..a870308 100644 --- a/collects/mzlib/thread.rkt +++ b/collects/mzlib/thread.rkt @@ -1,9 +1,15 @@ (module thread mzscheme - (require "kw.rkt" "contract.rkt") + (require "kw.rkt" "contract.rkt" racket/engine) (provide run-server - consumer-thread) + consumer-thread + + (rename engine? coroutine?) + (rename engine coroutine) + (rename engine-run coroutine-run) + (rename engine-result coroutine-result) + (rename engine-kill coroutine-kill)) #| t accepts a function, f, and creates a thread. It returns the thread and a @@ -127,108 +133,4 @@ (sync/timeout connection-timeout t) (custodian-shutdown-all c))))))))) (loop)))) - (lambda () (tcp-close l))))) - - ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - ;; Couroutine - ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - - ;; An X-coroutine-object is - ;; (make-coroutine-object thread semaphore channel channel X) - (define-struct coroutine-object (worker can-stop-lock done-ch ex-ch result)) - - ;; coroutine : ((bool ->) -> X) -> X-coroutine-object - (define (coroutine f) - ;;(printf "2. new coroutine\n") - (let* ([can-stop-lock (make-semaphore 1)] - [done-ch (make-channel)] - [ex-ch (make-channel)] - [proceed-sema (make-semaphore)] - [stop-enabled? #t] - [enable-stop - (lambda (enable?) - ;;(printf "3. enabling ~a\n" enable?) - (cond - [(and enable? (not stop-enabled?)) - (semaphore-post can-stop-lock) - (set! stop-enabled? #t)] - [(and (not enable?) stop-enabled?) - (semaphore-wait can-stop-lock) - (set! stop-enabled? #f)]) - ;;(printf "3. finished enabling\n") - )] - [tid (thread (lambda () - (semaphore-wait proceed-sema) - ;;(printf "3. creating coroutine thread\n") - (with-handlers ([(lambda (exn) #t) - (lambda (exn) - (enable-stop #t) - (channel-put ex-ch exn))]) - (let ([v (f enable-stop)]) - (enable-stop #t) - (channel-put done-ch v)))))]) - (begin0 (make-coroutine-object tid can-stop-lock done-ch ex-ch #f) - (thread-suspend tid) - (semaphore-post proceed-sema)))) - - ;; coroutine : real-number X-coroutine-object -> bool - (define (coroutine-run timeout w) - (if (coroutine-object-worker w) - (let ([can-stop-lock (coroutine-object-can-stop-lock w)] - [worker (coroutine-object-worker w)]) - #;(printf "2. starting coroutine\n") - (thread-resume worker) - (dynamic-wind - void - ;; Let the co-routine run... - (lambda () - (sync (choice-evt (wrap-evt (if (evt? timeout) - timeout - (alarm-evt (+ timeout (current-inexact-milliseconds)))) - (lambda (x) - #;(printf "2. alarm-evt\n") - (semaphore-wait can-stop-lock) - (thread-suspend worker) - (semaphore-post can-stop-lock) - #f)) - (wrap-evt (coroutine-object-done-ch w) - (lambda (res) - #;(printf "2. coroutine-done-evt\n") - (set-coroutine-object-result! w res) - (coroutine-kill w) - #t)) - (wrap-evt (coroutine-object-ex-ch w) - (lambda (exn) - #;(printf "2. ex-evt\n") - (coroutine-kill w) - (raise exn)))))) - ;; In case we escape through a break: - (lambda () - (when (thread-running? worker) - (semaphore-wait can-stop-lock) - (thread-suspend worker) - (semaphore-post can-stop-lock))))) - #t)) - - ;; coroutine-result : X-coroutine-object -> X - (define (coroutine-result w) - (coroutine-object-result w)) - - ;; coroutine-kill : X-coroutine-object -> - (define (coroutine-kill w) - (set-coroutine-object-can-stop-lock! w #f) - (set-coroutine-object-done-ch! w #f) - (set-coroutine-object-ex-ch! w #f) - (when (coroutine-object-worker w) - (kill-thread (coroutine-object-worker w)) - (set-coroutine-object-worker! w #f))) - - (define (coroutine? x) - (coroutine-object? x)) - - (provide coroutine?) - (provide/contract - (coroutine (((any/c . -> . any) . -> . any) . -> . coroutine?)) - (coroutine-run ((or/c evt? real?) coroutine? . -> . boolean?)) - (coroutine-result (coroutine? . -> . any)) - (coroutine-kill (coroutine? . -> . any)))) + (lambda () (tcp-close l))))))