diff --git a/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/libraries.scrbl b/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/libraries.scrbl index 2135ac92..1a759e9a 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/libraries.scrbl +++ b/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/libraries.scrbl @@ -71,6 +71,7 @@ The following libraries are included with Typed Racket in the @defmodule/incl[typed/openssl/md5] @defmodule/incl[typed/openssl/sha1] @defmodule/incl[typed/pict] +@defmodule/incl[typed/racket/async-channel] @defmodule/incl[typed/rackunit] @defmodule/incl[typed/srfi/14] @defmodule/incl[typed/syntax/stx] diff --git a/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/types.scrbl b/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/types.scrbl index 607ab4ff..58952de3 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/types.scrbl +++ b/pkgs/typed-racket-pkgs/typed-racket-doc/typed-racket/scribblings/reference/types.scrbl @@ -4,7 +4,8 @@ "numeric-tower-pict.rkt" scribble/eval racket/sandbox) - (require (for-label (only-meta-in 0 [except-in typed/racket for])))] + (require (for-label (only-meta-in 0 [except-in typed/racket for]) + racket/async-channel))] @(define the-eval (make-base-eval)) @(the-eval '(require (except-in typed/racket #%top-interaction #%module-begin))) @@ -412,6 +413,21 @@ corresponding to @racket[trest], where @racket[bound] @ex[(lambda: ([x : Any]) (if (channel? x) x (error "not a channel!")))] } +@defform[(Async-Channelof t)]{An @rtech{asynchronous channel} on which only @racket[t]s can be sent. +@ex[ +(require typed/racket/async-channel) +(ann (make-async-channel) (Async-Channelof Symbol)) +] +} + +@defidform[Async-ChannelTop]{is the type of an @rtech{asynchronous channel} with unknown + message type and is the supertype of all asynchronous channel types. This type typically + appears in programs via the combination of occurrence typing and + @racket[async-channel?]. +@ex[(require typed/racket/async-channel) + (lambda: ([x : Any]) (if (async-channel? x) x (error "not an async-channel!")))] +} + @defform*[[(Parameterof t) (Parameterof s t)]]{A @rtech{parameter} of @racket[t]. If two type arguments are supplied, the first is the type the parameter accepts, and the second is the type returned. diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/base-env/base-types.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/base-env/base-types.rkt index 8c413217..cff177c5 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/base-env/base-types.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/base-env/base-types.rkt @@ -115,6 +115,7 @@ [Procedure top-func] [BoxTop -BoxTop] [ChannelTop -ChannelTop] +[Async-ChannelTop -Async-ChannelTop] [VectorTop -VectorTop] [HashTableTop -HashTop] [MPairTop -MPairTop] @@ -168,6 +169,7 @@ [Pair (-poly (a b) (-pair a b))] [Boxof (-poly (a) (make-Box a))] [Channelof (-poly (a) (make-Channel a))] +[Async-Channelof (-poly (a) (make-Async-Channel a))] [Ephemeronof (-poly (a) (make-Ephemeron a))] [Setof (-poly (e) (make-Set e))] [Evtof (-poly (r) (-evt r))] diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/infer/infer-unit.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/infer/infer-unit.rkt index 7ccdd633..9da19828 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/infer/infer-unit.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/infer/infer-unit.rkt @@ -592,6 +592,8 @@ (% cset-meet (cg/inv s s*) (cg/inv t t*))] [((Channel: e) (Channel: e*)) (cg/inv e e*)] + [((Async-Channel: e) (Async-Channel: e*)) + (cg/inv e e*)] [((ThreadCell: e) (ThreadCell: e*)) (cg/inv e e*)] [((Continuation-Mark-Keyof: e) (Continuation-Mark-Keyof: e*)) @@ -629,6 +631,7 @@ t)] [((CustodianBox: t) (Evt: t*)) (cg S t*)] [((Channel: t) (Evt: t*)) (cg t t*)] + [((Async-Channel: t) (Evt: t*)) (cg t t*)] ;; we assume all HTs are mutable at the moment [((Hashtable: s1 s2) (Hashtable: t1 t2)) ;; for mutable hash tables, both are invariant diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/private/type-contract.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/private/type-contract.rkt index ff519b24..f0ff5618 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/private/type-contract.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/private/type-contract.rkt @@ -330,6 +330,7 @@ [(VectorTop:) (only-untyped vector?/sc)] [(BoxTop:) (only-untyped box?/sc)] [(ChannelTop:) (only-untyped channel?/sc)] + [(Async-ChannelTop:) (only-untyped async-channel?/sc)] [(HashtableTop:) (only-untyped hash?/sc)] [(MPairTop:) (only-untyped mpair?/sc)] [(ThreadCellTop:) (only-untyped thread-cell?/sc)] diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/rep/type-rep.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/rep/type-rep.rkt index 75c1378c..dc5f3c21 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/rep/type-rep.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/rep/type-rep.rkt @@ -185,6 +185,11 @@ [#:frees (λ (f) (make-invariant (f elem)))] [#:key 'channel]) +;; elem is a Type +(def-type Async-Channel ([elem Type/c]) + [#:frees (λ (f) (make-invariant (f elem)))] + [#:key 'async-channel]) + ;; elem is a Type (def-type ThreadCell ([elem Type/c]) [#:frees (λ (f) (make-invariant (f elem)))] @@ -397,6 +402,7 @@ ;; the supertype of all of these values (def-type BoxTop () [#:fold-rhs #:base] [#:key 'box]) (def-type ChannelTop () [#:fold-rhs #:base] [#:key 'channel]) +(def-type Async-ChannelTop () [#:fold-rhs #:base] [#:key 'async-channel]) (def-type VectorTop () [#:fold-rhs #:base] [#:key 'vector]) (def-type HashtableTop () [#:fold-rhs #:base] [#:key 'hash]) (def-type MPairTop () [#:fold-rhs #:base] [#:key 'mpair]) diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/static-contracts/combinators/derived.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/static-contracts/combinators/derived.rkt index 43f5bd31..09832f6a 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/static-contracts/combinators/derived.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/static-contracts/combinators/derived.rkt @@ -6,7 +6,7 @@ (require "simple.rkt" "structural.rkt" (for-template racket/base racket/list racket/set racket/promise racket/mpair - racket/class)) + racket/class racket/async-channel)) (provide (all-defined-out)) (define identifier?/sc (flat/sc #'identifier?)) @@ -28,6 +28,7 @@ (define empty-hash/sc (and/sc hash?/sc (flat/sc #'(λ (h) (zero? (hash-count h)))))) (define channel?/sc (flat/sc #'channel?)) +(define async-channel?/sc (flat/sc #'channel?)) (define thread-cell?/sc (flat/sc #'thread-cell?)) (define prompt-tag?/sc (flat/sc #'continuation-prompt-tag?)) (define continuation-mark-key?/sc (flat/sc #'continuation-mark-key?)) diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/abbrev.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/abbrev.rkt index 70e85e7c..18aa0f7a 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/abbrev.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/abbrev.rkt @@ -54,6 +54,7 @@ (define -Param make-Param) (define -box make-Box) (define -channel make-Channel) +(define -async-channel make-Async-Channel) (define -thread-cell make-ThreadCell) (define -Promise make-Promise) (define -set make-Set) @@ -169,6 +170,7 @@ (define -HT make-Hashtable) (define/decl -BoxTop (make-BoxTop)) (define/decl -ChannelTop (make-ChannelTop)) +(define/decl -Async-ChannelTop (make-Async-ChannelTop)) (define/decl -HashTop (make-HashtableTop)) (define/decl -VectorTop (make-VectorTop)) (define/decl -MPairTop (make-MPairTop)) diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/printer.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/printer.rkt index 7f193e33..60921782 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/printer.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/printer.rkt @@ -424,6 +424,7 @@ [(StructTop: (Struct: nm _ _ _ _ _)) `(Struct ,(syntax-e nm))] [(BoxTop:) 'BoxTop] [(ChannelTop:) 'ChannelTop] + [(Async-ChannelTop:) 'Async-ChannelTop] [(ThreadCellTop:) 'ThreadCellTop] [(VectorTop:) 'VectorTop] [(HashtableTop:) 'HashTableTop] @@ -462,6 +463,7 @@ [(Box: e) `(Boxof ,(t->s e))] [(Future: e) `(Futureof ,(t->s e))] [(Channel: e) `(Channelof ,(t->s e))] + [(Async-Channel: e) `(Async-Channelof ,(t->s e))] [(ThreadCell: e) `(ThreadCellof ,(t->s e))] [(Promise: e) `(Promise ,(t->s e))] [(Ephemeron: e) `(Ephemeronof ,(t->s e))] diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/structural.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/structural.rkt index 0b51cd3f..13154f0e 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/structural.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/structural.rkt @@ -32,6 +32,7 @@ (define-for-syntax structural-reps #'([BoxTop ()] [ChannelTop ()] + [Async-ChannelTop ()] [ClassTop ()] [Continuation-Mark-KeyTop ()] [Error ()] @@ -62,6 +63,7 @@ [Continuation-Mark-Keyof (#:inv)] [Box (#:inv)] [Channel (#:inv)] + [Async-Channel (#:inv)] [ThreadCell (#:inv)] [Vector (#:inv)] [Hashtable (#:inv #:inv)] diff --git a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/subtype.rkt b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/subtype.rkt index 2f9b5276..95480b8c 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/subtype.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-lib/typed-racket/types/subtype.rkt @@ -510,6 +510,7 @@ ;; compared against t* here (subtype* A0 s t*)] [((Channel: t) (Evt: t*)) (subtype* A0 t t*)] + [((Async-Channel: t) (Evt: t*)) (subtype* A0 t t*)] ;; Invariant types [((Box: s) (Box: t)) (type-equiv? A0 s t)] [((Box: _) (BoxTop:)) A0] @@ -517,6 +518,8 @@ [((ThreadCell: _) (ThreadCellTop:)) A0] [((Channel: s) (Channel: t)) (type-equiv? A0 s t)] [((Channel: _) (ChannelTop:)) A0] + [((Async-Channel: s) (Async-Channel: t)) (type-equiv? A0 s t)] + [((Async-Channel: _) (Async-ChannelTop:)) A0] [((Vector: s) (Vector: t)) (type-equiv? A0 s t)] [((Vector: _) (VectorTop:)) A0] [((HeterogeneousVector: _) (VectorTop:)) A0] diff --git a/pkgs/typed-racket-pkgs/typed-racket-more/typed/racket/async-channel.rkt b/pkgs/typed-racket-pkgs/typed-racket-more/typed/racket/async-channel.rkt new file mode 100644 index 00000000..912613d0 --- /dev/null +++ b/pkgs/typed-racket-pkgs/typed-racket-more/typed/racket/async-channel.rkt @@ -0,0 +1,16 @@ +#lang s-exp typed-racket/base-env/extra-env-lang + +;; This module provides a typed version of racket/async-channel + +(require "private/async-channel-wrapped.rkt" + (for-syntax (only-in (rep type-rep) make-Async-ChannelTop))) + +;; Section 11.2.4 (Buffered Asynchronous Channels) +(type-environment + [make-async-channel (-poly (a) (->opt [(-opt -PosInt)] (-async-channel a)))] + [async-channel? (make-pred-ty (make-Async-ChannelTop))] + [async-channel-get (-poly (a) ((-async-channel a) . -> . a))] + [async-channel-try-get (-poly (a) ((-async-channel a) . -> . (-opt a)))] + [async-channel-put (-poly (a) ((-async-channel a) a . -> . -Void))] + [async-channel-put-evt (-poly (a) (-> (-async-channel a) a (-mu x (-evt x))))]) + diff --git a/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/fail/async-channel-contract.rkt b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/fail/async-channel-contract.rkt new file mode 100644 index 00000000..fb3b2795 --- /dev/null +++ b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/fail/async-channel-contract.rkt @@ -0,0 +1,19 @@ +#; +(exn-pred #rx"could not convert type to a contract.*Async-Channelof") +#lang racket/load + +;; Test typed-untyped interaction with channels + +(module typed typed/racket + (require typed/racket/async-channel) + (: ch (Async-Channelof (Boxof Integer))) + (define ch (make-async-channel)) + (: putter (-> Thread)) + (define (putter) + (thread (λ () (async-channel-put ch (box 3))))) + (provide putter ch)) + +(require 'typed) +(putter) +(set-box! (async-channel-get ch) "not an integer") + diff --git a/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/events-with-async-channel.rkt b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/events-with-async-channel.rkt new file mode 100644 index 00000000..60c163a2 --- /dev/null +++ b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/events-with-async-channel.rkt @@ -0,0 +1,106 @@ +#lang typed/racket +(require typed/racket/async-channel) + +;; Integration test for synchronizable events, using async-channels +;; +;; example from unstable/logging + +(define-type Log-Receiver-Sync-Result + (Vector Symbol String Any (Option Symbol))) + +(: receiver-thread + (Log-Receiver (Async-Channelof 'stop) + (Log-Receiver-Sync-Result -> Void) + -> Thread)) +(define (receiver-thread receiver stop-chan intercept) + (thread + (lambda () + (: clear-events (-> Void)) + (define (clear-events) + (let ([l : (Option Log-Receiver-Sync-Result) + (sync/timeout 0 receiver)]) + (when l ; still something to read + (intercept l) ; interceptor gets the whole vector + (clear-events)))) + (let loop () + (let ([l : (U Log-Receiver-Sync-Result 'stop) + (sync receiver stop-chan)]) + (cond [(eq? l 'stop) + ;; we received all the events we were supposed + ;; to get, read them all (w/o waiting), then + ;; stop + (clear-events)] + [else ; keep going + (intercept l) + (loop)])))))) + +(struct: listener ([stop-chan : (Async-Channelof 'stop)] + ;; ugly, but the thread and the listener need to know each + ;; other + [thread : (Option Thread)] + [rev-messages : (Listof Log-Receiver-Sync-Result)] + [done? : Any]) + #:mutable) + +(: start-recording (Log-Level -> listener)) +(define (start-recording log-level) + (let* ([receiver (make-log-receiver (current-logger) log-level)] + [stop-chan ((inst make-async-channel 'stop))] + [cur-listener (listener stop-chan #f '() #f)] + [t (receiver-thread + receiver stop-chan + (lambda: ([l : Log-Receiver-Sync-Result]) + (set-listener-rev-messages! + cur-listener + (cons l (listener-rev-messages cur-listener)))))]) + (set-listener-thread! cur-listener t) + cur-listener)) + +(: stop-recording (listener -> (Listof Log-Receiver-Sync-Result))) +(define (stop-recording cur-listener) + (define the-thread (listener-thread cur-listener)) + (unless (or (not the-thread) + (listener-done? cur-listener)) + (async-channel-put (listener-stop-chan cur-listener) + 'stop) ; stop the receiver thread + (thread-wait the-thread) + (set-listener-done?! cur-listener #t)) + (reverse (listener-rev-messages cur-listener))) + +(: with-intercepted-logging + (((Vector Symbol String Any (Option Symbol)) -> Void) + (-> Void) + Log-Level + -> Void)) +(define (with-intercepted-logging interceptor proc log-level) + (let* ([orig-logger (current-logger)] + ;; We use a local logger to avoid getting messages that didn't + ;; originate from proc. Since it's a child of the original logger, + ;; the rest of the program still sees the log entries. + [logger (make-logger #f orig-logger)] + [receiver (make-log-receiver logger log-level)] + [stop-chan ((inst make-async-channel 'stop))] + [t (receiver-thread receiver stop-chan interceptor)]) + (begin0 + (parameterize ([current-logger logger]) + (proc)) + (async-channel-put stop-chan 'stop) ; stop the receiver thread + (thread-wait t)))) + +(require typed/rackunit) + +;; extracted from unstable/logging tests +(let ([l (start-recording 'warning)]) + (log-warning "1") + (log-warning "2") + (log-warning "3") + (log-info "4") + (stop-recording l) ; stopping should be idempotent + (let ([out (stop-recording l)]) + (check-equal? (map (lambda: ([l : Log-Receiver-Sync-Result]) + (vector-ref l 1)) out) + '("1" "2" "3")) + (check-true (andmap (lambda: ([l : Log-Receiver-Sync-Result]) + (eq? (vector-ref l 0) 'warning)) + out)))) + diff --git a/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/make-top-predicate.rkt b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/make-top-predicate.rkt index 30322ace..897ea142 100644 --- a/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/make-top-predicate.rkt +++ b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/make-top-predicate.rkt @@ -3,6 +3,7 @@ (make-predicate VectorTop) (make-predicate BoxTop) (make-predicate ChannelTop) +(make-predicate Async-ChannelTop) (make-predicate HashTableTop) (make-predicate MPairTop) (make-predicate Thread-CellTop) diff --git a/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/threads-and-async-channels.rkt b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/threads-and-async-channels.rkt new file mode 100644 index 00000000..6ca6189e --- /dev/null +++ b/pkgs/typed-racket-pkgs/typed-racket-test/tests/typed-racket/succeed/threads-and-async-channels.rkt @@ -0,0 +1,76 @@ +#lang typed/racket +(require typed/racket/async-channel) + +;; same as threads-and-channels.rkt, but with async-channels + +(: chan (Async-Channelof Symbol)) +(define chan (make-async-channel)) + +(define (reader) + (thread + (lambda () + (let loop : True ((i : Integer 10)) + (if (= i 0) + #t + (begin (async-channel-get chan) + (loop (- i 1)))))))) + +(: writer (Symbol -> Thread)) +(define (writer x) + (thread + (lambda () + (async-channel-put chan x) + (async-channel-put chan x)))) + +(reader) +(writer 'a) +(writer 'b) +(writer 'c) +(writer 'd) +(writer 'e) + + +(define-type JumpingChannel (Rec JumpingChannel (Async-Channelof (Pair JumpingChannel Symbol)))) +(: jump-chan JumpingChannel) +(define jump-chan (make-async-channel)) + +(define (jumping-reader) + (thread + (lambda () + (let loop : True ((i : Integer 3) + (c : JumpingChannel jump-chan)) + (if (= i 0) + #t + (loop (- i 1) + (car (async-channel-get c)))))))) + +(jumping-reader) +(let ((c2 : JumpingChannel (make-async-channel))) + (async-channel-put jump-chan (cons c2 'a)) + (let ((c3 : JumpingChannel (make-async-channel))) + (async-channel-put c2 (cons c3 'b)) + (let ((c4 : JumpingChannel (make-async-channel))) + (async-channel-put c3 (cons c4 'c))))) + + + + +(: tc (Thread-Cellof Integer)) +(define tc (make-thread-cell 0)) + +(thread-cell-set! tc 1) + +(thread-wait (thread (lambda () + (displayln (thread-cell-ref tc)) + (thread-cell-set! tc 2) + (displayln (thread-cell-ref tc))))) + +(thread-cell-ref tc) + +(define blocked-thread + (thread (lambda () + (async-channel-get ((inst make-async-channel 'unused)))))) + + +(thread-suspend blocked-thread) +(kill-thread blocked-thread)