reduce overhead and precision of per-thread cpu times
Getting the current CPU time is relatively expensive, so get it only on thread swaps where a thread used its full quantum or 1/100 swaps otherwise. This approximation should work because thread-specific CPU times are rarely requested, and they make the most sense for threads that don't constantly swap out due to synchronization.
This commit is contained in:
parent
84deff1d02
commit
1624193210
|
@ -113,7 +113,9 @@ both user and system time.
|
|||
|
||||
@item{If @racket[scope] is a thread, the result is specific to the
|
||||
time while the thread ran, but it may include time for other
|
||||
@tech{places}.}
|
||||
@tech{places}. The more a thread synchronizes with other
|
||||
threads, the less precisely per-thread processor time is
|
||||
recorded.}
|
||||
|
||||
@item{If @racket[scope] is @racket['subprocesses], the result is the
|
||||
sum of process times for known-completed subprocesses (see
|
||||
|
|
|
@ -457,7 +457,7 @@
|
|||
(e 100
|
||||
(lambda () (set! started (add1 started)))
|
||||
(lambda (remain a b c) (list a b c n started))
|
||||
(lambda (e)
|
||||
(lambda (e timeout?)
|
||||
(loop e (add1 n))))))
|
||||
'(1 2 3 10 11))
|
||||
|
||||
|
@ -481,7 +481,7 @@
|
|||
(e 200
|
||||
void
|
||||
(lambda (remain a b c pre t-post) (list a b c pre t-post post n))
|
||||
(lambda (e)
|
||||
(lambda (e timeout?)
|
||||
(loop e (add1 n)))))
|
||||
'(1 2 3 10 0 10 10))))
|
||||
|
||||
|
@ -504,12 +504,12 @@
|
|||
100
|
||||
void
|
||||
(lambda (remain l) l)
|
||||
(lambda (e) (error 'engine "oops"))))
|
||||
(lambda (e timeout?) (error 'engine "oops"))))
|
||||
(define l2 ((list-ref l1 2)
|
||||
100
|
||||
void
|
||||
(lambda (remain l) l)
|
||||
(lambda (e) (error 'engine "oops"))))
|
||||
(lambda (e timeout?) (error 'engine "oops"))))
|
||||
(check (list-ref l1 0) 1)
|
||||
(check (list-ref l1 1) 100)
|
||||
(check (list-ref l1 3) 2)
|
||||
|
@ -528,7 +528,7 @@
|
|||
(extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set)
|
||||
(make-engine (lambda () (|#%app| my-param)) engine-tag #f #f))])
|
||||
(check (|#%app| my-param) 'init)
|
||||
(check (e 1000 void (lambda (remain v) v) (lambda (e) (error 'engine "oops"))) 'set))
|
||||
(check (e 1000 void (lambda (remain v) v) (lambda (e timeout?) (error 'engine "oops"))) 'set))
|
||||
|
||||
(let ([also-my-param (make-derived-parameter my-param
|
||||
(lambda (v) (list v))
|
||||
|
@ -633,7 +633,7 @@
|
|||
(= prefixes (add1 i))
|
||||
(- (car v) i)
|
||||
(- (cadr v) i)))
|
||||
(lambda (e) (loop e (add1 i))))))
|
||||
(lambda (e timeout?) (loop e (add1 i))))))
|
||||
'(#t #t 1 0))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
|
|
@ -34,5 +34,6 @@
|
|||
[start-place (known-procedure 32)]
|
||||
[make-pthread-parameter (known-procedure 2)]
|
||||
[break-enabled-key (known-constant)]
|
||||
[engine-block (known-procedure 1)]
|
||||
|
||||
[force-unfasl (known-procedure 2)])
|
||||
|
|
|
@ -76,39 +76,46 @@
|
|||
(define (engine-block-via-timer)
|
||||
(cond
|
||||
[(current-in-uninterrupted)
|
||||
(pending-interrupt-callback engine-block)]
|
||||
(pending-interrupt-callback engine-block/timeout)]
|
||||
[else
|
||||
(engine-block)]))
|
||||
(engine-block/timeout)]))
|
||||
|
||||
(define (engine-block)
|
||||
(assert-not-in-uninterrupted)
|
||||
(timer-interrupt-handler void)
|
||||
(let ([es (current-engine-state)])
|
||||
(unless es
|
||||
(error 'engine-block "not currently running an engine"))
|
||||
(reset-handler (engine-state-reset-handler es))
|
||||
(start-implicit-uninterrupted 'block)
|
||||
;; Extra pair of parens around swap is to apply a prefix
|
||||
;; function on swapping back in:
|
||||
((swap-metacontinuation
|
||||
(engine-state-mc es)
|
||||
(lambda (saves)
|
||||
(end-implicit-uninterrupted 'block)
|
||||
(current-engine-state #f)
|
||||
(lambda () ; returned to the `swap-continuation` in `create-engine`
|
||||
((engine-state-expire es)
|
||||
(create-engine
|
||||
saves
|
||||
(lambda (prefix) prefix) ; returns `prefix` to the above "(("
|
||||
(engine-state-thread-cell-values es)
|
||||
(engine-state-init-break-enabled-cell es)))))))))
|
||||
(define engine-block
|
||||
(case-lambda
|
||||
[(timeout?)
|
||||
(assert-not-in-uninterrupted)
|
||||
(timer-interrupt-handler void)
|
||||
(let ([es (current-engine-state)])
|
||||
(unless es
|
||||
(error 'engine-block "not currently running an engine"))
|
||||
(reset-handler (engine-state-reset-handler es))
|
||||
(start-implicit-uninterrupted 'block)
|
||||
;; Extra pair of parens around swap is to apply a prefix
|
||||
;; function on swapping back in:
|
||||
((swap-metacontinuation
|
||||
(engine-state-mc es)
|
||||
(lambda (saves)
|
||||
(end-implicit-uninterrupted 'block)
|
||||
(current-engine-state #f)
|
||||
(lambda () ; returned to the `swap-continuation` in `create-engine`
|
||||
((engine-state-expire es)
|
||||
(create-engine
|
||||
saves
|
||||
(lambda (prefix) prefix) ; returns `prefix` to the above "(("
|
||||
(engine-state-thread-cell-values es)
|
||||
(engine-state-init-break-enabled-cell es))
|
||||
timeout?))))))]
|
||||
[() (engine-block #f)]))
|
||||
|
||||
(define (engine-block/timeout)
|
||||
(engine-block #t))
|
||||
|
||||
(define (engine-timeout)
|
||||
(let ([can-block? (fx= 1 (disable-interrupts))])
|
||||
(enable-interrupts)
|
||||
(cond
|
||||
[can-block?
|
||||
(engine-block)]
|
||||
(engine-block/timeout)]
|
||||
[else
|
||||
;; Cause the timer to fire as soon as possible (i.e., as soon
|
||||
;; as interrupts are enabled)
|
||||
|
|
|
@ -286,6 +286,8 @@
|
|||
#%procedure?]
|
||||
[(eq? 'ephemeron (car args))
|
||||
ephemeron-pair?]
|
||||
[(eq? 'metacontinuation-frame (car args))
|
||||
metacontinuation-frame?]
|
||||
[(symbol? (car args))
|
||||
#f
|
||||
;; This is disaterously slow, so don't try it:
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
[unsafe-place-local-set! rumble:unsafe-place-local-set!]
|
||||
;; These are extracted via `#%linklet`:
|
||||
[make-engine rumble:make-engine]
|
||||
[engine-block rumble:engine-block]
|
||||
[engine-timeout rumble:engine-timeout]
|
||||
[engine-return rumble:engine-return]
|
||||
[current-engine-state rumble:current-engine-state]
|
||||
|
@ -105,6 +104,7 @@
|
|||
'make-pthread-parameter make-pthread-parameter
|
||||
'unsafe-root-continuation-prompt-tag unsafe-root-continuation-prompt-tag
|
||||
'break-enabled-key break-enabled-key
|
||||
'engine-block engine-block
|
||||
;; These are actually redirected by "place-register.ss", but
|
||||
;; we list them here for compatibility with the bootstrapping
|
||||
;; variant of `#%pthread`
|
||||
|
@ -114,7 +114,6 @@
|
|||
[(|#%engine|)
|
||||
(hasheq
|
||||
'make-engine rumble:make-engine
|
||||
'engine-block rumble:engine-block
|
||||
'engine-timeout rumble:engine-timeout
|
||||
'engine-return rumble:engine-return
|
||||
'current-engine-state (lambda (v) (rumble:current-engine-state v))
|
||||
|
|
|
@ -64,4 +64,5 @@
|
|||
#%call-with-values
|
||||
make-pthread-parameter
|
||||
break-enabled-key
|
||||
engine-block
|
||||
fasl->s-exp/intern))))
|
||||
|
|
|
@ -237,6 +237,7 @@ typedef struct Thread_Local_Variables {
|
|||
struct Scheme_Thread_Set *scheme_thread_set_top_;
|
||||
struct Scheme_Current_LWC *scheme_current_lwc_;
|
||||
intptr_t process_time_at_swap_;
|
||||
intptr_t process_time_skips_;
|
||||
int num_running_threads_;
|
||||
int swap_no_setjmp_;
|
||||
int thread_swap_count_;
|
||||
|
@ -622,6 +623,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
|
|||
#define swap_no_setjmp XOA (scheme_get_thread_local_variables()->swap_no_setjmp_)
|
||||
#define thread_swap_count XOA (scheme_get_thread_local_variables()->thread_swap_count_)
|
||||
#define process_time_at_swap XOA (scheme_get_thread_local_variables()->process_time_at_swap_)
|
||||
#define process_time_skips XOA (scheme_get_thread_local_variables()->process_time_skips_)
|
||||
#define scheme_did_gc_count XOA (scheme_get_thread_local_variables()->scheme_did_gc_count_)
|
||||
#define scheme_future_state XOA (scheme_get_thread_local_variables()->scheme_future_state_)
|
||||
#define scheme_future_thread_state XOA (scheme_get_thread_local_variables()->scheme_future_thread_state_)
|
||||
|
|
|
@ -103,6 +103,7 @@ THREAD_LOCAL_DECL(static int swap_no_setjmp = 0);
|
|||
THREAD_LOCAL_DECL(static int thread_swap_count);
|
||||
THREAD_LOCAL_DECL(int scheme_did_gc_count);
|
||||
THREAD_LOCAL_DECL(static intptr_t process_time_at_swap);
|
||||
THREAD_LOCAL_DECL(static intptr_t process_time_skips);
|
||||
|
||||
THREAD_LOCAL_DECL(static intptr_t max_gc_pre_used_bytes);
|
||||
#ifdef MZ_PRECISE_GC
|
||||
|
@ -2985,11 +2986,12 @@ static void do_swap_thread()
|
|||
} else {
|
||||
Scheme_Thread *new_thread = swap_target;
|
||||
|
||||
{
|
||||
if ((!scheme_fuel_counter) || (++process_time_skips >= 100)) {
|
||||
intptr_t cpm;
|
||||
cpm = scheme_get_process_milliseconds();
|
||||
scheme_current_thread->accum_process_msec += (cpm - scheme_current_thread->current_start_process_msec);
|
||||
process_time_at_swap = cpm;
|
||||
process_time_skips = 0;
|
||||
}
|
||||
|
||||
swap_target = NULL;
|
||||
|
|
|
@ -66,9 +66,11 @@
|
|||
(set! prefix next-prefix)
|
||||
(break-thread t)
|
||||
(thread-resume t)
|
||||
(define timeout? #f)
|
||||
(define t2
|
||||
(thread (lambda ()
|
||||
(sleep (/ ticks 1000000.0))
|
||||
(set! timeout? #t)
|
||||
(thread-suspend t))))
|
||||
;; Limited break propagation while syncing:
|
||||
(call-with-exception-handler
|
||||
|
@ -85,7 +87,7 @@
|
|||
[(thread-dead? t)
|
||||
(apply complete 0 results)]
|
||||
[else
|
||||
(expire go)]))
|
||||
(expire go timeout?)]))
|
||||
go)
|
||||
|
||||
(define (engine-block)
|
||||
|
@ -223,11 +225,11 @@
|
|||
'unsafe-place-local-set! unsafe-place-local-set!
|
||||
'unsafe-add-global-finalizer (lambda (v proc) (void))
|
||||
'unsafe-root-continuation-prompt-tag unsafe-root-continuation-prompt-tag
|
||||
'break-enabled-key break-enabled-key))
|
||||
'break-enabled-key break-enabled-key
|
||||
'engine-block engine-block))
|
||||
(primitive-table '#%engine
|
||||
(hash
|
||||
'make-engine make-engine
|
||||
'engine-block engine-block
|
||||
'engine-timeout engine-timeout
|
||||
'engine-return (lambda args
|
||||
(error "engine-return: not ready"))
|
||||
|
|
|
@ -29,11 +29,11 @@
|
|||
unsafe-place-local-ref
|
||||
unsafe-place-local-set!
|
||||
unsafe-root-continuation-prompt-tag
|
||||
break-enabled-key)
|
||||
break-enabled-key
|
||||
engine-block)
|
||||
|
||||
(bounce #%engine
|
||||
make-engine
|
||||
engine-block
|
||||
engine-timeout
|
||||
engine-return
|
||||
current-engine-state
|
||||
|
|
|
@ -26,7 +26,10 @@
|
|||
[pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate
|
||||
done-waiting ; hash table of places to ping when this one ends
|
||||
[wakeup-handle #:mutable]
|
||||
[dequeue-semas #:mutable]) ; semaphores reflecting place-channel waits to recheck
|
||||
[dequeue-semas #:mutable] ; semaphores reflecting place-channel waits to recheck
|
||||
[recent-process-milliseconds #:mutable] ; used by scheduler
|
||||
[skipped-time-accums #:mutable] ; used by scheduler
|
||||
[thread-swap-count #:mutable]) ; number of thread swaps
|
||||
#:property prop:evt (struct-field-index pch)
|
||||
#:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self)))))
|
||||
|
||||
|
@ -49,7 +52,10 @@
|
|||
#f ; pending-break
|
||||
(make-hasheq) ; done-waiting
|
||||
#f ; wakeup-handle
|
||||
'())) ; dequeue-semas
|
||||
'() ; dequeue-semas
|
||||
0 ; recent-process-milliseconds
|
||||
0 ; skipped-time-accums
|
||||
0)) ; thread-swap-count
|
||||
|
||||
(define initial-place (make-place (host:make-mutex)
|
||||
root-custodian))
|
||||
|
|
|
@ -23,12 +23,11 @@
|
|||
(provide call-in-main-thread
|
||||
call-in-another-main-thread
|
||||
set-atomic-timeout-callback!
|
||||
set-check-place-activity!)
|
||||
set-check-place-activity!
|
||||
thread-swap-count)
|
||||
|
||||
(define TICKS 100000)
|
||||
|
||||
(define-place-local process-milliseconds 0)
|
||||
|
||||
;; Initializes the thread system:
|
||||
(define (call-in-main-thread thunk)
|
||||
(make-initial-thread (lambda ()
|
||||
|
@ -75,7 +74,9 @@
|
|||
(set-thread-engine! t 'running)
|
||||
(set-thread-sched-info! t #f)
|
||||
(current-thread t)
|
||||
(set-place-current-thread! current-place t)
|
||||
(let ([pl current-place])
|
||||
(set-place-current-thread! pl t)
|
||||
(set-place-thread-swap-count! pl (add1 (place-thread-swap-count pl))))
|
||||
(run-callbacks-in-engine
|
||||
e callbacks
|
||||
(lambda (e)
|
||||
|
@ -90,7 +91,7 @@
|
|||
(atomic-timeout-callback #f))))
|
||||
(lambda args
|
||||
(start-implicit-atomic-mode)
|
||||
(accum-cpu-time! t)
|
||||
(accum-cpu-time! t #t)
|
||||
(current-thread #f)
|
||||
(set-place-current-thread! current-place #f)
|
||||
(unless (zero? (current-atomic))
|
||||
|
@ -100,11 +101,11 @@
|
|||
(force-exit 0))
|
||||
(thread-did-work!)
|
||||
(select-thread!))
|
||||
(lambda (e)
|
||||
(lambda (e timeout?)
|
||||
(start-implicit-atomic-mode)
|
||||
(cond
|
||||
[(zero? (current-atomic))
|
||||
(accum-cpu-time! t)
|
||||
(accum-cpu-time! t timeout?)
|
||||
(current-thread #f)
|
||||
(set-place-current-thread! current-place #f)
|
||||
(unless (eq? (thread-engine t) 'done)
|
||||
|
@ -225,11 +226,31 @@
|
|||
|
||||
;; ----------------------------------------
|
||||
|
||||
(define (accum-cpu-time! t)
|
||||
(define start process-milliseconds)
|
||||
(set! process-milliseconds (current-process-milliseconds))
|
||||
(set-thread-cpu-time! t (+ (thread-cpu-time t)
|
||||
(- process-milliseconds start))))
|
||||
;; Getting CPU time is expensive relative to a thread
|
||||
;; switch, so limit precision in the case that the thread
|
||||
;; did not use up its quantum. This loss of precision
|
||||
;; should be ok, since `(current-process-milliseconds <thread>)`
|
||||
;; is used rarely, and it makes the most sense for threads
|
||||
;; that don't keep swapping themselves out.
|
||||
|
||||
(define (accum-cpu-time! t timeout?)
|
||||
(define pl current-place)
|
||||
(cond
|
||||
[(not timeout?)
|
||||
(define n (place-skipped-time-accums pl))
|
||||
(set-place-skipped-time-accums! pl (add1 n))
|
||||
(when (= n 100)
|
||||
(accum-cpu-time! t #t))]
|
||||
[else
|
||||
(define start (place-recent-process-milliseconds pl))
|
||||
(define now (current-process-milliseconds))
|
||||
(set-place-recent-process-milliseconds! pl now)
|
||||
(set-place-skipped-time-accums! pl 0)
|
||||
(set-thread-cpu-time! t (+ (thread-cpu-time t)
|
||||
(- now start)))]))
|
||||
|
||||
(define (thread-swap-count)
|
||||
(place-thread-swap-count current-place))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
#lang racket/base
|
||||
(require "check.rkt"
|
||||
"thread.rkt"
|
||||
"time.rkt")
|
||||
"time.rkt"
|
||||
"schedule.rkt")
|
||||
|
||||
(provide vector-set-performance-stats!)
|
||||
|
||||
|
@ -19,7 +20,7 @@
|
|||
(maybe-set! 1 (current-milliseconds))
|
||||
(maybe-set! 2 (current-gc-milliseconds))
|
||||
(maybe-set! 3 0) ; # of GCs
|
||||
(maybe-set! 4 0) ; # of thread switches
|
||||
(maybe-set! 4 (thread-swap-count)) ; # of thread switches
|
||||
(maybe-set! 5 0) ; # of stack overflows
|
||||
(maybe-set! 6 0) ; # of threads scheduled for running
|
||||
(maybe-set! 7 0) ; # of syntax objects read
|
||||
|
|
|
@ -681,16 +681,21 @@
|
|||
(lambda (c) (and (real? c) (c . >= . 0)))
|
||||
#:contract "(>=/c 0)"
|
||||
secs)
|
||||
(define until-msecs (+ (* secs 1000.0)
|
||||
(current-inexact-milliseconds)))
|
||||
(let loop ()
|
||||
((thread-deschedule! (current-thread)
|
||||
until-msecs
|
||||
void
|
||||
(lambda ()
|
||||
;; Woke up due to an ignored break?
|
||||
;; Try again:
|
||||
(loop))))))
|
||||
(cond
|
||||
[(and (zero? secs)
|
||||
(zero? (current-atomic)))
|
||||
(thread-yield #f)]
|
||||
[else
|
||||
(define until-msecs (+ (* secs 1000.0)
|
||||
(current-inexact-milliseconds)))
|
||||
(let loop ()
|
||||
((thread-deschedule! (current-thread)
|
||||
until-msecs
|
||||
void
|
||||
(lambda ()
|
||||
;; Woke up due to an ignored break?
|
||||
;; Try again:
|
||||
(loop)))))]))
|
||||
|
||||
;; ----------------------------------------
|
||||
;; Tracking thread progress
|
||||
|
|
Loading…
Reference in New Issue
Block a user