racket/stream: add more control over laziness

Add `#:eager` options to `stream-cons` to control whether the head
and/or tail expression is delayed. Also add `stream-lazy` to
explicitly construct a lazy stream and `stream-force` (normally not
needed) to force a stream.

Delayed expressions in `stream-cons` are now non-reentrant, like
promises created by `delay`. This is a change in behavior, but I'm
hoping that no one relied on the old behavior, which has the bad
property that the delayed expression is retained until its result is
received.

For consistency, this commit also changes `stream*` to delay the
stream-construction expression when its the only one expression that
is given.

Internally, changes to the stream implementation reduce allocation.

The `stream-lazy` form would be more natrally called `stream-delay`,
but that creates a conflict with existing packages.
This commit is contained in:
Matthew Flatt 2021-03-26 10:37:01 -06:00
parent 4704dc6962
commit a5615a7bae
4 changed files with 220 additions and 80 deletions

View File

@ -1072,18 +1072,71 @@ stream, but plain lists can be used as streams, and functions such as
element.
}
@defform[(stream-cons first-expr rest-expr)]{
@defform*[[(stream-cons first-expr rest-expr)
(stream-cons #:eager first-expr rest-expr)
(stream-cons first-expr #:eager rest-expr)
(stream-cons #:eager first-expr #:eager rest-expr)]]{
Produces a lazy stream for which @racket[stream-first] forces the
evaluation of @racket[first-expr] to produce the first element of
the stream, and @racket[stream-rest] forces the evaluation of
@racket[rest-expr] to produce a stream for the rest of the returned
stream.
Produces a stream whose first element is determined by
@racket[first-expr] and whose rest is determined by
@racket[rest-expr].
If @racket[first-expr] is not preceded by @racket[#:eager], then
@racket[first-expr] is not evaluated immediately. Instead,
@racket[stream-first] on the result stream forces the evaluation of
@racket[first-expr] (once) to produce the first element of the
stream. If evaluating @racket[first-expr] raises an exception or
tries to force itself, then an @exnraise[exn:fail:contract], and
future attempts to force evaluation will trigger another exception.
If @racket[rest-expr] is not preceded by @racket[#:eager], then
@racket[rest-expr] is not evaluated immediately. Instead,
@racket[stream-rest] on the result stream produces another stream
that is like the one produced by @racket[(stream-lazy rest-expr)].
The first element of the stream as produced by @racket[first-expr]
must be a single value. The @racket[rest-expr] must produce a stream
when it is evaluated, otherwise the @exnraise[exn:fail:contract?].
}
@history[#:changed "8.0.0.12" @elem{Added @racket[#:eager] options.}]}
@defform*[[(stream-lazy stream-expr)
(stream-lazy #:who who-expr stream-expr)]]{
Similar to @racket[(delay stream-expr)], but the result is a stream
instead of a @tech{promise}, and @racket[stream-expr] must produce a
stream when it is eventually forced. The stream produced by
@racket[stream-lazy] has the same content as the stream produced by
@racket[stream-expr]; that is, operations like @racket[stream-first]
on the result stream will force @racket[stream-expr] and retry on its
result.
If evaluating @racket[stream-expr] raises an exception or tries to
force itself, then an @exnraise[exn:fail:contract], and future
attempts to force evaluation will trigger another exception.
If @racket[who-expr] is provided, it is evaluated when constructing
the delayed stream. If @racket[stream-expr] later produces a value
that is not a stream, and if @racket[who-expr] produced a symbol
value, then the symbol is used for the error message.
@history[#:added "8.0.0.12"]}
@defproc[(stream-force [s stream?]) stream?]{
Forces the evaluation of a delayed stream from @racket[stream-lazy],
from the @racket[stream-rest] of a @racket[stream-cons], etc.,
returning the forced stream. If @racket[s] is not a delayed stream,
then @racket[s] is returned.
Normally, @racket[stream-force] is not needed, because operations
like @racket[stream-first], @racket[stream-rest], and
@racket[stream-empty?] force a delayed stream as needed. In rare
cases, @racket[stream-force] can be useful to reveal the underlying
implementation of a stream (e.g., a stream that is an instance of a
structure type that has the @racket[prop:stream] property).
@history[#:added "8.0.0.12"]}
@defform[(stream expr ...)]{
A shorthand for nested @racket[stream-cons]es ending with
@ -1092,10 +1145,12 @@ stream, but plain lists can be used as streams, and functions such as
@defform[(stream* expr ... rest-expr)]{
A shorthand for nested @racket[stream-cons]es, but the @racket[rest-expr]
must be a stream, and it is used as the rest of the stream instead of
must produce a stream when it is forced, and that stream is used as the rest of the stream instead of
@racket[empty-stream]. Similar to @racket[list*] but for streams.
@history[#:added "6.3"]}
@history[#:added "6.3"
#:changed "8.0.0.12" @elem{Changed to delay @racket[rest-expr] even
if zero @racket[expr]s are provided.}]}
@defproc[(in-stream [s stream?]) sequence?]{
Returns a sequence that is equivalent to @racket[s].

View File

@ -41,7 +41,7 @@
(test 'a 'stream* (stream-first (stream* 'a (stream (/ 0)))))
(test 4 'stream* (stream-length (stream* 'a 'b 'c (stream (/ 0)))))
(test 'c 'stream* (stream-first (stream-rest (stream-rest (stream* 'a 'b 'c (stream (/ 0)))))))
(err/rt-test (stream* 2) exn:fail:contract? "stream*")
(err/rt-test (stream-force (stream* 2)) exn:fail:contract? "stream*")
(test #true 'stream* (stream? (stream* 1 0)))
(err/rt-test (stream-length (stream* 1 2)) exn:fail:contract? "stream*")
@ -134,4 +134,48 @@
(in-parallel '(1 3) '(2 4))))))
list)
;; stream-rest doesn't force rest expr
(test #t stream? (stream-rest (stream-cons 1 'oops)))
;; stream-force does force
(err/rt-test (stream-force (stream-rest (stream-cons 1 'oops))))
(err/rt-test (stream-empty? (stream-rest (stream-cons 1 'oops))))
(err/rt-test (stream-first (stream-rest (stream-cons 1 'oops))))
(err/rt-test (stream-rest (stream-rest (stream-cons 1 'oops))))
(test #t stream? (stream-lazy 'oops))
(err/rt-test (stream-force (stream-lazy 'oops)))
(err/rt-test (stream-empty? (stream-lazy 'oops)))
(err/rt-test (stream-first (stream-lazy 'oops)))
(err/rt-test (stream-rest (stream-lazy 'oops)))
(test #t stream? (stream* 'oops))
(err/rt-test (stream-force (stream* 'oops)))
(err/rt-test (stream-empty? (stream* 'oops)))
(err/rt-test (stream-first (stream* 'oops)))
(err/rt-test (stream-rest (stream* 'oops)))
(err/rt-test (stream-force (stream-lazy #:who 'alice 'oops))
exn:fail:contract?
#rx"^alice: ")
(test #f null? (stream-lazy '()))
(test #t null? (stream-force (stream-lazy '())))
(test #t stream-empty? (stream-lazy '()))
;; lazy forcing errors => stays erroring
(let ([s (stream-cons (error "oops") null)])
(err/rt-test/once (stream-first s) exn:fail?)
(err/rt-test (stream-first s) exn:fail:contract? #rx"reentrant or broken"))
(let ([s (stream-cons 0 (error "oops"))])
(test #t stream? (stream-rest s))
(err/rt-test/once (stream-empty? (stream-rest s)) exn:fail?)
(err/rt-test (stream-empty? (stream-rest s)) exn:fail:contract? #rx"reentrant or broken"))
;; lazy forcing is non-reentrant
(letrec ([s (stream-cons (stream-first s) null)])
(err/rt-test (stream-first s) exn:fail:contract? #rx"reentrant or broken"))
(letrec ([s (stream-cons 1 (stream-force (stream-rest s)))])
(err/rt-test (stream-empty? (stream-rest s)) exn:fail:contract? #rx"reentrant or broken"))
(report-errs)

View File

@ -22,92 +22,132 @@
(require (prefix-in for: racket/private/for))
(provide stream-null stream-cons stream? stream-null? stream-pair?
stream-car stream-cdr stream-lambda stream-lazy)
stream-car stream-cdr stream-lambda stream-lazy stream-force)
;; An eagerly constructed stream has a lazy first element, and
;; normaly its rest is a lazily constructed stream.
(define-struct eagerly-created-stream ([first-forced? #:mutable] [first #:mutable] rest)
#:reflection-name 'stream
#:property for:prop:stream (vector
(lambda (p) #f)
(lambda (p) (stream-force-first p))
(lambda (p) (eagerly-created-stream-rest p))))
;; A lazily constructed stream uses an mpair redirection to facilitate
;; flattening chains of lazily constructed streams. The pair starts with
;; #f if the stream is forced, a symbol for the constructing form otherwise
(define-struct lazily-created-stream (mpair)
#:mutable
#:reflection-name 'stream
#:property for:prop:stream (vector
(lambda (p) (stream-null? p))
(lambda (p) (stream-car p))
(lambda (p) (stream-cdr p))))
;; Recognize just the streams created by this layer:
(define (stream? p)
(or (eagerly-created-stream? p)
(lazily-created-stream? p)))
(define-syntax stream-lazy
(syntax-rules ()
((stream-lazy expr)
(make-stream
(mcons 'lazy (lambda () expr))))))
[(stream-lazy expr)
(make-lazily-created-stream (mcons 'stream-lazy (lambda () expr)))]
[(stream-lazy #:who who-expr expr)
(make-lazily-created-stream (mcons (or who-expr 'stream-lazy) (lambda () expr)))]))
(define (stream-eager expr)
(make-stream
(mcons 'eager expr)))
(define reentrant-error
(lambda () (raise-arguments-error 'stream "reentrant or broken delay")))
(define-syntax stream-delay
(syntax-rules ()
((stream-delay expr)
(stream-lazy (stream-eager expr)))))
;; Forces a lazily constructed stream to a stream of any other kind
(define (stream-force s)
(cond
[(lazily-created-stream? s)
(define p (lazily-created-stream-mpair s))
(cond
[(not (mcar p)) (mcdr p)]
[else
(define thunk (mcdr p))
(set-mcdr! p reentrant-error)
(define v (thunk))
(cond
[(lazily-created-stream? v)
;; flatten the result lazy stream and try again
(set-lazily-created-stream-mpair! s (lazily-created-stream-mpair v))
(stream-force v)]
[(for:stream? v)
;; any other kind of stream is success
(set-mcar! p #f)
(set-mcdr! p v)
v]
[else
(define who (mcar p))
(if (symbol? who)
(raise-arguments-error
who
"delayed expression produced a non-stream"
"result" v)
(raise-arguments-error
'stream-cons
"rest expression produced a non-stream"
"rest result" v))])])]
[(for:stream? s) s]
[else (raise-argument-error 'stream-force "stream?" s)]))
(define (stream-force promise)
(let ((content (stream-promise promise)))
(case (mcar content)
((eager) (mcdr content))
((lazy) (let* ((promise* ((mcdr content))))
;; check mcar again, it can it was set in the
;; process of evaluating `(mcdr content)':
(if (eq? (mcar content) 'eager)
;; yes, it was set
(mcdr content)
;; normal case: no, it wasn't set:
(if (stream? promise*)
;; Flatten the result lazy stream and try again:
(let ([new-content (stream-promise promise*)])
(set-mcar! content (mcar new-content))
(set-mcdr! content (mcdr new-content))
(set-stream-promise! promise* content)
(stream-force promise))
;; Forced result is not a lazy stream:
(begin
(unless (for:stream? promise*)
(raise-mismatch-error
'stream-cons
"rest expression produced a non-stream: "
promise*))
(set-mcdr! content promise*)
(set-mcar! content 'eager)
promise*))))))))
;; Forces the first element of an eagerly consttructed stream
(define (stream-force-first p)
(cond
[(eagerly-created-stream-first-forced? p)
(eagerly-created-stream-first p)]
[else
(define thunk (eagerly-created-stream-first p))
(set-eagerly-created-stream-first! p reentrant-error)
(define v (thunk))
(set-eagerly-created-stream-first! p v)
(set-eagerly-created-stream-first-forced?! p #t)
v]))
(define-syntax stream-lambda
(syntax-rules ()
((stream-lambda formals body0 body1 ...)
(lambda formals (stream-lazy (let () body0 body1 ...))))))
(define-struct stream-pare (kar kdr))
(define (stream-null? obj)
(let ([v (stream-force obj)])
(if (stream-pare? v)
#f
(or (eqv? v (stream-force stream-null))
(for:stream-empty? v)))))
(for:stream-empty? (stream-force obj)))
(define (stream-pair? obj)
(and (stream? obj) (stream-pare? (stream-force obj))))
(eagerly-created-stream? (stream-force obj)))
(define-syntax stream-cons
(syntax-rules ()
((stream-cons obj strm)
(stream-eager (make-stream-pare (stream-delay obj) (stream-lazy strm))))))
(syntax-rules ()
((stream-cons obj strm)
(eagerly-created-stream #f (lambda () obj)
(lazily-created-stream (mcons #t (lambda () strm)))))
((stream-cons #:eager obj strm)
(eagerly-created-stream #t obj
(lazily-created-stream (mcons #t (lambda () strm)))))
((stream-cons obj #:eager strm)
(eagerly-created-stream #f (lambda () obj)
(stream-assert strm)))
((stream-cons #:eager obj #:eager strm)
(eagerly-created-stream #t obj
(stream-assert strm)))))
(define (stream-assert v)
(if (for:stream? v)
v
(raise-argument-error 'stream-cons "stream?" v)))
(define (stream-car strm)
(let ([v (stream-force strm)])
(if (stream-pare? v)
(stream-force (stream-pare-kar v))
(if (eagerly-created-stream? v) ; shortcut
(stream-force-first v)
(for:stream-first v))))
(define (stream-cdr strm)
(let ([v (stream-force strm)])
(if (stream-pare? v)
(stream-pare-kdr v)
(if (eagerly-created-stream? v) ; shortcut
(eagerly-created-stream-rest v)
(for:stream-rest v))))
(define-struct stream (promise)
#:mutable
#:property for:prop:stream (vector
stream-null?
stream-car
stream-cdr))
(define stream-null (stream-delay (cons 'stream 'null)))
(define stream-null (lazily-created-stream (mcons #f '())))

View File

@ -11,7 +11,8 @@
"private/sequence.rkt"
(only-in "private/stream-cons.rkt"
stream-cons
stream-lazy)
stream-lazy
stream-force)
"private/generic-methods.rkt"
(for-syntax racket/base))
@ -28,6 +29,8 @@
stream-rest
prop:stream
in-stream
stream-lazy
stream-force
stream
stream*
@ -67,21 +70,19 @@
(syntax-rules ()
((_)
empty-stream)
((_ tl)
;; shortcut:
(stream-cons tl #:eager empty-stream))
((_ hd tl ...)
(stream-cons hd (stream tl ...)))))
(define-syntax stream*
(syntax-rules ()
[(_ tl)
(assert-stream? 'stream* tl)]
(stream-lazy #:who 'stream* tl)]
[(_ hd tl ...)
(stream-cons hd (stream* tl ...))]))
(define (assert-stream? who st)
(if (stream? st)
st
(raise-argument-error who "stream?" st)))
(define (stream->list s)
(for/list ([v (in-stream s)]) v))