reduce overhead and precision of per-thread cpu times

Getting the current CPU time is relatively expensive, so get it only
on thread swaps where a thread used its full quantum or 1/100 swaps
otherwise. This approximation should work because thread-specific CPU
times are rarely requested, and they make the most sense for threads
that don't constantly swap out due to synchronization.
This commit is contained in:
Matthew Flatt 2019-04-25 08:06:35 -06:00
parent 84deff1d02
commit 1624193210
15 changed files with 117 additions and 66 deletions

View File

@ -113,7 +113,9 @@ both user and system time.
@item{If @racket[scope] is a thread, the result is specific to the
time while the thread ran, but it may include time for other
@tech{places}.}
@tech{places}. The more a thread synchronizes with other
threads, the less precisely per-thread processor time is
recorded.}
@item{If @racket[scope] is @racket['subprocesses], the result is the
sum of process times for known-completed subprocesses (see

View File

@ -457,7 +457,7 @@
(e 100
(lambda () (set! started (add1 started)))
(lambda (remain a b c) (list a b c n started))
(lambda (e)
(lambda (e timeout?)
(loop e (add1 n))))))
'(1 2 3 10 11))
@ -481,7 +481,7 @@
(e 200
void
(lambda (remain a b c pre t-post) (list a b c pre t-post post n))
(lambda (e)
(lambda (e timeout?)
(loop e (add1 n)))))
'(1 2 3 10 0 10 10))))
@ -504,12 +504,12 @@
100
void
(lambda (remain l) l)
(lambda (e) (error 'engine "oops"))))
(lambda (e timeout?) (error 'engine "oops"))))
(define l2 ((list-ref l1 2)
100
void
(lambda (remain l) l)
(lambda (e) (error 'engine "oops"))))
(lambda (e timeout?) (error 'engine "oops"))))
(check (list-ref l1 0) 1)
(check (list-ref l1 1) 100)
(check (list-ref l1 3) 2)
@ -528,7 +528,7 @@
(extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set)
(make-engine (lambda () (|#%app| my-param)) engine-tag #f #f))])
(check (|#%app| my-param) 'init)
(check (e 1000 void (lambda (remain v) v) (lambda (e) (error 'engine "oops"))) 'set))
(check (e 1000 void (lambda (remain v) v) (lambda (e timeout?) (error 'engine "oops"))) 'set))
(let ([also-my-param (make-derived-parameter my-param
(lambda (v) (list v))
@ -633,7 +633,7 @@
(= prefixes (add1 i))
(- (car v) i)
(- (cadr v) i)))
(lambda (e) (loop e (add1 i))))))
(lambda (e timeout?) (loop e (add1 i))))))
'(#t #t 1 0))
;; ----------------------------------------

View File

@ -34,5 +34,6 @@
[start-place (known-procedure 32)]
[make-pthread-parameter (known-procedure 2)]
[break-enabled-key (known-constant)]
[engine-block (known-procedure 1)]
[force-unfasl (known-procedure 2)])

View File

@ -76,39 +76,46 @@
(define (engine-block-via-timer)
(cond
[(current-in-uninterrupted)
(pending-interrupt-callback engine-block)]
(pending-interrupt-callback engine-block/timeout)]
[else
(engine-block)]))
(engine-block/timeout)]))
(define (engine-block)
(assert-not-in-uninterrupted)
(timer-interrupt-handler void)
(let ([es (current-engine-state)])
(unless es
(error 'engine-block "not currently running an engine"))
(reset-handler (engine-state-reset-handler es))
(start-implicit-uninterrupted 'block)
;; Extra pair of parens around swap is to apply a prefix
;; function on swapping back in:
((swap-metacontinuation
(engine-state-mc es)
(lambda (saves)
(end-implicit-uninterrupted 'block)
(current-engine-state #f)
(lambda () ; returned to the `swap-continuation` in `create-engine`
((engine-state-expire es)
(create-engine
saves
(lambda (prefix) prefix) ; returns `prefix` to the above "(("
(engine-state-thread-cell-values es)
(engine-state-init-break-enabled-cell es)))))))))
(define engine-block
(case-lambda
[(timeout?)
(assert-not-in-uninterrupted)
(timer-interrupt-handler void)
(let ([es (current-engine-state)])
(unless es
(error 'engine-block "not currently running an engine"))
(reset-handler (engine-state-reset-handler es))
(start-implicit-uninterrupted 'block)
;; Extra pair of parens around swap is to apply a prefix
;; function on swapping back in:
((swap-metacontinuation
(engine-state-mc es)
(lambda (saves)
(end-implicit-uninterrupted 'block)
(current-engine-state #f)
(lambda () ; returned to the `swap-continuation` in `create-engine`
((engine-state-expire es)
(create-engine
saves
(lambda (prefix) prefix) ; returns `prefix` to the above "(("
(engine-state-thread-cell-values es)
(engine-state-init-break-enabled-cell es))
timeout?))))))]
[() (engine-block #f)]))
(define (engine-block/timeout)
(engine-block #t))
(define (engine-timeout)
(let ([can-block? (fx= 1 (disable-interrupts))])
(enable-interrupts)
(cond
[can-block?
(engine-block)]
(engine-block/timeout)]
[else
;; Cause the timer to fire as soon as possible (i.e., as soon
;; as interrupts are enabled)

View File

@ -286,6 +286,8 @@
#%procedure?]
[(eq? 'ephemeron (car args))
ephemeron-pair?]
[(eq? 'metacontinuation-frame (car args))
metacontinuation-frame?]
[(symbol? (car args))
#f
;; This is disaterously slow, so don't try it:

View File

@ -10,7 +10,6 @@
[unsafe-place-local-set! rumble:unsafe-place-local-set!]
;; These are extracted via `#%linklet`:
[make-engine rumble:make-engine]
[engine-block rumble:engine-block]
[engine-timeout rumble:engine-timeout]
[engine-return rumble:engine-return]
[current-engine-state rumble:current-engine-state]
@ -105,6 +104,7 @@
'make-pthread-parameter make-pthread-parameter
'unsafe-root-continuation-prompt-tag unsafe-root-continuation-prompt-tag
'break-enabled-key break-enabled-key
'engine-block engine-block
;; These are actually redirected by "place-register.ss", but
;; we list them here for compatibility with the bootstrapping
;; variant of `#%pthread`
@ -114,7 +114,6 @@
[(|#%engine|)
(hasheq
'make-engine rumble:make-engine
'engine-block rumble:engine-block
'engine-timeout rumble:engine-timeout
'engine-return rumble:engine-return
'current-engine-state (lambda (v) (rumble:current-engine-state v))

View File

@ -64,4 +64,5 @@
#%call-with-values
make-pthread-parameter
break-enabled-key
engine-block
fasl->s-exp/intern))))

View File

@ -237,6 +237,7 @@ typedef struct Thread_Local_Variables {
struct Scheme_Thread_Set *scheme_thread_set_top_;
struct Scheme_Current_LWC *scheme_current_lwc_;
intptr_t process_time_at_swap_;
intptr_t process_time_skips_;
int num_running_threads_;
int swap_no_setjmp_;
int thread_swap_count_;
@ -622,6 +623,7 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define swap_no_setjmp XOA (scheme_get_thread_local_variables()->swap_no_setjmp_)
#define thread_swap_count XOA (scheme_get_thread_local_variables()->thread_swap_count_)
#define process_time_at_swap XOA (scheme_get_thread_local_variables()->process_time_at_swap_)
#define process_time_skips XOA (scheme_get_thread_local_variables()->process_time_skips_)
#define scheme_did_gc_count XOA (scheme_get_thread_local_variables()->scheme_did_gc_count_)
#define scheme_future_state XOA (scheme_get_thread_local_variables()->scheme_future_state_)
#define scheme_future_thread_state XOA (scheme_get_thread_local_variables()->scheme_future_thread_state_)

View File

@ -103,6 +103,7 @@ THREAD_LOCAL_DECL(static int swap_no_setjmp = 0);
THREAD_LOCAL_DECL(static int thread_swap_count);
THREAD_LOCAL_DECL(int scheme_did_gc_count);
THREAD_LOCAL_DECL(static intptr_t process_time_at_swap);
THREAD_LOCAL_DECL(static intptr_t process_time_skips);
THREAD_LOCAL_DECL(static intptr_t max_gc_pre_used_bytes);
#ifdef MZ_PRECISE_GC
@ -2985,11 +2986,12 @@ static void do_swap_thread()
} else {
Scheme_Thread *new_thread = swap_target;
{
if ((!scheme_fuel_counter) || (++process_time_skips >= 100)) {
intptr_t cpm;
cpm = scheme_get_process_milliseconds();
scheme_current_thread->accum_process_msec += (cpm - scheme_current_thread->current_start_process_msec);
process_time_at_swap = cpm;
process_time_skips = 0;
}
swap_target = NULL;

View File

@ -66,9 +66,11 @@
(set! prefix next-prefix)
(break-thread t)
(thread-resume t)
(define timeout? #f)
(define t2
(thread (lambda ()
(sleep (/ ticks 1000000.0))
(set! timeout? #t)
(thread-suspend t))))
;; Limited break propagation while syncing:
(call-with-exception-handler
@ -85,7 +87,7 @@
[(thread-dead? t)
(apply complete 0 results)]
[else
(expire go)]))
(expire go timeout?)]))
go)
(define (engine-block)
@ -223,11 +225,11 @@
'unsafe-place-local-set! unsafe-place-local-set!
'unsafe-add-global-finalizer (lambda (v proc) (void))
'unsafe-root-continuation-prompt-tag unsafe-root-continuation-prompt-tag
'break-enabled-key break-enabled-key))
'break-enabled-key break-enabled-key
'engine-block engine-block))
(primitive-table '#%engine
(hash
'make-engine make-engine
'engine-block engine-block
'engine-timeout engine-timeout
'engine-return (lambda args
(error "engine-return: not ready"))

View File

@ -29,11 +29,11 @@
unsafe-place-local-ref
unsafe-place-local-set!
unsafe-root-continuation-prompt-tag
break-enabled-key)
break-enabled-key
engine-block)
(bounce #%engine
make-engine
engine-block
engine-timeout
engine-return
current-engine-state

View File

@ -26,7 +26,10 @@
[pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate
done-waiting ; hash table of places to ping when this one ends
[wakeup-handle #:mutable]
[dequeue-semas #:mutable]) ; semaphores reflecting place-channel waits to recheck
[dequeue-semas #:mutable] ; semaphores reflecting place-channel waits to recheck
[recent-process-milliseconds #:mutable] ; used by scheduler
[skipped-time-accums #:mutable] ; used by scheduler
[thread-swap-count #:mutable]) ; number of thread swaps
#:property prop:evt (struct-field-index pch)
#:property prop:place-message (lambda (self) (lambda () (lambda () (place-pch self)))))
@ -49,7 +52,10 @@
#f ; pending-break
(make-hasheq) ; done-waiting
#f ; wakeup-handle
'())) ; dequeue-semas
'() ; dequeue-semas
0 ; recent-process-milliseconds
0 ; skipped-time-accums
0)) ; thread-swap-count
(define initial-place (make-place (host:make-mutex)
root-custodian))

View File

@ -23,12 +23,11 @@
(provide call-in-main-thread
call-in-another-main-thread
set-atomic-timeout-callback!
set-check-place-activity!)
set-check-place-activity!
thread-swap-count)
(define TICKS 100000)
(define-place-local process-milliseconds 0)
;; Initializes the thread system:
(define (call-in-main-thread thunk)
(make-initial-thread (lambda ()
@ -75,7 +74,9 @@
(set-thread-engine! t 'running)
(set-thread-sched-info! t #f)
(current-thread t)
(set-place-current-thread! current-place t)
(let ([pl current-place])
(set-place-current-thread! pl t)
(set-place-thread-swap-count! pl (add1 (place-thread-swap-count pl))))
(run-callbacks-in-engine
e callbacks
(lambda (e)
@ -90,7 +91,7 @@
(atomic-timeout-callback #f))))
(lambda args
(start-implicit-atomic-mode)
(accum-cpu-time! t)
(accum-cpu-time! t #t)
(current-thread #f)
(set-place-current-thread! current-place #f)
(unless (zero? (current-atomic))
@ -100,11 +101,11 @@
(force-exit 0))
(thread-did-work!)
(select-thread!))
(lambda (e)
(lambda (e timeout?)
(start-implicit-atomic-mode)
(cond
[(zero? (current-atomic))
(accum-cpu-time! t)
(accum-cpu-time! t timeout?)
(current-thread #f)
(set-place-current-thread! current-place #f)
(unless (eq? (thread-engine t) 'done)
@ -225,11 +226,31 @@
;; ----------------------------------------
(define (accum-cpu-time! t)
(define start process-milliseconds)
(set! process-milliseconds (current-process-milliseconds))
(set-thread-cpu-time! t (+ (thread-cpu-time t)
(- process-milliseconds start))))
;; Getting CPU time is expensive relative to a thread
;; switch, so limit precision in the case that the thread
;; did not use up its quantum. This loss of precision
;; should be ok, since `(current-process-milliseconds <thread>)`
;; is used rarely, and it makes the most sense for threads
;; that don't keep swapping themselves out.
(define (accum-cpu-time! t timeout?)
(define pl current-place)
(cond
[(not timeout?)
(define n (place-skipped-time-accums pl))
(set-place-skipped-time-accums! pl (add1 n))
(when (= n 100)
(accum-cpu-time! t #t))]
[else
(define start (place-recent-process-milliseconds pl))
(define now (current-process-milliseconds))
(set-place-recent-process-milliseconds! pl now)
(set-place-skipped-time-accums! pl 0)
(set-thread-cpu-time! t (+ (thread-cpu-time t)
(- now start)))]))
(define (thread-swap-count)
(place-thread-swap-count current-place))
;; ----------------------------------------

View File

@ -1,7 +1,8 @@
#lang racket/base
(require "check.rkt"
"thread.rkt"
"time.rkt")
"time.rkt"
"schedule.rkt")
(provide vector-set-performance-stats!)
@ -19,7 +20,7 @@
(maybe-set! 1 (current-milliseconds))
(maybe-set! 2 (current-gc-milliseconds))
(maybe-set! 3 0) ; # of GCs
(maybe-set! 4 0) ; # of thread switches
(maybe-set! 4 (thread-swap-count)) ; # of thread switches
(maybe-set! 5 0) ; # of stack overflows
(maybe-set! 6 0) ; # of threads scheduled for running
(maybe-set! 7 0) ; # of syntax objects read

View File

@ -681,16 +681,21 @@
(lambda (c) (and (real? c) (c . >= . 0)))
#:contract "(>=/c 0)"
secs)
(define until-msecs (+ (* secs 1000.0)
(current-inexact-milliseconds)))
(let loop ()
((thread-deschedule! (current-thread)
until-msecs
void
(lambda ()
;; Woke up due to an ignored break?
;; Try again:
(loop))))))
(cond
[(and (zero? secs)
(zero? (current-atomic)))
(thread-yield #f)]
[else
(define until-msecs (+ (* secs 1000.0)
(current-inexact-milliseconds)))
(let loop ()
((thread-deschedule! (current-thread)
until-msecs
void
(lambda ()
;; Woke up due to an ignored break?
;; Try again:
(loop)))))]))
;; ----------------------------------------
;; Tracking thread progress