new kinds of promises

svn: r16807
This commit is contained in:
Eli Barzilay 2009-11-16 11:06:47 +00:00
parent 709b588410
commit b1f5b0652c
3 changed files with 355 additions and 12 deletions

View File

@ -253,6 +253,181 @@
(running? (pref promise))
(raise-type-error 'promise-running? "promise" promise)))
;; ----------------------------------------------------------------------------
;; More delay-like values, with different ways of deferring computations
(define-struct (promise/name promise) ()
#:property prop:force (lambda (p) ((pref p))))
(#%provide (rename delay/name* delay/name))
(define delay/name make-promise/name)
(define-syntax (delay/name* stx) (make-delayer stx #'delay/name '()))
;; utility struct
(define-struct (running-thread running) (thread))
;; used in promise/sync until it's forced
(define-struct syncinfo ([thunk #:mutable] done-evt done-sema access-sema))
(define-struct (promise/sync promise) ()
#:property prop:custom-write
(lambda (p port write?)
(promise-printer
(let ([v (pref p)])
(if (syncinfo? v) (make-promise (syncinfo-thunk v)) p))
port write?))
#:property prop:force
(lambda (p)
(reify-result
(let ([v (pref p)])
(cond
;; already forced
[(not (syncinfo? v)) v]
;; being forced...
[(running-thread? (syncinfo-thunk v))
(let ([r (syncinfo-thunk v)])
(if (eq? (running-thread-thread r) (current-thread))
;; ... by the current thread => throw the usual reentrant error
(r)
;; ... by a different thread => just wait for it
(begin (sync (syncinfo-done-evt v)) (pref p))))]
[else
;; wasn't forced yet: try to do it now
(call-with-semaphore (syncinfo-access-sema v)
(lambda ()
(let ([thunk (syncinfo-thunk v)] [done (syncinfo-done-sema v)])
;; set the thread last
(set-syncinfo-thunk!
v (make-running-thread (object-name thunk) (current-thread)))
(call-with-exception-handler
(lambda (e)
(pset! p (make-reraise e))
(semaphore-post done)
e)
(lambda ()
(pset! p (call-with-values thunk list))
(semaphore-post done))))))
;; whether it was this thread that forced it or not, the results are
;; now in
(pref p)]))))
#:property prop:evt
(lambda (p)
(let ([v (pref p)])
(handle-evt (if (syncinfo? v) (syncinfo-done-evt v) always-evt) void))))
(#%provide (rename delay/sync* delay/sync))
(define (delay/sync thunk)
(let ([done-sema (make-semaphore 0)])
(make-promise/sync (make-syncinfo thunk
(semaphore-peek-evt done-sema) done-sema
(make-semaphore 1)))))
(define-syntax (delay/sync* stx) (make-delayer stx #'delay/sync '()))
;; threaded promises
(define-struct (promise/thread promise) ()
#:property prop:force
(lambda (p)
(reify-result (let ([v (pref p)])
(if (running-thread? v)
(begin (thread-wait (running-thread-thread v))
(pref p))
v))))
#:property prop:evt
(lambda (p)
(let ([v (pref p)])
(handle-evt (if (running? v) (running-thread-thread v) always-evt)
void))))
(#%provide (rename delay/thread* delay/thread))
(define (delay/thread thunk group)
(define (run)
(call-with-exception-handler
(lambda (e) (pset! p (make-reraise e)) (kill-thread (current-thread)))
(lambda () (pset! p (call-with-values thunk list)))))
(define p
(make-promise/thread
(make-running-thread
(object-name thunk)
(if group
(parameterize ([current-thread-group (make-thread-group)]) (thread run))
(thread run)))))
p)
(define-syntax delay/thread*
(let-values ([(kwds) (list (cons '#:group #'#t))])
(lambda (stx) (make-delayer stx #'delay/thread kwds))))
(define-struct (promise/idle promise/thread) ()
#:property prop:force
(lambda (p)
(reify-result (let ([v (pref p)])
(if (procedure? v)
;; either running-thread, or returns the controller
(let ([controller (if (running-thread? v)
(running-thread-thread v)
(v))])
(thread-send controller 'force!)
(thread-wait controller)
(pref p))
v)))))
(#%provide (rename delay/idle* delay/idle))
(define (delay/idle thunk wait-for work-while tick use*)
(define use (cond [(use* . <= . 0) 0] [(use* . >= . 1) 1] [else use*]))
(define work-time (* tick use))
(define rest-time (- tick work-time))
(define (work)
(call-with-exception-handler
(lambda (e) (pset! p (make-reraise e)) (kill-thread (current-thread)))
(lambda () (pset! p (call-with-values thunk list)))))
(define (run)
;; this thread is dedicated to controlling the worker thread, so it's
;; possible to dedicate messages to signaling a `force'.
(define force-evt (thread-receive-evt))
(sync wait-for force-evt)
(pset! p (make-running-thread (object-name thunk) controller-thread))
(let ([worker (parameterize ([current-thread-group (make-thread-group)])
(thread work))])
(cond
[(and (use . >= . 1) (equal? work-while always-evt))
;; as if it was pre-forced
(thread-wait worker)]
[(use . <= . 0)
;; work only when explicitly forced
(thread-suspend worker)
(sync force-evt)
(thread-wait worker)]
[else
(thread-suspend worker)
(let loop ()
;; rest, then wait for idle time, then resume working
(if (eq? (begin0 (or (sync/timeout rest-time force-evt)
(sync work-while force-evt))
(thread-resume worker))
force-evt)
;; forced during one of these => let it run to completion
(thread-wait worker)
;; not forced
(unless (sync/timeout work-time worker)
(thread-suspend worker)
(loop))))])))
;; I don't think that a thread-group here is needed, but it doesn't hurt
(define controller-thread
(parameterize ([current-thread-group (make-thread-group)])
(thread run)))
;; the thunk is not really used in the above, make it a function that returns
;; the controller thread so it can be forced (used in the `prop:force')
(define p (make-promise/idle
(procedure-rename (lambda () controller-thread)
(or (object-name thunk) 'idle-thread))))
p)
(define-syntax delay/idle*
(let-values ([(kwds) (list (cons '#:wait-for #'(system-idle-evt))
(cons '#:work-while #'(system-idle-evt))
(cons '#:tick #'0.2)
(cons '#:use #'0.12))])
(lambda (stx) (make-delayer stx #'delay/idle kwds))))
)
#|

View File

@ -10,6 +10,9 @@ A @deftech{promise} encapsulates an expression to be evaluated on
demand via @scheme[force]. After a promise has been @scheme[force]d,
every later @scheme[force] of the promise produces the same result.
This module provides this functionality, and extends it to additional
kinds of promises with various evaluation strategies.
@defproc[(promise? [v any/c]) boolean?]{
@ -20,17 +23,26 @@ otherwise.}
@defform[(delay body ...+)]{
Creates a promise that, when @scheme[force]d, evaluates the
@scheme[body]s to produce its value.}
@scheme[body]s to produce its value. The result is then cached, so
further uses of @scheme[force] produce the cached value immediately.
This includes multiple values and exceptions.}
@defform[(lazy body ...+)]{
Like @scheme[delay], except that if the last @scheme[body] produces a
promise, then this promise is @scheme[force]d to obtain a value. In
other words, this form creates a kind of a composable promise, which
is mostly useful for implementing lazy libraries and languages. Also
note that the last @scheme[body] in this case is restricted to one
that produces a single value.}
Like @scheme[delay], if the last @scheme[body] produces a promise when
forced, then this promise is @scheme[force]d too to obtain a value.
In other words, this form creates a composable promise, where the
computation of its body is ``attached'' to the computation of the
following promise and a single @scheme[force] iterates through the
whole chain, tail-calling each step.
Note that the last @scheme[body] of this form must produce a single
value --- but this value can itself be a @scheme[delay] promise that
returns multiple values.
This form useful for implementing lazy libraries and languages, where
tail-calls can be wrapped in a promise.}
@defproc[(force [v any/c]) any]{
@ -45,6 +57,9 @@ the promise will raise the same exception every time.
If @scheme[v] is @scheme[force]d again before the original call to
@scheme[force] returns, then the @exnraise[exn:fail].
Additional kinds of promises are also forced via @scheme[force]. See
below for further details.
If @scheme[v] is not a promise, then it is returned as the result.}
@ -57,3 +72,77 @@ Returns @scheme[#t] if @scheme[promise] has been forced.}
Returns @scheme[#t] if @scheme[promise] is currently being forced.
(Note that a promise can be either running or forced but not both.)}
@section{Additional Promise Kinds}
@defform[(delay/name body ...+)]{
Creates a ``call by name'' promise, that is similar to
@scheme[delay]-promises, except that the resulting value is not
cached. It is essentially a thunk, wrapped in a way that
@scheme[force] recognizes. Note that if a @scheme[delay/name] promise
forces itself, no exception is raised.
@; TODO: clarify that the point is that code that is written using
@; `force', can be used with these promises too.
Note that this promise is never considered ``running'' or ``forced''
in the sense of @scheme[promise-running?] and
@scheme[promise-forced?].}
@defform[(delay/sync body ...+)]{
Conventional promises are not useful when multiple threads attempt to
force them: when a promise is running, any additional threads that
@scheme[force] it will get an exception. @scheme[delay/sync] is
useful for such cases: if a second thread attempts to @scheme[force]
such a promise, it will get blocked until the computation is done and
an answer is available. If @scheme[force] is used with the promise as
it is forced from the same thread, an exception is raised.
In addition, these promises can be used with @scheme[sync], which
blocks until it has been forced. Note that using @scheme[sync] this
way is passive in the sense that it does not trigger evaluation of the
promise.}
@defform[(delay/thread body ...+)]{
@; TODO: document #:group keyword
This kind of promise begins the computation immediately, but this
happens on a separate thread. When the computation is done, the result
is cached as usual. Note that exceptions are caught as usual, and will
only be raised when @scheme[force]d. If such a promise is
@scheme[force]d before a value is ready, the calling thread will be
blocked until the computation terminates. These promises can also be
used with @scheme[sync].}
@defform[(delay/idle body ...+)]{
@; TODO: document #:wait-for, #:work-while, #:tick, #:use keywords
Similar to @scheme[delay/thread], but the computation thread gets to
work only when the process is otherwise idle, as determined by
@scheme[system-idle-evt], and the work is done in small runtime
fragements, making it overall not raise total CPU use or hurt
responsiveness. If the promise is @scheme[forced] before the
computation is done, it will run the rest of the computation immediately
without slicing the runtime. Using @scheme[sync] on these promises
blocks as is the case with @scheme[delay/sync], and this happens in a
passive way too, so the computation continues to work in low-priority.
@;{
TODO: Say something on:
* `use' = 0 --> similar to a plain `delay' which is evaluated only when
forced (or delay/sync, since it's still sync-able), except that the
evaluation is still happening on a new thread.
* `use' = 1 --> given cpu time as usual, but still polls the idle event
every `tick' seconds
* `use' = 1 and both `wait-for' and `work-while' are `always-evt' -->
similar to `delay/thread'.
* can use `wait-for' to delay evaluation start until some event is
ready. Specifically, this can be done to chain a few of these
promises sequentially.
* same goes for `work-while'. For example, you can use that with a
`semaphore-peek-evt' to be able to pause/resume the computation on
demand.
;}
}

View File

@ -9,8 +9,17 @@
(for ([v (list (delay 1) (lazy 1) (delay (delay 1)) (lazy (lazy 1)))])
(test (promise? v) => #t)))
(define (test-syntax)
(test (delay) =error> "bad syntax"
(lazy) =error> "bad syntax"
(delay #:foo 1 2) =error> "bad syntax"
(force (delay/thread #:group #f)) =error> "bad syntax"
(force (delay/thread #:group #f 1)) => 1
(force (delay/thread 1 #:group #f 2)) => 2
(force (delay/thread #:groupie #f 1)) =error> "bad syntax"))
;; basic delay/lazy/force tests
(define (basic-promise-tests)
(define (test-basic-promises)
(define thunk1 (lambda () 1))
(define promise1 (delay 1))
(define ? #f)
@ -68,7 +77,7 @@
(t* (force (lazy (lazy (lazy (force (delay (delay ?))))))))
(t* (force (lazy (lazy (delay (force (lazy (delay ?)))))))))
(define (basic-promise-behavior-tests)
(define (test-basic-promise-behavior)
(define (force+catch p) (with-handlers ([exn? values]) (force p)))
;; results are cached
(let* ([c 0] [p (delay (set! c (add1 c)) c)])
@ -97,8 +106,78 @@
(force p) => '(#f #t)
(forced+running? p) => '(#t #f))))
(define (test-printout)
(letrec ([foo (delay (set! s (format "~a" foo)) 3)] [s #f])
(test (format "~a" foo) => "#<promise:foo>"
(force foo) => 3
s => "#<promise:!running!foo>"
(format "~a" foo) => "#<promise!3>"))
(let ([foo (delay (values 1 2 3))])
(test (format "~a" foo) => "#<promise:foo>"
(force foo) => (values 1 2 3)
(format "~a" foo) => "#<promise!(values 1 2 3)>"))
(let ([foo (delay (error "boom"))])
(test (format "~a" foo) => "#<promise:foo>"
(force foo) => (error "boom")
(format "~a" foo) => "#<promise!exn!boom>"
(format "~s" foo) => "#<promise!exn!\"boom\">"))
(let ([foo (delay (raise 3))])
(test (format "~a" foo) => "#<promise:foo>"
(force foo) => (raise 3)
(format "~a" foo) => "#<promise!raise!3>")))
(define (test-delay/name)
(let* ([x 1] [p (delay/name (set! x (add1 x)) x)])
(test (promise? p)
x => 1
(force p) => 2
x => 2
(format "~a" p) => "#<promise:p>"
(force p) => 3
x => 3)))
(define (test-delay/sync)
(letrec ([p (delay/sync (force p))])
(test (force p) =error> "reentrant"))
(let* ([ch (make-channel)]
[p (delay/sync (channel-get ch) (channel-get ch) 99)])
(test (format "~a" p) => "#<promise:p>")
(thread (lambda () (force p) (channel-get ch)))
(channel-put ch 'x)
(test (format "~a" p) => "#<promise:!running!p>")
(channel-put ch 'x)
(channel-put ch 'x)
(test (format "~a" p) => "#<promise!99>"
(force p) => 99)))
(define (test-delay/thread)
(define-syntax-rule (t delayer)
(begin (let* ([ch (make-channel)]
[p (delayer (channel-get ch) 99)])
(thread (lambda () (channel-put ch 'x)))
(test (force p) => 99))
(test (force (delayer (+ 1 "2"))) =error> "expects type")))
(t delay/sync)
(t delay/idle)
(let* ([ch (make-channel)] [p (delay/idle #:wait-for ch 99)])
(test (format "~a" p) => "#<promise:p>"
(force p) => 99
(format "~a" p) => "#<promise!99>"))
(let* ([ch (make-channel)]
[p (delay/idle #:wait-for ch (channel-get ch) 99)])
(channel-put ch 'x)
(test (format "~a" p) => "#<promise:!running!p>"
(channel-put ch 'x)
(force p) => 99
(format "~a" p) => "#<promise!99>")))
(provide promise-tests)
(define (promise-tests)
(test do (test-types)
do (basic-promise-tests)
do (basic-promise-behavior-tests)))
(test do (test-syntax)
do (test-types)
do (test-basic-promises)
do (test-basic-promise-behavior)
do (test-printout)
do (test-delay/name)
do (test-delay/sync)
do (test-delay/thread)))