io: fix retention of blocked log receiver

Don't GC a log receiver that is blocked on a logger that
might receiver relevant events.
This commit is contained in:
Matthew Flatt 2018-09-11 12:14:07 -06:00
parent 862c05d64a
commit 9cf9be60b0
4 changed files with 59 additions and 21 deletions

View File

@ -274,6 +274,22 @@
warning-counter)) warning-counter))
(test 2 test-intercepted-logging2) (test 2 test-intercepted-logging2)
; --------------------
;; Check that a blocked log receiver is not GCed if
;; if might receiver something
(let ()
(define s (make-semaphore))
(let ([lr (make-log-receiver (current-logger)
'info)])
(thread (lambda ()
(semaphore-post s))))
(sync (system-idle-evt))
(collect-garbage)
(log-message (current-logger) 'info "" 'c)
;; If receiver is GCed, then this will block
(sync s))
; -------------------- ; --------------------
(report-errs) (report-errs)

View File

@ -9,7 +9,7 @@
(struct logger (topic ; symbol or #f (struct logger (topic ; symbol or #f
parent ; logger or #f parent ; logger or #f
propagate-filters propagate-filters
[receiver-boxes #:mutable] ; list of weak boxes [receiver-box+backrefs #:mutable] ; list of (cons weak-box any)
[prune-counter #:mutable] ; number of adds before checking empied boxes [prune-counter #:mutable] ; number of adds before checking empied boxes
[permanent-receivers #:mutable] ; receivers to retain strongly [permanent-receivers #:mutable] ; receivers to retain strongly
[max-receiver-level #:mutable] ; up-to-date if `local-level-timestamp` = `(unbox root-level-timestamp-box)` [max-receiver-level #:mutable] ; up-to-date if `local-level-timestamp` = `(unbox root-level-timestamp-box)`
@ -26,7 +26,7 @@
(logger topic (logger topic
parent parent
propagate-filters propagate-filters
null ; receiver-boxes null ; receiver-box+backrefs
8 ; prune-counter 8 ; prune-counter
null ; permanent-receivers null ; permanent-receivers
'none ; max-receiver-level 'none ; max-receiver-level
@ -40,7 +40,7 @@
;; Get log receivers, dropping any boxes made empty due to a weak ;; Get log receivers, dropping any boxes made empty due to a weak
;; reference: ;; reference:
(define (logger-receivers logger) (define (logger-receivers logger)
(for*/list ([rb (in-list (logger-receiver-boxes logger))] (for*/list ([b+r (in-list (logger-receiver-box+backrefs logger))]
[b (in-value (weak-box-value rb))] [lr (in-value (weak-box-value (car b+r)))]
#:when b) #:when lr)
b)) lr))

View File

@ -76,7 +76,8 @@
=> (lambda (s) s)] => (lambda (s) s)]
[else [else
(define s (make-semaphore)) (define s (make-semaphore))
(set-logger-level-sema! logger s)]))) (set-logger-level-sema! logger s)
s])))
(semaphore-peek-evt s)) (semaphore-peek-evt s))
;; Can be called in any host Scheme thread and in interrupt handler, ;; Can be called in any host Scheme thread and in interrupt handler,

View File

@ -21,7 +21,8 @@
;; ---------------------------------------- ;; ----------------------------------------
(struct queue-log-receiver log-receiver (msgs ; queue of messages ready for `sync` [if `waiters` is null] (struct queue-log-receiver log-receiver (msgs ; queue of messages ready for `sync` [if `waiters` is null]
waiters) ; queue of (box callback) to receive ready messages [if `msgs` is null] waiters ; queue of (box callback) to receive ready messages [if `msgs` is null]
backref) ; box to make a self reference to avoid GC of a waiting receiver
#:reflection-name 'log-receiver #:reflection-name 'log-receiver
#:property #:property
prop:receiver-send prop:receiver-send
@ -33,6 +34,7 @@
(define b (queue-remove! (queue-log-receiver-waiters lr))) (define b (queue-remove! (queue-log-receiver-waiters lr)))
(cond (cond
[b [b
(decrement-receiever-waiters! lr)
(define select! (unbox b)) (define select! (unbox b))
(set-box! b msg) (set-box! b msg)
(select!)] (select!)]
@ -47,11 +49,14 @@
(values (list msg) #f)] (values (list msg) #f)]
[else [else
(define b (box (poll-ctx-select-proc ctx))) (define b (box (poll-ctx-select-proc ctx)))
(define n (atomically/no-interrupts/no-wind (queue-add! (queue-log-receiver-waiters lr) b))) (define n (atomically/no-interrupts/no-wind
(increment-receiever-waiters! lr)
(queue-add! (queue-log-receiver-waiters lr) b)))
(values #f (control-state-evt (values #f (control-state-evt
(wrap-evt async-evt (lambda (e) (unbox b))) (wrap-evt async-evt (lambda (e) (unbox b)))
(lambda () (atomically/no-interrupts/no-wind (lambda () (atomically/no-interrupts/no-wind
(queue-remove-node! (queue-log-receiver-waiters lr) n))) (queue-remove-node! (queue-log-receiver-waiters lr) n)
(decrement-receiever-waiters! lr)))
void void
(lambda () (lambda ()
(atomically/no-interrupts/no-wind (atomically/no-interrupts/no-wind
@ -61,17 +66,31 @@
(set-box! b msg) (set-box! b msg)
(values msg #t)] (values msg #t)]
[else [else
(increment-receiever-waiters! lr)
(set! n (queue-add! (queue-log-receiver-waiters lr) b)) (set! n (queue-add! (queue-log-receiver-waiters lr) b))
(values #f #f)])))))])))) (values #f #f)])))))]))))
(define/who (make-log-receiver logger level . args) (define/who (make-log-receiver logger level . args)
(check who logger? logger) (check who logger? logger)
(define backref (box #f))
(define lr (queue-log-receiver (parse-filters 'make-log-receiver (cons level args) #:default-level 'none) (define lr (queue-log-receiver (parse-filters 'make-log-receiver (cons level args) #:default-level 'none)
(make-queue) (make-queue)
(make-queue))) (make-queue)
(add-log-receiver! logger lr) backref))
(add-log-receiver! logger lr backref)
lr) lr)
;; In atomic mode
(define (decrement-receiever-waiters! lr)
(when (queue-empty? (queue-log-receiver-waiters lr))
(set-box! (queue-log-receiver-backref lr) #f)))
;; In atomic mode
(define (increment-receiever-waiters! lr)
(when (queue-empty? (queue-log-receiver-waiters lr))
(set-box! (queue-log-receiver-backref lr) lr)))
;; ---------------------------------------- ;; ----------------------------------------
(struct stdio-log-receiver log-receiver (which) (struct stdio-log-receiver log-receiver (which)
@ -95,7 +114,7 @@
(define lr (stdio-log-receiver (parse-filters parse-who args #:default-level 'none) (define lr (stdio-log-receiver (parse-filters parse-who args #:default-level 'none)
which)) which))
(atomically (atomically
(add-log-receiver! logger lr) (add-log-receiver! logger lr #f)
(set-logger-permanent-receivers! logger (cons lr (logger-permanent-receivers logger))))) (set-logger-permanent-receivers! logger (cons lr (logger-permanent-receivers logger)))))
(define/who (add-stderr-log-receiver! logger . args) (define/who (add-stderr-log-receiver! logger . args)
@ -106,19 +125,21 @@
;; ---------------------------------------- ;; ----------------------------------------
(define (add-log-receiver! logger lr) (define (add-log-receiver! logger lr backref)
(atomically/no-interrupts/no-wind (atomically/no-interrupts/no-wind
;; Add receiver to the logger's list, purning empty boxes ;; Add receiver to the logger's list, pruning empty boxes
;; every time the list length doubles (roughly): ;; every time the list length doubles (roughly):
(cond (cond
[(zero? (logger-prune-counter logger)) [(zero? (logger-prune-counter logger))
(set-logger-receiver-boxes! logger (cons (make-weak-box lr) (set-logger-receiver-box+backrefs! logger
(for/list ([b (in-list (logger-receiver-boxes logger))] (cons (cons (make-weak-box lr) backref)
#:when (weak-box-value b)) (for/list ([b+r (in-list (logger-receiver-box+backrefs logger))]
b))) #:when (weak-box-value (car b+r)))
(set-logger-prune-counter! logger (max 8 (length (logger-receiver-boxes logger))))] b+r)))
(set-logger-prune-counter! logger (max 8 (length (logger-receiver-box+backrefs logger))))]
[else [else
(set-logger-receiver-boxes! logger (cons (make-weak-box lr) (logger-receiver-boxes logger))) (set-logger-receiver-box+backrefs! logger (cons (cons (make-weak-box lr) backref)
(logger-receiver-box+backrefs logger)))
(set-logger-prune-counter! logger (sub1 (logger-prune-counter logger)))]) (set-logger-prune-counter! logger (sub1 (logger-prune-counter logger)))])
;; Increment the timestamp, so that wanted levels will be ;; Increment the timestamp, so that wanted levels will be
;; recomputed on demand: ;; recomputed on demand: