From 2dcf06077461175936616f21737fbed7b0c27eb0 Mon Sep 17 00:00:00 2001 From: Asumu Takikawa Date: Thu, 26 Jul 2012 01:32:31 -0400 Subject: [PATCH] Move engines from mzlib/thread to racket/engine (they were previously called "coroutines" but the term "engine" is less ambiguous) --- collects/mzlib/scribblings/thread.scrbl | 64 ++-------- collects/mzlib/thread.rkt | 116 ++---------------- collects/racket/engine.rkt | 106 ++++++++++++++++ .../scribblings/reference/concurrency.scrbl | 1 + collects/scribblings/reference/engine.scrbl | 65 ++++++++++ .../scribblings/reference/reference.scrbl | 6 + 6 files changed, 199 insertions(+), 159 deletions(-) create mode 100644 collects/racket/engine.rkt create mode 100644 collects/scribblings/reference/engine.scrbl diff --git a/collects/mzlib/scribblings/thread.scrbl b/collects/mzlib/scribblings/thread.scrbl index 8efa0a9668..8ebd38bee2 100644 --- a/collects/mzlib/scribblings/thread.scrbl +++ b/collects/mzlib/scribblings/thread.scrbl @@ -1,65 +1,25 @@ #lang scribble/doc @(require "common.rkt" (for-label mzlib/thread + racket/engine scheme/contract scheme/tcp)) @mzlib[#:mode title thread] -@defproc[(coroutine [proc ((any/c . -> . void?) . -> . any/c)]) - coroutine?]{ +@deprecated[@racketmodname[racket/engine]]{} -Returns a coroutine object to encapsulate a thread that runs only when -allowed. The @racket[proc] procedure should accept one argument, and -@racket[proc] is run in the coroutine thread when -@racket[coroutine-run] is called. If @racket[coroutine-run] returns -due to a timeout, then the coroutine thread is suspended until a -future call to @racket[coroutine-run]. Thus, @racket[proc] only -executes during the dynamic extent of a @racket[coroutine-run] call. - -The argument to @racket[proc] is a procedure that takes a boolean, and -it can be used to disable suspends (in case @racket[proc] has critical -regions where it should not be suspended). A true value passed to the -procedure enables suspends, and @racket[#f] disables -suspends. Initially, suspends are allowed.} - - -@defproc[(coroutine? [v any/c]) any]{ - -Returns @racket[#t] if @racket[v] is a coroutine produced by -@racket[coroutine], @racket[#f] otherwise.} - - -@defproc[(coroutine-run [until (or/c evt? real?)][coroutine coroutine?]) - boolean?]{ - -Allows the thread associated with @racket[coroutine] to execute for up -as long as @racket[until] milliseconds (of @racket[until] is a real -number) or @racket[until] is ready (if @racket[until] is an event). If -@racket[coroutine]'s procedure disables suspends, then the coroutine -can run arbitrarily long until it re-enables suspends. - -The @racket[coroutine-run] procedure returns @racket[#t] if -@racket[coroutine]'s procedure completes (or if it completed earlier), -and the result is available via @racket[coroutine-result]. The -@racket[coroutine-run] procedure returns @racket[#f] if -@racket[coroutine]'s procedure does not complete before it is -suspended after @racket[timeout-secs]. If @racket[coroutine]'s -procedure raises an exception, then it is re-raised by -@racket[coroutine-run].} - - -@defproc[(coroutine-result [coroutine coroutine]) any]{ - -Returns the result for @racket[coroutine] if it has completed with a -value (as opposed to an exception), @racket[#f] otherwise.} - - -@defproc[(coroutine-kill [coroutine coroutine?]) void?]{ - -Forcibly terminates the thread associated with @racket[coroutine] if -it is still running, leaving the coroutine result unchanged.} +Re-exports the bindings from @racketmodname[racket/engine] under +different names and also provides two extra bindings. The renamings +are: +@itemlist[ + @item{@racket[engine] as @racket[coroutine]} + @item{@racket[engine?] as @racket[coroutine?]} + @item{@racket[engine-run] as @racket[coroutine-run]} + @item{@racket[engine-result] as @racket[coroutine-result]} + @item{@racket[engine-kill] as @racket[coroutine-kill]} +] @defproc[(consumer-thread [f procedure?][init (-> any) void]) (values thread? procedure?)]{ diff --git a/collects/mzlib/thread.rkt b/collects/mzlib/thread.rkt index e024066e14..a87030814a 100644 --- a/collects/mzlib/thread.rkt +++ b/collects/mzlib/thread.rkt @@ -1,9 +1,15 @@ (module thread mzscheme - (require "kw.rkt" "contract.rkt") + (require "kw.rkt" "contract.rkt" racket/engine) (provide run-server - consumer-thread) + consumer-thread + + (rename engine? coroutine?) + (rename engine coroutine) + (rename engine-run coroutine-run) + (rename engine-result coroutine-result) + (rename engine-kill coroutine-kill)) #| t accepts a function, f, and creates a thread. It returns the thread and a @@ -127,108 +133,4 @@ (sync/timeout connection-timeout t) (custodian-shutdown-all c))))))))) (loop)))) - (lambda () (tcp-close l))))) - - ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - ;; Couroutine - ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - - ;; An X-coroutine-object is - ;; (make-coroutine-object thread semaphore channel channel X) - (define-struct coroutine-object (worker can-stop-lock done-ch ex-ch result)) - - ;; coroutine : ((bool ->) -> X) -> X-coroutine-object - (define (coroutine f) - ;;(printf "2. new coroutine\n") - (let* ([can-stop-lock (make-semaphore 1)] - [done-ch (make-channel)] - [ex-ch (make-channel)] - [proceed-sema (make-semaphore)] - [stop-enabled? #t] - [enable-stop - (lambda (enable?) - ;;(printf "3. enabling ~a\n" enable?) - (cond - [(and enable? (not stop-enabled?)) - (semaphore-post can-stop-lock) - (set! stop-enabled? #t)] - [(and (not enable?) stop-enabled?) - (semaphore-wait can-stop-lock) - (set! stop-enabled? #f)]) - ;;(printf "3. finished enabling\n") - )] - [tid (thread (lambda () - (semaphore-wait proceed-sema) - ;;(printf "3. creating coroutine thread\n") - (with-handlers ([(lambda (exn) #t) - (lambda (exn) - (enable-stop #t) - (channel-put ex-ch exn))]) - (let ([v (f enable-stop)]) - (enable-stop #t) - (channel-put done-ch v)))))]) - (begin0 (make-coroutine-object tid can-stop-lock done-ch ex-ch #f) - (thread-suspend tid) - (semaphore-post proceed-sema)))) - - ;; coroutine : real-number X-coroutine-object -> bool - (define (coroutine-run timeout w) - (if (coroutine-object-worker w) - (let ([can-stop-lock (coroutine-object-can-stop-lock w)] - [worker (coroutine-object-worker w)]) - #;(printf "2. starting coroutine\n") - (thread-resume worker) - (dynamic-wind - void - ;; Let the co-routine run... - (lambda () - (sync (choice-evt (wrap-evt (if (evt? timeout) - timeout - (alarm-evt (+ timeout (current-inexact-milliseconds)))) - (lambda (x) - #;(printf "2. alarm-evt\n") - (semaphore-wait can-stop-lock) - (thread-suspend worker) - (semaphore-post can-stop-lock) - #f)) - (wrap-evt (coroutine-object-done-ch w) - (lambda (res) - #;(printf "2. coroutine-done-evt\n") - (set-coroutine-object-result! w res) - (coroutine-kill w) - #t)) - (wrap-evt (coroutine-object-ex-ch w) - (lambda (exn) - #;(printf "2. ex-evt\n") - (coroutine-kill w) - (raise exn)))))) - ;; In case we escape through a break: - (lambda () - (when (thread-running? worker) - (semaphore-wait can-stop-lock) - (thread-suspend worker) - (semaphore-post can-stop-lock))))) - #t)) - - ;; coroutine-result : X-coroutine-object -> X - (define (coroutine-result w) - (coroutine-object-result w)) - - ;; coroutine-kill : X-coroutine-object -> - (define (coroutine-kill w) - (set-coroutine-object-can-stop-lock! w #f) - (set-coroutine-object-done-ch! w #f) - (set-coroutine-object-ex-ch! w #f) - (when (coroutine-object-worker w) - (kill-thread (coroutine-object-worker w)) - (set-coroutine-object-worker! w #f))) - - (define (coroutine? x) - (coroutine-object? x)) - - (provide coroutine?) - (provide/contract - (coroutine (((any/c . -> . any) . -> . any) . -> . coroutine?)) - (coroutine-run ((or/c evt? real?) coroutine? . -> . boolean?)) - (coroutine-result (coroutine? . -> . any)) - (coroutine-kill (coroutine? . -> . any)))) + (lambda () (tcp-close l)))))) diff --git a/collects/racket/engine.rkt b/collects/racket/engine.rkt new file mode 100644 index 0000000000..8efdeed095 --- /dev/null +++ b/collects/racket/engine.rkt @@ -0,0 +1,106 @@ +#lang racket/base + +;; Library for engines: preemptable processes + +(require racket/contract/base) + +(provide + engine? + (contract-out (engine (((any/c . -> . any) . -> . any) . -> . engine?)) + (engine-run ((or/c evt? real?) engine? . -> . boolean?)) + (engine-result (engine? . -> . any)) + (engine-kill (engine? . -> . any)))) + +;; An X-engine-object is +;; (make-engine-object thread semaphore channel channel X) +(define-struct engine-object (worker can-stop-lock done-ch ex-ch result) + #:mutable) + +;; engine : ((bool ->) -> X) -> X-engine-object +(define (engine f) + ;;(printf "2. new engine\n") + (let* ([can-stop-lock (make-semaphore 1)] + [done-ch (make-channel)] + [ex-ch (make-channel)] + [proceed-sema (make-semaphore)] + [stop-enabled? #t] + [enable-stop + (lambda (enable?) + ;;(printf "3. enabling ~a\n" enable?) + (cond + [(and enable? (not stop-enabled?)) + (semaphore-post can-stop-lock) + (set! stop-enabled? #t)] + [(and (not enable?) stop-enabled?) + (semaphore-wait can-stop-lock) + (set! stop-enabled? #f)]) + ;;(printf "3. finished enabling\n") + )] + [tid (thread (lambda () + (semaphore-wait proceed-sema) + ;;(printf "3. creating engine thread\n") + (with-handlers ([(lambda (exn) #t) + (lambda (exn) + (enable-stop #t) + (channel-put ex-ch exn))]) + (let ([v (f enable-stop)]) + (enable-stop #t) + (channel-put done-ch v)))))]) + (begin0 (make-engine-object tid can-stop-lock done-ch ex-ch #f) + (thread-suspend tid) + (semaphore-post proceed-sema)))) + +;; engine : real-number X-engine-object -> bool +(define (engine-run timeout w) + (if (engine-object-worker w) + (let ([can-stop-lock (engine-object-can-stop-lock w)] + [worker (engine-object-worker w)]) + #;(printf "2. starting engine\n") + (thread-resume worker) + (dynamic-wind + void + ;; Let the co-routine run... + (lambda () + (sync (choice-evt (wrap-evt (if (evt? timeout) + timeout + (alarm-evt (+ timeout (current-inexact-milliseconds)))) + (lambda (x) + #;(printf "2. alarm-evt\n") + (semaphore-wait can-stop-lock) + (thread-suspend worker) + (semaphore-post can-stop-lock) + #f)) + (wrap-evt (engine-object-done-ch w) + (lambda (res) + #;(printf "2. engine-done-evt\n") + (set-engine-object-result! w res) + (engine-kill w) + #t)) + (wrap-evt (engine-object-ex-ch w) + (lambda (exn) + #;(printf "2. ex-evt\n") + (engine-kill w) + (raise exn)))))) + ;; In case we escape through a break: + (lambda () + (when (thread-running? worker) + (semaphore-wait can-stop-lock) + (thread-suspend worker) + (semaphore-post can-stop-lock))))) + #t)) + +;; engine-result : X-engine-object -> X +(define (engine-result w) + (engine-object-result w)) + +;; engine-kill : X-engine-object -> +(define (engine-kill w) + (set-engine-object-can-stop-lock! w #f) + (set-engine-object-done-ch! w #f) + (set-engine-object-ex-ch! w #f) + (when (engine-object-worker w) + (kill-thread (engine-object-worker w)) + (set-engine-object-worker! w #f))) + +(define (engine? x) + (engine-object? x)) diff --git a/collects/scribblings/reference/concurrency.scrbl b/collects/scribblings/reference/concurrency.scrbl index 5f9824a637..3dc3bca9fa 100644 --- a/collects/scribblings/reference/concurrency.scrbl +++ b/collects/scribblings/reference/concurrency.scrbl @@ -21,3 +21,4 @@ support for parallelism to improve performance. @include-section["futures-trace.scrbl"] @include-section["places.scrbl"] @include-section["distributed.scrbl"] +@include-section["engine.scrbl"] diff --git a/collects/scribblings/reference/engine.scrbl b/collects/scribblings/reference/engine.scrbl new file mode 100644 index 0000000000..a8cdd274d4 --- /dev/null +++ b/collects/scribblings/reference/engine.scrbl @@ -0,0 +1,65 @@ +#lang scribble/doc +@(require "mz.rkt" + (for-label racket/engine)) + +@title[#:tag "engine"]{Engines} + +@note-lib-only[racket/engine] + +An @deftech{engine} is an abstraction that models processes that +can be preempted by a timer or other external trigger. They are +inspired by the work of Haynes and Friedman @cite["Haynes84"]. + +@defproc[(engine [proc ((any/c . -> . void?) . -> . any/c)]) + engine?]{ + +Returns an engine object to encapsulate a thread that runs only when +allowed. The @racket[proc] procedure should accept one argument, and +@racket[proc] is run in the engine thread when +@racket[engine-run] is called. If @racket[engine-run] returns +due to a timeout, then the engine thread is suspended until a +future call to @racket[engine-run]. Thus, @racket[proc] only +executes during the dynamic extent of a @racket[engine-run] call. + +The argument to @racket[proc] is a procedure that takes a boolean, and +it can be used to disable suspends (in case @racket[proc] has critical +regions where it should not be suspended). A true value passed to the +procedure enables suspends, and @racket[#f] disables +suspends. Initially, suspends are allowed.} + + +@defproc[(engine? [v any/c]) any]{ + +Returns @racket[#t] if @racket[v] is an engine produced by +@racket[engine], @racket[#f] otherwise.} + + +@defproc[(engine-run [until (or/c evt? real?)][engine engine?]) + boolean?]{ + +Allows the thread associated with @racket[engine] to execute for up +as long as @racket[until] milliseconds (of @racket[until] is a real +number) or @racket[until] is ready (if @racket[until] is an event). If +@racket[engine]'s procedure disables suspends, then the engine +can run arbitrarily long until it re-enables suspends. + +The @racket[engine-run] procedure returns @racket[#t] if +@racket[engine]'s procedure completes (or if it completed earlier), +and the result is available via @racket[engine-result]. The +@racket[engine-run] procedure returns @racket[#f] if +@racket[engine]'s procedure does not complete before it is +suspended after @racket[timeout-secs]. If @racket[engine]'s +procedure raises an exception, then it is re-raised by +@racket[engine-run].} + + +@defproc[(engine-result [engine engine]) any]{ + +Returns the result for @racket[engine] if it has completed with a +value (as opposed to an exception), @racket[#f] otherwise.} + + +@defproc[(engine-kill [engine engine?]) void?]{ + +Forcibly terminates the thread associated with @racket[engine] if +it is still running, leaving the engine result unchanged.} diff --git a/collects/scribblings/reference/reference.scrbl b/collects/scribblings/reference/reference.scrbl index c2721746f9..828dda739f 100644 --- a/collects/scribblings/reference/reference.scrbl +++ b/collects/scribblings/reference/reference.scrbl @@ -115,6 +115,12 @@ The @racketmodname[racket] library combines #:title "A Generalization of Exceptions and Control in ML-like Languages" #:location "Functional Programming Languages and Computer Architecture" #:date "1995") + + (bib-entry #:key "Haynes84" + #:author "Christopher T. Haynes and Daniel P. Friedman" + #:title "Engines Build Process Abstractions" + #:location "Symposium on LISP and Functional Programming" + #:date "1984") (bib-entry #:key "Hayes97" #:author "Barry Hayes"