thread: don't always convert end-atomic
to a thread swap
If interrupts are disabled to prevent thread swaps, then don't react to `end-atomic` by performing a deferred swap. That might happen with logger callbacks after a GC where a deferred action was overlooked due to a rare race condition.
This commit is contained in:
parent
e1c2e5d4f4
commit
3c95034a90
|
@ -307,6 +307,66 @@ FFI Differences
|
||||||
not actually an array of pointers.
|
not actually an array of pointers.
|
||||||
|
|
||||||
|
|
||||||
|
Threads, Threads, Atomicity, Atomicity, and Atomicity
|
||||||
|
-----------------------------------------------------
|
||||||
|
|
||||||
|
Racket's thread layer does not use Chez Scheme threads. Chez Scheme
|
||||||
|
threads correspond to OS threads. Racket threads are implemented in
|
||||||
|
terms of engines at the Rumble layer. At the same time, futures and
|
||||||
|
places will use Chez Scheme threads, and so parts of Rumble are meant
|
||||||
|
to be thread-safe with respect to Chez Scheme and OS threads. The FFI
|
||||||
|
also exposes elements of Chez Scheme / OS threads.
|
||||||
|
|
||||||
|
As a result of these layers, there are multiple ways to implement
|
||||||
|
atomic regions:
|
||||||
|
|
||||||
|
* For critical sections with respect to Chez Scheme / OS threads, use
|
||||||
|
a mutex.
|
||||||
|
|
||||||
|
For example, the implementation of `eq?` and `eqv?`-based hash
|
||||||
|
tables uses mutex to guard hash tables, so they can be accessed
|
||||||
|
concurrently from futures. In contrast, `equal?`-based hash table
|
||||||
|
operations are not atomic from the Racket perspective, so they
|
||||||
|
can't be locked by a mutex; they use Racket-thread locks, instead.
|
||||||
|
The "rumble/lock.ss" layer skips the `eq?`/`eqv?`-table mutex when
|
||||||
|
threads are not enabled at the Chez Scheme level.
|
||||||
|
|
||||||
|
* For critical sections at the Racket level, there are multiple
|
||||||
|
possibilities:
|
||||||
|
|
||||||
|
- The Racket "thread" layer provides `start-atomic` and
|
||||||
|
`end-atomic` to prevent Racket-thread swaps.
|
||||||
|
|
||||||
|
These are the same opertations as provided by
|
||||||
|
`ffi/unsafe/atomic`.
|
||||||
|
|
||||||
|
- Disabling Chez Scheme interrupts will also disable Racket
|
||||||
|
thread swaps, since a thread swap via engines depends on a
|
||||||
|
timer interrupt --- unless something explicitly blocks via the
|
||||||
|
Racket thread scheduler, such as with `(sleep)`.
|
||||||
|
|
||||||
|
Disable interrupts for atomicity only at the Rumble level where
|
||||||
|
no Racket-level callbacks are not involved. Also, beware that
|
||||||
|
disabling interrupts will prevent GC interrupts.
|
||||||
|
|
||||||
|
The Racket "thread" layer provides `start-atomic/no-interrupts`
|
||||||
|
and `end-atomic/no-interrupts` for both declaing atomicity at
|
||||||
|
the Racket level and turning off Chez Scheme interrupts. The
|
||||||
|
combination is useful for implementing functionality that might
|
||||||
|
be called in response to a GC and might also be called by
|
||||||
|
normal (non-atomic) code; the implementation of logging at the
|
||||||
|
"io" layer might be the only use case.
|
||||||
|
|
||||||
|
- The implementation of engines and continuations uses its own
|
||||||
|
flag to protect regions where an engine timeout should not
|
||||||
|
happen, such as when the metacontinuation is being manipulated.
|
||||||
|
That flag is managed by `start-uninterrupted` and
|
||||||
|
`end-uninterrupted` in "rumble/interrupt.ss".
|
||||||
|
|
||||||
|
It may be tempting to use that flag for other purposes, as a
|
||||||
|
cheap way to disable thread swaps. For now, don't do that.
|
||||||
|
|
||||||
|
|
||||||
Status and Thoughts on Various Racket Subsystems
|
Status and Thoughts on Various Racket Subsystems
|
||||||
------------------------------------------------
|
------------------------------------------------
|
||||||
|
|
||||||
|
@ -316,10 +376,6 @@ Status and Thoughts on Various Racket Subsystems
|
||||||
needed often in a typical program, and the overhead appears to be
|
needed often in a typical program, and the overhead appears to be
|
||||||
light when it is needed.
|
light when it is needed.
|
||||||
|
|
||||||
* Racket's delimited continuations, continuation marks, threads, and
|
|
||||||
events are mostly in place (see "rumble/control.ss",
|
|
||||||
"rumble/engine.ss", and the source for "thread.rktl").
|
|
||||||
|
|
||||||
* The "rktio" library fills the gap between Racket and Chez Scheme's
|
* The "rktio" library fills the gap between Racket and Chez Scheme's
|
||||||
native I/O. The "rktio" library provides a minimal, non-blocking,
|
native I/O. The "rktio" library provides a minimal, non-blocking,
|
||||||
non-GCed interface to OS-specific functionality. Its' compiled to a
|
non-GCed interface to OS-specific functionality. Its' compiled to a
|
||||||
|
|
|
@ -45,6 +45,7 @@
|
||||||
|
|
||||||
make-engine
|
make-engine
|
||||||
engine-block
|
engine-block
|
||||||
|
engine-timeout
|
||||||
engine-return
|
engine-return
|
||||||
current-engine-state ; not exported to Racket
|
current-engine-state ; not exported to Racket
|
||||||
set-ctl-c-handler! ; not exported to Racket
|
set-ctl-c-handler! ; not exported to Racket
|
||||||
|
|
|
@ -92,6 +92,17 @@
|
||||||
(engine-state-thread-cell-values es)
|
(engine-state-thread-cell-values es)
|
||||||
(engine-state-init-break-enabled-cell es)))))))))
|
(engine-state-init-break-enabled-cell es)))))))))
|
||||||
|
|
||||||
|
(define (engine-timeout)
|
||||||
|
(let ([can-block? (fx= 1 (disable-interrupts))])
|
||||||
|
(enable-interrupts)
|
||||||
|
(cond
|
||||||
|
[can-block?
|
||||||
|
(engine-block)]
|
||||||
|
[else
|
||||||
|
;; Cause the timer to fire as soon as possible (i.e., as soon
|
||||||
|
;; as interrupts are enabled)
|
||||||
|
(set-timer 1)])))
|
||||||
|
|
||||||
(define (engine-return . args)
|
(define (engine-return . args)
|
||||||
(assert-not-in-uninterrupted)
|
(assert-not-in-uninterrupted)
|
||||||
(timer-interrupt-handler void)
|
(timer-interrupt-handler void)
|
||||||
|
|
|
@ -2,7 +2,9 @@
|
||||||
;; Enabling uninterrupted mode defers a timer callback
|
;; Enabling uninterrupted mode defers a timer callback
|
||||||
;; until leaving uninterrupted mode. This is the same
|
;; until leaving uninterrupted mode. This is the same
|
||||||
;; as disabling and enabling interrupts at the Chez
|
;; as disabling and enabling interrupts at the Chez
|
||||||
;; level, but cheaper and more limited.
|
;; level, but cheaper and more limited. Uninterrupted
|
||||||
|
;; mode should be used only by the implementation of
|
||||||
|
;; engines and control in "engine.ss" and "control.ss".
|
||||||
|
|
||||||
(define-virtual-register current-in-uninterrupted #f)
|
(define-virtual-register current-in-uninterrupted #f)
|
||||||
(define-virtual-register pending-interrupt-callback #f)
|
(define-virtual-register pending-interrupt-callback #f)
|
||||||
|
|
|
@ -27,7 +27,10 @@
|
||||||
(define collect-generation-radix-mask (sub1 (bitwise-arithmetic-shift 1 log-collect-generation-radix)))
|
(define collect-generation-radix-mask (sub1 (bitwise-arithmetic-shift 1 log-collect-generation-radix)))
|
||||||
(define allocated-after-major (* 32 1024 1024))
|
(define allocated-after-major (* 32 1024 1024))
|
||||||
|
|
||||||
;; Called in any thread with all other threads paused
|
;; Called in any thread with all other threads paused. The Racket
|
||||||
|
;; thread scheduler may be in atomic mode. In fact, the engine
|
||||||
|
;; and control layer may be in uninterrupted mode, so don't
|
||||||
|
;; do anything that might use "control.ss" (especially in logging).
|
||||||
(define (collect/report g)
|
(define (collect/report g)
|
||||||
(let ([this-counter (if g (bitwise-arithmetic-shift-left 1 (* log-collect-generation-radix g)) gc-counter)]
|
(let ([this-counter (if g (bitwise-arithmetic-shift-left 1 (* log-collect-generation-radix g)) gc-counter)]
|
||||||
[pre-allocated (bytes-allocated)]
|
[pre-allocated (bytes-allocated)]
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
;; These are extracted via `#%linklet`:
|
;; These are extracted via `#%linklet`:
|
||||||
[make-engine rumble:make-engine]
|
[make-engine rumble:make-engine]
|
||||||
[engine-block rumble:engine-block]
|
[engine-block rumble:engine-block]
|
||||||
|
[engine-timeout rumble:engine-timeout]
|
||||||
[engine-return rumble:engine-return]
|
[engine-return rumble:engine-return]
|
||||||
[current-engine-state rumble:current-engine-state]
|
[current-engine-state rumble:current-engine-state]
|
||||||
[make-condition rumble:make-condition]
|
[make-condition rumble:make-condition]
|
||||||
|
@ -64,6 +65,7 @@
|
||||||
(hash
|
(hash
|
||||||
'make-engine rumble:make-engine
|
'make-engine rumble:make-engine
|
||||||
'engine-block rumble:engine-block
|
'engine-block rumble:engine-block
|
||||||
|
'engine-timeout rumble:engine-timeout
|
||||||
'engine-return rumble:engine-return
|
'engine-return rumble:engine-return
|
||||||
'current-engine-state (lambda (v) (rumble:current-engine-state v))
|
'current-engine-state (lambda (v) (rumble:current-engine-state v))
|
||||||
'set-ctl-c-handler! rumble:set-ctl-c-handler!
|
'set-ctl-c-handler! rumble:set-ctl-c-handler!
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
|
|
||||||
(provide atomically
|
(provide atomically
|
||||||
non-atomically
|
non-atomically
|
||||||
atomically/no-interrupts
|
atomically/no-interrupts/no-wind
|
||||||
assert-atomic
|
assert-atomic
|
||||||
check-current-custodian)
|
check-current-custodian)
|
||||||
|
|
||||||
|
@ -70,8 +70,11 @@
|
||||||
(let () e ...)
|
(let () e ...)
|
||||||
(start-atomic))))
|
(start-atomic))))
|
||||||
|
|
||||||
;; Cannot be exited with `non-atomically`:
|
;; Disables host interrupts, but the "no wind" part is
|
||||||
(define-syntax-rule (atomically/no-interrupts e ...)
|
;; an unforced constraint: don't use anything related
|
||||||
|
;; to `dynamic-wind`, continuations, or continuation marks.
|
||||||
|
;; Cannot be exited with `non-atomically`.
|
||||||
|
(define-syntax-rule (atomically/no-interrupts/no-wind e ...)
|
||||||
(begin
|
(begin
|
||||||
(start-atomic/no-interrupts)
|
(start-atomic/no-interrupts)
|
||||||
(begin0
|
(begin0
|
||||||
|
|
|
@ -101,7 +101,7 @@
|
||||||
(check who #:or-false symbol? topic)
|
(check who #:or-false symbol? topic)
|
||||||
(check who string? message)
|
(check who string? message)
|
||||||
(define msg #f)
|
(define msg #f)
|
||||||
(atomically/no-interrupts
|
(atomically/no-interrupts/no-wind
|
||||||
(when ((logger-max-wanted-level logger) . level>=? . level)
|
(when ((logger-max-wanted-level logger) . level>=? . level)
|
||||||
(let loop ([logger logger])
|
(let loop ([logger logger])
|
||||||
(for ([r (in-list (logger-receivers logger))])
|
(for ([r (in-list (logger-receivers logger))])
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
;; called in atomic mode and possibly in host interrupt handler,
|
;; called in atomic mode and possibly in host interrupt handler,
|
||||||
;; so anything we touch here should only be modified with
|
;; so anything we touch here should only be modified with
|
||||||
;; interrupts disabled
|
;; interrupts disabled
|
||||||
(atomically/no-interrupts
|
(atomically/no-interrupts/no-wind
|
||||||
(define b (queue-remove! (queue-log-receiver-waiters lr)))
|
(define b (queue-remove! (queue-log-receiver-waiters lr)))
|
||||||
(cond
|
(cond
|
||||||
[b
|
[b
|
||||||
|
@ -41,19 +41,20 @@
|
||||||
#:property
|
#:property
|
||||||
prop:evt
|
prop:evt
|
||||||
(poller (lambda (lr ctx)
|
(poller (lambda (lr ctx)
|
||||||
(define msg (atomically/no-interrupts (queue-remove! (queue-log-receiver-msgs lr))))
|
(define msg (atomically/no-interrupts/no-wind (queue-remove! (queue-log-receiver-msgs lr))))
|
||||||
(cond
|
(cond
|
||||||
[msg
|
[msg
|
||||||
(values (list msg) #f)]
|
(values (list msg) #f)]
|
||||||
[else
|
[else
|
||||||
(define b (box (poll-ctx-select-proc ctx)))
|
(define b (box (poll-ctx-select-proc ctx)))
|
||||||
(define n (atomically/no-interrupts (queue-add! (queue-log-receiver-waiters lr) b)))
|
(define n (atomically/no-interrupts/no-wind (queue-add! (queue-log-receiver-waiters lr) b)))
|
||||||
(values #f (control-state-evt
|
(values #f (control-state-evt
|
||||||
(wrap-evt async-evt (lambda (e) (unbox b)))
|
(wrap-evt async-evt (lambda (e) (unbox b)))
|
||||||
(lambda () (atomically/no-interrupts (queue-remove-node! (queue-log-receiver-waiters lr) n)))
|
(lambda () (atomically/no-interrupts/no-wind
|
||||||
|
(queue-remove-node! (queue-log-receiver-waiters lr) n)))
|
||||||
void
|
void
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(atomically/no-interrupts
|
(atomically/no-interrupts/no-wind
|
||||||
(define msg (queue-remove! (queue-log-receiver-msgs lr)))
|
(define msg (queue-remove! (queue-log-receiver-msgs lr)))
|
||||||
(cond
|
(cond
|
||||||
[msg
|
[msg
|
||||||
|
@ -106,7 +107,7 @@
|
||||||
;; ----------------------------------------
|
;; ----------------------------------------
|
||||||
|
|
||||||
(define (add-log-receiver! logger lr)
|
(define (add-log-receiver! logger lr)
|
||||||
(atomically/no-interrupts
|
(atomically/no-interrupts/no-wind
|
||||||
;; Add receiver to the logger's list, purning empty boxes
|
;; Add receiver to the logger's list, purning empty boxes
|
||||||
;; every time the list length doubles (roughly):
|
;; every time the list length doubles (roughly):
|
||||||
(cond
|
(cond
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
logger-all-levels)
|
logger-all-levels)
|
||||||
|
|
||||||
(define (logger-wanted-level logger topic)
|
(define (logger-wanted-level logger topic)
|
||||||
(atomically/no-interrupts
|
(atomically/no-interrupts/no-wind
|
||||||
(cond
|
(cond
|
||||||
[(not topic) (logger-max-wanted-level logger)]
|
[(not topic) (logger-max-wanted-level logger)]
|
||||||
[else
|
[else
|
||||||
|
@ -30,7 +30,7 @@
|
||||||
(logger-wanted-level logger topic)])])))
|
(logger-wanted-level logger topic)])])))
|
||||||
|
|
||||||
(define (logger-max-wanted-level logger)
|
(define (logger-max-wanted-level logger)
|
||||||
(atomically/no-interrupts
|
(atomically/no-interrupts/no-wind
|
||||||
(cond
|
(cond
|
||||||
[((logger-local-level-timestamp logger) . >= . (unbox (logger-root-level-timestamp-box logger)))
|
[((logger-local-level-timestamp logger) . >= . (unbox (logger-root-level-timestamp-box logger)))
|
||||||
;; Ccahed value is up-to-date
|
;; Ccahed value is up-to-date
|
||||||
|
|
|
@ -45,6 +45,10 @@
|
||||||
(cb)]
|
(cb)]
|
||||||
[(negative? n) (internal-error "not in atomic mode to end")]
|
[(negative? n) (internal-error "not in atomic mode to end")]
|
||||||
[else
|
[else
|
||||||
|
;; There's a small chance that `end-atomic-callback`
|
||||||
|
;; was set by the scheduler after the check and
|
||||||
|
;; before we exit atomic mode. Make sure that rare
|
||||||
|
;; event is ok.
|
||||||
(current-atomic n)]))
|
(current-atomic n)]))
|
||||||
|
|
||||||
(define (start-atomic/no-interrupts)
|
(define (start-atomic/no-interrupts)
|
||||||
|
|
|
@ -83,6 +83,9 @@
|
||||||
(define (engine-block)
|
(define (engine-block)
|
||||||
(thread-suspend (current-thread)))
|
(thread-suspend (current-thread)))
|
||||||
|
|
||||||
|
(define (engine-timeout)
|
||||||
|
(void))
|
||||||
|
|
||||||
(define ctl-c-handler #f)
|
(define ctl-c-handler #f)
|
||||||
|
|
||||||
(define (set-ctl-c-handler! proc)
|
(define (set-ctl-c-handler! proc)
|
||||||
|
@ -144,6 +147,7 @@
|
||||||
(hash
|
(hash
|
||||||
'make-engine make-engine
|
'make-engine make-engine
|
||||||
'engine-block engine-block
|
'engine-block engine-block
|
||||||
|
'engine-timeout engine-timeout
|
||||||
'engine-return (lambda args
|
'engine-return (lambda args
|
||||||
(error "engine-return: not ready"))
|
(error "engine-return: not ready"))
|
||||||
'current-process-milliseconds current-process-milliseconds
|
'current-process-milliseconds current-process-milliseconds
|
||||||
|
|
|
@ -29,6 +29,7 @@
|
||||||
(bounce #%engine
|
(bounce #%engine
|
||||||
make-engine
|
make-engine
|
||||||
engine-block
|
engine-block
|
||||||
|
engine-timeout
|
||||||
engine-return
|
engine-return
|
||||||
current-engine-state
|
current-engine-state
|
||||||
current-process-milliseconds
|
current-process-milliseconds
|
||||||
|
|
|
@ -88,8 +88,10 @@
|
||||||
(set-thread-engine! t e))
|
(set-thread-engine! t e))
|
||||||
(select-thread!)]
|
(select-thread!)]
|
||||||
[else
|
[else
|
||||||
;; Swap out when the atomic region ends:
|
;; Swap out when the atomic region ends and at a point
|
||||||
(set-end-atomic-callback! engine-block)
|
;; where host-system interrupts are not disabled (i.e.,
|
||||||
|
;; don't use `engine-block` instead of `engine-timeout`):
|
||||||
|
(set-end-atomic-callback! engine-timeout)
|
||||||
(loop e)])))))))
|
(loop e)])))))))
|
||||||
|
|
||||||
(define (maybe-done callbacks)
|
(define (maybe-done callbacks)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user