cs: repair async-apply
handling for foreign callbacks
The CS implementation was missing a level of indirection. It worked for a typical handler `(lambda (thunk) (thunk))` for the case when the callback can run atomically, but it did not work for caces where `thunk` is made to run later (potentially out of atomic mode). Also, fix the management of interrupt state, including for some place- and future-related asynchronous callbacks. Thanks to @rmculpepper for the `async-apply` report, new `async-apply` tests, and the repair for the interrupt-state bug.
This commit is contained in:
parent
8668ad6959
commit
920e3dbde5
|
@ -10,6 +10,7 @@
|
|||
ffi/unsafe/define/conventions
|
||||
ffi/unsafe/global
|
||||
ffi/unsafe/atomic
|
||||
ffi/unsafe/os-async-channel
|
||||
ffi/vector
|
||||
racket/extflonum
|
||||
racket/place
|
||||
|
@ -906,6 +907,27 @@
|
|||
(loop)))
|
||||
(test (like 16) foreign_thread_callback_finish d)]))
|
||||
(check (lambda (f) (f)) add1)
|
||||
(check (lambda (f) (thread f)) add1)
|
||||
(let ()
|
||||
;; using thread mailboxes
|
||||
(define runner
|
||||
(thread
|
||||
(lambda ()
|
||||
(let loop ()
|
||||
(define p (thread-receive))
|
||||
(p)
|
||||
(loop)))))
|
||||
(check (lambda (f) (thread-send runner f)) add1))
|
||||
(when (eq? 'chez-scheme (system-type 'vm))
|
||||
(define chan (make-os-async-channel))
|
||||
(void
|
||||
(thread
|
||||
(lambda ()
|
||||
(let loop ()
|
||||
(define p (sync chan))
|
||||
(p)
|
||||
(loop)))))
|
||||
(check (lambda (f) (os-async-channel-put chan f)) add1))
|
||||
(check (box 20) (lambda (x) 20)))
|
||||
|
||||
;; check `#:callback-exns?`
|
||||
|
|
|
@ -13,13 +13,13 @@
|
|||
void)))
|
||||
|
||||
(define (call-as-asynchronous-callback thunk)
|
||||
(async-callback-queue-call (current-async-callback-queue) thunk #f #t #t))
|
||||
(async-callback-queue-call (current-async-callback-queue) (lambda (th) (th)) thunk #f #t #t))
|
||||
|
||||
(define (post-as-asynchronous-callback thunk)
|
||||
(async-callback-queue-call (current-async-callback-queue) thunk #f #t #f)
|
||||
(async-callback-queue-call (current-async-callback-queue) (lambda (th) (th)) thunk #f #t #f)
|
||||
(void))
|
||||
|
||||
(define (async-callback-queue-call async-callback-queue thunk interrupts-disabled? need-atomic? wait-for-result?)
|
||||
(define (async-callback-queue-call async-callback-queue run-thunk thunk interrupts-disabled? need-atomic? wait-for-result?)
|
||||
(let* ([result-done? (box #f)]
|
||||
[result #f]
|
||||
[q async-callback-queue]
|
||||
|
@ -28,11 +28,17 @@
|
|||
(when need-atomic? (scheduler-start-atomic)) ; don't abandon engine after mutex is acquired
|
||||
(mutex-acquire m)
|
||||
(set-async-callback-queue-in! q (cons (lambda ()
|
||||
(set! result (thunk))
|
||||
(mutex-acquire m)
|
||||
(set-box! result-done? #t)
|
||||
(condition-broadcast (async-callback-queue-condition q))
|
||||
(mutex-release m))
|
||||
(run-thunk
|
||||
(lambda ()
|
||||
(set! result (thunk))
|
||||
;; the thunk is not necessarily called in atomic
|
||||
;; mode, so make the mode atomic if needed:
|
||||
(when need-atomic? (scheduler-start-atomic))
|
||||
(mutex-acquire m)
|
||||
(set-box! result-done? #t)
|
||||
(condition-broadcast (async-callback-queue-condition q))
|
||||
(mutex-release m)
|
||||
(when need-atomic? (scheduler-end-atomic)))))
|
||||
(async-callback-queue-in q)))
|
||||
((async-callback-queue-wakeup q))
|
||||
(when wait-for-result?
|
||||
|
@ -44,7 +50,7 @@
|
|||
(loop))))
|
||||
(mutex-release m)
|
||||
(when need-atomic? (scheduler-end-atomic))
|
||||
(when interrupts-disabled? (enable-interrupts))
|
||||
(when interrupts-disabled? (disable-interrupts))
|
||||
result))
|
||||
|
||||
;; Called with all threads all stopped:
|
||||
|
|
|
@ -1836,7 +1836,7 @@
|
|||
[else r])))))])
|
||||
(if (and orig-place?
|
||||
(not (eqv? 0 (get-thread-id))))
|
||||
(async-callback-queue-call orig-place-async-callback-queue (lambda () (go)) #f #t #t)
|
||||
(async-callback-queue-call orig-place-async-callback-queue (lambda (th) (th)) (lambda () (go)) #f #t #t)
|
||||
(go))))])
|
||||
(c->s out-type r)))
|
||||
(fxsll 1 (length in-types))
|
||||
|
@ -1954,7 +1954,8 @@
|
|||
;; and wait for the response
|
||||
(let ([known-thread? (eqv? (place-thread-category) PLACE-KNOWN-THREAD)])
|
||||
(async-callback-queue-call async-callback-queue
|
||||
(lambda () (|#%app| async-apply thunk))
|
||||
async-apply
|
||||
thunk
|
||||
;; If we created this thread by `fork-pthread`, we must
|
||||
;; have gotten here by a foreign call, so interrupts are
|
||||
;; currently disabled
|
||||
|
|
Loading…
Reference in New Issue
Block a user