From 04566b2fcfb9a43d2a05f56656a9b7901563bbdd Mon Sep 17 00:00:00 2001 From: Jay McCarthy Date: Thu, 25 Jun 2009 18:56:11 +0000 Subject: [PATCH] Converting to scheme/base svn: r15268 --- collects/frtime/dv.ss | 1 + collects/frtime/frp-core.ss | 1450 +++++++++++++++++------------------ collects/frtime/mailbox.ss | 2 +- 3 files changed, 720 insertions(+), 733 deletions(-) diff --git a/collects/frtime/dv.ss b/collects/frtime/dv.ss index 32edeb9b09..f21f60a191 100644 --- a/collects/frtime/dv.ss +++ b/collects/frtime/dv.ss @@ -23,6 +23,7 @@ [(struct dv (_ _ vec)) (vector-set! vec pos new-val)])) +; XXX Make this more efficient by preserving previous vector (define (dv:append a-dv item) (match a-dv [(struct dv (real used vec)) diff --git a/collects/frtime/frp-core.ss b/collects/frtime/frp-core.ss index d2dc7bb818..c778f2e0fa 100644 --- a/collects/frtime/frp-core.ss +++ b/collects/frtime/frp-core.ss @@ -1,743 +1,729 @@ +#lang scheme +(require (only-in mzlib/etc + identity nor) + "erl.ss" + "heap.ss") -(module frp-core mzscheme - (require mzlib/etc - mzlib/list - mzlib/match - "erl.ss" - "heap.ss") - - (require-for-syntax scheme/struct-info) - - - - - ;;;;;;;;;;;;; - ;; Globals ;; - ;;;;;;;;;;;;; +;;;;;;;;;;;;; +;; Globals ;; +;;;;;;;;;;;;; - ;; the current logical time step - (define logical-time (box 0)) - (define (current-logical-time) - (unbox logical-time)) - - (define frtime-inspector (make-inspector)) - (print-struct #t) - - (define snap? (make-parameter #f)) - - (define named-dependents (make-hash-table)) - - (define (compose-continuation-mark-sets2 s1 s2) - s2) - - - (define (my-ccm) - (current-continuation-marks) - #;(continuation-mark-set->list (current-continuation-marks) 'drscheme-debug-continuation-mark-key)) - - ;;;;;;;;;;;;;;;; - ;; Structures ;; - ;;;;;;;;;;;;;;;; - - ; also models events, where 'value' is all the events that - ; haven't yet occurred (more specifically, an event-cons cell whose - ; tail is *undefined*) - (define-values (struct:signal - make-signal - signal? - signal-value - signal-dependents - signal-stale? - signal-thunk - signal-depth - signal-continuation-marks - signal-parameterization - signal-producers - set-signal-value! - set-signal-dependents! - set-signal-stale?! - set-signal-thunk! - set-signal-depth! - set-signal-continuation-marks! - set-signal-parameterization! - set-signal-producers!) - (let*-values ([(field-name-symbols) - (list 'value 'dependents 'stale? 'thunk - 'depth 'continuation-marks 'parameterization 'producers)] - [(desc make-signal signal? acc mut) - (make-struct-type - 'signal #f (length field-name-symbols) 0 #f null frtime-inspector - (lambda (fn . args) - (unregister #f fn) ; clear out stale dependencies from previous apps - (let* (; revisit error-reporting for switched behaviors - [ccm (my-ccm)] - [app-fun (lambda (cur-fn) - (let ([res (apply cur-fn args)]) - (when (signal? res) - (set-signal-continuation-marks! res ccm)) - res))]) - (super-lift app-fun fn))))]) - (apply values - desc - make-signal - signal? - (append - (build-list (length field-name-symbols) - (lambda (i) (make-struct-field-accessor acc i (list-ref field-name-symbols i)))) - (build-list (length field-name-symbols) - (lambda (i) (make-struct-field-mutator mut i (list-ref field-name-symbols i)))))))) - - (define-syntax signal - (let ([field-name-symbols (list 'value 'dependents 'stale? 'thunk - 'depth 'continuation-marks 'parameterization - 'producers)] - [cert (syntax-local-certifier #t)]) - (make-struct-info - (lambda () - (list - (cert #'struct:signal) - (cert #'make-signal) - (cert #'signal?) - (map - (lambda (fd) - (cert (datum->syntax-object - #'here - (string->symbol (format "signal-~a" fd))))) - (reverse field-name-symbols)) - (map - (lambda (fd) - (cert (datum->syntax-object - #'here - (string->symbol (format "set-signal-~a!" fd))))) - (reverse field-name-symbols)) - #t))))) - - (define (signal-custodian sig) - (call-with-parameterization - (signal-parameterization sig) - current-cust)) - - (define-struct ft-cust (signal constructed-sigs children)) - ;(define-struct non-scheduled (signal)) - (define make-non-scheduled identity) - (define (non-scheduled? x) #f) - (define (non-scheduled-signal x) - (error 'non-scheduled-signal "should never be called")) - - (define current-cust - (make-parameter #f)) - - (define-struct multiple (values) frtime-inspector) - - (define-struct event-set (time events)) - (define (make-events-now events) - (make-event-set (current-logical-time) events)) - - (define-struct (signal:unchanged signal) () frtime-inspector) - (define-struct (signal:compound signal:unchanged) (content copy) frtime-inspector) - (define-struct (signal:switching signal:unchanged) (current trigger) frtime-inspector) - (define-struct (signal:event signal) () frtime-inspector) - - ; an external event; contains a list of pairs - ; (recip val), where val is passed to recip's thunk - (define-struct external-event (recip-val-pairs)) - - ; update the given signal at the given time - (define-struct alarm (time signal)) - - (define extra-cont-marks (make-parameter #f)) +;; the current logical time step +(define logical-time (box 0)) +(define (current-logical-time) + (unbox logical-time)) - (define (effective-continuation-marks) - (if (extra-cont-marks) - (begin - #;(thread (lambda () (raise (make-exn:fail +(define frtime-inspector (make-inspector)) +(print-struct #t) + +(define snap? (make-parameter #f)) + +(define named-dependents (make-hash)) + +(define (compose-continuation-mark-sets2 s1 s2) + s2) + +(define (my-ccm) + (current-continuation-marks) + ; XXX What is this? + #;(continuation-mark-set->list (current-continuation-marks) 'drscheme-debug-continuation-mark-key)) + +;;;;;;;;;;;;;;;; +;; Structures ;; +;;;;;;;;;;;;;;;; + +; also models events, where 'value' is all the events that +; haven't yet occurred (more specifically, an event-cons cell whose +; tail is *undefined*) +(define-struct signal ([value #:mutable] + [dependents #:mutable] + [stale? #:mutable] + [thunk #:mutable] + [depth #:mutable] + [continuation-marks #:mutable] + parameterization + [producers #:mutable]) + #:inspector frtime-inspector + #:property prop:procedure + (lambda (fn . args) + (unregister #f fn) ; clear out stale dependencies from previous apps + (let* (; revisit error-reporting for switched behaviors + [ccm (my-ccm)] + [app-fun (lambda (cur-fn) + (let ([res (apply cur-fn args)]) + (when (signal? res) + (set-signal-continuation-marks! res ccm)) + res))]) + (super-lift app-fun fn)))) + +; XXX Remove +#;(define (signal-custodian sig) + (call-with-parameterization + (signal-parameterization sig) + current-cust)) + +(define-struct ft-cust (signal constructed-sigs children) #:mutable) + +(define make-non-scheduled identity) +(define (non-scheduled? x) #f) +(define (non-scheduled-signal x) + (error 'non-scheduled-signal "should never be called")) + +(define current-cust + (make-parameter #f)) + +(define-struct multiple (values) + #:inspector frtime-inspector) + +(define-struct event-set (time events)) +(define (make-events-now events) + (make-event-set (current-logical-time) events)) + +(define-struct (signal:unchanged signal) () + #:inspector frtime-inspector) +(define-struct (signal:compound signal:unchanged) (content copy) + #:inspector frtime-inspector) +(define-struct (signal:switching signal:unchanged) (current trigger) + #:inspector frtime-inspector) +(define-struct (signal:event signal) () + #:inspector frtime-inspector) + +; an external event; contains a list of pairs +; (recip val), where val is passed to recip's thunk +(define-struct external-event (recip-val-pairs)) + +; update the given signal at the given time +(define-struct alarm (time signal)) + +(define extra-cont-marks (make-parameter #f)) + +(define (effective-continuation-marks) + (if (extra-cont-marks) + (begin + ; XXX Remove + #;(thread (lambda () (raise (make-exn:fail "extra marks present!" (extra-cont-marks))))) - (compose-continuation-mark-sets2 - (my-ccm) - (extra-cont-marks) - )) - (my-ccm))) - - ;; Simple Structure Combinators - - (define (event-receiver) - (event-producer2 - (lambda (emit) - (lambda the-args - (if (cons? the-args) - (emit (first the-args)) - (make-events-now empty)))))) - - (define (event-producer2 proc . deps) - (let* ([result (apply proc->signal (lambda args (make-events-now empty)) deps)] - [proc/emit (proc - (lambda (val) - (let ([old-value (signal-value result)]) - (make-events-now - (if (and (event-set? old-value) - (= (current-logical-time) (event-set-time old-value))) - (append (event-set-events old-value) (list val)) - (list val))))))]) - (set-signal-thunk! result proc/emit) - result)) - - (define (build-signal ctor thunk producers) - (let ([ccm (effective-continuation-marks)]) - (do-in-manager - (let* ([cust (current-cust)] - [cust-sig (and cust (ft-cust-signal cust))] - [sig (ctor - undefined empty #t thunk - (add1 (apply max 0 (cons (safe-signal-depth cust-sig) (map safe-signal-depth producers)))) - ccm - (parameterize ([uncaught-exception-handler - (lambda (exn) (exn-handler exn))] - [extra-cont-marks ccm]) - (current-parameterization)) - (if cust-sig (append producers (list cust-sig)) producers))]) - (when (cons? producers) - (register sig producers)) - (when cust-sig - (register (make-non-scheduled sig) cust-sig)) - (when cust - (set-ft-cust-constructed-sigs! cust (cons (make-weak-box sig) (ft-cust-constructed-sigs cust)))) - (iq-enqueue sig) - sig)))) - - (define (proc->signal:switching thunk current-box trigger . producers) - (let ([ccm (effective-continuation-marks)]) - (do-in-manager - (let* ([cust (current-cust)] - [cust-sig (and cust (ft-cust-signal cust))] - [sig (make-signal:switching - undefined empty #t thunk - (add1 (apply max 0 (cons (safe-signal-depth cust-sig) (map safe-signal-depth producers)))) - ccm - (parameterize ([uncaught-exception-handler - (lambda (exn) (exn-handler exn))] - [extra-cont-marks ccm]) - (current-parameterization)) - (if cust-sig (cons cust-sig producers) producers) - current-box - trigger)]) - (when (cons? producers) - (register sig producers)) - (when cust-sig - (register (make-non-scheduled sig) cust-sig)) - (when cust - (set-ft-cust-constructed-sigs! - cust (cons (make-weak-box sig) (ft-cust-constructed-sigs cust)))) - (iq-enqueue sig) - sig)))) - - (define ht (make-hash-table)) - - (define (proc->signal thunk . producers) - (build-signal make-signal thunk producers)) - - (define (proc->signal/dont-gc-unless other-val thunk . producers) - (let ([result (build-signal make-signal thunk producers)]) - (hash-table-put! ht other-val result) - result)) - - (define (proc->signal:unchanged thunk . producers) - (build-signal make-signal:unchanged thunk producers)) - - - - ;;;;;;;;;;;;;;;;;;;;;;;;; - ;; Simple Signal Tools ;; - ;;;;;;;;;;;;;;;;;;;;;;;;; - - - (define (send-event rcvr val) - (! man (make-external-event (list (list rcvr val))))) - - (define (send-synchronous-event rcvr val) - (when (man?) - (error 'send-synchronous-event "already in frtime engine (would deadlock)")) - (! man (make-external-event (list (list rcvr val)))) - (do-in-manager ())) - - (define (send-synchronous-events rcvr-val-pairs) - (when (man?) - (error 'send-synchronous-events "already in frtime engine (would deadlock)")) - (unless (ormap list? rcvr-val-pairs) (error "not list")) - (unless (ormap signal? (map first rcvr-val-pairs)) (error "not signals")) - (! man (make-external-event rcvr-val-pairs)) - (do-in-manager ())) + (compose-continuation-mark-sets2 + (my-ccm) + (extra-cont-marks) + )) + (my-ccm))) +;; Simple Structure Combinators - ; set-cell! : cell[a] a -> void - (define (set-cell! ref beh) - (if (man?) - (iq-enqueue (list ((signal-thunk ref) #t) beh)) - (! man (make-external-event (list (list ((signal-thunk ref) #t) beh)))))) - - - (define-values (undefined undefined?) - (let-values ([(desc make-undefined undefined? acc mut) - (make-struct-type - 'undefined #f 0 0 #f null frtime-inspector - (lambda (fn . args) fn))]) - (values (make-undefined) undefined?))) - - - (define (behavior? v) - (and (signal? v) (not (event-set? (signal-value v))))) - - (define (undef b) - (match b - [(and (? signal?) - (= signal-value value)) - (set-signal-stale?! b #f) - (when (not (undefined? value)) - (set-signal-value! b undefined) - (propagate b))] - [_ (void)])) - - - (define (multiple->values v) - (if (multiple? v) - (apply values (multiple-values v)) - v)) - - (define (values->multiple proc) - (call-with-values - proc - (case-lambda - [(v) v] - [vals (make-multiple vals)]))) - - ; value-now : signal[a] -> a - (define (value-now val) - ;(multiple->values - (cond - [(signal:compound? val) ((signal:compound-copy val))] - [(signal:switching? val) (value-now (unbox (signal:switching-current val)))] - [(signal? val) (signal-value val)] - [else val]));) - - (define (value-now/no-copy val) - ;(multiple->values - (cond - [(signal:switching? val) (value-now/no-copy (unbox (signal:switching-current val)))] - [(signal? val) (signal-value val)] - [else val]));) - - - ;; given a list, will return a list of their value-nows that will agree - (define (value-now/sync . sigs) - (do-in-manager-after - (apply values (map value-now sigs)))) - - - (define (kill-signal sig) +(define (event-receiver) + (event-producer2 + (lambda (emit) + (lambda the-args + (if (cons? the-args) + (emit (first the-args)) + (make-events-now empty)))))) + +(define (event-producer2 proc . deps) + (let* ([result (apply proc->signal (lambda args (make-events-now empty)) deps)] + [proc/emit (proc + (lambda (val) + (let ([old-value (signal-value result)]) + (make-events-now + (if (and (event-set? old-value) + (= (current-logical-time) (event-set-time old-value))) + (append (event-set-events old-value) (list val)) + (list val))))))]) + (set-signal-thunk! result proc/emit) + result)) + +(define (build-signal ctor thunk producers) + (let ([ccm (effective-continuation-marks)]) + (do-in-manager + (let* ([cust (current-cust)] + [cust-sig (and cust (ft-cust-signal cust))] + [sig (ctor + undefined empty #t thunk + (add1 (apply max 0 (cons (safe-signal-depth cust-sig) (map safe-signal-depth producers)))) + ccm + (parameterize ([uncaught-exception-handler + (lambda (exn) (exn-handler exn))] + [extra-cont-marks ccm]) + (current-parameterization)) + (if cust-sig (append producers (list cust-sig)) producers))]) + (when (cons? producers) + (register sig producers)) + (when cust-sig + (register (make-non-scheduled sig) cust-sig)) + (when cust + (set-ft-cust-constructed-sigs! cust (cons (make-weak-box sig) (ft-cust-constructed-sigs cust)))) + (iq-enqueue sig) + sig)))) + +(define (proc->signal:switching thunk current-box trigger . producers) + (let ([ccm (effective-continuation-marks)]) + (do-in-manager + (let* ([cust (current-cust)] + [cust-sig (and cust (ft-cust-signal cust))] + [sig (make-signal:switching + undefined empty #t thunk + (add1 (apply max 0 (cons (safe-signal-depth cust-sig) (map safe-signal-depth producers)))) + ccm + (parameterize ([uncaught-exception-handler + (lambda (exn) (exn-handler exn))] + [extra-cont-marks ccm]) + (current-parameterization)) + (if cust-sig (cons cust-sig producers) producers) + current-box + trigger)]) + (when (cons? producers) + (register sig producers)) + (when cust-sig + (register (make-non-scheduled sig) cust-sig)) + (when cust + (set-ft-cust-constructed-sigs! + cust (cons (make-weak-box sig) (ft-cust-constructed-sigs cust)))) + (iq-enqueue sig) + sig)))) + +(define (proc->signal thunk . producers) + (build-signal make-signal thunk producers)) + +; XXX Remove +#;(define ht (make-hash)) +#;(define (proc->signal/dont-gc-unless other-val thunk . producers) + (let ([result (build-signal make-signal thunk producers)]) + (hash-set! ht other-val result) + result)) + +(define (proc->signal:unchanged thunk . producers) + (build-signal make-signal:unchanged thunk producers)) + +;;;;;;;;;;;;;;;;;;;;;;;;; +;; Simple Signal Tools ;; +;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (send-event rcvr val) + (! man (make-external-event (list (list rcvr val))))) + +; XXX move man check into contract +(define (send-synchronous-event rcvr val) + (when (man?) + (error 'send-synchronous-event "already in frtime engine (would deadlock)")) + (! man (make-external-event (list (list rcvr val)))) + (do-in-manager '())) + +; XXX move man check into contract +(define (send-synchronous-events rcvr-val-pairs) + (when (man?) + (error 'send-synchronous-events "already in frtime engine (would deadlock)")) + (! man (make-external-event rcvr-val-pairs)) + (do-in-manager '())) + +; set-cell! : cell[a] a -> void +(define (set-cell! ref beh) + (if (man?) + (iq-enqueue (list ((signal-thunk ref) #t) beh)) + (! man (make-external-event (list (list ((signal-thunk ref) #t) beh)))))) + +(define-values (undefined undefined?) + (let () + (define-struct undefined () + #:inspector frtime-inspector + #:property prop:procedure (lambda (fn . args) fn)) + (values (make-undefined) undefined?))) + +(define (behavior? v) + (and (signal? v) (not (event-set? (signal-value v))))) + +(define (undef b) + (match b + [(and (? signal?) + (app signal-value value)) + (set-signal-stale?! b #f) + (when (not (undefined? value)) + (set-signal-value! b undefined) + (propagate b))] + [_ (void)])) + +(define (multiple->values v) + (if (multiple? v) + (apply values (multiple-values v)) + v)) + +(define (values->multiple proc) + (call-with-values + proc + (case-lambda + [(v) v] + [vals (make-multiple vals)]))) + +; value-now : signal[a] -> a +(define (value-now val) + ;(multiple->values + (cond + [(signal:compound? val) ((signal:compound-copy val))] + [(signal:switching? val) (value-now (unbox (signal:switching-current val)))] + [(signal? val) (signal-value val)] + [else val]));) + +(define (value-now/no-copy val) + ;(multiple->values + (cond + [(signal:switching? val) (value-now/no-copy (unbox (signal:switching-current val)))] + [(signal? val) (signal-value val)] + [else val]));) + +;; given a list, will return a list of their value-nows that will agree +(define (value-now/sync . sigs) + (do-in-manager-after + (apply values (map value-now sigs)))) + +(define (kill-signal sig) + (for-each + (lambda (prod) + (unregister sig prod)) + (signal-producers sig)) + (set-signal-thunk! sig (lambda _ 'really-dead)) + (set-signal-value! sig 'dead) + (set-signal-dependents! sig empty) + (set-signal-producers! sig empty)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Dataflow Graph Maintenance ;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (safe-signal-depth v) + (cond + [(signal? v) (signal-depth v)] + [(non-scheduled? v) (signal-depth (non-scheduled-signal v))] + [else 0])) + +(define fix-depths + (lambda (inf sup [mem empty]) + (if (memq sup mem) + (send-event exceptions (list (make-exn:fail "tight cycle in dataflow graph" (signal-continuation-marks sup)) + sup)) + (when (<= (safe-signal-depth inf) + (safe-signal-depth sup)) + (set-signal-depth! inf (add1 (safe-signal-depth sup))) + (for-each + (lambda (dep) (fix-depths dep inf (cons sup mem))) + (foldl (lambda (wb acc) + (match (weak-box-value wb) + [(and sig (? signal?)) (cons sig acc)] + [(and (? non-scheduled?) (app non-scheduled-signal sig)) (cons sig acc)] + [_ acc])) + empty (signal-dependents inf))))))) + +(define-values (iq-enqueue iq-dequeue iq-empty? iq-resort) + (let* ([depth + (lambda (msg) + (let ([msg (if (weak-box? msg) (weak-box-value msg) msg)]) + (cond + [(cons? msg) (signal-depth (first msg))] + [(signal? msg) (signal-depth msg)] + [else 0])))] + [heap (make-heap + (lambda (b1 b2) (< (depth b1) (depth b2))) + eq?)]) + (values + (lambda (b) (heap-insert heap b)) + (lambda () (let ([v (heap-pop heap)]) + (if (weak-box? v) + (weak-box-value v) + v))) + (lambda () (heap-empty? heap)) + (lambda () (let loop ([elts empty]) + (if (heap-empty? heap) + (let loop ([elts elts]) + (when (cons? elts) + (heap-insert heap (first elts)) + (loop (rest elts)))) + (loop (cons (heap-pop heap) elts)))))))) + +(define-values (alarms-enqueue alarms-dequeue-beh alarms-peak-ms alarms-empty?) + (let ([heap (make-heap (lambda (a b) (< (first a) (first b))) eq?)]) + (values (lambda (ms beh) (heap-insert heap (list ms (make-weak-box beh)))) + (lambda () (match (heap-pop heap) [(list _ beh) (weak-box-value beh)])) + (lambda () (match (heap-peak heap) [(list ms _) ms])) + (lambda () (heap-empty? heap))))) + +(define (schedule-alarm ms beh) + (if (eq? (self) man) + (alarms-enqueue ms beh) + (! man (make-alarm ms beh)))) + +;;;;;;;;;;;;;;;;;;;;; +;; Manager Helpers ;; +;;;;;;;;;;;;;;;;;;;;; + +(define man? + (lambda ([v (self)]) + (eq? v man))) + +(define-syntax do-in-manager + (syntax-rules () + [(_ expr ...) + (if (man?) + (begin expr ...) + (begin + (! man (list 'run-thunk (self) + (let ([params (current-parameterization)]) + (lambda () + (call-with-parameterization + params + (lambda () expr ...)))))) + (receive [(list-rest 'vals vs) (apply values vs)] + [(list 'exn e) (raise e)])))])) + +(define-syntax do-in-manager-after + (syntax-rules () + [(_ expr ...) + (if (man?) + (begin expr ...) + (begin + (! man (list 'run-thunk/stabilized (self) + (let ([params (current-parameterization)]) + (lambda () + (call-with-parameterization + params + (lambda () expr ...)))))) + (receive [(list-rest 'vals vs) (apply values vs)] + [(list 'exn e) (raise e)])))])) + +(define (register inf sup) + (do-in-manager + (match sup + [(and (? signal?) + (app signal-dependents dependents)) + (set-signal-dependents! + sup + (cons (make-weak-box inf) dependents)) + (fix-depths inf sup)] + [(? list?) (for-each (lambda (sup1) (register inf sup1)) sup)] + [_ (void)]) + inf)) + +(define (unregister inf sup) + (do-in-manager + (match sup + [(and (? signal?) + (app signal-dependents dependents)) + (set-signal-dependents! + sup + (filter (lambda (a) + (let ([v (weak-box-value a)]) + (nor (eq? v inf) + (eq? v #f)))) + dependents))] + [_ (void)]))) + +(define (cust-killall! cust) + (let loop ([sigs (ft-cust-constructed-sigs cust)]) + (when (cons? sigs) + (cond + [(weak-box-value (first sigs)) => kill-signal] + [else (void)]) + (loop (rest sigs)))) + (for-each cust-killall! (ft-cust-children cust))) + +(define (super-lift fun bhvr) + (if (behavior? bhvr) + (parameterize ([extra-cont-marks + (effective-continuation-marks)]) + (do-in-manager + (let* ([cust (make-ft-cust (void) empty empty)] + [_ (cond + [(current-cust) + => (lambda (c) (set-ft-cust-children! c (cons cust (ft-cust-children c))))] + [else (void)])] + [pfun (lambda (b) + (parameterize ([current-cust cust]) + (fun b)))] + [current (box undefined)]) + (letrec ([custodian-signal + (proc->signal:unchanged + (lambda () + (cust-killall! cust) + (set-ft-cust-constructed-sigs! cust empty) + (set-ft-cust-children! cust empty) + (unregister rtn (unbox current)) + (set-box! current (pfun (value-now/no-copy bhvr))) + (register rtn (unbox current)) + ;; keep rtn's producers up-to-date + (set-signal-producers! rtn (cons (unbox current) + (cdr (signal-producers rtn)))) + (iq-resort) + 'custodian) + bhvr)] + [rtn (proc->signal:switching + (lambda () custodian-signal (value-now/no-copy (unbox current))) + current custodian-signal undefined bhvr custodian-signal)]) + (set-ft-cust-signal! cust custodian-signal) + rtn)))) + (fun bhvr))) + +(define (propagate b) + (let ([empty-boxes 0] + [dependents (signal-dependents b)] + [depth (signal-depth b)]) (for-each - (lambda (prod) - (unregister sig prod)) - (signal-producers sig)) - (set-signal-thunk! sig (lambda _ 'really-dead)) - (set-signal-value! sig 'dead) - (set-signal-dependents! sig empty) - (set-signal-producers! sig empty)) - - - - - - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - ;; Dataflow Graph Maintenance ;; - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - - - (define (safe-signal-depth v) - (cond - [(signal? v) (signal-depth v)] - [(non-scheduled? v) (signal-depth (non-scheduled-signal v))] - [0])) - - - (define fix-depths - (opt-lambda (inf sup [mem empty]) - (if (memq sup mem) - (send-event exceptions (list (make-exn:fail "tight cycle in dataflow graph" (signal-continuation-marks sup)) - sup)) - (when (<= (safe-signal-depth inf) - (safe-signal-depth sup)) - (set-signal-depth! inf (add1 (safe-signal-depth sup))) - (for-each - (lambda (dep) (fix-depths dep inf (cons sup mem))) - (foldl (lambda (wb acc) - (match (weak-box-value wb) - [(and sig (? signal?)) (cons sig acc)] - [(and (? non-scheduled?) (= non-scheduled-signal sig)) (cons sig acc)] - [_ acc])) - empty (signal-dependents inf))))))) - - - (define-values (iq-enqueue iq-dequeue iq-empty? iq-resort) - (let* ([depth - (lambda (msg) - (let ([msg (if (weak-box? msg) (weak-box-value msg) msg)]) - (cond - [(cons? msg) (signal-depth (first msg))] - [(signal? msg) (signal-depth msg)] - [else 0])))] - [heap (make-heap - (lambda (b1 b2) (< (depth b1) (depth b2))) - eq?)]) - (values - (lambda (b) (heap-insert heap b)) - (lambda () (let ([v (heap-pop heap)]) - (if (weak-box? v) - (weak-box-value v) - v))) - (lambda () (heap-empty? heap)) - (lambda () (let loop ([elts empty]) - (if (heap-empty? heap) - (let loop ([elts elts]) - (when (cons? elts) - (heap-insert heap (first elts)) - (loop (rest elts)))) - (loop (cons (heap-pop heap) elts)))))))) - - (define-values (alarms-enqueue alarms-dequeue-beh alarms-peak-ms alarms-empty?) - (let ([heap (make-heap (lambda (a b) (< (first a) (first b))) eq?)]) - (values (lambda (ms beh) (heap-insert heap (list ms (make-weak-box beh)))) - (lambda () (match (heap-pop heap) [(_ beh) (weak-box-value beh)])) - (lambda () (match (heap-peak heap) [(ms _) ms])) - (lambda () (heap-empty? heap))))) - - (define (schedule-alarm ms beh) - (if (eq? (self) man) - (alarms-enqueue ms beh) - (! man (make-alarm ms beh)))) - - - - - - ;;;;;;;;;;;;;;;;;;;;; - ;; Manager Helpers ;; - ;;;;;;;;;;;;;;;;;;;;; - - (define man? - (opt-lambda ([v (self)]) - (eq? v man))) - - - - - (define-syntax do-in-manager - (syntax-rules () - [(_ expr ...) - (if (man?) - (begin expr ...) - (begin - (! man (list 'run-thunk (self) - (let ([params (current-parameterization)]) - (lambda () - (call-with-parameterization - params - (lambda () expr ...)))))) - (receive [(list-rest 'vals vs) (apply values vs)] - [(list 'exn e) (raise e)])))])) - - (define-syntax do-in-manager-after - (syntax-rules () - [(_ expr ...) - (if (man?) - (begin expr ...) - (begin - (! man (list 'run-thunk/stabilized (self) - (let ([params (current-parameterization)]) - (lambda () - (call-with-parameterization - params - (lambda () expr ...)))))) - (receive [(list-rest 'vals vs) (apply values vs)] - [(list 'exn e) (raise e)])))])) - - (define (register inf sup) - (do-in-manager - (match sup - [(and (? signal?) - (= signal-dependents dependents)) - (set-signal-dependents! - sup - (cons (make-weak-box inf) dependents)) - (fix-depths inf sup)] - [(? list?) (for-each (lambda (sup1) (register inf sup1)) sup)] - [_ (void)]) - inf)) - - (define (unregister inf sup) - (do-in-manager - (match sup - [(and (? signal?) - (= signal-dependents dependents)) - (set-signal-dependents! - sup - (filter (lambda (a) - (let ([v (weak-box-value a)]) - (nor (eq? v inf) - (eq? v #f)))) - dependents))] - [_ (void)]))) - - (define (cust-killall! cust) - (let loop ([sigs (ft-cust-constructed-sigs cust)]) - (when (cons? sigs) - (cond - [(weak-box-value (first sigs)) => kill-signal] - [else (void)]) - (loop (rest sigs)))) - (for-each cust-killall! (ft-cust-children cust))) - - (define (super-lift fun bhvr) - (if (behavior? bhvr) - (parameterize ([extra-cont-marks - (effective-continuation-marks)]) - (do-in-manager - (let* ([cust (make-ft-cust (void) empty empty)] - [_ (cond - [(current-cust) - => (lambda (c) (set-ft-cust-children! c (cons cust (ft-cust-children c))))] - [else (void)])] - [pfun (lambda (b) - (parameterize ([current-cust cust]) - (fun b)))] - [current (box undefined)]) - (letrec ([custodian-signal - (proc->signal:unchanged - (lambda () - (cust-killall! cust) - (set-ft-cust-constructed-sigs! cust empty) - (set-ft-cust-children! cust empty) - (unregister rtn (unbox current)) - (set-box! current (pfun (value-now/no-copy bhvr))) - (register rtn (unbox current)) - ;; keep rtn's producers up-to-date - (set-signal-producers! rtn (cons (unbox current) - (cdr (signal-producers rtn)))) - (iq-resort) - 'custodian) - bhvr)] - [rtn (proc->signal:switching - (lambda () custodian-signal (value-now/no-copy (unbox current))) - current custodian-signal undefined bhvr custodian-signal)]) - (set-ft-cust-signal! cust custodian-signal) - rtn)))) - (fun bhvr))) - - (define (propagate b) - (let ([empty-boxes 0] - [dependents (signal-dependents b)] - [depth (signal-depth b)]) - (for-each - (lambda (wb) - (match (weak-box-value wb) - [(and dep (? signal?) (= signal-stale? #f)) - (set-signal-stale?! dep #t) - ; If I'm crossing a "back" edge (one potentially causing a cycle), - ; then I send a message. Otherwise, I add to the internal - ; priority queue. - (if (< depth (signal-depth dep)) - (iq-enqueue wb) - (! man dep))] - [_ - (set! empty-boxes (add1 empty-boxes))])) - dependents) - (when (> empty-boxes 9) - (set-signal-dependents! - b - (filter weak-box-value dependents))))) - - (define (update0 b) - (match b - [(and (? signal?) - (= signal-value value) - (= signal-thunk thunk) - (= signal-parameterization params)) - (set-signal-stale?! b #f) - (let ([new-value (call-with-parameterization - params - thunk)]) - (when (or (signal:unchanged? b) - (and (not (eq? value new-value)) - (or (not (event-set? new-value)) (cons? (event-set-events new-value)) - (not (event-set? value))))) - (begin - (set-signal-value! b new-value) - (propagate b))))] - [_ (void)])) - - (define (update1 b a) - (match b - [(and (? signal?) - (= signal-value value) - (= signal-thunk thunk)) - (set-signal-stale?! b #f) - (let ([new-value (thunk a)]) - (when (not (equal? value new-value)) + (lambda (wb) + (match (weak-box-value wb) + [(and dep (? signal?) (app signal-stale? #f)) + (set-signal-stale?! dep #t) + ; If I'm crossing a "back" edge (one potentially causing a cycle), + ; then I send a message. Otherwise, I add to the internal + ; priority queue. + (if (< depth (signal-depth dep)) + (iq-enqueue wb) + (! man dep))] + [_ + (set! empty-boxes (add1 empty-boxes))])) + dependents) + (when (> empty-boxes 9) + (set-signal-dependents! + b + (filter weak-box-value dependents))))) + +(define (update0 b) + (match b + [(and (? signal?) + (app signal-value value) + (app signal-thunk thunk) + (app signal-parameterization params)) + (set-signal-stale?! b #f) + (let ([new-value (call-with-parameterization + params + thunk)]) + (when (or (signal:unchanged? b) + (and (not (eq? value new-value)) + (or (not (event-set? new-value)) (cons? (event-set-events new-value)) + (not (event-set? value))))) + (begin (set-signal-value! b new-value) - (propagate b)))] - [_ (void)])) - - (define (signal-count) - (! man `(stat ,(self))) - (receive [n n])) + (propagate b))))] + [_ (void)])) - (define (hash-table-size ht) - (let ([x 0]) - (hash-table-for-each ht (lambda (k v) - (if k (set! x (add1 x))))) - x)) +(define (update1 b a) + (match b + [(and (? signal?) + (app signal-value value) + (app signal-thunk thunk)) + (set-signal-stale?! b #f) + (let ([new-value (thunk a)]) + (when (not (equal? value new-value)) + (set-signal-value! b new-value) + (propagate b)))] + [_ (void)])) - (define exn-handler (lambda (exn) (raise exn))) - - ;;;;;;;;;;;;; - ;; Manager ;; - ;;;;;;;;;;;;; - - ;; the manager of all signals - (define man - (spawn/name - 'frtime-heart - (let* ([named-providers (make-hash-table)] - [cur-beh #f] - [signal-cache (make-hash-table 'weak)] - [last-known-signal-count 50] - [notifications empty] - - ;; added for run-thunk/stablized - [thunks-to-run empty] - [do-and-queue (lambda (pid thnk) - (with-handlers - ([exn:fail? (lambda (exn) - (set! notifications - (cons (list pid 'exn exn) - notifications)))]) - (set! notifications - (cons (list* pid 'vals (call-with-values thnk list)) - notifications))))]) - (let outer () - (with-handlers ([exn:fail? - (lambda (exn) - (when cur-beh - (set! exn (make-exn:fail - (exn-message exn) - (compose-continuation-mark-sets2 - (signal-continuation-marks - cur-beh) - (exn-continuation-marks exn))));) - (iq-enqueue (list exceptions (list exn cur-beh))) - (when (behavior? cur-beh) - (undef cur-beh))) - (outer))]) - (set! exn-handler (uncaught-exception-handler)) - (let inner () - - ;; process external messages until there is an internal update - ;; or an expired alarm - (let loop () - (receive [after (cond - [(not (iq-empty?)) 0] - [(not (alarms-empty?)) (- (alarms-peak-ms) - (current-inexact-milliseconds))] - [else #f]) - (void)] - [(? signal? b) - (iq-enqueue b) - (loop)] - [(struct external-event (recip-val-pairs)) - (for-each iq-enqueue recip-val-pairs) - (loop)] - [(struct alarm (ms beh)) - (schedule-alarm ms beh) - (loop)] - [(list 'run-thunk rtn-pid thunk) - (begin - (do-and-queue rtn-pid thunk) - (loop))] - - - ;; !Experimental! - ;; queues thunks to be evaluated after this round of computation, - ;; but before the next round - - [(list 'run-thunk/stabilized rtn-pid thunk) - (begin - (set! thunks-to-run (cons (list rtn-pid thunk) thunks-to-run)) - (loop))] - - - [(list 'stat rtn-pid) - (! rtn-pid (hash-table-size signal-cache))] - - [(list 'remote-reg tid sym) - (let ([f+l (hash-table-get named-providers sym)]) - (when (not (member tid (mcdr f+l))) - (set-mcdr! f+l (cons tid (mcdr f+l))))) - (loop)] - [(list 'remote-evt sym val) - (iq-enqueue - (list (hash-table-get named-dependents sym (lambda () dummy)) val)) - (loop)] - [msg - (fprintf (current-error-port) - "frtime engine: msg not understood: ~a~n" - msg) - (loop)])) - - ;; enqueue expired timers for execution - (let loop () - (unless (or (alarms-empty?) - (< (current-inexact-milliseconds) - (alarms-peak-ms))) - (let ([beh (alarms-dequeue-beh)]) - (when (and beh (not (signal-stale? beh))) - (set-signal-stale?! beh #t) - (iq-enqueue beh))) - (loop))) - - ;; process internal updates - (let loop () - (unless (iq-empty?) - (match (iq-dequeue) - [(b val) - (set! cur-beh b) - (update1 b val) - (set! cur-beh #f)] - [b - (set! cur-beh b) - (update0 b) - (hash-table-get signal-cache b (lambda () (hash-table-put! signal-cache b #t))) - (set! cur-beh #f)]) - (loop))) - - - ;; do the run-thunk/stabalized; use existing notification mechanism - (for-each (lambda (pair) - (do-and-queue (first pair) (second pair))) - thunks-to-run) - - - (for-each (lambda (lst) - (! (first lst) (rest lst))) - notifications) - - (set! notifications empty) - (set! thunks-to-run empty) +(define (signal-count) + (! man `(stat ,(self))) + (receive [n n])) - (set-box! logical-time (add1 (unbox logical-time))) - - (inner))))))) - - (define exceptions - (event-receiver)) - - (define dummy (proc->signal void)) - - (provide (all-defined))) +(define (hash-table-size ht) + (let ([x 0]) + (hash-for-each ht (lambda (k v) + (when k (set! x (add1 x))))) + x)) + +(define exn-handler (lambda (exn) (raise exn))) + +;;;;;;;;;;;;; +;; Manager ;; +;;;;;;;;;;;;; + +;; the manager of all signals +(define man + (spawn/name + 'frtime-heart + (let* ([named-providers (make-hash)] + [cur-beh #f] + [signal-cache (make-weak-hash)] + [last-known-signal-count 50] + [notifications empty] + + ;; added for run-thunk/stablized + [thunks-to-run empty] + [do-and-queue (lambda (pid thnk) + (with-handlers + ([exn:fail? (lambda (exn) + (set! notifications + (cons (list pid 'exn exn) + notifications)))]) + (set! notifications + (cons (list* pid 'vals (call-with-values thnk list)) + notifications))))]) + (let outer () + (with-handlers ([exn:fail? + (lambda (exn) + (when cur-beh + (set! exn (make-exn:fail + (exn-message exn) + (compose-continuation-mark-sets2 + (signal-continuation-marks + cur-beh) + (exn-continuation-marks exn))));) + (iq-enqueue (list exceptions (list exn cur-beh))) + (when (behavior? cur-beh) + (undef cur-beh))) + (outer))]) + (set! exn-handler (uncaught-exception-handler)) + (let inner () + + ;; process external messages until there is an internal update + ;; or an expired alarm + (let loop () + (receive [after (cond + [(not (iq-empty?)) 0] + [(not (alarms-empty?)) (- (alarms-peak-ms) + (current-inexact-milliseconds))] + [else #f]) + (void)] + [(? signal? b) + (iq-enqueue b) + (loop)] + [(struct external-event (recip-val-pairs)) + (for-each iq-enqueue recip-val-pairs) + (loop)] + [(struct alarm (ms beh)) + (schedule-alarm ms beh) + (loop)] + [(list 'run-thunk rtn-pid thunk) + (begin + (do-and-queue rtn-pid thunk) + (loop))] + + ;; !Experimental! + ;; queues thunks to be evaluated after this round of computation, + ;; but before the next round + + [(list 'run-thunk/stabilized rtn-pid thunk) + (begin + (set! thunks-to-run (cons (list rtn-pid thunk) thunks-to-run)) + (loop))] + + [(list 'stat rtn-pid) + (! rtn-pid (hash-table-size signal-cache))] + + [(list 'remote-reg tid sym) + (let ([f+l (hash-ref named-providers sym)]) + (when (not (member tid (mcdr f+l))) + (set-mcdr! f+l (cons tid (mcdr f+l))))) + (loop)] + [(list 'remote-evt sym val) + (iq-enqueue + (list (hash-ref named-dependents sym (lambda () dummy)) val)) + (loop)] + [msg + (fprintf (current-error-port) + "frtime engine: msg not understood: ~a~n" + msg) + (loop)])) + + ;; enqueue expired timers for execution + (let loop () + (unless (or (alarms-empty?) + (< (current-inexact-milliseconds) + (alarms-peak-ms))) + (let ([beh (alarms-dequeue-beh)]) + (when (and beh (not (signal-stale? beh))) + (set-signal-stale?! beh #t) + (iq-enqueue beh))) + (loop))) + + ;; process internal updates + (let loop () + (unless (iq-empty?) + (match (iq-dequeue) + [(list b val) + (set! cur-beh b) + (update1 b val) + (set! cur-beh #f)] + [b + (set! cur-beh b) + (update0 b) + (hash-ref signal-cache b (lambda () (hash-set! signal-cache b #t))) + (set! cur-beh #f)]) + (loop))) + + ;; do the run-thunk/stabalized; use existing notification mechanism + (for-each (lambda (pair) + (do-and-queue (first pair) (second pair))) + thunks-to-run) + + + (for-each (lambda (lst) + (! (first lst) (rest lst))) + notifications) + + (set! notifications empty) + (set! thunks-to-run empty) + + (set-box! logical-time (add1 (unbox logical-time))) + + (inner))))))) + +(define exceptions (event-receiver)) + +(define dummy (proc->signal void)) + +(provide do-in-manager + do-in-manager-after) + +(define thunk/c + (unconstrained-domain-> any/c)) ; XXX Not really thunk +(define producers/c + (listof any/c)) ; XXX bad +(define switching-current/c + (box/c any/c)) ; XXX +(define switching-trigger/c + any/c) + +(provide/contract + ; Event Sets + [make-events-now ((listof any/c) . -> . event-set?)] ; XXX Ugly contract + [event-set? (any/c . -> . boolean?)] + [event-set-time (event-set? . -> . number?)] + [event-set-events (event-set? . -> . (listof any/c))] ; XXX Ugly contract + ; Undefined + [undefined undefined?] + [undefined? (any/c . -> . boolean?)] + ; Signals + [proc->signal ((thunk/c) () #:rest producers/c . ->* . signal?)] + [signal? (any/c . -> . boolean?)] + [signal-value (signal? . -> . any/c)] + [set-signal-value! (signal? any/c . -> . void)] + [signal-depth (signal? . -> . exact-nonnegative-integer?)] + [set-signal-depth! (signal? exact-nonnegative-integer? . -> . void)] + [set-signal-producers! (signal? producers/c . -> . void)] + [signal-thunk (signal? . -> . thunk/c)] + [set-signal-thunk! (signal? thunk/c . -> . void)] + [signal-count (-> exact-nonnegative-integer?)] + ; Signal : Compound + [signal:compound? (any/c . -> . boolean?)] + [signal:compound-content (signal:compound? . -> . cons?)] ; XXX Ugly contract on codomain + ; Signal : Switching + [proc->signal:switching ((thunk/c switching-current/c switching-trigger/c) () #:rest producers/c . ->* . signal:switching?)] + [signal:switching? (any/c . -> . boolean?)] + [signal:switching-current (signal:switching? . -> . switching-current/c)] + [signal:switching-trigger (signal:switching? . -> . switching-trigger/c)] + ; Input queue + [iq-enqueue (any/c . -> . void)] ; XXX Not sure what any/c should be + [iq-resort (-> void)] + ; Events + [send-event (signal? any/c . -> . void)] + [send-synchronous-event (signal? any/c . -> . void)] + [send-synchronous-events ((listof (cons/c signal? (listof any/c))) . -> . void)] + [event-receiver (-> signal?)] + [event-producer2 ((thunk/c) () #:rest producers/c . ->* . signal?)] + ; Other + [register (signal? any/c . -> . void)] ; XXX Ugly contract + [unregister (signal? any/c . -> . void)] ; XXX Ugly contract + [current-logical-time (-> exact-nonnegative-integer?)] + [snap? (parameter/c boolean?)] + [super-lift ((any/c . -> . any/c) any/c . -> . any/c)] ; XXX Ugly contract + [behavior? (any/c . -> . boolean?)] + [value-now (any/c . -> . any/c)] ; XXX Should this return (not/c signal?) and why not take signal? + [value-now/sync (() () #:rest (listof any/c) . ->* . any)] ; XXX See above + not matching number of values returned with number of signals + [value-now/no-copy (any/c . -> . any/c)] ; XXX Should this return (not/c signal?) and why not take signal? + [safe-signal-depth (any/c . -> . exact-nonnegative-integer?)] ; XXX Ugly contract + [schedule-alarm (number? signal? . -> . void)] + [set-cell! (signal? any/c . -> . void)] ; XXX What is any/c? + [exceptions signal?]) \ No newline at end of file diff --git a/collects/frtime/mailbox.ss b/collects/frtime/mailbox.ss index 2f129cbe7d..3405fd5228 100644 --- a/collects/frtime/mailbox.ss +++ b/collects/frtime/mailbox.ss @@ -56,7 +56,7 @@ (not-on-receive empty)))) (make-mailbox manager control-ch msgs-ch)) -(define-struct receive (reply-ch timeout timeout-thunk matcher)) +(define-struct receive (reply-ch timeout timeout-thunk matcher)) (define (mailbox-send! mb msg) (match mb [(struct mailbox (thd _ msgs))