Move engines from mzlib/thread to racket/engine
(they were previously called "coroutines" but the term "engine" is less ambiguous)
This commit is contained in:
parent
100212cd53
commit
2dcf060774
|
@ -1,65 +1,25 @@
|
||||||
#lang scribble/doc
|
#lang scribble/doc
|
||||||
@(require "common.rkt"
|
@(require "common.rkt"
|
||||||
(for-label mzlib/thread
|
(for-label mzlib/thread
|
||||||
|
racket/engine
|
||||||
scheme/contract
|
scheme/contract
|
||||||
scheme/tcp))
|
scheme/tcp))
|
||||||
|
|
||||||
@mzlib[#:mode title thread]
|
@mzlib[#:mode title thread]
|
||||||
|
|
||||||
@defproc[(coroutine [proc ((any/c . -> . void?) . -> . any/c)])
|
@deprecated[@racketmodname[racket/engine]]{}
|
||||||
coroutine?]{
|
|
||||||
|
|
||||||
Returns a coroutine object to encapsulate a thread that runs only when
|
Re-exports the bindings from @racketmodname[racket/engine] under
|
||||||
allowed. The @racket[proc] procedure should accept one argument, and
|
different names and also provides two extra bindings. The renamings
|
||||||
@racket[proc] is run in the coroutine thread when
|
are:
|
||||||
@racket[coroutine-run] is called. If @racket[coroutine-run] returns
|
|
||||||
due to a timeout, then the coroutine thread is suspended until a
|
|
||||||
future call to @racket[coroutine-run]. Thus, @racket[proc] only
|
|
||||||
executes during the dynamic extent of a @racket[coroutine-run] call.
|
|
||||||
|
|
||||||
The argument to @racket[proc] is a procedure that takes a boolean, and
|
|
||||||
it can be used to disable suspends (in case @racket[proc] has critical
|
|
||||||
regions where it should not be suspended). A true value passed to the
|
|
||||||
procedure enables suspends, and @racket[#f] disables
|
|
||||||
suspends. Initially, suspends are allowed.}
|
|
||||||
|
|
||||||
|
|
||||||
@defproc[(coroutine? [v any/c]) any]{
|
|
||||||
|
|
||||||
Returns @racket[#t] if @racket[v] is a coroutine produced by
|
|
||||||
@racket[coroutine], @racket[#f] otherwise.}
|
|
||||||
|
|
||||||
|
|
||||||
@defproc[(coroutine-run [until (or/c evt? real?)][coroutine coroutine?])
|
|
||||||
boolean?]{
|
|
||||||
|
|
||||||
Allows the thread associated with @racket[coroutine] to execute for up
|
|
||||||
as long as @racket[until] milliseconds (of @racket[until] is a real
|
|
||||||
number) or @racket[until] is ready (if @racket[until] is an event). If
|
|
||||||
@racket[coroutine]'s procedure disables suspends, then the coroutine
|
|
||||||
can run arbitrarily long until it re-enables suspends.
|
|
||||||
|
|
||||||
The @racket[coroutine-run] procedure returns @racket[#t] if
|
|
||||||
@racket[coroutine]'s procedure completes (or if it completed earlier),
|
|
||||||
and the result is available via @racket[coroutine-result]. The
|
|
||||||
@racket[coroutine-run] procedure returns @racket[#f] if
|
|
||||||
@racket[coroutine]'s procedure does not complete before it is
|
|
||||||
suspended after @racket[timeout-secs]. If @racket[coroutine]'s
|
|
||||||
procedure raises an exception, then it is re-raised by
|
|
||||||
@racket[coroutine-run].}
|
|
||||||
|
|
||||||
|
|
||||||
@defproc[(coroutine-result [coroutine coroutine]) any]{
|
|
||||||
|
|
||||||
Returns the result for @racket[coroutine] if it has completed with a
|
|
||||||
value (as opposed to an exception), @racket[#f] otherwise.}
|
|
||||||
|
|
||||||
|
|
||||||
@defproc[(coroutine-kill [coroutine coroutine?]) void?]{
|
|
||||||
|
|
||||||
Forcibly terminates the thread associated with @racket[coroutine] if
|
|
||||||
it is still running, leaving the coroutine result unchanged.}
|
|
||||||
|
|
||||||
|
@itemlist[
|
||||||
|
@item{@racket[engine] as @racket[coroutine]}
|
||||||
|
@item{@racket[engine?] as @racket[coroutine?]}
|
||||||
|
@item{@racket[engine-run] as @racket[coroutine-run]}
|
||||||
|
@item{@racket[engine-result] as @racket[coroutine-result]}
|
||||||
|
@item{@racket[engine-kill] as @racket[coroutine-kill]}
|
||||||
|
]
|
||||||
|
|
||||||
@defproc[(consumer-thread [f procedure?][init (-> any) void])
|
@defproc[(consumer-thread [f procedure?][init (-> any) void])
|
||||||
(values thread? procedure?)]{
|
(values thread? procedure?)]{
|
||||||
|
|
|
@ -1,9 +1,15 @@
|
||||||
|
|
||||||
(module thread mzscheme
|
(module thread mzscheme
|
||||||
(require "kw.rkt" "contract.rkt")
|
(require "kw.rkt" "contract.rkt" racket/engine)
|
||||||
|
|
||||||
(provide run-server
|
(provide run-server
|
||||||
consumer-thread)
|
consumer-thread
|
||||||
|
|
||||||
|
(rename engine? coroutine?)
|
||||||
|
(rename engine coroutine)
|
||||||
|
(rename engine-run coroutine-run)
|
||||||
|
(rename engine-result coroutine-result)
|
||||||
|
(rename engine-kill coroutine-kill))
|
||||||
|
|
||||||
#|
|
#|
|
||||||
t accepts a function, f, and creates a thread. It returns the thread and a
|
t accepts a function, f, and creates a thread. It returns the thread and a
|
||||||
|
@ -127,108 +133,4 @@
|
||||||
(sync/timeout connection-timeout t)
|
(sync/timeout connection-timeout t)
|
||||||
(custodian-shutdown-all c)))))))))
|
(custodian-shutdown-all c)))))))))
|
||||||
(loop))))
|
(loop))))
|
||||||
(lambda () (tcp-close l)))))
|
(lambda () (tcp-close l))))))
|
||||||
|
|
||||||
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; Couroutine
|
|
||||||
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
;; An X-coroutine-object is
|
|
||||||
;; (make-coroutine-object thread semaphore channel channel X)
|
|
||||||
(define-struct coroutine-object (worker can-stop-lock done-ch ex-ch result))
|
|
||||||
|
|
||||||
;; coroutine : ((bool ->) -> X) -> X-coroutine-object
|
|
||||||
(define (coroutine f)
|
|
||||||
;;(printf "2. new coroutine\n")
|
|
||||||
(let* ([can-stop-lock (make-semaphore 1)]
|
|
||||||
[done-ch (make-channel)]
|
|
||||||
[ex-ch (make-channel)]
|
|
||||||
[proceed-sema (make-semaphore)]
|
|
||||||
[stop-enabled? #t]
|
|
||||||
[enable-stop
|
|
||||||
(lambda (enable?)
|
|
||||||
;;(printf "3. enabling ~a\n" enable?)
|
|
||||||
(cond
|
|
||||||
[(and enable? (not stop-enabled?))
|
|
||||||
(semaphore-post can-stop-lock)
|
|
||||||
(set! stop-enabled? #t)]
|
|
||||||
[(and (not enable?) stop-enabled?)
|
|
||||||
(semaphore-wait can-stop-lock)
|
|
||||||
(set! stop-enabled? #f)])
|
|
||||||
;;(printf "3. finished enabling\n")
|
|
||||||
)]
|
|
||||||
[tid (thread (lambda ()
|
|
||||||
(semaphore-wait proceed-sema)
|
|
||||||
;;(printf "3. creating coroutine thread\n")
|
|
||||||
(with-handlers ([(lambda (exn) #t)
|
|
||||||
(lambda (exn)
|
|
||||||
(enable-stop #t)
|
|
||||||
(channel-put ex-ch exn))])
|
|
||||||
(let ([v (f enable-stop)])
|
|
||||||
(enable-stop #t)
|
|
||||||
(channel-put done-ch v)))))])
|
|
||||||
(begin0 (make-coroutine-object tid can-stop-lock done-ch ex-ch #f)
|
|
||||||
(thread-suspend tid)
|
|
||||||
(semaphore-post proceed-sema))))
|
|
||||||
|
|
||||||
;; coroutine : real-number X-coroutine-object -> bool
|
|
||||||
(define (coroutine-run timeout w)
|
|
||||||
(if (coroutine-object-worker w)
|
|
||||||
(let ([can-stop-lock (coroutine-object-can-stop-lock w)]
|
|
||||||
[worker (coroutine-object-worker w)])
|
|
||||||
#;(printf "2. starting coroutine\n")
|
|
||||||
(thread-resume worker)
|
|
||||||
(dynamic-wind
|
|
||||||
void
|
|
||||||
;; Let the co-routine run...
|
|
||||||
(lambda ()
|
|
||||||
(sync (choice-evt (wrap-evt (if (evt? timeout)
|
|
||||||
timeout
|
|
||||||
(alarm-evt (+ timeout (current-inexact-milliseconds))))
|
|
||||||
(lambda (x)
|
|
||||||
#;(printf "2. alarm-evt\n")
|
|
||||||
(semaphore-wait can-stop-lock)
|
|
||||||
(thread-suspend worker)
|
|
||||||
(semaphore-post can-stop-lock)
|
|
||||||
#f))
|
|
||||||
(wrap-evt (coroutine-object-done-ch w)
|
|
||||||
(lambda (res)
|
|
||||||
#;(printf "2. coroutine-done-evt\n")
|
|
||||||
(set-coroutine-object-result! w res)
|
|
||||||
(coroutine-kill w)
|
|
||||||
#t))
|
|
||||||
(wrap-evt (coroutine-object-ex-ch w)
|
|
||||||
(lambda (exn)
|
|
||||||
#;(printf "2. ex-evt\n")
|
|
||||||
(coroutine-kill w)
|
|
||||||
(raise exn))))))
|
|
||||||
;; In case we escape through a break:
|
|
||||||
(lambda ()
|
|
||||||
(when (thread-running? worker)
|
|
||||||
(semaphore-wait can-stop-lock)
|
|
||||||
(thread-suspend worker)
|
|
||||||
(semaphore-post can-stop-lock)))))
|
|
||||||
#t))
|
|
||||||
|
|
||||||
;; coroutine-result : X-coroutine-object -> X
|
|
||||||
(define (coroutine-result w)
|
|
||||||
(coroutine-object-result w))
|
|
||||||
|
|
||||||
;; coroutine-kill : X-coroutine-object ->
|
|
||||||
(define (coroutine-kill w)
|
|
||||||
(set-coroutine-object-can-stop-lock! w #f)
|
|
||||||
(set-coroutine-object-done-ch! w #f)
|
|
||||||
(set-coroutine-object-ex-ch! w #f)
|
|
||||||
(when (coroutine-object-worker w)
|
|
||||||
(kill-thread (coroutine-object-worker w))
|
|
||||||
(set-coroutine-object-worker! w #f)))
|
|
||||||
|
|
||||||
(define (coroutine? x)
|
|
||||||
(coroutine-object? x))
|
|
||||||
|
|
||||||
(provide coroutine?)
|
|
||||||
(provide/contract
|
|
||||||
(coroutine (((any/c . -> . any) . -> . any) . -> . coroutine?))
|
|
||||||
(coroutine-run ((or/c evt? real?) coroutine? . -> . boolean?))
|
|
||||||
(coroutine-result (coroutine? . -> . any))
|
|
||||||
(coroutine-kill (coroutine? . -> . any))))
|
|
||||||
|
|
106
collects/racket/engine.rkt
Normal file
106
collects/racket/engine.rkt
Normal file
|
@ -0,0 +1,106 @@
|
||||||
|
#lang racket/base
|
||||||
|
|
||||||
|
;; Library for engines: preemptable processes
|
||||||
|
|
||||||
|
(require racket/contract/base)
|
||||||
|
|
||||||
|
(provide
|
||||||
|
engine?
|
||||||
|
(contract-out (engine (((any/c . -> . any) . -> . any) . -> . engine?))
|
||||||
|
(engine-run ((or/c evt? real?) engine? . -> . boolean?))
|
||||||
|
(engine-result (engine? . -> . any))
|
||||||
|
(engine-kill (engine? . -> . any))))
|
||||||
|
|
||||||
|
;; An X-engine-object is
|
||||||
|
;; (make-engine-object thread semaphore channel channel X)
|
||||||
|
(define-struct engine-object (worker can-stop-lock done-ch ex-ch result)
|
||||||
|
#:mutable)
|
||||||
|
|
||||||
|
;; engine : ((bool ->) -> X) -> X-engine-object
|
||||||
|
(define (engine f)
|
||||||
|
;;(printf "2. new engine\n")
|
||||||
|
(let* ([can-stop-lock (make-semaphore 1)]
|
||||||
|
[done-ch (make-channel)]
|
||||||
|
[ex-ch (make-channel)]
|
||||||
|
[proceed-sema (make-semaphore)]
|
||||||
|
[stop-enabled? #t]
|
||||||
|
[enable-stop
|
||||||
|
(lambda (enable?)
|
||||||
|
;;(printf "3. enabling ~a\n" enable?)
|
||||||
|
(cond
|
||||||
|
[(and enable? (not stop-enabled?))
|
||||||
|
(semaphore-post can-stop-lock)
|
||||||
|
(set! stop-enabled? #t)]
|
||||||
|
[(and (not enable?) stop-enabled?)
|
||||||
|
(semaphore-wait can-stop-lock)
|
||||||
|
(set! stop-enabled? #f)])
|
||||||
|
;;(printf "3. finished enabling\n")
|
||||||
|
)]
|
||||||
|
[tid (thread (lambda ()
|
||||||
|
(semaphore-wait proceed-sema)
|
||||||
|
;;(printf "3. creating engine thread\n")
|
||||||
|
(with-handlers ([(lambda (exn) #t)
|
||||||
|
(lambda (exn)
|
||||||
|
(enable-stop #t)
|
||||||
|
(channel-put ex-ch exn))])
|
||||||
|
(let ([v (f enable-stop)])
|
||||||
|
(enable-stop #t)
|
||||||
|
(channel-put done-ch v)))))])
|
||||||
|
(begin0 (make-engine-object tid can-stop-lock done-ch ex-ch #f)
|
||||||
|
(thread-suspend tid)
|
||||||
|
(semaphore-post proceed-sema))))
|
||||||
|
|
||||||
|
;; engine : real-number X-engine-object -> bool
|
||||||
|
(define (engine-run timeout w)
|
||||||
|
(if (engine-object-worker w)
|
||||||
|
(let ([can-stop-lock (engine-object-can-stop-lock w)]
|
||||||
|
[worker (engine-object-worker w)])
|
||||||
|
#;(printf "2. starting engine\n")
|
||||||
|
(thread-resume worker)
|
||||||
|
(dynamic-wind
|
||||||
|
void
|
||||||
|
;; Let the co-routine run...
|
||||||
|
(lambda ()
|
||||||
|
(sync (choice-evt (wrap-evt (if (evt? timeout)
|
||||||
|
timeout
|
||||||
|
(alarm-evt (+ timeout (current-inexact-milliseconds))))
|
||||||
|
(lambda (x)
|
||||||
|
#;(printf "2. alarm-evt\n")
|
||||||
|
(semaphore-wait can-stop-lock)
|
||||||
|
(thread-suspend worker)
|
||||||
|
(semaphore-post can-stop-lock)
|
||||||
|
#f))
|
||||||
|
(wrap-evt (engine-object-done-ch w)
|
||||||
|
(lambda (res)
|
||||||
|
#;(printf "2. engine-done-evt\n")
|
||||||
|
(set-engine-object-result! w res)
|
||||||
|
(engine-kill w)
|
||||||
|
#t))
|
||||||
|
(wrap-evt (engine-object-ex-ch w)
|
||||||
|
(lambda (exn)
|
||||||
|
#;(printf "2. ex-evt\n")
|
||||||
|
(engine-kill w)
|
||||||
|
(raise exn))))))
|
||||||
|
;; In case we escape through a break:
|
||||||
|
(lambda ()
|
||||||
|
(when (thread-running? worker)
|
||||||
|
(semaphore-wait can-stop-lock)
|
||||||
|
(thread-suspend worker)
|
||||||
|
(semaphore-post can-stop-lock)))))
|
||||||
|
#t))
|
||||||
|
|
||||||
|
;; engine-result : X-engine-object -> X
|
||||||
|
(define (engine-result w)
|
||||||
|
(engine-object-result w))
|
||||||
|
|
||||||
|
;; engine-kill : X-engine-object ->
|
||||||
|
(define (engine-kill w)
|
||||||
|
(set-engine-object-can-stop-lock! w #f)
|
||||||
|
(set-engine-object-done-ch! w #f)
|
||||||
|
(set-engine-object-ex-ch! w #f)
|
||||||
|
(when (engine-object-worker w)
|
||||||
|
(kill-thread (engine-object-worker w))
|
||||||
|
(set-engine-object-worker! w #f)))
|
||||||
|
|
||||||
|
(define (engine? x)
|
||||||
|
(engine-object? x))
|
|
@ -21,3 +21,4 @@ support for parallelism to improve performance.
|
||||||
@include-section["futures-trace.scrbl"]
|
@include-section["futures-trace.scrbl"]
|
||||||
@include-section["places.scrbl"]
|
@include-section["places.scrbl"]
|
||||||
@include-section["distributed.scrbl"]
|
@include-section["distributed.scrbl"]
|
||||||
|
@include-section["engine.scrbl"]
|
||||||
|
|
65
collects/scribblings/reference/engine.scrbl
Normal file
65
collects/scribblings/reference/engine.scrbl
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
#lang scribble/doc
|
||||||
|
@(require "mz.rkt"
|
||||||
|
(for-label racket/engine))
|
||||||
|
|
||||||
|
@title[#:tag "engine"]{Engines}
|
||||||
|
|
||||||
|
@note-lib-only[racket/engine]
|
||||||
|
|
||||||
|
An @deftech{engine} is an abstraction that models processes that
|
||||||
|
can be preempted by a timer or other external trigger. They are
|
||||||
|
inspired by the work of Haynes and Friedman @cite["Haynes84"].
|
||||||
|
|
||||||
|
@defproc[(engine [proc ((any/c . -> . void?) . -> . any/c)])
|
||||||
|
engine?]{
|
||||||
|
|
||||||
|
Returns an engine object to encapsulate a thread that runs only when
|
||||||
|
allowed. The @racket[proc] procedure should accept one argument, and
|
||||||
|
@racket[proc] is run in the engine thread when
|
||||||
|
@racket[engine-run] is called. If @racket[engine-run] returns
|
||||||
|
due to a timeout, then the engine thread is suspended until a
|
||||||
|
future call to @racket[engine-run]. Thus, @racket[proc] only
|
||||||
|
executes during the dynamic extent of a @racket[engine-run] call.
|
||||||
|
|
||||||
|
The argument to @racket[proc] is a procedure that takes a boolean, and
|
||||||
|
it can be used to disable suspends (in case @racket[proc] has critical
|
||||||
|
regions where it should not be suspended). A true value passed to the
|
||||||
|
procedure enables suspends, and @racket[#f] disables
|
||||||
|
suspends. Initially, suspends are allowed.}
|
||||||
|
|
||||||
|
|
||||||
|
@defproc[(engine? [v any/c]) any]{
|
||||||
|
|
||||||
|
Returns @racket[#t] if @racket[v] is an engine produced by
|
||||||
|
@racket[engine], @racket[#f] otherwise.}
|
||||||
|
|
||||||
|
|
||||||
|
@defproc[(engine-run [until (or/c evt? real?)][engine engine?])
|
||||||
|
boolean?]{
|
||||||
|
|
||||||
|
Allows the thread associated with @racket[engine] to execute for up
|
||||||
|
as long as @racket[until] milliseconds (of @racket[until] is a real
|
||||||
|
number) or @racket[until] is ready (if @racket[until] is an event). If
|
||||||
|
@racket[engine]'s procedure disables suspends, then the engine
|
||||||
|
can run arbitrarily long until it re-enables suspends.
|
||||||
|
|
||||||
|
The @racket[engine-run] procedure returns @racket[#t] if
|
||||||
|
@racket[engine]'s procedure completes (or if it completed earlier),
|
||||||
|
and the result is available via @racket[engine-result]. The
|
||||||
|
@racket[engine-run] procedure returns @racket[#f] if
|
||||||
|
@racket[engine]'s procedure does not complete before it is
|
||||||
|
suspended after @racket[timeout-secs]. If @racket[engine]'s
|
||||||
|
procedure raises an exception, then it is re-raised by
|
||||||
|
@racket[engine-run].}
|
||||||
|
|
||||||
|
|
||||||
|
@defproc[(engine-result [engine engine]) any]{
|
||||||
|
|
||||||
|
Returns the result for @racket[engine] if it has completed with a
|
||||||
|
value (as opposed to an exception), @racket[#f] otherwise.}
|
||||||
|
|
||||||
|
|
||||||
|
@defproc[(engine-kill [engine engine?]) void?]{
|
||||||
|
|
||||||
|
Forcibly terminates the thread associated with @racket[engine] if
|
||||||
|
it is still running, leaving the engine result unchanged.}
|
|
@ -116,6 +116,12 @@ The @racketmodname[racket] library combines
|
||||||
#:location "Functional Programming Languages and Computer Architecture"
|
#:location "Functional Programming Languages and Computer Architecture"
|
||||||
#:date "1995")
|
#:date "1995")
|
||||||
|
|
||||||
|
(bib-entry #:key "Haynes84"
|
||||||
|
#:author "Christopher T. Haynes and Daniel P. Friedman"
|
||||||
|
#:title "Engines Build Process Abstractions"
|
||||||
|
#:location "Symposium on LISP and Functional Programming"
|
||||||
|
#:date "1984")
|
||||||
|
|
||||||
(bib-entry #:key "Hayes97"
|
(bib-entry #:key "Hayes97"
|
||||||
#:author "Barry Hayes"
|
#:author "Barry Hayes"
|
||||||
#:title "Ephemerons: a New Finalization Mechanism"
|
#:title "Ephemerons: a New Finalization Mechanism"
|
||||||
|
|
Loading…
Reference in New Issue
Block a user