From 920e3dbde5954c2858129d04aa6207a43ff16f43 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Sun, 28 Mar 2021 19:58:46 -0600 Subject: [PATCH] cs: repair `async-apply` handling for foreign callbacks The CS implementation was missing a level of indirection. It worked for a typical handler `(lambda (thunk) (thunk))` for the case when the callback can run atomically, but it did not work for caces where `thunk` is made to run later (potentially out of atomic mode). Also, fix the management of interrupt state, including for some place- and future-related asynchronous callbacks. Thanks to @rmculpepper for the `async-apply` report, new `async-apply` tests, and the repair for the interrupt-state bug. --- .../tests/racket/foreign-test.rktl | 22 +++++++++++++++++ racket/src/cs/rumble/async-callback.ss | 24 ++++++++++++------- racket/src/cs/rumble/foreign.ss | 5 ++-- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/pkgs/racket-test-core/tests/racket/foreign-test.rktl b/pkgs/racket-test-core/tests/racket/foreign-test.rktl index 567aadc928..99f72ce81f 100644 --- a/pkgs/racket-test-core/tests/racket/foreign-test.rktl +++ b/pkgs/racket-test-core/tests/racket/foreign-test.rktl @@ -10,6 +10,7 @@ ffi/unsafe/define/conventions ffi/unsafe/global ffi/unsafe/atomic + ffi/unsafe/os-async-channel ffi/vector racket/extflonum racket/place @@ -906,6 +907,27 @@ (loop))) (test (like 16) foreign_thread_callback_finish d)])) (check (lambda (f) (f)) add1) + (check (lambda (f) (thread f)) add1) + (let () + ;; using thread mailboxes + (define runner + (thread + (lambda () + (let loop () + (define p (thread-receive)) + (p) + (loop))))) + (check (lambda (f) (thread-send runner f)) add1)) + (when (eq? 'chez-scheme (system-type 'vm)) + (define chan (make-os-async-channel)) + (void + (thread + (lambda () + (let loop () + (define p (sync chan)) + (p) + (loop))))) + (check (lambda (f) (os-async-channel-put chan f)) add1)) (check (box 20) (lambda (x) 20))) ;; check `#:callback-exns?` diff --git a/racket/src/cs/rumble/async-callback.ss b/racket/src/cs/rumble/async-callback.ss index 59466513e8..8671cb4969 100644 --- a/racket/src/cs/rumble/async-callback.ss +++ b/racket/src/cs/rumble/async-callback.ss @@ -13,13 +13,13 @@ void))) (define (call-as-asynchronous-callback thunk) - (async-callback-queue-call (current-async-callback-queue) thunk #f #t #t)) + (async-callback-queue-call (current-async-callback-queue) (lambda (th) (th)) thunk #f #t #t)) (define (post-as-asynchronous-callback thunk) - (async-callback-queue-call (current-async-callback-queue) thunk #f #t #f) + (async-callback-queue-call (current-async-callback-queue) (lambda (th) (th)) thunk #f #t #f) (void)) -(define (async-callback-queue-call async-callback-queue thunk interrupts-disabled? need-atomic? wait-for-result?) +(define (async-callback-queue-call async-callback-queue run-thunk thunk interrupts-disabled? need-atomic? wait-for-result?) (let* ([result-done? (box #f)] [result #f] [q async-callback-queue] @@ -28,11 +28,17 @@ (when need-atomic? (scheduler-start-atomic)) ; don't abandon engine after mutex is acquired (mutex-acquire m) (set-async-callback-queue-in! q (cons (lambda () - (set! result (thunk)) - (mutex-acquire m) - (set-box! result-done? #t) - (condition-broadcast (async-callback-queue-condition q)) - (mutex-release m)) + (run-thunk + (lambda () + (set! result (thunk)) + ;; the thunk is not necessarily called in atomic + ;; mode, so make the mode atomic if needed: + (when need-atomic? (scheduler-start-atomic)) + (mutex-acquire m) + (set-box! result-done? #t) + (condition-broadcast (async-callback-queue-condition q)) + (mutex-release m) + (when need-atomic? (scheduler-end-atomic))))) (async-callback-queue-in q))) ((async-callback-queue-wakeup q)) (when wait-for-result? @@ -44,7 +50,7 @@ (loop)))) (mutex-release m) (when need-atomic? (scheduler-end-atomic)) - (when interrupts-disabled? (enable-interrupts)) + (when interrupts-disabled? (disable-interrupts)) result)) ;; Called with all threads all stopped: diff --git a/racket/src/cs/rumble/foreign.ss b/racket/src/cs/rumble/foreign.ss index 1a2f208067..a6edd2e9bd 100644 --- a/racket/src/cs/rumble/foreign.ss +++ b/racket/src/cs/rumble/foreign.ss @@ -1836,7 +1836,7 @@ [else r])))))]) (if (and orig-place? (not (eqv? 0 (get-thread-id)))) - (async-callback-queue-call orig-place-async-callback-queue (lambda () (go)) #f #t #t) + (async-callback-queue-call orig-place-async-callback-queue (lambda (th) (th)) (lambda () (go)) #f #t #t) (go))))]) (c->s out-type r))) (fxsll 1 (length in-types)) @@ -1954,7 +1954,8 @@ ;; and wait for the response (let ([known-thread? (eqv? (place-thread-category) PLACE-KNOWN-THREAD)]) (async-callback-queue-call async-callback-queue - (lambda () (|#%app| async-apply thunk)) + async-apply + thunk ;; If we created this thread by `fork-pthread`, we must ;; have gotten here by a foreign call, so interrupts are ;; currently disabled