add typed/racket/async-channel

original commit: 500745f41b53690e2011571c80b0e7526e3d1355
This commit is contained in:
Stephen Chang 2014-06-24 18:16:14 -04:00
parent f55296aa2d
commit 334d9975df
16 changed files with 259 additions and 2 deletions

View File

@ -71,6 +71,7 @@ The following libraries are included with Typed Racket in the
@defmodule/incl[typed/openssl/md5]
@defmodule/incl[typed/openssl/sha1]
@defmodule/incl[typed/pict]
@defmodule/incl[typed/racket/async-channel]
@defmodule/incl[typed/rackunit]
@defmodule/incl[typed/srfi/14]
@defmodule/incl[typed/syntax/stx]

View File

@ -4,7 +4,8 @@
"numeric-tower-pict.rkt"
scribble/eval
racket/sandbox)
(require (for-label (only-meta-in 0 [except-in typed/racket for])))]
(require (for-label (only-meta-in 0 [except-in typed/racket for])
racket/async-channel))]
@(define the-eval (make-base-eval))
@(the-eval '(require (except-in typed/racket #%top-interaction #%module-begin)))
@ -412,6 +413,21 @@ corresponding to @racket[trest], where @racket[bound]
@ex[(lambda: ([x : Any]) (if (channel? x) x (error "not a channel!")))]
}
@defform[(Async-Channelof t)]{An @rtech{asynchronous channel} on which only @racket[t]s can be sent.
@ex[
(require typed/racket/async-channel)
(ann (make-async-channel) (Async-Channelof Symbol))
]
}
@defidform[Async-ChannelTop]{is the type of an @rtech{asynchronous channel} with unknown
message type and is the supertype of all asynchronous channel types. This type typically
appears in programs via the combination of occurrence typing and
@racket[async-channel?].
@ex[(require typed/racket/async-channel)
(lambda: ([x : Any]) (if (async-channel? x) x (error "not an async-channel!")))]
}
@defform*[[(Parameterof t)
(Parameterof s t)]]{A @rtech{parameter} of @racket[t]. If two type arguments are supplied,
the first is the type the parameter accepts, and the second is the type returned.

View File

@ -115,6 +115,7 @@
[Procedure top-func]
[BoxTop -BoxTop]
[ChannelTop -ChannelTop]
[Async-ChannelTop -Async-ChannelTop]
[VectorTop -VectorTop]
[HashTableTop -HashTop]
[MPairTop -MPairTop]
@ -168,6 +169,7 @@
[Pair (-poly (a b) (-pair a b))]
[Boxof (-poly (a) (make-Box a))]
[Channelof (-poly (a) (make-Channel a))]
[Async-Channelof (-poly (a) (make-Async-Channel a))]
[Ephemeronof (-poly (a) (make-Ephemeron a))]
[Setof (-poly (e) (make-Set e))]
[Evtof (-poly (r) (-evt r))]

View File

@ -592,6 +592,8 @@
(% cset-meet (cg/inv s s*) (cg/inv t t*))]
[((Channel: e) (Channel: e*))
(cg/inv e e*)]
[((Async-Channel: e) (Async-Channel: e*))
(cg/inv e e*)]
[((ThreadCell: e) (ThreadCell: e*))
(cg/inv e e*)]
[((Continuation-Mark-Keyof: e) (Continuation-Mark-Keyof: e*))
@ -629,6 +631,7 @@
t)]
[((CustodianBox: t) (Evt: t*)) (cg S t*)]
[((Channel: t) (Evt: t*)) (cg t t*)]
[((Async-Channel: t) (Evt: t*)) (cg t t*)]
;; we assume all HTs are mutable at the moment
[((Hashtable: s1 s2) (Hashtable: t1 t2))
;; for mutable hash tables, both are invariant

View File

@ -330,6 +330,7 @@
[(VectorTop:) (only-untyped vector?/sc)]
[(BoxTop:) (only-untyped box?/sc)]
[(ChannelTop:) (only-untyped channel?/sc)]
[(Async-ChannelTop:) (only-untyped async-channel?/sc)]
[(HashtableTop:) (only-untyped hash?/sc)]
[(MPairTop:) (only-untyped mpair?/sc)]
[(ThreadCellTop:) (only-untyped thread-cell?/sc)]

View File

@ -185,6 +185,11 @@
[#:frees (λ (f) (make-invariant (f elem)))]
[#:key 'channel])
;; elem is a Type
(def-type Async-Channel ([elem Type/c])
[#:frees (λ (f) (make-invariant (f elem)))]
[#:key 'async-channel])
;; elem is a Type
(def-type ThreadCell ([elem Type/c])
[#:frees (λ (f) (make-invariant (f elem)))]
@ -397,6 +402,7 @@
;; the supertype of all of these values
(def-type BoxTop () [#:fold-rhs #:base] [#:key 'box])
(def-type ChannelTop () [#:fold-rhs #:base] [#:key 'channel])
(def-type Async-ChannelTop () [#:fold-rhs #:base] [#:key 'async-channel])
(def-type VectorTop () [#:fold-rhs #:base] [#:key 'vector])
(def-type HashtableTop () [#:fold-rhs #:base] [#:key 'hash])
(def-type MPairTop () [#:fold-rhs #:base] [#:key 'mpair])

View File

@ -6,7 +6,7 @@
(require "simple.rkt" "structural.rkt"
(for-template racket/base racket/list racket/set racket/promise racket/mpair
racket/class))
racket/class racket/async-channel))
(provide (all-defined-out))
(define identifier?/sc (flat/sc #'identifier?))
@ -28,6 +28,7 @@
(define empty-hash/sc (and/sc hash?/sc (flat/sc #'(λ (h) (zero? (hash-count h))))))
(define channel?/sc (flat/sc #'channel?))
(define async-channel?/sc (flat/sc #'channel?))
(define thread-cell?/sc (flat/sc #'thread-cell?))
(define prompt-tag?/sc (flat/sc #'continuation-prompt-tag?))
(define continuation-mark-key?/sc (flat/sc #'continuation-mark-key?))

View File

@ -54,6 +54,7 @@
(define -Param make-Param)
(define -box make-Box)
(define -channel make-Channel)
(define -async-channel make-Async-Channel)
(define -thread-cell make-ThreadCell)
(define -Promise make-Promise)
(define -set make-Set)
@ -169,6 +170,7 @@
(define -HT make-Hashtable)
(define/decl -BoxTop (make-BoxTop))
(define/decl -ChannelTop (make-ChannelTop))
(define/decl -Async-ChannelTop (make-Async-ChannelTop))
(define/decl -HashTop (make-HashtableTop))
(define/decl -VectorTop (make-VectorTop))
(define/decl -MPairTop (make-MPairTop))

View File

@ -424,6 +424,7 @@
[(StructTop: (Struct: nm _ _ _ _ _)) `(Struct ,(syntax-e nm))]
[(BoxTop:) 'BoxTop]
[(ChannelTop:) 'ChannelTop]
[(Async-ChannelTop:) 'Async-ChannelTop]
[(ThreadCellTop:) 'ThreadCellTop]
[(VectorTop:) 'VectorTop]
[(HashtableTop:) 'HashTableTop]
@ -462,6 +463,7 @@
[(Box: e) `(Boxof ,(t->s e))]
[(Future: e) `(Futureof ,(t->s e))]
[(Channel: e) `(Channelof ,(t->s e))]
[(Async-Channel: e) `(Async-Channelof ,(t->s e))]
[(ThreadCell: e) `(ThreadCellof ,(t->s e))]
[(Promise: e) `(Promise ,(t->s e))]
[(Ephemeron: e) `(Ephemeronof ,(t->s e))]

View File

@ -32,6 +32,7 @@
(define-for-syntax structural-reps
#'([BoxTop ()]
[ChannelTop ()]
[Async-ChannelTop ()]
[ClassTop ()]
[Continuation-Mark-KeyTop ()]
[Error ()]
@ -62,6 +63,7 @@
[Continuation-Mark-Keyof (#:inv)]
[Box (#:inv)]
[Channel (#:inv)]
[Async-Channel (#:inv)]
[ThreadCell (#:inv)]
[Vector (#:inv)]
[Hashtable (#:inv #:inv)]

View File

@ -510,6 +510,7 @@
;; compared against t* here
(subtype* A0 s t*)]
[((Channel: t) (Evt: t*)) (subtype* A0 t t*)]
[((Async-Channel: t) (Evt: t*)) (subtype* A0 t t*)]
;; Invariant types
[((Box: s) (Box: t)) (type-equiv? A0 s t)]
[((Box: _) (BoxTop:)) A0]
@ -517,6 +518,8 @@
[((ThreadCell: _) (ThreadCellTop:)) A0]
[((Channel: s) (Channel: t)) (type-equiv? A0 s t)]
[((Channel: _) (ChannelTop:)) A0]
[((Async-Channel: s) (Async-Channel: t)) (type-equiv? A0 s t)]
[((Async-Channel: _) (Async-ChannelTop:)) A0]
[((Vector: s) (Vector: t)) (type-equiv? A0 s t)]
[((Vector: _) (VectorTop:)) A0]
[((HeterogeneousVector: _) (VectorTop:)) A0]

View File

@ -0,0 +1,16 @@
#lang s-exp typed-racket/base-env/extra-env-lang
;; This module provides a typed version of racket/async-channel
(require "private/async-channel-wrapped.rkt"
(for-syntax (only-in (rep type-rep) make-Async-ChannelTop)))
;; Section 11.2.4 (Buffered Asynchronous Channels)
(type-environment
[make-async-channel (-poly (a) (->opt [(-opt -PosInt)] (-async-channel a)))]
[async-channel? (make-pred-ty (make-Async-ChannelTop))]
[async-channel-get (-poly (a) ((-async-channel a) . -> . a))]
[async-channel-try-get (-poly (a) ((-async-channel a) . -> . (-opt a)))]
[async-channel-put (-poly (a) ((-async-channel a) a . -> . -Void))]
[async-channel-put-evt (-poly (a) (-> (-async-channel a) a (-mu x (-evt x))))])

View File

@ -0,0 +1,19 @@
#;
(exn-pred #rx"could not convert type to a contract.*Async-Channelof")
#lang racket/load
;; Test typed-untyped interaction with channels
(module typed typed/racket
(require typed/racket/async-channel)
(: ch (Async-Channelof (Boxof Integer)))
(define ch (make-async-channel))
(: putter (-> Thread))
(define (putter)
(thread (λ () (async-channel-put ch (box 3)))))
(provide putter ch))
(require 'typed)
(putter)
(set-box! (async-channel-get ch) "not an integer")

View File

@ -0,0 +1,106 @@
#lang typed/racket
(require typed/racket/async-channel)
;; Integration test for synchronizable events, using async-channels
;;
;; example from unstable/logging
(define-type Log-Receiver-Sync-Result
(Vector Symbol String Any (Option Symbol)))
(: receiver-thread
(Log-Receiver (Async-Channelof 'stop)
(Log-Receiver-Sync-Result -> Void)
-> Thread))
(define (receiver-thread receiver stop-chan intercept)
(thread
(lambda ()
(: clear-events (-> Void))
(define (clear-events)
(let ([l : (Option Log-Receiver-Sync-Result)
(sync/timeout 0 receiver)])
(when l ; still something to read
(intercept l) ; interceptor gets the whole vector
(clear-events))))
(let loop ()
(let ([l : (U Log-Receiver-Sync-Result 'stop)
(sync receiver stop-chan)])
(cond [(eq? l 'stop)
;; we received all the events we were supposed
;; to get, read them all (w/o waiting), then
;; stop
(clear-events)]
[else ; keep going
(intercept l)
(loop)]))))))
(struct: listener ([stop-chan : (Async-Channelof 'stop)]
;; ugly, but the thread and the listener need to know each
;; other
[thread : (Option Thread)]
[rev-messages : (Listof Log-Receiver-Sync-Result)]
[done? : Any])
#:mutable)
(: start-recording (Log-Level -> listener))
(define (start-recording log-level)
(let* ([receiver (make-log-receiver (current-logger) log-level)]
[stop-chan ((inst make-async-channel 'stop))]
[cur-listener (listener stop-chan #f '() #f)]
[t (receiver-thread
receiver stop-chan
(lambda: ([l : Log-Receiver-Sync-Result])
(set-listener-rev-messages!
cur-listener
(cons l (listener-rev-messages cur-listener)))))])
(set-listener-thread! cur-listener t)
cur-listener))
(: stop-recording (listener -> (Listof Log-Receiver-Sync-Result)))
(define (stop-recording cur-listener)
(define the-thread (listener-thread cur-listener))
(unless (or (not the-thread)
(listener-done? cur-listener))
(async-channel-put (listener-stop-chan cur-listener)
'stop) ; stop the receiver thread
(thread-wait the-thread)
(set-listener-done?! cur-listener #t))
(reverse (listener-rev-messages cur-listener)))
(: with-intercepted-logging
(((Vector Symbol String Any (Option Symbol)) -> Void)
(-> Void)
Log-Level
-> Void))
(define (with-intercepted-logging interceptor proc log-level)
(let* ([orig-logger (current-logger)]
;; We use a local logger to avoid getting messages that didn't
;; originate from proc. Since it's a child of the original logger,
;; the rest of the program still sees the log entries.
[logger (make-logger #f orig-logger)]
[receiver (make-log-receiver logger log-level)]
[stop-chan ((inst make-async-channel 'stop))]
[t (receiver-thread receiver stop-chan interceptor)])
(begin0
(parameterize ([current-logger logger])
(proc))
(async-channel-put stop-chan 'stop) ; stop the receiver thread
(thread-wait t))))
(require typed/rackunit)
;; extracted from unstable/logging tests
(let ([l (start-recording 'warning)])
(log-warning "1")
(log-warning "2")
(log-warning "3")
(log-info "4")
(stop-recording l) ; stopping should be idempotent
(let ([out (stop-recording l)])
(check-equal? (map (lambda: ([l : Log-Receiver-Sync-Result])
(vector-ref l 1)) out)
'("1" "2" "3"))
(check-true (andmap (lambda: ([l : Log-Receiver-Sync-Result])
(eq? (vector-ref l 0) 'warning))
out))))

View File

@ -3,6 +3,7 @@
(make-predicate VectorTop)
(make-predicate BoxTop)
(make-predicate ChannelTop)
(make-predicate Async-ChannelTop)
(make-predicate HashTableTop)
(make-predicate MPairTop)
(make-predicate Thread-CellTop)

View File

@ -0,0 +1,76 @@
#lang typed/racket
(require typed/racket/async-channel)
;; same as threads-and-channels.rkt, but with async-channels
(: chan (Async-Channelof Symbol))
(define chan (make-async-channel))
(define (reader)
(thread
(lambda ()
(let loop : True ((i : Integer 10))
(if (= i 0)
#t
(begin (async-channel-get chan)
(loop (- i 1))))))))
(: writer (Symbol -> Thread))
(define (writer x)
(thread
(lambda ()
(async-channel-put chan x)
(async-channel-put chan x))))
(reader)
(writer 'a)
(writer 'b)
(writer 'c)
(writer 'd)
(writer 'e)
(define-type JumpingChannel (Rec JumpingChannel (Async-Channelof (Pair JumpingChannel Symbol))))
(: jump-chan JumpingChannel)
(define jump-chan (make-async-channel))
(define (jumping-reader)
(thread
(lambda ()
(let loop : True ((i : Integer 3)
(c : JumpingChannel jump-chan))
(if (= i 0)
#t
(loop (- i 1)
(car (async-channel-get c))))))))
(jumping-reader)
(let ((c2 : JumpingChannel (make-async-channel)))
(async-channel-put jump-chan (cons c2 'a))
(let ((c3 : JumpingChannel (make-async-channel)))
(async-channel-put c2 (cons c3 'b))
(let ((c4 : JumpingChannel (make-async-channel)))
(async-channel-put c3 (cons c4 'c)))))
(: tc (Thread-Cellof Integer))
(define tc (make-thread-cell 0))
(thread-cell-set! tc 1)
(thread-wait (thread (lambda ()
(displayln (thread-cell-ref tc))
(thread-cell-set! tc 2)
(displayln (thread-cell-ref tc)))))
(thread-cell-ref tc)
(define blocked-thread
(thread (lambda ()
(async-channel-get ((inst make-async-channel 'unused))))))
(thread-suspend blocked-thread)
(kill-thread blocked-thread)