cs & thread: fix atomic mode in callbacks within replace-evt

A nested evt poll did not move back to non-atomic mode when calling
event generators in the first argument of `replace-evt`.
This commit is contained in:
Matthew Flatt 2020-10-20 21:14:15 -06:00
parent 3be1d49652
commit 0ed1fc3850
3 changed files with 382 additions and 310 deletions

View File

@ -712,6 +712,19 @@
(thread (lambda () (semaphore-post s) (sleep) (semaphore-post s)))
(test s chain-evts s s)))
;; indirectly check that a `guard-evt` callabck in a `replace-evt`
;; is not called in atomic mode; if it is, then the thread won't
;; escape and terminate right
(test #t thread? (sync
(thread
(lambda ()
(let/cc esc
(sync (replace-evt
(guard-evt
(lambda ()
(esc 'done)))
void)))))))
;; ----------------------------------------
;; Structures as waitables

View File

@ -10426,278 +10426,121 @@
(set-schedule-info-did-work?! sched-info_0 #t)
(end-atomic)
(loop_0 (syncer-next sr_0) 0 #f #f))
(let ((ctx_0
(poll-ctx3.1
just-poll?20_0
(lambda () (syncing-done! s32_0 sr_0))
sched-info_0
#f)))
(call-with-values
(lambda () (evt-poll (syncer-evt sr_0) ctx_0))
(case-lambda
((results_0 new-evt_0)
(if results_0
(begin
(syncing-done! s32_0 sr_0)
(make-result sr_0 results_0 success-k19_0))
(if (delayed-poll? new-evt_0)
(if (let ((app_0 nested-sync-evt?))
(|#%app| app_0 (syncer-evt sr_0)))
(begin
(end-atomic)
(call-with-values
(lambda ()
(let ((app_0 poll-nested-sync))
(|#%app|
app_0
(syncer-evt sr_0)
just-poll?20_0
fast-only?21_0
sched-info_0)))
(case-lambda
((same?_0 new-evt_0)
(if same?_0
(loop_0
(syncer-next sr_0)
0
polled-all-so-far?_0
no-wrappers?_0)
(begin
(start-atomic)
(if (syncing-selected s32_0)
(void)
(set-syncer-evt! sr_0 new-evt_0))
(end-atomic)
(if fast-only?21_0
(|#%app| fail-k18_0 sched-info_0 #f #f)
(let ((new-evt_1
(|#%app|
(delayed-poll-resume
new-evt_0))))
(begin
(start-atomic)
(if (syncing-selected s32_0)
(void)
(set-syncer-evt! sr_0 new-evt_1))
(end-atomic)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
#f)))))
(if (choice-evt? new-evt_0)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
no-wrappers?_0)))))
(args
(raise-binding-result-arity-error 2 args)))))
(let ((ctx_0
(poll-ctx3.1
just-poll?20_0
(lambda () (syncing-done! s32_0 sr_0))
sched-info_0
#f)))
(call-with-values
(lambda () (evt-poll (syncer-evt sr_0) ctx_0))
(case-lambda
((results_0 new-evt_0)
(if results_0
(begin
(syncing-done! s32_0 sr_0)
(make-result
sr_0
results_0
success-k19_0))
(if (delayed-poll? new-evt_0)
(begin
(if (let ((or-part_0
(syncer-interrupt sr_0)))
(if or-part_0
or-part_0
(syncer-retry sr_0)))
(begin
(end-atomic)
(internal-error
"choice event discovered after interrupt/retry callback"))
(void))
(let ((new-syncers_0
(let ((app_0 random-rotate))
(end-atomic)
(if fast-only?21_0
(|#%app|
fail-k18_0
sched-info_0
#f
#f)
(let ((new-evt_1
(|#%app|
app_0
(let ((app_1 evts->syncers))
(let ((app_2
(choice-evt-evts
new-evt_0)))
(let ((app_3
(syncer-wraps
sr_0)))
(let ((app_4
(syncer-commits
sr_0)))
(|#%app|
app_1
#f
app_2
app_3
app_4
(syncer-abandons
sr_0))))))))))
(if (not new-syncers_0)
(delayed-poll-resume
new-evt_0))))
(begin
(syncer-remove! sr_0 s32_0)
(start-atomic)
(if (syncing-selected s32_0)
(void)
(set-syncer-evt!
sr_0
new-evt_1))
(end-atomic)
(loop_0
(syncer-next sr_0)
0
polled-all-so-far?_0
no-wrappers?_0))
(begin
(syncer-replace!
sr_0
new-syncers_0
s32_0)
(end-atomic)
(loop_0
new-syncers_0
(add1 retries_0)
polled-all-so-far?_0
no-wrappers?_0)))))
(if (wrap-evt? new-evt_0)
#f)))))
(if (choice-evt? new-evt_0)
(begin
(set-syncer-wraps!
sr_0
(let ((app_0
(wrap-evt-wrap new-evt_0)))
(cons
app_0
(let ((l_0 (syncer-wraps sr_0)))
(if (if (null? l_0)
(not
(handle-evt?$1
new-evt_0))
#f)
(list values)
l_0)))))
(let ((inner-new-evt_0
(wrap-evt-evt new-evt_0)))
(begin
(set-syncer-evt!
sr_0
inner-new-evt_0)
(if (eq?
inner-new-evt_0
the-always-evt)
(begin
(syncing-done! s32_0 sr_0)
(make-result
sr_0
(list the-always-evt)
success-k19_0))
(begin
(end-atomic)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
#f))))))
(if (control-state-evt? new-evt_0)
(let ((wrap-proc_0
(control-state-evt-wrap-proc
new-evt_0)))
(let ((interrupt-proc_0
(control-state-evt-interrupt-proc
new-evt_0)))
(let ((abandon-proc_0
(control-state-evt-abandon-proc
new-evt_0)))
(let ((retry-proc_0
(control-state-evt-retry-proc
new-evt_0)))
(begin
(if (eq?
wrap-proc_0
values)
(void)
(set-syncer-wraps!
sr_0
(cons
wrap-proc_0
(syncer-wraps sr_0))))
(if (eq?
interrupt-proc_0
void)
(void)
(if (eq?
interrupt-proc_0
'reset)
(set-syncer-interrupt!
sr_0
#f)
(begin
(if (syncer-interrupt
sr_0)
(internal-error
"syncer already has an interrupt callback")
(void))
(set-syncer-interrupt!
sr_0
interrupt-proc_0))))
(if (eq?
abandon-proc_0
void)
(void)
(set-syncer-abandons!
sr_0
(cons
abandon-proc_0
(syncer-abandons
sr_0))))
(if (eq? retry-proc_0 void)
(void)
(if (eq?
retry-proc_0
'reset)
(set-syncer-retry!
sr_0
#f)
(begin
(if (syncer-retry
sr_0)
(internal-error
"syncer already has an retry callback")
(void))
(set-syncer-retry!
sr_0
retry-proc_0))))
(set-syncer-evt!
sr_0
(control-state-evt-evt
new-evt_0))
(end-atomic)
(if (if fast-only?21_0
(not
(if (eq?
interrupt-proc_0
void)
(if (eq?
abandon-proc_0
void)
(eq?
retry-proc_0
void)
#f)
#f))
#f)
(|#%app|
fail-k18_0
sched-info_0
#f
#f)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
no-wrappers?_0)))))))
(if (poll-guard-evt? new-evt_0)
(if (let ((or-part_0
(syncer-interrupt sr_0)))
(if or-part_0
or-part_0
(syncer-retry sr_0)))
(begin
(end-atomic)
(if fast-only?21_0
(|#%app|
fail-k18_0
sched-info_0
#f
#f)
(let ((generated_0
(call-with-continuation-barrier
(lambda ()
(|#%app|
(poll-guard-evt-proc
new-evt_0)
just-poll?20_0)))))
(begin
(set-syncer-evt!
sr_0
(if (1/evt? generated_0)
generated_0
(wrap-evt7.1
the-always-evt
(lambda (a_0)
generated_0))))
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
#f)))))
(if (if (never-evt? new-evt_0)
(if (not
(|#%app|
evt-impersonator?
new-evt_0))
(if (not
(syncer-interrupt
sr_0))
(if (null?
(syncer-commits
sr_0))
(null?
(syncer-abandons
sr_0))
#f)
#f)
#f)
#f)
(internal-error
"choice event discovered after interrupt/retry callback"))
(void))
(let ((new-syncers_0
(let ((app_0 random-rotate))
(|#%app|
app_0
(let ((app_1
evts->syncers))
(let ((app_2
(choice-evt-evts
new-evt_0)))
(let ((app_3
(syncer-wraps
sr_0)))
(let ((app_4
(syncer-commits
sr_0)))
(|#%app|
app_1
#f
app_2
app_3
app_4
(syncer-abandons
sr_0))))))))))
(if (not new-syncers_0)
(begin
(syncer-remove! sr_0 s32_0)
(end-atomic)
@ -10706,34 +10549,241 @@
0
polled-all-so-far?_0
no-wrappers?_0))
(if (if (eq?
new-evt_0
(syncer-evt sr_0))
(not
(poll-ctx-incomplete?
ctx_0))
(begin
(syncer-replace!
sr_0
new-syncers_0
s32_0)
(end-atomic)
(loop_0
new-syncers_0
(add1 retries_0)
polled-all-so-far?_0
no-wrappers?_0)))))
(if (wrap-evt? new-evt_0)
(begin
(set-syncer-wraps!
sr_0
(let ((app_0
(wrap-evt-wrap new-evt_0)))
(cons
app_0
(let ((l_0
(syncer-wraps sr_0)))
(if (if (null? l_0)
(not
(handle-evt?$1
new-evt_0))
#f)
(list values)
l_0)))))
(let ((inner-new-evt_0
(wrap-evt-evt new-evt_0)))
(begin
(set-syncer-evt!
sr_0
inner-new-evt_0)
(if (eq?
inner-new-evt_0
the-always-evt)
(begin
(syncing-done! s32_0 sr_0)
(make-result
sr_0
(list the-always-evt)
success-k19_0))
(begin
(end-atomic)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
#f))))))
(if (control-state-evt? new-evt_0)
(let ((wrap-proc_0
(control-state-evt-wrap-proc
new-evt_0)))
(let ((interrupt-proc_0
(control-state-evt-interrupt-proc
new-evt_0)))
(let ((abandon-proc_0
(control-state-evt-abandon-proc
new-evt_0)))
(let ((retry-proc_0
(control-state-evt-retry-proc
new-evt_0)))
(begin
(if (eq?
wrap-proc_0
values)
(void)
(set-syncer-wraps!
sr_0
(cons
wrap-proc_0
(syncer-wraps
sr_0))))
(if (eq?
interrupt-proc_0
void)
(void)
(if (eq?
interrupt-proc_0
'reset)
(set-syncer-interrupt!
sr_0
#f)
(begin
(if (syncer-interrupt
sr_0)
(internal-error
"syncer already has an interrupt callback")
(void))
(set-syncer-interrupt!
sr_0
interrupt-proc_0))))
(if (eq?
abandon-proc_0
void)
(void)
(set-syncer-abandons!
sr_0
(cons
abandon-proc_0
(syncer-abandons
sr_0))))
(if (eq?
retry-proc_0
void)
(void)
(if (eq?
retry-proc_0
'reset)
(set-syncer-retry!
sr_0
#f)
(begin
(if (syncer-retry
sr_0)
(internal-error
"syncer already has an retry callback")
(void))
(set-syncer-retry!
sr_0
retry-proc_0))))
(set-syncer-evt!
sr_0
(control-state-evt-evt
new-evt_0))
(end-atomic)
(if (if fast-only?21_0
(not
(if (eq?
interrupt-proc_0
void)
(if (eq?
abandon-proc_0
void)
(eq?
retry-proc_0
void)
#f)
#f))
#f)
(|#%app|
fail-k18_0
sched-info_0
#f
#f)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
no-wrappers?_0)))))))
(if (poll-guard-evt? new-evt_0)
(begin
(end-atomic)
(if fast-only?21_0
(|#%app|
fail-k18_0
sched-info_0
#f
#f)
(let ((generated_0
(call-with-continuation-barrier
(lambda ()
(|#%app|
(poll-guard-evt-proc
new-evt_0)
just-poll?20_0)))))
(begin
(set-syncer-evt!
sr_0
(if (1/evt? generated_0)
generated_0
(wrap-evt7.1
the-always-evt
(lambda (a_0)
generated_0))))
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
#f)))))
(if (if (never-evt? new-evt_0)
(if (not
(|#%app|
evt-impersonator?
new-evt_0))
(if (not
(syncer-interrupt
sr_0))
(if (null?
(syncer-commits
sr_0))
(null?
(syncer-abandons
sr_0))
#f)
#f)
#f)
#f)
(begin
(syncer-remove! sr_0 s32_0)
(end-atomic)
(loop_0
(syncer-next sr_0)
0
polled-all-so-far?_0
no-wrappers?_0))
(begin
(set-syncer-evt!
sr_0
new-evt_0)
(end-atomic)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
no-wrappers?_0)))))))))))
(args
(raise-binding-result-arity-error
2
args)))))))))))))))
(if (if (eq?
new-evt_0
(syncer-evt sr_0))
(not
(poll-ctx-incomplete?
ctx_0))
#f)
(begin
(end-atomic)
(loop_0
(syncer-next sr_0)
0
polled-all-so-far?_0
no-wrappers?_0))
(begin
(set-syncer-evt!
sr_0
new-evt_0)
(end-atomic)
(loop_0
sr_0
(add1 retries_0)
polled-all-so-far?_0
no-wrappers?_0)))))))))))
(args
(raise-binding-result-arity-error
2
args))))))))))))))))
(loop_0 (syncing-syncers s32_0) 0 #t #t)))))))
(define make-result
(lambda (sr_0 results_0 success-k_0)
@ -11070,7 +11120,7 @@
(void)))
(define struct:nested-sync-evt
(make-record-type-descriptor* 'evt #f #f #f #f 3 7))
(define effect_2073
(define effect_2232
(struct-type-install-properties!
struct:nested-sync-evt
'evt
@ -11080,9 +11130,7 @@
(list
(cons
1/prop:evt
(poller2.1
(lambda (self_0 poll-ctx_0)
(|#%app| poll-nested-sync self_0 poll-ctx_0)))))
(poller2.1 (lambda (self_0 poll-ctx_0) (values #f self_0)))))
(current-inspector)
#f
'(0 1 2)
@ -11204,15 +11252,11 @@
(lambda () (syncing-retry! s_0))))))))
orig-evt_0)))))))
(define poll-nested-sync
(lambda (ns_0 poll-ctx_0)
(lambda (ns_0 just-poll?_0 fast-only?_0 sched-info_0)
(let ((temp90_0 (|#%app| nested-sync-evt-s ns_0)))
(let ((temp91_0
(lambda (sched-info_0 polled-all?_0 no-wrappers?_0)
(begin
(if polled-all?_0
(void)
(set-poll-ctx-incomplete?! poll-ctx_0 #f))
(values #f ns_0)))))
(lambda (sched-info_1 polled-all?_0 no-wrappers?_0)
(values polled-all?_0 ns_0))))
(let ((temp92_0
(lambda (thunk_0)
(let ((next_0 (nested-sync-evt-next ns_0)))
@ -11232,21 +11276,16 @@
'reset
void
'reset)))))))
(let ((temp93_0 (poll-ctx-poll? poll-ctx_0)))
(let ((temp95_0 (poll-ctx-sched-info poll-ctx_0)))
(let ((temp93_1 temp93_0)
(temp92_1 temp92_0)
(temp91_1 temp91_0)
(temp90_1 temp90_0))
(sync-poll.1
#f
#f
temp91_1
#f
temp93_1
temp95_0
temp92_1
temp90_1)))))))))
(let ((temp91_1 temp91_0) (temp90_1 temp90_0))
(sync-poll.1
#f
#f
temp91_1
fast-only?_0
just-poll?_0
sched-info_0
temp92_0
temp90_1)))))))
(define 1/current-evt-pseudo-random-generator
(make-parameter
(make-pseudo-random-generator)

View File

@ -399,6 +399,27 @@
(schedule-info-did-work! sched-info)
(end-atomic)
(loop (syncer-next sr) 0 #f #f)]
[(nested-sync-evt? (syncer-evt sr))
;; Have to go out of atomic mode to continue:
(end-atomic)
(define-values (same? new-evt) (poll-nested-sync (syncer-evt sr) just-poll? fast-only? sched-info))
(cond
[same?
(loop (syncer-next sr) 0 polled-all-so-far? no-wrappers?)]
[else
;; Since we left atomic mode, double-check that we're
;; still syncing before installing the replacement event:
(atomically
(unless (syncing-selected s)
(set-syncer-evt! sr new-evt))
(void))
(cond
[fast-only?
;; Conservative, because we don't know whether `same?` was #f
;; because the nested sync had non-fast elements
(none-k sched-info #f #f)]
[else
(loop sr (add1 retries) polled-all-so-far? no-wrappers?)])])]
[else
(define ctx (poll-ctx just-poll?
;; Call back for asynchronous selection,
@ -729,7 +750,7 @@
#:reflection-name 'evt)
(struct nested-sync-evt (s next orig-evt)
#:property prop:evt (poller (lambda (self poll-ctx) (poll-nested-sync self poll-ctx)))
#:property prop:evt (poller (lambda (self poll-ctx) (values #f self)))
#:reflection-name 'evt)
(define/who (replace-evt evt next)
@ -754,12 +775,10 @@
(lambda () (syncing-retry! s)))))))
orig-evt)
(define (poll-nested-sync ns poll-ctx)
(define (poll-nested-sync ns just-poll? fast-only? sched-info)
(sync-poll (nested-sync-evt-s ns)
#:fail-k (lambda (sched-info polled-all? no-wrappers?)
(unless polled-all?
(set-poll-ctx-incomplete?! poll-ctx #f))
(values #f ns))
(values polled-all? ns))
#:success-k (lambda (thunk)
;; `thunk` produces the values of the evt
;; that was provided to `replace-evt`:
@ -778,9 +797,10 @@
'reset
void
'reset)))
#:just-poll? (poll-ctx-poll? poll-ctx)
#:just-poll? just-poll?
#:done-after-poll? #f
#:schedule-info (poll-ctx-sched-info poll-ctx)))
#:fast-only? fast-only?
#:schedule-info sched-info))
;; ----------------------------------------