diff --git a/collects/frtime/core/contract.ss b/collects/frtime/core/contract.ss new file mode 100644 index 0000000000..bd1905cf70 --- /dev/null +++ b/collects/frtime/core/contract.ss @@ -0,0 +1,8 @@ +#lang scheme + +(define-syntax-rule (provide/contract* [id ctrct] ...) + #;(provide/contract [id ctrct] ...) + (provide id ...)) + +(provide + provide/contract*) \ No newline at end of file diff --git a/collects/frtime/core/dv.ss b/collects/frtime/core/dv.ss index f21f60a191..c351ad2b32 100644 --- a/collects/frtime/core/dv.ss +++ b/collects/frtime/core/dv.ss @@ -1,6 +1,7 @@ #lang scheme +(require "contract.ss") -(define-struct dv (real used vec) #:mutable #:transparent) +(define-struct dv (real used vec) #:mutable) (define (dv:make size) (make-dv size 0 (make-vector size))) @@ -11,7 +12,7 @@ (match a-dv [(struct dv (_ used vec)) (set-dv-used! a-dv (sub1 used)) - (vector-set! vec (sub1 used) 0)])) + (vector-set! vec used 0)])) (define (dv:ref a-dv pos) (match a-dv @@ -42,7 +43,7 @@ (set-dv-used! a-dv (add1 used)) (vector-set! new-vec used item)))])) -(provide/contract +(provide/contract* [dv:make (exact-nonnegative-integer? . -> . dv?)] [dv:length (dv? . -> . exact-nonnegative-integer?)] [dv:remove-last (dv? . -> . void)] diff --git a/collects/frtime/core/erl.ss b/collects/frtime/core/erl.ss index cda1ddb621..f9b56b90e5 100644 --- a/collects/frtime/core/erl.ss +++ b/collects/frtime/core/erl.ss @@ -1,5 +1,7 @@ #lang scheme -(require "mailbox.ss") +(require "match.ss" + "contract.ss" + "mailbox.ss") (define-struct tid (lid) #:prefab) (define (create-tid thr) (make-tid thr)) @@ -12,11 +14,7 @@ (define (do-receive timeout timeout-thunk matcher) (define mb (hash-ref mailboxes (tid-lid (self)))) - (define timeout-evt - (if timeout - (alarm-evt (+ (current-inexact-milliseconds) timeout)) - never-evt)) - (define val-thunk (mailbox-receive mb timeout-evt timeout-thunk matcher)) + (define val-thunk (mailbox-receive mb timeout timeout-thunk matcher)) (val-thunk)) (define-syntax receive @@ -25,7 +23,9 @@ (do-receive timeout (lambda () to-expr ...) - (match-lambda (pat (lambda () expr ...)) ...))] + (match-lambda + (pat (lambda () expr ...)) ... + [_ match-fail]))] [(_ clause ...) (receive (after false (void)) clause ...)])) ; must ensure name not already taken @@ -84,6 +84,6 @@ (provide spawn/name receive) -(provide/contract +(provide/contract* [! (tid? any/c . -> . void)] [self (-> tid?)]) \ No newline at end of file diff --git a/collects/frtime/core/frp.ss b/collects/frtime/core/frp.ss index c778f2e0fa..a877af0331 100644 --- a/collects/frtime/core/frp.ss +++ b/collects/frtime/core/frp.ss @@ -1,6 +1,7 @@ #lang scheme (require (only-in mzlib/etc identity nor) + "contract.ss" "erl.ss" "heap.ss") @@ -676,7 +677,7 @@ (define switching-trigger/c any/c) -(provide/contract +(provide/contract* ; Event Sets [make-events-now ((listof any/c) . -> . event-set?)] ; XXX Ugly contract [event-set? (any/c . -> . boolean?)] diff --git a/collects/frtime/core/heap.ss b/collects/frtime/core/heap.ss index 777b67c962..8f533ec686 100644 --- a/collects/frtime/core/heap.ss +++ b/collects/frtime/core/heap.ss @@ -1,7 +1,8 @@ #lang scheme -(require "dv.ss") +(require "dv.ss" + "contract.ss") -(define-struct t (sorter equality data) #:transparent) +(define-struct t (sorter equality data)) ;; sorter: elements which have the most trueness according to ;; the sorter pop out first @@ -100,7 +101,7 @@ (and (heap? heap) (not (= (heap-size heap) 0)))) -(provide/contract +(provide/contract* [heap? (any/c . -> . boolean?)] [non-empty-heap? (any/c . -> . boolean?)] [make-heap (sorter/c equality/c . -> . heap?)] diff --git a/collects/frtime/core/mailbox.ss b/collects/frtime/core/mailbox.ss index 3405fd5228..ef679f6b3f 100644 --- a/collects/frtime/core/mailbox.ss +++ b/collects/frtime/core/mailbox.ss @@ -1,5 +1,7 @@ #lang scheme -(require scheme/async-channel) +(require "contract.ss" + "match.ss" + scheme/async-channel) ; XXX More efficient structure (define (snoc x l) (append l (list x))) @@ -13,11 +15,13 @@ (define (try-to-match req msg) (match req [(struct receive (reply-ch _ _ matcher)) - (with-handlers ([exn:misc:match? (lambda (x) #f)]) - (define the-match-thunk (matcher msg)) - ; XXX Handle partner's death - (channel-put reply-ch the-match-thunk) - #t)])) + (define the-match-thunk (matcher msg)) + (if (eq? the-match-thunk match-fail) + #f + (begin + ; XXX Handle partner's death + (channel-put reply-ch the-match-thunk) + #t))])) ; Try to match a list of messages (define (try-to-match* req msgs) (match msgs @@ -33,23 +37,28 @@ (not-on-receive (snoc new-msg msgs)))) (handle-evt control-ch (lambda (req) - (with-handlers ([exn? (lambda (x) (waiting-for-matching req msgs))]) + (with-handlers ([exn? (lambda (x) (waiting-for-matching (current-inexact-milliseconds) req msgs))]) (define new-msgs (try-to-match* req msgs)) ; One worked (not-on-receive new-msgs)))))) ; Waiting for a message that will match - (define (waiting-for-matching req msgs) + (define (waiting-for-matching start-time req msgs) (match req - [(struct receive (reply-ch timeout-evt timeout-thunk _)) - (sync (handle-evt timeout-evt - (lambda (_) - (channel-put reply-ch timeout-thunk) - (not-on-receive msgs))) - (handle-evt msgs-ch - (lambda (new-msg) - (if (try-to-match req new-msg) - (not-on-receive msgs) - (waiting-for-matching req (snoc new-msg msgs))))))])) + [(struct receive (reply-ch timeout timeout-thunk _)) + (define elapsed (- (current-inexact-milliseconds) start-time)) + (define wait-time + (cond + [(not timeout) false] + [(> elapsed timeout) 0] + [else (/ (- timeout elapsed) 1000.0)])) + (define new-msg (sync/timeout wait-time msgs-ch)) + (if new-msg + (if (try-to-match req new-msg) + (not-on-receive msgs) + (waiting-for-matching start-time req (snoc new-msg msgs))) + (begin + (channel-put reply-ch timeout-thunk) + (not-on-receive msgs)))])) (define manager (thread (lambda () @@ -62,16 +71,16 @@ [(struct mailbox (thd _ msgs)) (thread-resume thd) (async-channel-put msgs msg)])) -(define (mailbox-receive mb timeout-evt timeout-thunk matcher) +(define (mailbox-receive mb timeout timeout-thunk matcher) (match mb [(struct mailbox (thd control _)) (define reply-ch (make-channel)) (thread-resume thd) - (channel-put control (make-receive reply-ch timeout-evt timeout-thunk matcher)) + (channel-put control (make-receive reply-ch timeout timeout-thunk matcher)) (channel-get reply-ch)])) -(provide/contract +(provide/contract* [mailbox? (any/c . -> . boolean?)] [new-mailbox (-> mailbox?)] [mailbox-send! (mailbox? any/c . -> . void)] - [mailbox-receive (mailbox? evt? (-> any) (any/c . -> . (-> any)) . -> . (-> any))]) \ No newline at end of file + [mailbox-receive (mailbox? (or/c false/c number?) (-> any) (any/c . -> . (-> any)) . -> . (-> any))]) \ No newline at end of file diff --git a/collects/frtime/core/match.ss b/collects/frtime/core/match.ss new file mode 100644 index 0000000000..d80a62ea0b --- /dev/null +++ b/collects/frtime/core/match.ss @@ -0,0 +1,6 @@ +#lang scheme + +(define-struct a-match-fail ()) +(define match-fail (make-a-match-fail)) + +(provide match-fail) \ No newline at end of file diff --git a/collects/frtime/core/sema-mailbox.ss b/collects/frtime/core/sema-mailbox.ss new file mode 100644 index 0000000000..b54d76205a --- /dev/null +++ b/collects/frtime/core/sema-mailbox.ss @@ -0,0 +1,86 @@ +#lang scheme +(require "match.ss" + "contract.ss") + +(define (call-with-semaphore s thunk) + (semaphore-wait s) + (let ([result (thunk)]) + (semaphore-post s) + result)) + +(define Mcons mcons) +(define release void) + +(define-struct mailbox (old-head old-last head tail sem-count sem-space lock-enqueue) #:mutable) + +(define (new-mailbox) + (let* ([sentinel (Mcons empty empty)] + [old-sentinel (Mcons empty empty)] + [old-head (Mcons empty old-sentinel)]) + (make-mailbox old-head + old-sentinel + sentinel + sentinel + (make-semaphore) + (make-semaphore 1000) + (make-semaphore 1)))) + +(define (try-extract m l) + (let loop ([prev l] [cur (mcdr l)]) + (if (empty? (mcdr cur)) + #f + (let ([v (m (mcar cur))]) + (if (eq? v match-fail) + (loop cur (mcdr cur)) + (begin0 v + (set-mcdr! prev (mcdr cur)) + (release cur))))))) + +(define (mailbox-receive mb timeout timeout-thunk matcher) + (define start-time (current-inexact-milliseconds)) + (match (try-extract matcher (mailbox-old-head mb)) + [#f + (let wait () + (let* ([elapsed (- (current-inexact-milliseconds) start-time)] + [wait-time (cond + [(not timeout) false] + [(> elapsed timeout) 0] + [else (/ (- timeout elapsed) 1000.0)])] + [not-timeout? (sync/timeout wait-time (mailbox-sem-count mb))]) + (if not-timeout? + (let* ([oldhead (mailbox-head mb)] + [msg (mcar oldhead)] + [v + (begin (set-mailbox-head! mb (mcdr oldhead)) + (release oldhead) + (semaphore-post (mailbox-sem-space mb)) + (matcher msg))]) + (if (eq? v match-fail) + (let ([new-last (Mcons empty empty)] + [old-last (mailbox-old-last mb)]) + (set-mcar! old-last msg) + (set-mcdr! old-last new-last) + (set-mailbox-old-last! mb new-last) + (wait)) + v)) + timeout-thunk)))] + [val + val])) + +(define (mailbox-send! mbox msg) + (call-with-semaphore + (mailbox-lock-enqueue mbox) + (lambda () + (let ([newtail (Mcons empty empty)] + [oldtail (mailbox-tail mbox)]) + (set-mcar! oldtail msg) + (set-mcdr! oldtail newtail) + (set-mailbox-tail! mbox newtail) + (semaphore-wait (mailbox-sem-space mbox)) + (semaphore-post (mailbox-sem-count mbox)))))) + +(provide/contract* + [mailbox? (any/c . -> . boolean?)] + [new-mailbox (-> mailbox?)] + [mailbox-send! (mailbox? any/c . -> . void)] + [mailbox-receive (mailbox? (or/c false/c number?) (-> any) (any/c . -> . (-> any)) . -> . (-> any))]) \ No newline at end of file