cs & thread: fix problems with sync and breaks

This commit fixes two bugs:

 * `sync/enable-break` didn't implement the guarantee that an event is
   selected or a break exception raised, but not both; the problem was
   in the handling of making the break-enable state ignored after
   committing to an event

 * `sync` didn't cancel asynchronous pending commits when a break is
   received at certain points; the bug in `sync/enable-break` masked
   this bug for existing test cases

Closes #3574
This commit is contained in:
Matthew Flatt 2020-12-19 16:06:16 -07:00
parent c05d0a6fa5
commit 4d0aa443b1
4 changed files with 89 additions and 61 deletions

View File

@ -1226,6 +1226,9 @@
[did-post2 #f] [did-post2 #f]
[did-done #f] [did-done #f]
[break-on (lambda () (break-enabled #t))] [break-on (lambda () (break-enabled #t))]
[sync-idle/break-thread (lambda (t)
(sync (system-idle-evt))
(break-thread t))]
[sw semaphore-wait]) [sw semaphore-wait])
(let ([mk-t (let ([mk-t
(lambda (init ;; how to start (lambda (init ;; how to start
@ -1273,8 +1276,8 @@
reset reset
(lambda () (lambda ()
(set! did-pre1 #t) (set! did-pre1 #t)
(semaphore-post p)
(pre-thunk) (pre-thunk)
(semaphore-post p)
(pre-semaphore-wait s) (pre-semaphore-wait s)
(set! did-pre2 #t)))) (set! did-pre2 #t))))
(lambda () (lambda ()
@ -1282,8 +1285,8 @@
reset reset
(lambda () (lambda ()
(set! did-act1 #t) (set! did-act1 #t)
(semaphore-post p)
(act-thunk) (act-thunk)
(semaphore-post p)
(act-semaphore-wait s) (act-semaphore-wait s)
(set! did-act2 #t)))) (set! did-act2 #t))))
(lambda () (lambda ()
@ -1291,8 +1294,8 @@
reset reset
(lambda () (lambda ()
(set! did-post1 #t) (set! did-post1 #t)
(semaphore-post p)
(post-thunk) (post-thunk)
(semaphore-post p)
(post-semaphore-wait s) (post-semaphore-wait s)
(set! did-post2 #t))))) (set! did-post2 #t)))))
(set! did-done #t))))))]) (set! did-done #t))))))])
@ -1338,6 +1341,7 @@
(if should-pre-break? (if should-pre-break?
(begin (begin
(thread-wait t) (thread-wait t)
(test #t semaphore-try-wait? s)
(test #f 'pre2 did-pre2)) (test #f 'pre2 did-pre2))
(if should-preact-break? (if should-preact-break?
(begin (begin
@ -1365,6 +1369,7 @@
(if should-post-break? (if should-post-break?
(begin (begin
(thread-wait t) (thread-wait t)
(test #t semaphore-try-wait? s)
(test #f 'post2 did-post2)) (test #f 'post2 did-post2))
(begin (begin
(thread-wait t) (thread-wait t)
@ -1374,6 +1379,8 @@
(lambda (mk-t) (lambda (mk-t)
(for-each (for-each
(lambda (nada) (lambda (nada)
(for-each
(lambda (break-thread)
;; Basic checks --- dynamic-wind thunks don't explicitly enable breaks ;; Basic checks --- dynamic-wind thunks don't explicitly enable breaks
(go mk-t #f nada nada nada sw sw sw void #f #f void #f void #f #f) (go mk-t #f nada nada nada sw sw sw void #f #f void #f void #f #f)
(go mk-t #f nada nada nada sw sw sw break-thread #f 'pre-act void #f void #f #f) (go mk-t #f nada nada nada sw sw sw break-thread #f 'pre-act void #f void #f #f)
@ -1396,6 +1403,7 @@
;; Enable break in pre shouldn't affect act/done ;; Enable break in pre shouldn't affect act/done
(go mk-t #t break-on nada nada sw sw sw void #f #f break-thread #f void #f #f) (go mk-t #t break-on nada nada sw sw sw void #f #f break-thread #f void #f #f)
(go mk-t #t break-on nada nada sw sw sw void #f #f void #f break-thread #f #f)) (go mk-t #t break-on nada nada sw sw sw void #f #f void #f break-thread #f #f))
(list break-thread sync-idle/break-thread)))
(list void sleep))) (list void sleep)))
;; We'll make threads in three modes: normal, restore a continuation into pre, ;; We'll make threads in three modes: normal, restore a continuation into pre,
;; and restore a continuation into act ;; and restore a continuation into act
@ -1576,6 +1584,40 @@
(try (lambda (c) (choice-evt c (alarm-evt (+ 10000 (current-inexact-milliseconds))))) (try (lambda (c) (choice-evt c (alarm-evt (+ 10000 (current-inexact-milliseconds)))))
'ok-channel+alarm)) 'ok-channel+alarm))
;; ----------------------------------------
;; Extra check that a sync/enable-break either succeeds or breaks,
;; where we avoid explicit thread synchronization and spin to
;; try to get different schedules.
;; Based on an example from Bogdan.
(for ([i 50])
(let ([succeeded? #f]
[broke? #f])
(define (spin-then thunk)
(let loop ([n (random 1000000)])
(cond
[(zero? n) (thunk)]
[else (loop (sub1 n))])))
(struct p ()
#:property prop:evt (unsafe-poller
(lambda (self wakeups)
;; loop to try to be slow
(spin-then
(lambda ()
(set! succeeded? #t)
(values (list 'success) #f))))))
(define ready-sema (make-semaphore))
(define t (thread (lambda ()
(parameterize-break #f
(semaphore-post ready-sema)
(with-handlers ([exn:break? (lambda (exn)
(set! broke? #t))])
(sync/enable-break (p)))))))
(semaphore-wait ready-sema)
(spin-then (lambda () (break-thread t)))
(sync t)
(test #t 'xor-on-success-and-break (if succeeded? (not broke?) broke?))))
;; ---------------------------------------- ;; ----------------------------------------
;; Make sure that suspending a thread that's blocked on a ;; Make sure that suspending a thread that's blocked on a
;; semaphore works right when the semaphore becomes available ;; semaphore works right when the semaphore becomes available

View File

@ -7010,8 +7010,7 @@
"attempt to deschedule the current thread in atomic mode")) "attempt to deschedule the current thread in atomic mode"))
(void))))))) (void)))))))
(loop_0)) (loop_0))
(engine-block) (engine-block))
(|#%app| 1/check-for-break))
(void)))))) (void))))))
(define thread-deschedule! (define thread-deschedule!
(lambda (t_0 timeout-at_0 interrupt-callback_0) (lambda (t_0 timeout-at_0 interrupt-callback_0)
@ -7703,6 +7702,8 @@
(begin (begin
(set-thread-pending-break! t_0 kind_0) (set-thread-pending-break! t_0 kind_0)
(thread-did-work!) (thread-did-work!)
(run-suspend/resume-callbacks t_0 car)
(run-suspend/resume-callbacks t_0 cdr)
(if (thread-descheduled? t_0) (if (thread-descheduled? t_0)
(if (thread-suspended? t_0) (if (thread-suspended? t_0)
(void) (void)
@ -9197,11 +9198,7 @@
(go_0 (go_0
(|#%name| (|#%name|
go go
(lambda (enable-break?7_0 (lambda (enable-break?7_0 s_0 timeout10_0 thunk-result?38_0)
local-break-cell_0
s_0
timeout10_0
thunk-result?38_0)
(begin (begin
(dynamic-wind (dynamic-wind
(lambda () (lambda ()
@ -9239,11 +9236,6 @@
(start-atomic) (start-atomic)
(thread-pop-suspend+resume-callbacks!) (thread-pop-suspend+resume-callbacks!)
(thread-pop-kill-callback!) (thread-pop-kill-callback!)
(if local-break-cell_0
(thread-remove-ignored-break-cell!
(current-thread/in-atomic)
local-break-cell_0)
(void))
(|#%app| syncing-abandon! s_0) (|#%app| syncing-abandon! s_0)
(end-atomic)))))))) (end-atomic))))))))
(loop_0 (loop_0
@ -9406,13 +9398,13 @@
push-authentic push-authentic
break-enabled-key break-enabled-key
local-break-cell_0 local-break-cell_0
(go_0 (go_0 enable-break?7_0 s_0 timeout10_0 #t))))
enable-break?7_0 (begin
local-break-cell_0 (thread-remove-ignored-break-cell!
s_0 (current-thread/in-atomic)
timeout10_0 local-break-cell_0)
#t)))) (1/check-for-break)
(begin (1/check-for-break) (|#%app| thunk_0))) (|#%app| thunk_0)))
(let ((temp52_0 (let ((temp52_0
(lambda (sched-info_0 polled-all?_0 no-wrappers?_0) (lambda (sched-info_0 polled-all?_0 no-wrappers?_0)
(if polled-all?_0 (if polled-all?_0
@ -9423,26 +9415,15 @@
(if (procedure? timeout10_0) (if (procedure? timeout10_0)
(|#%app| timeout10_0) (|#%app| timeout10_0)
(if no-wrappers?_0 (if no-wrappers?_0
(go_0 (go_0 enable-break?7_0 s_0 timeout10_0 #f)
enable-break?7_0
local-break-cell_0
s_0
timeout10_0
#f)
(|#%app| (|#%app|
(go_0 (go_0
enable-break?7_0 enable-break?7_0
local-break-cell_0
s_0 s_0
timeout10_0 timeout10_0
#t))))) #t)))))
(|#%app| (|#%app|
(go_0 (go_0 enable-break?7_0 s_0 timeout10_0 #t))))))
enable-break?7_0
local-break-cell_0
s_0
timeout10_0
#t))))))
(|#%app| (|#%app|
sync-poll.1 sync-poll.1
#f #f

View File

@ -167,8 +167,6 @@
(atomically (atomically
(thread-pop-suspend+resume-callbacks!) (thread-pop-suspend+resume-callbacks!)
(thread-pop-kill-callback!) (thread-pop-kill-callback!)
(when local-break-cell
(thread-remove-ignored-break-cell! (current-thread/in-atomic) local-break-cell))
;; On escape, post nacks, etc.: ;; On escape, post nacks, etc.:
(syncing-abandon! s) (syncing-abandon! s)
(void))))) (void)))))
@ -181,6 +179,9 @@
break-enabled-key break-enabled-key
local-break-cell local-break-cell
(go))) (go)))
;; If we get here, the break wasn't triggered, and it must be currently ignored.
;; (If the break was triggered so that we don't get here, it's not ignored.)
(thread-remove-ignored-break-cell! (current-thread/in-atomic) local-break-cell)
;; In case old break cell was meanwhile enabled: ;; In case old break cell was meanwhile enabled:
(check-for-break) (check-for-break)
;; In tail position: ;; In tail position:

View File

@ -440,8 +440,8 @@
(if (force-atomic-timeout-callback) (if (force-atomic-timeout-callback)
(loop) (loop)
(internal-error "attempt to deschedule the current thread in atomic mode")))) (internal-error "attempt to deschedule the current thread in atomic mode"))))
(engine-block) ;; implies `(check-for-break)`:
(check-for-break)))) (engine-block))))
;; Extends `do-thread-deschdule!` where `t` is always `(current-thread)`. ;; Extends `do-thread-deschdule!` where `t` is always `(current-thread)`.
;; The `interrupt-callback` is called if the thread receives a break ;; The `interrupt-callback` is called if the thread receives a break
@ -836,6 +836,10 @@
(unless (thread-pending-break t) (unless (thread-pending-break t)
(set-thread-pending-break! t kind) (set-thread-pending-break! t kind)
(thread-did-work!) (thread-did-work!)
(begin
;; interrupt synchronization, if any
(run-suspend/resume-callbacks t car)
(run-suspend/resume-callbacks t cdr))
(when (thread-descheduled? t) (when (thread-descheduled? t)
(unless (thread-suspended? t) (unless (thread-suspended? t)
(run-interrupt-callback t) (run-interrupt-callback t)