parent
8908496ba1
commit
f508bb25ab
|
@ -689,6 +689,29 @@
|
|||
(lambda (_)
|
||||
(+ a b))))))
|
||||
|
||||
(let ()
|
||||
(define (chain-evts e1 e2)
|
||||
(sync (make-semaphore) (replace-evt e1
|
||||
(lambda (v)
|
||||
(choice-evt
|
||||
e2
|
||||
(make-semaphore))))))
|
||||
(test always-evt chain-evts always-evt always-evt)
|
||||
(test always-evt chain-evts (make-semaphore 1) always-evt)
|
||||
(let ([s (make-semaphore 1)])
|
||||
(test always-evt chain-evts s always-evt))
|
||||
(let ([s (make-semaphore 1)])
|
||||
(test s chain-evts (make-semaphore 1) s))
|
||||
(let ([s (make-semaphore 2)])
|
||||
(test s chain-evts s s)
|
||||
(test #f sync/timeout 0 s))
|
||||
(let ([s (make-semaphore)])
|
||||
(thread (lambda () (semaphore-post s)))
|
||||
(test always-evt chain-evts s always-evt))
|
||||
(let ([s (make-semaphore)])
|
||||
(thread (lambda () (semaphore-post s) (sleep) (semaphore-post s)))
|
||||
(test s chain-evts s s)))
|
||||
|
||||
;; ----------------------------------------
|
||||
;; Structures as waitables
|
||||
|
||||
|
|
|
@ -137,6 +137,12 @@
|
|||
;; semaphore was meanwhile posted). As another example, a
|
||||
;; `nack-guard-evt`'s result uses `abandon-proc` to post to the NACK
|
||||
;; event.
|
||||
;;
|
||||
;; A sync slot can have at most one non-`void` `interupt-proc` and
|
||||
;; `retry-proc` (but any number of `abandon-proc`s). The
|
||||
;; `interrupt-proc` and `abandon-proc` fields in `control-state-evt`
|
||||
;; can be 'reset to discard existing procedures for the slot.
|
||||
;;
|
||||
;; Beware that it doesn't make sense to use `wrap-evt` around the
|
||||
;; `control-state-evt` or the `evt` inside for an asynchronously
|
||||
;; satisfied event (like the way that semaphores are implemented). The
|
||||
|
@ -144,6 +150,7 @@
|
|||
;; event is found, so that the result turns out to be an unwrapped
|
||||
;; event. Or the `interrupt-proc`, etc., callbacks may not be found
|
||||
;; early enough if the `control-state-evt` is wrapped.
|
||||
;;
|
||||
(struct control-state-evt (evt
|
||||
wrap-proc
|
||||
interrupt-proc ; thunk for break/kill initiated or otherwise before `abandon-proc`
|
||||
|
|
|
@ -35,16 +35,16 @@
|
|||
wraps ; list of wraps to apply if selected
|
||||
commits ; list of thunks to run atomically when selected
|
||||
interrupted? ; kill/break in progress?
|
||||
interrupts ; list of thunks to run on kill/break initiation
|
||||
interrupt ; #f or a thunk to run on kill/break initiation
|
||||
abandons ; list of thunks to run on kill/break completion
|
||||
retries ; list of thunks to run on retry: returns `(values _val _ready?)`
|
||||
retry ; #f or a thunk to run on retry: returns `(values _val _ready?)`
|
||||
prev ; previous in linked list
|
||||
next) ; next in linked list
|
||||
#:transparent
|
||||
#:mutable)
|
||||
|
||||
(define (make-syncer evt wraps prev)
|
||||
(syncer evt wraps null #f null null null prev #f))
|
||||
(syncer evt wraps null #f #f null #f prev #f))
|
||||
|
||||
(define none-syncer (make-syncer #f null #f))
|
||||
|
||||
|
@ -437,10 +437,10 @@
|
|||
(void))
|
||||
(loop sr (add1 retries) polled-all-so-far? #f))])]
|
||||
[(choice-evt? new-evt)
|
||||
(when (or (pair? (syncer-interrupts sr))
|
||||
(pair? (syncer-retries sr)))
|
||||
(when (or (syncer-interrupt sr)
|
||||
(syncer-retry sr))
|
||||
(end-atomic)
|
||||
(internal-error "choice event discovered after interrupt/retry callbacks"))
|
||||
(internal-error "choice event discovered after interrupt/retry callback"))
|
||||
(define new-syncers (random-rotate
|
||||
(evts->syncers #f
|
||||
(choice-evt-evts new-evt)
|
||||
|
@ -487,11 +487,19 @@
|
|||
(unless (eq? wrap-proc values)
|
||||
(set-syncer-wraps! sr (cons wrap-proc (syncer-wraps sr))))
|
||||
(unless (eq? interrupt-proc void)
|
||||
(set-syncer-interrupts! sr (cons interrupt-proc (syncer-interrupts sr))))
|
||||
(cond
|
||||
[(eq? interrupt-proc 'reset) (set-syncer-interrupt! sr #f)]
|
||||
[else
|
||||
(when (syncer-interrupt sr) (internal-error "syncer already has an interrupt callback"))
|
||||
(set-syncer-interrupt! sr interrupt-proc)]))
|
||||
(unless (eq? abandon-proc void)
|
||||
(set-syncer-abandons! sr (cons abandon-proc (syncer-abandons sr))))
|
||||
(unless (eq? retry-proc void)
|
||||
(set-syncer-retries! sr (cons retry-proc (syncer-retries sr))))
|
||||
(cond
|
||||
[(eq? retry-proc 'reset) (set-syncer-retry! sr #f)]
|
||||
[else
|
||||
(when (syncer-retry sr) (internal-error "syncer already has an retry callback"))
|
||||
(set-syncer-retry! sr retry-proc)]))
|
||||
(set-syncer-evt! sr (control-state-evt-evt new-evt))
|
||||
(end-atomic)
|
||||
(cond
|
||||
|
@ -517,7 +525,7 @@
|
|||
(loop sr (add1 retries) polled-all-so-far? #f)])]
|
||||
[(and (never-evt? new-evt)
|
||||
(not (evt-impersonator? new-evt))
|
||||
(null? (syncer-interrupts sr))
|
||||
(not (syncer-interrupt sr))
|
||||
(null? (syncer-commits sr))
|
||||
(null? (syncer-abandons sr)))
|
||||
;; Drop this event, since it will never get selected
|
||||
|
@ -573,8 +581,9 @@
|
|||
(when sr
|
||||
(unless (eq? sr selected-sr)
|
||||
(unless (syncer-interrupted? sr)
|
||||
(for ([interrupt (in-list (syncer-interrupts sr))])
|
||||
(interrupt)))
|
||||
(let ([interrupt (syncer-interrupt sr)])
|
||||
(when interrupt
|
||||
(interrupt))))
|
||||
(for ([abandon (in-list (syncer-abandons sr))])
|
||||
(abandon)))
|
||||
(loop (syncer-next sr))))
|
||||
|
@ -597,8 +606,9 @@
|
|||
(when sr
|
||||
(unless (syncer-interrupted? sr)
|
||||
(set-syncer-interrupted?! sr #t)
|
||||
(for ([interrupt (in-list (syncer-interrupts sr))])
|
||||
(interrupt)))
|
||||
(let ([interrupt (syncer-interrupt sr)])
|
||||
(when interrupt
|
||||
(interrupt))))
|
||||
(loop (syncer-next sr)))))
|
||||
|
||||
;; Called in atomic mode
|
||||
|
@ -614,13 +624,12 @@
|
|||
(not (syncing-selected s)))
|
||||
(when (syncer-interrupted? sr)
|
||||
(set-syncer-interrupted?! sr #f)
|
||||
;; Although we keep a list of retries, we expect only
|
||||
;; one to be relevant
|
||||
(for ([retry (in-list (syncer-retries sr))])
|
||||
(let ([retry (syncer-retry sr)])
|
||||
(when retry
|
||||
(define-values (result ready?) (retry))
|
||||
(when ready?
|
||||
(set-syncer-wraps! sr (cons (lambda args result) (syncer-wraps sr)))
|
||||
(syncing-done! s sr))))
|
||||
(syncing-done! s sr)))))
|
||||
(loop (syncer-next sr)))))
|
||||
|
||||
;; Queue a retry when a check for breaks should happen before a retry
|
||||
|
@ -737,6 +746,9 @@
|
|||
(control-state-evt
|
||||
(nested-sync-evt s next orig-evt)
|
||||
values
|
||||
;; The interrupt and retry callbacks get discarded
|
||||
;; when a new event is returned (but the abandon
|
||||
;; callback is preserved)
|
||||
(lambda () (syncing-interrupt! s))
|
||||
(lambda () (syncing-abandon! s))
|
||||
(lambda () (syncing-retry! s)))))))
|
||||
|
@ -755,12 +767,17 @@
|
|||
(define orig-evt (nested-sync-evt-orig-evt ns))
|
||||
(values #f
|
||||
;; and this is the "replace" step:
|
||||
(control-state-evt
|
||||
(poll-guard-evt
|
||||
(lambda (poll?)
|
||||
(define r (call-with-values thunk next))
|
||||
(cond
|
||||
[(evt? r) r]
|
||||
[else (wrap-evt always-evt (lambda (v) orig-evt))])))))
|
||||
[else (wrap-evt always-evt (lambda (v) orig-evt))])))
|
||||
values
|
||||
'reset
|
||||
void
|
||||
'reset)))
|
||||
#:just-poll? (poll-ctx-poll? poll-ctx)
|
||||
#:done-after-poll? #f
|
||||
#:schedule-info (poll-ctx-sched-info poll-ctx)))
|
||||
|
|
Loading…
Reference in New Issue
Block a user