cs & io: fix log receiver synchronization
A misplaced `wrap-evt` could allow the result from `sync` on a log receiver to be an opaque event, instead of a vector. In other cases, a differently misplaced `wrap-evt` could also cause an internal instance of `control-state-evt` to not be unregistered correctly. The solution to both problems is to add a wrapper procedure to `control-state-evt`. Closes #2664
This commit is contained in:
parent
2ad4c6f508
commit
067dda578b
23
pkgs/racket-test/tests/racket/stress/log-receiver.rkt
Normal file
23
pkgs/racket-test/tests/racket/stress/log-receiver.rkt
Normal file
|
@ -0,0 +1,23 @@
|
|||
#lang racket/base
|
||||
|
||||
(define (do-thread t) (thread t))
|
||||
(set! do-thread do-thread)
|
||||
|
||||
(let loop ([i 0])
|
||||
(unless (= i 2000000)
|
||||
(when (zero? (modulo i 10000))
|
||||
(printf "~s\n" i))
|
||||
(define (spin-a-while)
|
||||
(let loop ([j (random (add1 (modulo i 100000)))])
|
||||
(unless (zero? j)
|
||||
(loop (sub1 j)))))
|
||||
(define s (make-log-receiver (current-logger) 'info 'send))
|
||||
(define t
|
||||
(do-thread
|
||||
(lambda ()
|
||||
(log-message (current-logger) 'info 'send "a" 1))))
|
||||
(spin-a-while)
|
||||
(unless (vector? (sync s))
|
||||
(error "not a vector result!"))
|
||||
(thread-wait t)
|
||||
(loop (add1 i))))
|
|
@ -42,12 +42,13 @@
|
|||
|
||||
(define (poll-ctx-sched-info ctx) #f)
|
||||
|
||||
(struct control-state-evt (evt interrupt abandon retry)
|
||||
(struct control-state-evt (evt wrap interrupt abandon retry)
|
||||
#:property prop:evt (lambda (cse)
|
||||
(nack-guard-evt
|
||||
(lambda (nack)
|
||||
(thread (lambda () (sync nack) ((control-state-evt-abandon cse))))
|
||||
(control-state-evt-evt cse)))))
|
||||
(wrap-evt (control-state-evt-evt cse)
|
||||
(control-state-evt-wrap cse))))))
|
||||
|
||||
(define current-async-semaphore (make-parameter #f #f 'current-async-semaphore))
|
||||
|
||||
|
|
|
@ -55,7 +55,8 @@
|
|||
(increment-receiever-waiters! lr)
|
||||
(queue-add! (queue-log-receiver-waiters lr) b)))
|
||||
(values #f (control-state-evt
|
||||
(wrap-evt async-evt (lambda (e) (unbox b)))
|
||||
async-evt
|
||||
(lambda (e) (unbox b))
|
||||
(lambda ()
|
||||
(queue-remove-node! (queue-log-receiver-waiters lr) n)
|
||||
(decrement-receiever-waiters! lr))
|
||||
|
|
|
@ -79,6 +79,7 @@
|
|||
(if (evt? v)
|
||||
v
|
||||
(wrap-evt always-evt (lambda () v)))))
|
||||
values
|
||||
void
|
||||
(lambda () (semaphore-post s))
|
||||
void))))
|
||||
|
|
|
@ -98,22 +98,21 @@
|
|||
(current-thread/in-atomic)))
|
||||
(define n (queue-add! gq (cons gw b)))
|
||||
(values #f
|
||||
(wrap-evt
|
||||
(control-state-evt async-evt
|
||||
(lambda () (queue-remove-node! gq n))
|
||||
void
|
||||
(lambda ()
|
||||
;; Retry: get ready value or requeue
|
||||
(define pw+v (queue-fremove! pq not-matching-select-waiter))
|
||||
(cond
|
||||
(control-state-evt async-evt
|
||||
(lambda (v) (unbox b))
|
||||
(lambda () (queue-remove-node! gq n))
|
||||
void
|
||||
(lambda ()
|
||||
;; Retry: get ready value or requeue
|
||||
(define pw+v (queue-fremove! pq not-matching-select-waiter))
|
||||
(cond
|
||||
[pw+v
|
||||
(waiter-resume! (car pw+v) (void))
|
||||
(set-box! b (cdr pw+v))
|
||||
(values #t #t)]
|
||||
[else
|
||||
(set! n (queue-add! gq (cons gw b)))
|
||||
(values #f #f)])))
|
||||
(lambda (v) (unbox b))))]))
|
||||
(values #f #f)]))))]))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
|
@ -161,22 +160,21 @@
|
|||
(current-thread/in-atomic)))
|
||||
(define n (queue-add! pq (cons pw v)))
|
||||
(values #f
|
||||
(wrap-evt
|
||||
(control-state-evt async-evt
|
||||
(lambda () (queue-remove-node! pq n))
|
||||
void
|
||||
(lambda ()
|
||||
;; Retry: put ready value or requeue
|
||||
(define gw+b (queue-fremove! gq not-matching-select-waiter))
|
||||
(cond
|
||||
(control-state-evt async-evt
|
||||
(lambda (v) self)
|
||||
(lambda () (queue-remove-node! pq n))
|
||||
void
|
||||
(lambda ()
|
||||
;; Retry: put ready value or requeue
|
||||
(define gw+b (queue-fremove! gq not-matching-select-waiter))
|
||||
(cond
|
||||
[gw+b
|
||||
(set-box! (cdr gw+b) v)
|
||||
(waiter-resume! (car gw+b) v)
|
||||
(values self #t)]
|
||||
[else
|
||||
(set! n (queue-add! pq (cons pw v)))
|
||||
(values #f #f)])))
|
||||
(lambda (v) self)))]))
|
||||
(values #f #f)]))))]))
|
||||
|
||||
(define/who (channel-put-evt ch v)
|
||||
(check who channel? ch)
|
||||
|
|
|
@ -135,7 +135,15 @@
|
|||
;; semaphore was meanwhile posted). As another example, a
|
||||
;; `nack-guard-evt`'s result uses `abandon-proc` to post to the NACK
|
||||
;; event.
|
||||
;; Beware that it doesn't make sense to use `wrap-evt` around the
|
||||
;; `control-state-evt` or the `evt` inside for an asynchronously
|
||||
;; satisfied event (like the way that semaphores are implemented). The
|
||||
;; event may be selected asynchronously before a wrapper on the inner
|
||||
;; event is found, so that the result turns out to be an unwrapped
|
||||
;; event. Or the `interrupt-proc`, etc., callbacks may not be found
|
||||
;; early enough if the `control-state-evt` is wrapped.
|
||||
(struct control-state-evt (evt
|
||||
wrap-proc
|
||||
interrupt-proc ; thunk for break/kill initiated or otherwise before `abandon-proc`
|
||||
abandon-proc ; thunk for not selected, including break/kill complete
|
||||
retry-proc) ; thunk for resume from break; return `(values _val _ready?)`
|
||||
|
|
|
@ -182,19 +182,19 @@
|
|||
;; event through a callback. Pair the event with a nack callback
|
||||
;; to get back out of line.
|
||||
(values #f
|
||||
(wrap-evt
|
||||
(control-state-evt async-evt
|
||||
(lambda ()
|
||||
(assert-atomic-mode)
|
||||
(queue-remove-node! s n)
|
||||
(when (queue-empty? s)
|
||||
(set-semaphore-count! s 0))) ; allow CAS again
|
||||
void
|
||||
(lambda ()
|
||||
;; Retry: decrement or requeue
|
||||
(assert-atomic-mode)
|
||||
(define c (semaphore-count s))
|
||||
(cond
|
||||
(control-state-evt async-evt
|
||||
(lambda (v) result)
|
||||
(lambda ()
|
||||
(assert-atomic-mode)
|
||||
(queue-remove-node! s n)
|
||||
(when (queue-empty? s)
|
||||
(set-semaphore-count! s 0))) ; allow CAS again
|
||||
void
|
||||
(lambda ()
|
||||
;; Retry: decrement or requeue
|
||||
(assert-atomic-mode)
|
||||
(define c (semaphore-count s))
|
||||
(cond
|
||||
[(positive? c)
|
||||
(unless peek?
|
||||
(set-semaphore-count! s (sub1 c)))
|
||||
|
@ -202,8 +202,7 @@
|
|||
[else
|
||||
(set! n (queue-add! s w))
|
||||
(set-semaphore-count! s -1) ; so CAS not tried for `semaphore-post`
|
||||
(values #f #f)])))
|
||||
(lambda (v) result)))]))
|
||||
(values #f #f)]))))]))
|
||||
|
||||
;; Called only when it should immediately succeed:
|
||||
(define (semaphore-wait/atomic s)
|
||||
|
|
|
@ -374,9 +374,9 @@
|
|||
(let loop ([sr (syncing-syncers s)]
|
||||
[retries 0] ; count retries on `sr`, and advance if it's too many
|
||||
[polled-all-so-far? #t])
|
||||
(start-atomic)
|
||||
(when (syncing-need-retry? s)
|
||||
(syncing-retry! s))
|
||||
(start-atomic)
|
||||
(cond
|
||||
[(syncing-selected s)
|
||||
=> (lambda (sr)
|
||||
|
@ -473,9 +473,12 @@
|
|||
(end-atomic)
|
||||
(loop sr (add1 retries) polled-all-so-far?)])]
|
||||
[(control-state-evt? new-evt)
|
||||
(define wrap-proc (control-state-evt-wrap-proc new-evt))
|
||||
(define interrupt-proc (control-state-evt-interrupt-proc new-evt))
|
||||
(define abandon-proc (control-state-evt-abandon-proc new-evt))
|
||||
(define retry-proc (control-state-evt-retry-proc new-evt))
|
||||
(unless (eq? wrap-proc values)
|
||||
(set-syncer-wraps! sr (cons wrap-proc (syncer-wraps sr))))
|
||||
(unless (eq? interrupt-proc void)
|
||||
(set-syncer-interrupts! sr (cons interrupt-proc (syncer-interrupts sr))))
|
||||
(unless (eq? abandon-proc void)
|
||||
|
@ -726,6 +729,7 @@
|
|||
;; represents the instantited attempt to sync on `evt`:
|
||||
(control-state-evt
|
||||
(nested-sync-evt s next orig-evt)
|
||||
values
|
||||
(lambda () (syncing-interrupt! s))
|
||||
(lambda () (syncing-abandon! s))
|
||||
(lambda () (syncing-retry! s)))))))
|
||||
|
|
|
@ -1000,7 +1000,8 @@
|
|||
(set-thread-mailbox-wakeup! t (lambda () (wakeup) (receive))))
|
||||
(add-wakeup-callback!)
|
||||
(values #f (control-state-evt
|
||||
(wrap-evt async-evt (lambda (v) self))
|
||||
async-evt
|
||||
(lambda (v) self)
|
||||
;; interrupt (all must be interrupted, so just install `void`):
|
||||
(lambda () (set-thread-mailbox-wakeup! t void))
|
||||
;; abandon:
|
||||
|
|
Loading…
Reference in New Issue
Block a user