added optional timeout to condition-wait

original commit: 26d4dfc437fde2ba85d2d1a9ccadc2fa36979da0
This commit is contained in:
Bob Burger 2017-03-14 12:53:10 -04:00
parent 03072287e9
commit 288243924f
12 changed files with 170 additions and 33 deletions

4
LOG
View File

@ -375,3 +375,7 @@
registers as required for varargs functions. Windows does not
support single-precision floating-point arguments as varargs.
foreign.ms, np-languages.ss, x86_64.ss
- added an optional timeout argument to condition-wait
externs.h, stats.c, thread.c, thread.h, csug/threads.stex,
primvars.ms, thread.ms, release_notes.stex,
date.ss, primdata.ss, prims.ss

View File

@ -203,7 +203,7 @@ extern void S_mutex_acquire PROTO((scheme_mutex_t *m));
extern INT S_mutex_tryacquire PROTO((scheme_mutex_t *m));
extern void S_mutex_release PROTO((scheme_mutex_t *m));
extern s_thread_cond_t *S_make_condition PROTO((void));
extern void S_condition_wait PROTO((s_thread_cond_t *c, scheme_mutex_t *m));
extern IBOOL S_condition_wait PROTO((s_thread_cond_t *c, scheme_mutex_t *m, ptr t));
#endif
/* scheme.c */
@ -309,6 +309,7 @@ extern ptr S_gmtime PROTO((ptr tzoff, ptr tspair));
extern ptr S_asctime PROTO((ptr dtvec));
extern ptr S_mktime PROTO((ptr dtvec));
extern ptr S_unique_id PROTO((void));
extern void s_gettime PROTO((INT typeno, struct timespec *tp));
/* symbol.c */
extern ptr S_symbol_value PROTO((ptr sym));

View File

@ -38,8 +38,6 @@
static struct timespec starting_mono_tp;
static void s_gettime(INT typeno, struct timespec *tp);
/******** unique-id ********/
#if (time_t_bits == 32)
@ -119,7 +117,7 @@ typedef void (WINAPI *GetSystemTimeAsFileTime_t)(LPFILETIME lpSystemTimeAsFileTi
static GetSystemTimeAsFileTime_t s_GetSystemTimeAsFileTime = GetSystemTimeAsFileTime;
static void s_gettime(INT typeno, struct timespec *tp) {
void s_gettime(INT typeno, struct timespec *tp) {
switch (typeno) {
case time_process: {
FILETIME ftKernel, ftUser, ftDummy;
@ -228,7 +226,7 @@ static char *asctime_r(const struct tm *tm, char *buf) {
#else /* WIN32 */
static void s_gettime(INT typeno, struct timespec *tp) {
void s_gettime(INT typeno, struct timespec *tp) {
switch (typeno) {
case time_thread:
#ifdef CLOCK_THREAD_CPUTIME_ID

View File

@ -306,10 +306,63 @@ s_thread_cond_t *S_make_condition() {
return c;
}
void S_condition_wait(c, m) s_thread_cond_t *c; scheme_mutex_t *m; {
#ifdef FEATURE_WINDOWS
static inline int s_thread_cond_timedwait(s_thread_cond_t *cond, s_thread_mutex_t *mutex, int typeno, long sec, long nsec) {
if (typeno == time_utc) {
struct timespec now;
s_gettime(time_utc, &now);
sec -= (long)now.tv_sec;
nsec -= now.tv_nsec;
if (nsec < 0) {
sec -= 1;
nsec += 1000000000;
}
}
if (sec < 0) {
sec = 0;
nsec = 0;
}
if (SleepConditionVariableCS(cond, mutex, sec*1000 + nsec/1000000)) {
return 0;
} else if (GetLastError() == ERROR_TIMEOUT) {
return ETIMEDOUT;
} else {
return EINVAL;
}
}
#else /* FEATURE_WINDOWS */
static inline int s_thread_cond_timedwait(s_thread_cond_t *cond, s_thread_mutex_t *mutex, int typeno, long sec, long nsec) {
struct timespec t;
if (typeno == time_duration) {
struct timespec now;
s_gettime(time_utc, &now);
t.tv_sec = now.tv_sec + sec;
t.tv_nsec = now.tv_nsec + nsec;
if (t.tv_nsec >= 1000000000) {
t.tv_sec += 1;
t.tv_nsec -= 1000000000;
}
} else {
t.tv_sec = sec;
t.tv_nsec = nsec;
}
return pthread_cond_timedwait(cond, mutex, &t);
}
#endif /* FEATURE_WINDOWS */
#define Srecord_ref(x,i) (((ptr *)((uptr)(x)+record_data_disp))[i])
IBOOL S_condition_wait(c, m, t) s_thread_cond_t *c; scheme_mutex_t *m; ptr t; {
ptr tc = get_thread_context();
s_thread_t self = s_thread_self();
iptr count;
INT typeno;
long sec;
long nsec;
INT status;
if ((count = m->count) == 0 || !s_thread_equal(m->owner, self))
@ -318,12 +371,20 @@ void S_condition_wait(c, m) s_thread_cond_t *c; scheme_mutex_t *m; {
if (count != 1)
S_error1("condition-wait", "mutex ~s is recursively locked", m);
if (t != Sfalse) {
/* Keep in sync with ts record in s/date.ss */
typeno = Sinteger32_value(Srecord_ref(t,0));
sec = Sinteger32_value(Scar(Srecord_ref(t,1)));
nsec = Sinteger32_value(Scdr(Srecord_ref(t,1)));
}
if (c == &S_collect_cond || DISABLECOUNT(tc) == 0) {
deactivate_thread(tc)
}
m->count = 0;
status = s_thread_cond_wait(c, &m->pmutex);
status = (t == Sfalse) ? s_thread_cond_wait(c, &m->pmutex) :
s_thread_cond_timedwait(c, &m->pmutex, typeno, sec, nsec);
m->owner = self;
m->count = 1;
@ -331,8 +392,13 @@ void S_condition_wait(c, m) s_thread_cond_t *c; scheme_mutex_t *m; {
reactivate_thread(tc)
}
if (status != 0) {
if (status == 0) {
return 1;
} else if (status == ETIMEDOUT) {
return 0;
} else {
S_error1("condition-wait", "failed: ~a", S_strerror(status));
return 0;
}
}
#endif /* PTHREADS */

View File

@ -18,6 +18,7 @@
#ifdef FEATURE_WINDOWS
#include <process.h>
#include <time.h>
/* learned from http://locklessinc.com/articles/pthreads_on_windows/ which
* Windows API types and functions to use to support mutexes and condition

View File

@ -221,24 +221,29 @@ Using \scheme{with-mutex} is generally more convenient and safer than using
%----------------------------------------------------------------------------
\entryheader
\formdef{condition-wait}{\categoryprocedure}{(condition-wait \var{cond} \var{mutex})}
\returns unspecified
\formdef{condition-wait}{\categoryprocedure}{(condition-wait \var{cond} \var{mutex} \var{timeout})}
\returns \scheme{#t} if the calling thread was awakened by the condition, \scheme{#f} if the calling thread timed out waiting
\listlibraries
\endentryheader
\noindent
\var{cond} must be a condition object, and
\var{mutex} must be a mutex.
The optional argument \var{timeout} is a time record of type
\scheme{time-duration} or \scheme{time-utc}, or \scheme{#f} for no
timeout. It defaults to \scheme{#f}.
\scheme{condition-wait} waits for the condition
identified by the condition object \var{cond}.
\scheme{condition-wait} waits up to the specified \var{timeout} for
the condition identified by the condition object \var{cond}.
The calling thread must have acquired the mutex identified by the mutex
\var{mutex} at the time \scheme{condition-wait} is
called.
\var{mutex} is released as a side effect of the call to
\scheme{condition-wait}.
When a thread is later released from the condition variable by one
of the procedures described below, \var{mutex} is reacquired
and \scheme{condition-wait} returns.
When a thread is later released from the condition variable by one of
the procedures described below or the timeout expires, \var{mutex} is
reacquired and \scheme{condition-wait} returns.
%----------------------------------------------------------------------------

View File

@ -242,7 +242,7 @@
; the test has several deficiencies:
; - for arguments labeled sub-<type>, it cannot determine a 'good' value. this can be
; addressed only by refining the types given in primdata.ss, including adding
; dependent times for things like list-ref, the range of whose second argument
; dependent types for things like list-ref, the range of whose second argument
; depends on its first.
; - it doesn't verify that the raised condition is appropriate, other than ruling out
; warning conditions, non-violation conditions, and invalid memory references.
@ -448,6 +448,7 @@
[(textual-input-port) (current-input-port) 0 *binary-input-port (transcoded-port *binary-output-port (native-transcoder)) #f]
[(textual-output-port) (current-output-port) 0 *binary-output-port (transcoded-port *binary-input-port (native-transcoder)) #f]
[(time) *time "no-time" #f]
[(timeout) *time "no-time"]
[(transcoder) (native-transcoder) 0 #f]
[(u16) 0 -1 (expt 2 16) "a" #f]
[(u24) 0 -1 (expt 2 24) "a" #f]

View File

@ -113,7 +113,12 @@
(errorf #f "extra threads running ~s" ls)]
[else (loop (- n 1) nnt)]))]))
#t))
(andmap procedure? (list $threads $fib $thread-check)))
(define $time-in-range?
(lambda (start stop target)
(let ([t (time-difference stop start)])
(<= (abs (- (+ (time-second t) (* (time-nanosecond t) 1e-9)) target))
0.1))))
(andmap procedure? (list $threads $fib $thread-check $time-in-range?)))
($thread-check)
(not (= (let ([n #f])
(fork-thread (lambda () (set! n (get-thread-id))))
@ -144,29 +149,71 @@
(if (= n 0)
'()
(cons i (iota (+ i 1) (- n 1))))]))
; w/o the occasional pretty-print, following mat hangs under Windows
; theory is that looping main thread shuts out one of the child
; threads.
(define-syntax parallel-list
(syntax-rules ()
[(_ x ...)
(let ([v (make-vector (length '(x ...)) #f)])
(let ([v (make-vector (length '(x ...)) #f)]
[m (make-mutex)]
[c (make-condition)]
[n (length '(x ...))])
(map (lambda (p i) (p i))
(list (lambda (i)
(fork-thread (lambda () (vector-set! v i x))))
(fork-thread
(lambda ()
(vector-set! v i x)
(with-mutex m
(set! n (- n 1))
(when (= n 0)
(condition-signal c))))))
...)
(iota (length '(x ...))))
(let f ([n 0])
(let ([ls (vector->list v)])
(if (andmap values ls)
ls
(if (= n 1000000)
(begin (pretty-print ls) (f 0))
(f (+ n 1)))))))]))
(and (with-mutex m
(condition-wait c m (make-time 'time-duration 0 60)))
(vector->list v)))]))
(parallel-list (fatfib 26) (fatfib 27) (fatfib 28)
(fatfib 29) (fatfib 30) (fatfib 31)))
'(196418 317811 514229 832040 1346269 2178309))
($thread-check)
(let ([m (make-mutex)] [c (make-condition)])
(with-mutex m
(let* ([start (current-time)]
[r (condition-wait c m (make-time 'time-duration 250000000 1))]
[stop (current-time)])
(and (not r)
($time-in-range? start stop 1.25)))))
(let ([m (make-mutex)] [c (make-condition)])
(with-mutex m
(let* ([start (current-time)]
[r (condition-wait c m
(add-duration start (make-time 'time-duration 250000000 1)))]
[stop (current-time)])
(and (not r)
($time-in-range? start stop 1.25)))))
(let ([m (make-mutex)] [c (make-condition)])
(with-mutex m
(let* ([start (current-time)]
[r (condition-wait c m (make-time 'time-duration 0 -1))]
[stop (current-time)])
(and (not r)
($time-in-range? start stop 0.0)))))
(let ([m (make-mutex)] [c (make-condition)])
(with-mutex m
(let* ([start (current-time)]
[r (condition-wait c m
(add-duration start (make-time 'time-duration 0 -1)))]
[stop (current-time)])
(and (not r)
($time-in-range? start stop 0.0)))))
(let ([m (make-mutex)] [c (make-condition)])
(with-mutex m
(fork-thread
(lambda ()
(with-mutex m (sleep (make-time 'time-duration 250000000 0)))))
(let* ([start (current-time)]
[r (condition-wait c m (make-time 'time-duration 100000000 0))]
[stop (current-time)])
(and (not r)
($time-in-range? start stop 0.25)))))
(let ([count 300] [live 0] [live-m (make-mutex)])
(parameterize ([collect-request-handler
(lambda ()

View File

@ -58,6 +58,15 @@ Online versions of both books can be found at
%-----------------------------------------------------------------------------
\section{Functionality Changes}\label{section:functionality}
\subsection{Optional timeout for \protect\scheme{condition-wait} (9.4.1)}
The \scheme{condition-wait} procedure now takes an optional
\var{timeout} argument and returns a boolean indicating whether the
thread was awakened by the condition before the timeout. The
\var{timeout} can be a time record of type \scheme{time-duration} or
\scheme{time-utc}, or it can be \scheme{#f} for no timeout (the
default).
\subsection{\protect\scheme{procedure-arity-mask} (9.4.1)}
The new primitive procedure \scheme{procedure-arity-mask} takes a

View File

@ -101,7 +101,7 @@
(scheme-object)
scheme-object))
(define-record-type ts
(define-record-type ts ; keep in sync with S_condition_wait in c/thread.c
(fields (mutable typeno) (immutable pair))
(nongenerative #{ts a5dq4nztnmq6xlgp-a})
(sealed #t))

View File

@ -1200,7 +1200,7 @@
(condition-broadcast [feature pthreads] [sig [(condition-object) -> (void)]] [flags true])
(condition-continuation [sig [(continuation-condition) -> (ptr)]] [flags pure mifoldable discard])
(condition-signal [feature pthreads] [sig [(condition-object) -> (void)]] [flags true])
(condition-wait [feature pthreads] [sig [(condition-object mutex) -> (void)]] [flags true])
(condition-wait [feature pthreads] [sig [(condition-object mutex) (condition-object mutex timeout) -> (boolean)]] [flags])
(conjugate [sig [(number) -> (number)]] [flags arith-op mifoldable discard])
(continuation-condition? [sig [(ptr) -> (boolean)]] [flags pure unrestricted mifoldable discard])
(copy-environment [sig [(environment) (environment ptr) (environment ptr sub-list) -> (environment)]] [flags alloc])

View File

@ -1430,7 +1430,7 @@
scheme-object))
(define mr (foreign-procedure "(cs)mutex_release" (scheme-object) void))
(define mc (foreign-procedure "(cs)make_condition" () scheme-object))
(define cw (foreign-procedure "(cs)condition_wait" (scheme-object scheme-object) void))
(define cw (foreign-procedure "(cs)condition_wait" (scheme-object scheme-object scheme-object) boolean))
(define cb (foreign-procedure "(cs)condition_broadcast" (scheme-object) void))
(define cs (foreign-procedure "(cs)condition_signal" (scheme-object) void))
@ -1490,12 +1490,17 @@
($condition? x)))
(set! condition-wait
(lambda (c m)
(case-lambda
[(c m) (condition-wait c m #f)]
[(c m t)
(unless (thread-condition? c)
($oops 'condition-wait "~s is not a condition" c))
(unless (mutex? m)
($oops 'condition-wait "~s is not a mutex" m))
(cw ($condition-addr c) ($mutex-addr m))))
(unless (or (not t)
(and (time? t) (memq (time-type t) '(time-duration time-utc))))
($oops 'condition-wait "~s is not a time record of type time-duration or time-utc" t))
(cw ($condition-addr c) ($mutex-addr m) t)]))
(set! condition-broadcast
(lambda (c)