cs: avoid thread misuse during GC

A collection can only invoke certain callabcks (e.g., for DrRacket's
GC icon) when the collection is performed in the main thread. Also,
delay posting GC logging events to receivers that cannot work at
interrupt time.
This commit is contained in:
Matthew Flatt 2018-09-12 20:55:12 -06:00
parent 6b56156d55
commit efd601cb51
14 changed files with 123 additions and 71 deletions

View File

@ -416,18 +416,20 @@
(and (not minor?)
(log-level? root-logger 'debug 'GC:major)))
(let ([delta (- pre-allocated post-allocated)])
(log-message root-logger 'debug (if debug-GC? 'GC 'GC:major)
(chez:format "GC: 0:~a~a @ ~a(~a); free ~a(~a) ~ams @ ~a"
(if minor? "min" "MAJ") gen
(K "" pre-allocated) (K "+" (- pre-allocated+overhead pre-allocated))
(K "" delta) (K "+" (- (- pre-allocated+overhead post-allocated+overhead)
delta))
(- post-cpu-time pre-cpu-time) pre-cpu-time)
(make-gc-info (if minor? 'minor 'major) pre-allocated pre-allocated+overhead 0
post-allocated post-allocated+overhead
pre-cpu-time post-cpu-time
pre-time post-time)
#f)))))))))
(log-message* root-logger 'debug (if debug-GC? 'GC 'GC:major)
(chez:format "GC: 0:~a~a @ ~a(~a); free ~a(~a) ~ams @ ~a"
(if minor? "min" "MAJ") gen
(K "" pre-allocated) (K "+" (- pre-allocated+overhead pre-allocated))
(K "" delta) (K "+" (- (- pre-allocated+overhead post-allocated+overhead)
delta))
(- post-cpu-time pre-cpu-time) pre-cpu-time)
(make-gc-info (if minor? 'minor 'major) pre-allocated pre-allocated+overhead 0
post-allocated post-allocated+overhead
pre-cpu-time post-cpu-time
pre-time post-time)
#f
;; in interrupt:
#t)))))))))
(seq
(|#%app| exit-handler
(let ([orig (|#%app| exit-handler)]

View File

@ -599,7 +599,7 @@
set-ffi-get-lib-and-obj! ; not exported to Racket
poll-async-callbacks ; not exported to Racket
set-async-callback-poll-wakeup! ; not exported to Racket
set-foreign-eval! ; not exported to racket
set-foreign-eval! ; not exported to Racket
unsafe-unbox
unsafe-unbox*

View File

@ -307,28 +307,32 @@
;; ----------------------------------------
;; List of (cons <pre> <post>)
;; List of (cons <pre> <post>), currently suported
;; only in the original host thread of the original place
(define collect-callbacks '())
(define (unsafe-add-collect-callbacks pre post)
(let ([p (cons pre post)])
(with-interrupts-disabled
(set! collect-callbacks (cons p collect-callbacks)))
p))
(when (in-original-host-thread?)
(let ([p (cons pre post)])
(with-interrupts-disabled
(set! collect-callbacks (cons p collect-callbacks)))
p)))
(define (unsafe-remove-collect-callbacks p)
(with-interrupts-disabled
(set! collect-callbacks (#%remq p collect-callbacks))))
(when (in-original-host-thread?)
(with-interrupts-disabled
(set! collect-callbacks (#%remq p collect-callbacks)))))
(define (run-collect-callbacks sel)
(let loop ([l collect-callbacks])
(unless (null? l)
(let ([v (sel (car l))])
(let loop ([i 0] [save #f])
(unless (fx= i (#%vector-length v))
(loop (fx+ i 1)
(run-one-collect-callback (#%vector-ref v i) save sel))))
(loop (cdr l))))))
(when (in-original-host-thread?)
(let loop ([l collect-callbacks])
(unless (null? l)
(let ([v (sel (car l))])
(let loop ([i 0] [save #f])
(unless (fx= i (#%vector-length v))
(loop (fx+ i 1)
(run-one-collect-callback (#%vector-ref v i) save sel))))
(loop (cdr l)))))))
(define-syntax (osapi-foreign-procedure stx)
(syntax-case stx ()

View File

@ -8,6 +8,10 @@
(set-place-registers! place-registers)
(thunk)))))
(define pthread? thread?)
(define in-original-host-thread?
(let ([initial-thread-id (get-thread-id)])
(lambda ()
(eqv? (get-thread-id) initial-thread-id))))
;; make-condition
;; condition-wait
;; condition-signal
@ -20,6 +24,7 @@
(define make-pthread-parameter #%make-parameter)
(define (fork-pthread) (void))
(define (pthread?) #f)
(define (in-original-host-thread?) #t)
(define (make-condition) (void))
(define (condition-wait c m) (void))
(define (condition-signal c) (void))

View File

@ -132,5 +132,6 @@
'unsafe-custodian-unregister unsafe-custodian-unregister
'thread-push-kill-callback! thread-push-kill-callback!
'thread-pop-kill-callback! thread-pop-kill-callback!
'unsafe-add-pre-poll-callback! (lambda (proc) (void))
'set-get-subprocesses-time! void
'prop:place-message prop:place-message))

View File

@ -78,6 +78,7 @@
unsafe-custodian-unregister
thread-push-kill-callback!
thread-pop-kill-callback!
unsafe-add-pre-poll-callback!
set-get-subprocesses-time!)
(define start-atomic unsafe-start-atomic)

View File

@ -15,7 +15,8 @@
log-max-level
log-all-levels
log-level-evt
log-message ; ok to call in host-Scheme interrupt handler
log-message
log-message* ; ok to call in host-Scheme interrupt handler
log-receiver?
make-log-receiver
add-stderr-log-receiver!
@ -80,8 +81,6 @@
s])))
(semaphore-peek-evt s))
;; Can be called in any host Scheme thread and in interrupt handler,
;; like `log-level?`:
(define/who log-message
;; Complex dispatch based on number and whether third is a string:
(case-lambda
@ -102,13 +101,16 @@
[(logger level topic message data prefix?)
(do-log-message who logger level topic message data prefix?)]))
;; Can be called in any host Scheme thread and in interrupt handler,
;; like `log-level?`:
(define (do-log-message who logger level topic message data prefix?)
(check who logger? logger)
(check-level who level)
(check who #:or-false symbol? topic)
(check who string? message)
(log-message* logger level topic message data prefix? #f))
;; Can be called in any host Scheme thread and in interrupt handler,
;; like `log-level?`:
(define (log-message* logger level topic message data prefix? in-interrupt?)
(define msg #f)
(atomically/no-interrupts/no-wind
(when ((logger-max-wanted-level logger) . level>=? . level)
@ -126,7 +128,7 @@
message))
data
topic)))
(log-receiver-send! r msg)))
(log-receiver-send! r msg in-interrupt?)))
(let ([parent (logger-parent logger)])
(when (and parent
((filters-level-for-topic (logger-propagate-filters logger) topic) . level>=? . level))

View File

@ -2,6 +2,7 @@
(require "../common/check.rkt"
"../../common/queue.rkt"
"../host/thread.rkt"
"../host/pthread.rkt"
"../host/rktio.rkt"
"../string/convert.rkt"
"level.rkt"
@ -27,48 +28,44 @@
#:property
prop:receiver-send
(lambda (lr msg)
;; called in atomic mode and possibly in host interrupt handler,
;; so anything we touch here should only be modified with
;; interrupts disabled
(atomically/no-interrupts/no-wind
(define b (queue-remove! (queue-log-receiver-waiters lr)))
(cond
[b
(decrement-receiever-waiters! lr)
(define select! (unbox b))
(set-box! b msg)
(select!)]
[else
(queue-add! (queue-log-receiver-msgs lr) msg)])))
;; called in atomic mode
(define b (queue-remove! (queue-log-receiver-waiters lr)))
(cond
[b
(decrement-receiever-waiters! lr)
(define select! (unbox b))
(set-box! b msg)
(select!)]
[else
(queue-add! (queue-log-receiver-msgs lr) msg)]))
#:property
prop:evt
(poller (lambda (lr ctx)
(define msg (atomically/no-interrupts/no-wind (queue-remove! (queue-log-receiver-msgs lr))))
(define msg (queue-remove! (queue-log-receiver-msgs lr)))
(cond
[msg
(values (list msg) #f)]
[else
(define b (box (poll-ctx-select-proc ctx)))
(define n (atomically/no-interrupts/no-wind
(increment-receiever-waiters! lr)
(queue-add! (queue-log-receiver-waiters lr) b)))
(define n (begin
(increment-receiever-waiters! lr)
(queue-add! (queue-log-receiver-waiters lr) b)))
(values #f (control-state-evt
(wrap-evt async-evt (lambda (e) (unbox b)))
(lambda () (atomically/no-interrupts/no-wind
(queue-remove-node! (queue-log-receiver-waiters lr) n)
(decrement-receiever-waiters! lr)))
(lambda ()
(queue-remove-node! (queue-log-receiver-waiters lr) n)
(decrement-receiever-waiters! lr))
void
(lambda ()
(atomically/no-interrupts/no-wind
(define msg (queue-remove! (queue-log-receiver-msgs lr)))
(cond
[msg
(set-box! b msg)
(values msg #t)]
[else
(increment-receiever-waiters! lr)
(set! n (queue-add! (queue-log-receiver-waiters lr) b))
(values #f #f)])))))]))))
(define msg (queue-remove! (queue-log-receiver-msgs lr)))
(cond
[msg
(set-box! b msg)
(values msg #t)]
[else
(increment-receiever-waiters! lr)
(set! n (queue-add! (queue-log-receiver-waiters lr) b))
(values #f #f)]))))]))))
(define/who (make-log-receiver logger level . args)
(check who logger? logger)
@ -152,5 +149,11 @@
(set-logger-level-sema! logger #f))))
;; Called in atomic mode and with interrupts disabled
(define (log-receiver-send! r msg)
((receiver-send-ref r) r msg))
(define (log-receiver-send! r msg in-interrupt?)
(if (or (not in-interrupt?)
;; We can run stdio loggers in atomic/interrupt mode:
(stdio-log-receiver? r))
((receiver-send-ref r) r msg)
;; Record any any other message for posting later:
(unsafe-add-pre-poll-callback! (lambda ()
((receiver-send-ref r) r msg)))))

View File

@ -11,7 +11,8 @@
"thread.rkt"
"unsafe.rkt"
"time.rkt"
"place-message.rkt")
"place-message.rkt"
"pre-poll.rkt")
;; Unsafe scheduler-cooperation functions are made available to
;; clients through a `#%thread` primitive linklet instance:
@ -65,5 +66,6 @@
'unsafe-custodian-unregister unsafe-custodian-unregister
'thread-push-kill-callback! thread-push-kill-callback!
'thread-pop-kill-callback! thread-pop-kill-callback!
'unsafe-add-pre-poll-callback! unsafe-add-pre-poll-callback!
'set-get-subprocesses-time! set-get-subprocesses-time!
'prop:place-message prop:place-message))

View File

@ -95,6 +95,7 @@
(start-atomic)
(define-values (parent-in parent-out parent-err child-in-fd child-out-fd child-err-fd)
(make-place-ports+fds in out err))
(host:mutex-acquire lock)
;; Start the new place
(host:fork-place
(lambda ()
@ -136,7 +137,6 @@
(wakeup-waiting pl))
(hash-clear! done-waiting)))
;; Wait for the place to start, then return the place object
(host:mutex-acquire lock)
(host:condition-wait started lock)
(host:mutex-release lock)
(end-atomic)

View File

@ -0,0 +1,26 @@
#lang racket/base
(require "atomic.rkt"
"host.rkt")
(provide unsafe-add-pre-poll-callback!
call-pre-poll-external-callbacks)
(define pre-poll-callbacks null)
;; called in atomic mode in an arbitrary host thread, but
;; with all other host threads paused; the given procedure
;; will be called in atomic mode, possibly in the schduler
(define (unsafe-add-pre-poll-callback! proc)
(set! pre-poll-callbacks (cons proc pre-poll-callbacks)))
;; in atomic mode
(define (call-pre-poll-external-callbacks)
(unless (null? pre-poll-callbacks)
;; disable interrupts to avoid a case with `unsafe-add-pre-poll-callback!`
(host:disable-interrupts)
(define l pre-poll-callbacks)
(set! pre-poll-callbacks null)
(host:enable-interrupts)
;; Call the callbacks
(for ([cb (in-list (reverse l))])
(cb))))

View File

@ -12,7 +12,8 @@
"exit.rkt"
"future.rkt"
"custodian.rkt"
(submod "custodian.rkt" scheduling))
(submod "custodian.rkt" scheduling)
"pre-poll.rkt")
;; Many scheduler details are implemented in "thread.rkt", but this
;; module handles the thread selection, thread swapping, and
@ -48,6 +49,7 @@
pending-callbacks))
(host:poll-will-executors)
(check-external-events 'fast)
(call-pre-poll-external-callbacks)
(check-place-activity)
(when (and (null? callbacks)
(all-threads-poll-done?)

View File

@ -5,7 +5,8 @@
"atomic.rkt"
"parameter.rkt"
"waiter.rkt"
"evt.rkt")
"evt.rkt"
"pre-poll.rkt")
(provide make-semaphore
semaphore?
@ -93,6 +94,7 @@
(define/who (semaphore-try-wait? s)
(check who semaphore? s)
(atomically
(call-pre-poll-external-callbacks)
(define c (semaphore-count s))
(cond
[(positive? c)

View File

@ -9,7 +9,8 @@
"thread.rkt"
(only-in (submod "thread.rkt" scheduling)
thread-descheduled?)
"schedule-info.rkt")
"schedule-info.rkt"
"pre-poll.rkt")
(provide sync
sync/timeout
@ -96,6 +97,7 @@
(cond
[(or (and (real? timeout) (zero? timeout))
(procedure? timeout))
(atomically (call-pre-poll-external-callbacks))
(let poll-loop ()
(sync-poll s #:fail-k (lambda (sched-info polled-all?)
(cond