diff --git a/racket/src/cs/main.sps b/racket/src/cs/main.sps index a70b9fbfc4..ffeb728749 100644 --- a/racket/src/cs/main.sps +++ b/racket/src/cs/main.sps @@ -416,18 +416,20 @@ (and (not minor?) (log-level? root-logger 'debug 'GC:major))) (let ([delta (- pre-allocated post-allocated)]) - (log-message root-logger 'debug (if debug-GC? 'GC 'GC:major) - (chez:format "GC: 0:~a~a @ ~a(~a); free ~a(~a) ~ams @ ~a" - (if minor? "min" "MAJ") gen - (K "" pre-allocated) (K "+" (- pre-allocated+overhead pre-allocated)) - (K "" delta) (K "+" (- (- pre-allocated+overhead post-allocated+overhead) - delta)) - (- post-cpu-time pre-cpu-time) pre-cpu-time) - (make-gc-info (if minor? 'minor 'major) pre-allocated pre-allocated+overhead 0 - post-allocated post-allocated+overhead - pre-cpu-time post-cpu-time - pre-time post-time) - #f))))))))) + (log-message* root-logger 'debug (if debug-GC? 'GC 'GC:major) + (chez:format "GC: 0:~a~a @ ~a(~a); free ~a(~a) ~ams @ ~a" + (if minor? "min" "MAJ") gen + (K "" pre-allocated) (K "+" (- pre-allocated+overhead pre-allocated)) + (K "" delta) (K "+" (- (- pre-allocated+overhead post-allocated+overhead) + delta)) + (- post-cpu-time pre-cpu-time) pre-cpu-time) + (make-gc-info (if minor? 'minor 'major) pre-allocated pre-allocated+overhead 0 + post-allocated post-allocated+overhead + pre-cpu-time post-cpu-time + pre-time post-time) + #f + ;; in interrupt: + #t))))))))) (seq (|#%app| exit-handler (let ([orig (|#%app| exit-handler)] diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index 8daadb16a6..2c6d99a110 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -599,7 +599,7 @@ set-ffi-get-lib-and-obj! ; not exported to Racket poll-async-callbacks ; not exported to Racket set-async-callback-poll-wakeup! ; not exported to Racket - set-foreign-eval! ; not exported to racket + set-foreign-eval! ; not exported to Racket unsafe-unbox unsafe-unbox* diff --git a/racket/src/cs/rumble/memory.ss b/racket/src/cs/rumble/memory.ss index 24149c4837..3261c96bd8 100644 --- a/racket/src/cs/rumble/memory.ss +++ b/racket/src/cs/rumble/memory.ss @@ -307,28 +307,32 @@ ;; ---------------------------------------- -;; List of (cons
) +;; List of (cons ), currently suported +;; only in the original host thread of the original place (define collect-callbacks '()) (define (unsafe-add-collect-callbacks pre post) - (let ([p (cons pre post)]) - (with-interrupts-disabled - (set! collect-callbacks (cons p collect-callbacks))) - p)) + (when (in-original-host-thread?) + (let ([p (cons pre post)]) + (with-interrupts-disabled + (set! collect-callbacks (cons p collect-callbacks))) + p))) (define (unsafe-remove-collect-callbacks p) - (with-interrupts-disabled - (set! collect-callbacks (#%remq p collect-callbacks)))) + (when (in-original-host-thread?) + (with-interrupts-disabled + (set! collect-callbacks (#%remq p collect-callbacks))))) (define (run-collect-callbacks sel) - (let loop ([l collect-callbacks]) - (unless (null? l) - (let ([v (sel (car l))]) - (let loop ([i 0] [save #f]) - (unless (fx= i (#%vector-length v)) - (loop (fx+ i 1) - (run-one-collect-callback (#%vector-ref v i) save sel)))) - (loop (cdr l)))))) + (when (in-original-host-thread?) + (let loop ([l collect-callbacks]) + (unless (null? l) + (let ([v (sel (car l))]) + (let loop ([i 0] [save #f]) + (unless (fx= i (#%vector-length v)) + (loop (fx+ i 1) + (run-one-collect-callback (#%vector-ref v i) save sel)))) + (loop (cdr l))))))) (define-syntax (osapi-foreign-procedure stx) (syntax-case stx () diff --git a/racket/src/cs/rumble/pthread.ss b/racket/src/cs/rumble/pthread.ss index 2e15301ed5..f7b85f71b7 100644 --- a/racket/src/cs/rumble/pthread.ss +++ b/racket/src/cs/rumble/pthread.ss @@ -8,6 +8,10 @@ (set-place-registers! place-registers) (thunk))))) (define pthread? thread?) + (define in-original-host-thread? + (let ([initial-thread-id (get-thread-id)]) + (lambda () + (eqv? (get-thread-id) initial-thread-id)))) ;; make-condition ;; condition-wait ;; condition-signal @@ -20,6 +24,7 @@ (define make-pthread-parameter #%make-parameter) (define (fork-pthread) (void)) (define (pthread?) #f) + (define (in-original-host-thread?) #t) (define (make-condition) (void)) (define (condition-wait c m) (void)) (define (condition-signal c) (void)) diff --git a/racket/src/io/host/bootstrap.rkt b/racket/src/io/host/bootstrap.rkt index 7ce001e74c..b03b3d5414 100644 --- a/racket/src/io/host/bootstrap.rkt +++ b/racket/src/io/host/bootstrap.rkt @@ -132,5 +132,6 @@ 'unsafe-custodian-unregister unsafe-custodian-unregister 'thread-push-kill-callback! thread-push-kill-callback! 'thread-pop-kill-callback! thread-pop-kill-callback! + 'unsafe-add-pre-poll-callback! (lambda (proc) (void)) 'set-get-subprocesses-time! void 'prop:place-message prop:place-message)) diff --git a/racket/src/io/host/thread.rkt b/racket/src/io/host/thread.rkt index 992ddab4fc..acde4ee5bc 100644 --- a/racket/src/io/host/thread.rkt +++ b/racket/src/io/host/thread.rkt @@ -78,6 +78,7 @@ unsafe-custodian-unregister thread-push-kill-callback! thread-pop-kill-callback! + unsafe-add-pre-poll-callback! set-get-subprocesses-time!) (define start-atomic unsafe-start-atomic) diff --git a/racket/src/io/logger/main.rkt b/racket/src/io/logger/main.rkt index 6567feeec0..9656be25e7 100644 --- a/racket/src/io/logger/main.rkt +++ b/racket/src/io/logger/main.rkt @@ -15,7 +15,8 @@ log-max-level log-all-levels log-level-evt - log-message ; ok to call in host-Scheme interrupt handler + log-message + log-message* ; ok to call in host-Scheme interrupt handler log-receiver? make-log-receiver add-stderr-log-receiver! @@ -80,8 +81,6 @@ s]))) (semaphore-peek-evt s)) -;; Can be called in any host Scheme thread and in interrupt handler, -;; like `log-level?`: (define/who log-message ;; Complex dispatch based on number and whether third is a string: (case-lambda @@ -102,13 +101,16 @@ [(logger level topic message data prefix?) (do-log-message who logger level topic message data prefix?)])) -;; Can be called in any host Scheme thread and in interrupt handler, -;; like `log-level?`: (define (do-log-message who logger level topic message data prefix?) (check who logger? logger) (check-level who level) (check who #:or-false symbol? topic) (check who string? message) + (log-message* logger level topic message data prefix? #f)) + +;; Can be called in any host Scheme thread and in interrupt handler, +;; like `log-level?`: +(define (log-message* logger level topic message data prefix? in-interrupt?) (define msg #f) (atomically/no-interrupts/no-wind (when ((logger-max-wanted-level logger) . level>=? . level) @@ -126,7 +128,7 @@ message)) data topic))) - (log-receiver-send! r msg))) + (log-receiver-send! r msg in-interrupt?))) (let ([parent (logger-parent logger)]) (when (and parent ((filters-level-for-topic (logger-propagate-filters logger) topic) . level>=? . level)) diff --git a/racket/src/io/logger/receiver.rkt b/racket/src/io/logger/receiver.rkt index 7bc9964b68..7aa1e6a6db 100644 --- a/racket/src/io/logger/receiver.rkt +++ b/racket/src/io/logger/receiver.rkt @@ -2,6 +2,7 @@ (require "../common/check.rkt" "../../common/queue.rkt" "../host/thread.rkt" + "../host/pthread.rkt" "../host/rktio.rkt" "../string/convert.rkt" "level.rkt" @@ -27,48 +28,44 @@ #:property prop:receiver-send (lambda (lr msg) - ;; called in atomic mode and possibly in host interrupt handler, - ;; so anything we touch here should only be modified with - ;; interrupts disabled - (atomically/no-interrupts/no-wind - (define b (queue-remove! (queue-log-receiver-waiters lr))) - (cond - [b - (decrement-receiever-waiters! lr) - (define select! (unbox b)) - (set-box! b msg) - (select!)] - [else - (queue-add! (queue-log-receiver-msgs lr) msg)]))) + ;; called in atomic mode + (define b (queue-remove! (queue-log-receiver-waiters lr))) + (cond + [b + (decrement-receiever-waiters! lr) + (define select! (unbox b)) + (set-box! b msg) + (select!)] + [else + (queue-add! (queue-log-receiver-msgs lr) msg)])) #:property prop:evt (poller (lambda (lr ctx) - (define msg (atomically/no-interrupts/no-wind (queue-remove! (queue-log-receiver-msgs lr)))) + (define msg (queue-remove! (queue-log-receiver-msgs lr))) (cond [msg (values (list msg) #f)] [else (define b (box (poll-ctx-select-proc ctx))) - (define n (atomically/no-interrupts/no-wind - (increment-receiever-waiters! lr) - (queue-add! (queue-log-receiver-waiters lr) b))) + (define n (begin + (increment-receiever-waiters! lr) + (queue-add! (queue-log-receiver-waiters lr) b))) (values #f (control-state-evt (wrap-evt async-evt (lambda (e) (unbox b))) - (lambda () (atomically/no-interrupts/no-wind - (queue-remove-node! (queue-log-receiver-waiters lr) n) - (decrement-receiever-waiters! lr))) + (lambda () + (queue-remove-node! (queue-log-receiver-waiters lr) n) + (decrement-receiever-waiters! lr)) void (lambda () - (atomically/no-interrupts/no-wind - (define msg (queue-remove! (queue-log-receiver-msgs lr))) - (cond - [msg - (set-box! b msg) - (values msg #t)] - [else - (increment-receiever-waiters! lr) - (set! n (queue-add! (queue-log-receiver-waiters lr) b)) - (values #f #f)])))))])))) + (define msg (queue-remove! (queue-log-receiver-msgs lr))) + (cond + [msg + (set-box! b msg) + (values msg #t)] + [else + (increment-receiever-waiters! lr) + (set! n (queue-add! (queue-log-receiver-waiters lr) b)) + (values #f #f)]))))])))) (define/who (make-log-receiver logger level . args) (check who logger? logger) @@ -152,5 +149,11 @@ (set-logger-level-sema! logger #f)))) ;; Called in atomic mode and with interrupts disabled -(define (log-receiver-send! r msg) - ((receiver-send-ref r) r msg)) +(define (log-receiver-send! r msg in-interrupt?) + (if (or (not in-interrupt?) + ;; We can run stdio loggers in atomic/interrupt mode: + (stdio-log-receiver? r)) + ((receiver-send-ref r) r msg) + ;; Record any any other message for posting later: + (unsafe-add-pre-poll-callback! (lambda () + ((receiver-send-ref r) r msg))))) diff --git a/racket/src/thread/instance.rkt b/racket/src/thread/instance.rkt index 1b4ef83311..6ab32d91bd 100644 --- a/racket/src/thread/instance.rkt +++ b/racket/src/thread/instance.rkt @@ -11,7 +11,8 @@ "thread.rkt" "unsafe.rkt" "time.rkt" - "place-message.rkt") + "place-message.rkt" + "pre-poll.rkt") ;; Unsafe scheduler-cooperation functions are made available to ;; clients through a `#%thread` primitive linklet instance: @@ -65,5 +66,6 @@ 'unsafe-custodian-unregister unsafe-custodian-unregister 'thread-push-kill-callback! thread-push-kill-callback! 'thread-pop-kill-callback! thread-pop-kill-callback! + 'unsafe-add-pre-poll-callback! unsafe-add-pre-poll-callback! 'set-get-subprocesses-time! set-get-subprocesses-time! 'prop:place-message prop:place-message)) diff --git a/racket/src/thread/place.rkt b/racket/src/thread/place.rkt index 489a9ad61c..568c2ba59a 100644 --- a/racket/src/thread/place.rkt +++ b/racket/src/thread/place.rkt @@ -95,6 +95,7 @@ (start-atomic) (define-values (parent-in parent-out parent-err child-in-fd child-out-fd child-err-fd) (make-place-ports+fds in out err)) + (host:mutex-acquire lock) ;; Start the new place (host:fork-place (lambda () @@ -136,7 +137,6 @@ (wakeup-waiting pl)) (hash-clear! done-waiting))) ;; Wait for the place to start, then return the place object - (host:mutex-acquire lock) (host:condition-wait started lock) (host:mutex-release lock) (end-atomic) diff --git a/racket/src/thread/pre-poll.rkt b/racket/src/thread/pre-poll.rkt new file mode 100644 index 0000000000..23b894e300 --- /dev/null +++ b/racket/src/thread/pre-poll.rkt @@ -0,0 +1,26 @@ +#lang racket/base +(require "atomic.rkt" + "host.rkt") + +(provide unsafe-add-pre-poll-callback! + call-pre-poll-external-callbacks) + +(define pre-poll-callbacks null) + +;; called in atomic mode in an arbitrary host thread, but +;; with all other host threads paused; the given procedure +;; will be called in atomic mode, possibly in the schduler +(define (unsafe-add-pre-poll-callback! proc) + (set! pre-poll-callbacks (cons proc pre-poll-callbacks))) + +;; in atomic mode +(define (call-pre-poll-external-callbacks) + (unless (null? pre-poll-callbacks) + ;; disable interrupts to avoid a case with `unsafe-add-pre-poll-callback!` + (host:disable-interrupts) + (define l pre-poll-callbacks) + (set! pre-poll-callbacks null) + (host:enable-interrupts) + ;; Call the callbacks + (for ([cb (in-list (reverse l))]) + (cb)))) diff --git a/racket/src/thread/schedule.rkt b/racket/src/thread/schedule.rkt index f544dafb1a..413874c37c 100644 --- a/racket/src/thread/schedule.rkt +++ b/racket/src/thread/schedule.rkt @@ -12,7 +12,8 @@ "exit.rkt" "future.rkt" "custodian.rkt" - (submod "custodian.rkt" scheduling)) + (submod "custodian.rkt" scheduling) + "pre-poll.rkt") ;; Many scheduler details are implemented in "thread.rkt", but this ;; module handles the thread selection, thread swapping, and @@ -48,6 +49,7 @@ pending-callbacks)) (host:poll-will-executors) (check-external-events 'fast) + (call-pre-poll-external-callbacks) (check-place-activity) (when (and (null? callbacks) (all-threads-poll-done?) diff --git a/racket/src/thread/semaphore.rkt b/racket/src/thread/semaphore.rkt index 4df60dc1e6..00dac020c4 100644 --- a/racket/src/thread/semaphore.rkt +++ b/racket/src/thread/semaphore.rkt @@ -5,7 +5,8 @@ "atomic.rkt" "parameter.rkt" "waiter.rkt" - "evt.rkt") + "evt.rkt" + "pre-poll.rkt") (provide make-semaphore semaphore? @@ -93,6 +94,7 @@ (define/who (semaphore-try-wait? s) (check who semaphore? s) (atomically + (call-pre-poll-external-callbacks) (define c (semaphore-count s)) (cond [(positive? c) diff --git a/racket/src/thread/sync.rkt b/racket/src/thread/sync.rkt index e6e9d8aa10..7f2b4a8416 100644 --- a/racket/src/thread/sync.rkt +++ b/racket/src/thread/sync.rkt @@ -9,7 +9,8 @@ "thread.rkt" (only-in (submod "thread.rkt" scheduling) thread-descheduled?) - "schedule-info.rkt") + "schedule-info.rkt" + "pre-poll.rkt") (provide sync sync/timeout @@ -96,6 +97,7 @@ (cond [(or (and (real? timeout) (zero? timeout)) (procedure? timeout)) + (atomically (call-pre-poll-external-callbacks)) (let poll-loop () (sync-poll s #:fail-k (lambda (sched-info polled-all?) (cond