From b1f5b0652c44a1722a5944da6c7c0cd27f4ed16d Mon Sep 17 00:00:00 2001 From: Eli Barzilay Date: Mon, 16 Nov 2009 11:06:47 +0000 Subject: [PATCH] new kinds of promises svn: r16807 --- collects/scheme/promise.ss | 175 +++++++++++++++++++ collects/scribblings/reference/promise.scrbl | 103 ++++++++++- collects/tests/lazy/promise.ss | 89 +++++++++- 3 files changed, 355 insertions(+), 12 deletions(-) diff --git a/collects/scheme/promise.ss b/collects/scheme/promise.ss index 4e437130f8..755ef5e516 100644 --- a/collects/scheme/promise.ss +++ b/collects/scheme/promise.ss @@ -253,6 +253,181 @@ (running? (pref promise)) (raise-type-error 'promise-running? "promise" promise))) +;; ---------------------------------------------------------------------------- +;; More delay-like values, with different ways of deferring computations + +(define-struct (promise/name promise) () + #:property prop:force (lambda (p) ((pref p)))) + +(#%provide (rename delay/name* delay/name)) +(define delay/name make-promise/name) +(define-syntax (delay/name* stx) (make-delayer stx #'delay/name '())) + +;; utility struct +(define-struct (running-thread running) (thread)) + +;; used in promise/sync until it's forced +(define-struct syncinfo ([thunk #:mutable] done-evt done-sema access-sema)) + +(define-struct (promise/sync promise) () + #:property prop:custom-write + (lambda (p port write?) + (promise-printer + (let ([v (pref p)]) + (if (syncinfo? v) (make-promise (syncinfo-thunk v)) p)) + port write?)) + #:property prop:force + (lambda (p) + (reify-result + (let ([v (pref p)]) + (cond + ;; already forced + [(not (syncinfo? v)) v] + ;; being forced... + [(running-thread? (syncinfo-thunk v)) + (let ([r (syncinfo-thunk v)]) + (if (eq? (running-thread-thread r) (current-thread)) + ;; ... by the current thread => throw the usual reentrant error + (r) + ;; ... by a different thread => just wait for it + (begin (sync (syncinfo-done-evt v)) (pref p))))] + [else + ;; wasn't forced yet: try to do it now + (call-with-semaphore (syncinfo-access-sema v) + (lambda () + (let ([thunk (syncinfo-thunk v)] [done (syncinfo-done-sema v)]) + ;; set the thread last + (set-syncinfo-thunk! + v (make-running-thread (object-name thunk) (current-thread))) + (call-with-exception-handler + (lambda (e) + (pset! p (make-reraise e)) + (semaphore-post done) + e) + (lambda () + (pset! p (call-with-values thunk list)) + (semaphore-post done)))))) + ;; whether it was this thread that forced it or not, the results are + ;; now in + (pref p)])))) + #:property prop:evt + (lambda (p) + (let ([v (pref p)]) + (handle-evt (if (syncinfo? v) (syncinfo-done-evt v) always-evt) void)))) + +(#%provide (rename delay/sync* delay/sync)) +(define (delay/sync thunk) + (let ([done-sema (make-semaphore 0)]) + (make-promise/sync (make-syncinfo thunk + (semaphore-peek-evt done-sema) done-sema + (make-semaphore 1))))) +(define-syntax (delay/sync* stx) (make-delayer stx #'delay/sync '())) + +;; threaded promises + +(define-struct (promise/thread promise) () + #:property prop:force + (lambda (p) + (reify-result (let ([v (pref p)]) + (if (running-thread? v) + (begin (thread-wait (running-thread-thread v)) + (pref p)) + v)))) + #:property prop:evt + (lambda (p) + (let ([v (pref p)]) + (handle-evt (if (running? v) (running-thread-thread v) always-evt) + void)))) + +(#%provide (rename delay/thread* delay/thread)) +(define (delay/thread thunk group) + (define (run) + (call-with-exception-handler + (lambda (e) (pset! p (make-reraise e)) (kill-thread (current-thread))) + (lambda () (pset! p (call-with-values thunk list))))) + (define p + (make-promise/thread + (make-running-thread + (object-name thunk) + (if group + (parameterize ([current-thread-group (make-thread-group)]) (thread run)) + (thread run))))) + p) +(define-syntax delay/thread* + (let-values ([(kwds) (list (cons '#:group #'#t))]) + (lambda (stx) (make-delayer stx #'delay/thread kwds)))) + +(define-struct (promise/idle promise/thread) () + #:property prop:force + (lambda (p) + (reify-result (let ([v (pref p)]) + (if (procedure? v) + ;; either running-thread, or returns the controller + (let ([controller (if (running-thread? v) + (running-thread-thread v) + (v))]) + (thread-send controller 'force!) + (thread-wait controller) + (pref p)) + v))))) + +(#%provide (rename delay/idle* delay/idle)) +(define (delay/idle thunk wait-for work-while tick use*) + (define use (cond [(use* . <= . 0) 0] [(use* . >= . 1) 1] [else use*])) + (define work-time (* tick use)) + (define rest-time (- tick work-time)) + (define (work) + (call-with-exception-handler + (lambda (e) (pset! p (make-reraise e)) (kill-thread (current-thread))) + (lambda () (pset! p (call-with-values thunk list))))) + (define (run) + ;; this thread is dedicated to controlling the worker thread, so it's + ;; possible to dedicate messages to signaling a `force'. + (define force-evt (thread-receive-evt)) + (sync wait-for force-evt) + (pset! p (make-running-thread (object-name thunk) controller-thread)) + (let ([worker (parameterize ([current-thread-group (make-thread-group)]) + (thread work))]) + (cond + [(and (use . >= . 1) (equal? work-while always-evt)) + ;; as if it was pre-forced + (thread-wait worker)] + [(use . <= . 0) + ;; work only when explicitly forced + (thread-suspend worker) + (sync force-evt) + (thread-wait worker)] + [else + (thread-suspend worker) + (let loop () + ;; rest, then wait for idle time, then resume working + (if (eq? (begin0 (or (sync/timeout rest-time force-evt) + (sync work-while force-evt)) + (thread-resume worker)) + force-evt) + ;; forced during one of these => let it run to completion + (thread-wait worker) + ;; not forced + (unless (sync/timeout work-time worker) + (thread-suspend worker) + (loop))))]))) + ;; I don't think that a thread-group here is needed, but it doesn't hurt + (define controller-thread + (parameterize ([current-thread-group (make-thread-group)]) + (thread run))) + ;; the thunk is not really used in the above, make it a function that returns + ;; the controller thread so it can be forced (used in the `prop:force') + (define p (make-promise/idle + (procedure-rename (lambda () controller-thread) + (or (object-name thunk) 'idle-thread)))) + p) +(define-syntax delay/idle* + (let-values ([(kwds) (list (cons '#:wait-for #'(system-idle-evt)) + (cons '#:work-while #'(system-idle-evt)) + (cons '#:tick #'0.2) + (cons '#:use #'0.12))]) + (lambda (stx) (make-delayer stx #'delay/idle kwds)))) + ) #| diff --git a/collects/scribblings/reference/promise.scrbl b/collects/scribblings/reference/promise.scrbl index d6ddd2dbf3..9864531d29 100644 --- a/collects/scribblings/reference/promise.scrbl +++ b/collects/scribblings/reference/promise.scrbl @@ -10,6 +10,9 @@ A @deftech{promise} encapsulates an expression to be evaluated on demand via @scheme[force]. After a promise has been @scheme[force]d, every later @scheme[force] of the promise produces the same result. +This module provides this functionality, and extends it to additional +kinds of promises with various evaluation strategies. + @defproc[(promise? [v any/c]) boolean?]{ @@ -20,17 +23,26 @@ otherwise.} @defform[(delay body ...+)]{ Creates a promise that, when @scheme[force]d, evaluates the -@scheme[body]s to produce its value.} +@scheme[body]s to produce its value. The result is then cached, so +further uses of @scheme[force] produce the cached value immediately. +This includes multiple values and exceptions.} @defform[(lazy body ...+)]{ -Like @scheme[delay], except that if the last @scheme[body] produces a -promise, then this promise is @scheme[force]d to obtain a value. In -other words, this form creates a kind of a composable promise, which -is mostly useful for implementing lazy libraries and languages. Also -note that the last @scheme[body] in this case is restricted to one -that produces a single value.} +Like @scheme[delay], if the last @scheme[body] produces a promise when +forced, then this promise is @scheme[force]d too to obtain a value. +In other words, this form creates a composable promise, where the +computation of its body is ``attached'' to the computation of the +following promise and a single @scheme[force] iterates through the +whole chain, tail-calling each step. + +Note that the last @scheme[body] of this form must produce a single +value --- but this value can itself be a @scheme[delay] promise that +returns multiple values. + +This form useful for implementing lazy libraries and languages, where +tail-calls can be wrapped in a promise.} @defproc[(force [v any/c]) any]{ @@ -45,6 +57,9 @@ the promise will raise the same exception every time. If @scheme[v] is @scheme[force]d again before the original call to @scheme[force] returns, then the @exnraise[exn:fail]. +Additional kinds of promises are also forced via @scheme[force]. See +below for further details. + If @scheme[v] is not a promise, then it is returned as the result.} @@ -57,3 +72,77 @@ Returns @scheme[#t] if @scheme[promise] has been forced.} Returns @scheme[#t] if @scheme[promise] is currently being forced. (Note that a promise can be either running or forced but not both.)} + + +@section{Additional Promise Kinds} + +@defform[(delay/name body ...+)]{ + +Creates a ``call by name'' promise, that is similar to +@scheme[delay]-promises, except that the resulting value is not +cached. It is essentially a thunk, wrapped in a way that +@scheme[force] recognizes. Note that if a @scheme[delay/name] promise +forces itself, no exception is raised. +@; TODO: clarify that the point is that code that is written using +@; `force', can be used with these promises too. + +Note that this promise is never considered ``running'' or ``forced'' +in the sense of @scheme[promise-running?] and +@scheme[promise-forced?].} + +@defform[(delay/sync body ...+)]{ + +Conventional promises are not useful when multiple threads attempt to +force them: when a promise is running, any additional threads that +@scheme[force] it will get an exception. @scheme[delay/sync] is +useful for such cases: if a second thread attempts to @scheme[force] +such a promise, it will get blocked until the computation is done and +an answer is available. If @scheme[force] is used with the promise as +it is forced from the same thread, an exception is raised. + +In addition, these promises can be used with @scheme[sync], which +blocks until it has been forced. Note that using @scheme[sync] this +way is passive in the sense that it does not trigger evaluation of the +promise.} + +@defform[(delay/thread body ...+)]{ +@; TODO: document #:group keyword + +This kind of promise begins the computation immediately, but this +happens on a separate thread. When the computation is done, the result +is cached as usual. Note that exceptions are caught as usual, and will +only be raised when @scheme[force]d. If such a promise is +@scheme[force]d before a value is ready, the calling thread will be +blocked until the computation terminates. These promises can also be +used with @scheme[sync].} + +@defform[(delay/idle body ...+)]{ +@; TODO: document #:wait-for, #:work-while, #:tick, #:use keywords + +Similar to @scheme[delay/thread], but the computation thread gets to +work only when the process is otherwise idle, as determined by +@scheme[system-idle-evt], and the work is done in small runtime +fragements, making it overall not raise total CPU use or hurt +responsiveness. If the promise is @scheme[forced] before the +computation is done, it will run the rest of the computation immediately +without slicing the runtime. Using @scheme[sync] on these promises +blocks as is the case with @scheme[delay/sync], and this happens in a +passive way too, so the computation continues to work in low-priority. + +@;{ +TODO: Say something on: +* `use' = 0 --> similar to a plain `delay' which is evaluated only when + forced (or delay/sync, since it's still sync-able), except that the + evaluation is still happening on a new thread. +* `use' = 1 --> given cpu time as usual, but still polls the idle event + every `tick' seconds +* `use' = 1 and both `wait-for' and `work-while' are `always-evt' --> + similar to `delay/thread'. +* can use `wait-for' to delay evaluation start until some event is + ready. Specifically, this can be done to chain a few of these + promises sequentially. +* same goes for `work-while'. For example, you can use that with a + `semaphore-peek-evt' to be able to pause/resume the computation on + demand. +;} +} diff --git a/collects/tests/lazy/promise.ss b/collects/tests/lazy/promise.ss index c5b75d0ef8..f0c2c903d3 100644 --- a/collects/tests/lazy/promise.ss +++ b/collects/tests/lazy/promise.ss @@ -9,8 +9,17 @@ (for ([v (list (delay 1) (lazy 1) (delay (delay 1)) (lazy (lazy 1)))]) (test (promise? v) => #t))) +(define (test-syntax) + (test (delay) =error> "bad syntax" + (lazy) =error> "bad syntax" + (delay #:foo 1 2) =error> "bad syntax" + (force (delay/thread #:group #f)) =error> "bad syntax" + (force (delay/thread #:group #f 1)) => 1 + (force (delay/thread 1 #:group #f 2)) => 2 + (force (delay/thread #:groupie #f 1)) =error> "bad syntax")) + ;; basic delay/lazy/force tests -(define (basic-promise-tests) +(define (test-basic-promises) (define thunk1 (lambda () 1)) (define promise1 (delay 1)) (define ? #f) @@ -68,7 +77,7 @@ (t* (force (lazy (lazy (lazy (force (delay (delay ?)))))))) (t* (force (lazy (lazy (delay (force (lazy (delay ?))))))))) -(define (basic-promise-behavior-tests) +(define (test-basic-promise-behavior) (define (force+catch p) (with-handlers ([exn? values]) (force p))) ;; results are cached (let* ([c 0] [p (delay (set! c (add1 c)) c)]) @@ -97,8 +106,78 @@ (force p) => '(#f #t) (forced+running? p) => '(#t #f)))) +(define (test-printout) + (letrec ([foo (delay (set! s (format "~a" foo)) 3)] [s #f]) + (test (format "~a" foo) => "#" + (force foo) => 3 + s => "#" + (format "~a" foo) => "#")) + (let ([foo (delay (values 1 2 3))]) + (test (format "~a" foo) => "#" + (force foo) => (values 1 2 3) + (format "~a" foo) => "#")) + (let ([foo (delay (error "boom"))]) + (test (format "~a" foo) => "#" + (force foo) => (error "boom") + (format "~a" foo) => "#" + (format "~s" foo) => "#")) + (let ([foo (delay (raise 3))]) + (test (format "~a" foo) => "#" + (force foo) => (raise 3) + (format "~a" foo) => "#"))) + +(define (test-delay/name) + (let* ([x 1] [p (delay/name (set! x (add1 x)) x)]) + (test (promise? p) + x => 1 + (force p) => 2 + x => 2 + (format "~a" p) => "#" + (force p) => 3 + x => 3))) + +(define (test-delay/sync) + (letrec ([p (delay/sync (force p))]) + (test (force p) =error> "reentrant")) + (let* ([ch (make-channel)] + [p (delay/sync (channel-get ch) (channel-get ch) 99)]) + (test (format "~a" p) => "#") + (thread (lambda () (force p) (channel-get ch))) + (channel-put ch 'x) + (test (format "~a" p) => "#") + (channel-put ch 'x) + (channel-put ch 'x) + (test (format "~a" p) => "#" + (force p) => 99))) + +(define (test-delay/thread) + (define-syntax-rule (t delayer) + (begin (let* ([ch (make-channel)] + [p (delayer (channel-get ch) 99)]) + (thread (lambda () (channel-put ch 'x))) + (test (force p) => 99)) + (test (force (delayer (+ 1 "2"))) =error> "expects type"))) + (t delay/sync) + (t delay/idle) + (let* ([ch (make-channel)] [p (delay/idle #:wait-for ch 99)]) + (test (format "~a" p) => "#" + (force p) => 99 + (format "~a" p) => "#")) + (let* ([ch (make-channel)] + [p (delay/idle #:wait-for ch (channel-get ch) 99)]) + (channel-put ch 'x) + (test (format "~a" p) => "#" + (channel-put ch 'x) + (force p) => 99 + (format "~a" p) => "#"))) + (provide promise-tests) (define (promise-tests) - (test do (test-types) - do (basic-promise-tests) - do (basic-promise-behavior-tests))) + (test do (test-syntax) + do (test-types) + do (test-basic-promises) + do (test-basic-promise-behavior) + do (test-printout) + do (test-delay/name) + do (test-delay/sync) + do (test-delay/thread)))