ffi/unsafe/schedule: expose some scheduler internals

These internals are needed for `racket/gui`, which currently
accesses them via C functions.
This commit is contained in:
Matthew Flatt 2017-10-03 20:12:21 -06:00
parent fadf0d4a7d
commit afc5f919f1
20 changed files with 495 additions and 97 deletions

View File

@ -14,6 +14,7 @@
@include-section["custodian.scrbl"]
@include-section["atomic.scrbl"]
@include-section["try-atomic.scrbl"]
@include-section["schedule.scrbl"]
@include-section["global.scrbl"]
@include-section["objc.scrbl"]
@include-section["ns.scrbl"]

View File

@ -0,0 +1,105 @@
#lang scribble/doc
@(require "utils.rkt"
(for-label ffi/unsafe/schedule))
@title{Thread Scheduling}
@defmodule[ffi/unsafe/schedule]{The
@racketmodname[ffi/unsafe/schedule] library provides functions for
cooperating with the thread scheduler and manipulating it.}
@history[#:added "6.11.0.1"]
@defproc[(unsafe-poller [poll (evt? (or/c #f any/c) . -> . (values (or/c #f list?) evt?))])
any/c]{
Produces a @deftech{poller} value that is allowed as a
@racket[prop:evt] value, even though it is not a procedure or itself
an @racket[evt?]. The @racket[poll] callback is called in @tech{atomic
mode} to check whether the event is ready or to allow it to register a
wakeup trigger.
The first argument to @racket[poll] is always the object that is used
as a @tech[#:doc reference.scrbl]{synchronizable event} with the
@tech{poller} as its @racket[prop:evt] value. Let's call that value
@racket[_evt].
The second argument to @racket[poll] is @racket[#f] when @racket[poll]
is called to check whether the event is ready. The result must be two
values. The first result value is a list of results if @racket[_evt]
is ready, or it is @racket[#f] if @racket[_evt] is not ready. The
second result value is @racket[#f] if @racket[_evt] is ready, or it is
an event to replace @racket[_evt] (often just @racket[_evt] itself) if
@racket[_evt] is not ready.
When the thread scheduler has determined that the Racket process
should sleep until an external event or timeout, then @racket[poll] is
called with a non-@racket[#f] second argument, @racket[_wakeups]. In
that case, the result must always be @racket[(values #f _evt)]. In
addition to returning the requires values, @racket[poll] can call
@racket[unsafe-poll-ctx-fd-wakeup],
@racket[unsafe-poll-ctx-eventmask-wakeup], and/or
@racket[unsafe-poll-ctx-milliseconds-wakeup] on @racket[_wakeups] to
register wakeup triggers.}
@defproc[(unsafe-poll-ctx-fd-wakeup [wakeups any/c]
[fd fixnum?]
[mode '(read write error)])
void?]{
Registers a file descriptor (Unix and Mac OS) or socket (all
platforms) to cause the Racket process to wake up and resume polling
if the file descriptor or socket becomes ready for reading, writing,
or error reporting, as selected by @racket[mode]. The @racket[wakeups]
argument must be a non-@racket[#f] value that is passed by the
scheduler to a @racket[unsafe-poller]-wrapped procedure.}
@defproc[(unsafe-poll-ctx-eventmask-wakeup [wakeups any/c]
[mask fixnum?])
void?]{
On Windows, registers an eventmask to cause the Racket process to wake
up and resume polling if an event selected by the mask becomes
available.}
@defproc[(unsafe-poll-ctx-milliseconds-wakeup [wakeups any/c]
[msecs flonum?])
void?]{
Causes the Racket process will wake up and resume polling at the point
when @racket[(current-inexact-milliseconds)] starts returning a value
that is @racket[msecs] or greater.}
Registers a file descriptor (Unix and Mac OS) or socket (all
platforms) to cause the Racket process to wake up if the file
descriptor or socket becomes ready for reading, writing, or error
reporting, as selected by @racket[mode]. The @racket[wakeups] argument
must be a non-@racket[#f] value that is passed by the scheduler to a
@racket[unsafe-poller]-wrapped procedure.}
@defproc[(unsafe-set-sleep-in-thread! [foreground-sleep (-> any/c)]
[fd fixnum?])
void?]{
Registers @racket[foreground-sleep] as a procedure to implement
sleeping for the Racket process when the thread scheduler determines
at the process will sleep. Meanwhile, during a call to
@racket[foreground-sleep], the scheduler's default sleeping function
will run in a separate OS-level thread. When that default sleeping
function wakes up, a byte is written to @racket[fd] as a way of
notifying @racket[foreground-sleep] that it should return
immediately.
This function works on when OS-level threads are available within the
Racket implementation. It always works for Mac OS.}
@defproc[(unsafe-signal-received) void?]{
For use with @racket[unsafe-set-sleep-in-thread!] by
@racket[foreground-sleep] or something that it triggers, causes the
default sleeping function to request @racket[foreground-sleep] to
return.}

View File

@ -5,7 +5,7 @@
@title{Speculatively Atomic Execution}
@defmodule[ffi/unsafe/try-atomic]{The
@racketmodname[ffi/unsafe/try-atomic] supports atomic execution that
@racketmodname[ffi/unsafe/try-atomic] library supports atomic execution that
can be suspended and resumed in non-atomic mode if it takes too long
or if some external event causes the attempt to be abandoned.}

View File

@ -0,0 +1,9 @@
#lang racket/base
(require (only-in '#%unsafe
unsafe-poller
unsafe-poll-ctx-fd-wakeup
unsafe-poll-ctx-eventmask-wakeup
unsafe-poll-ctx-milliseconds-wakeup
unsafe-signal-received
unsafe-set-sleep-in-thread!))
(provide (all-from-out '#%unsafe))

View File

@ -15,6 +15,11 @@
unsafe-start-atomic unsafe-end-atomic
unsafe-start-breakable-atomic unsafe-end-breakable-atomic
unsafe-in-atomic?
unsafe-poller
unsafe-poll-ctx-fd-wakeup
unsafe-poll-ctx-eventmask-wakeup
unsafe-poll-ctx-milliseconds-wakeup
unsafe-signal-received unsafe-set-sleep-in-thread!
unsafe-thread-at-root
unsafe-make-custodian-at-root
unsafe-custodian-register

View File

@ -363,6 +363,8 @@ typedef struct Thread_Local_Variables {
struct Scheme_Hash_Table *loaded_extensions_;
struct Scheme_Hash_Table *fullpath_loaded_extensions_;
Scheme_Sleep_Proc scheme_place_sleep_;
struct Scheme_Object *thread_sleep_callback_;
int thread_sleep_callback_fd_;
struct GHBN_Thread_Data *ghbn_thread_data_;
Scheme_On_Atomic_Timeout_Proc on_atomic_timeout_;
void *on_atomic_timeout_data_;
@ -759,6 +761,8 @@ XFORM_GC_VARIABLE_STACK_THROUGH_THREAD_LOCAL;
#define loaded_extensions XOA (scheme_get_thread_local_variables()->loaded_extensions_)
#define fullpath_loaded_extensions XOA (scheme_get_thread_local_variables()->fullpath_loaded_extensions_)
#define scheme_place_sleep XOA (scheme_get_thread_local_variables()->scheme_place_sleep_)
#define thread_sleep_callback XOA (scheme_get_thread_local_variables()->thread_sleep_callback_)
#define thread_sleep_callback_fd XOA (scheme_get_thread_local_variables()->thread_sleep_callback_fd_)
#define ghbn_thread_data XOA (scheme_get_thread_local_variables()->ghbn_thread_data_)
#define on_atomic_timeout XOA (scheme_get_thread_local_variables()->on_atomic_timeout_)
#define on_atomic_timeout_data XOA (scheme_get_thread_local_variables()->on_atomic_timeout_data_)

View File

@ -1,5 +1,5 @@
{
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,48,46,51,84,0,0,0,0,0,0,0,0,0,
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,49,46,51,84,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,54,0,0,0,1,0,0,8,0,18,
0,22,0,26,0,31,0,38,0,42,0,47,0,59,0,66,0,69,0,82,0,
89,0,94,0,103,0,109,0,123,0,137,0,140,0,146,0,157,0,159,0,173,
@ -102,7 +102,7 @@
EVAL_ONE_SIZED_STR((char *)expr, 2091);
}
{
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,48,46,51,84,0,0,0,0,0,0,0,0,0,
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,49,46,51,84,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,183,0,0,0,1,0,0,8,0,16,
0,29,0,34,0,51,0,63,0,85,0,114,0,158,0,164,0,178,0,193,0,
211,0,223,0,239,0,253,0,19,1,39,1,73,1,90,1,107,1,130,1,145,
@ -1011,7 +1011,7 @@
EVAL_ONE_SIZED_STR((char *)expr, 19016);
}
{
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,48,46,51,84,0,0,0,0,0,0,0,0,0,
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,49,46,51,84,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,15,0,0,0,1,0,0,8,0,23,
0,48,0,65,0,83,0,105,0,128,0,149,0,171,0,181,0,191,0,199,0,
209,0,217,0,0,0,253,1,0,0,3,1,5,105,110,115,112,48,76,35,37,
@ -1042,7 +1042,7 @@
EVAL_ONE_SIZED_STR((char *)expr, 582);
}
{
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,48,46,51,84,0,0,0,0,0,0,0,0,0,
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,49,46,51,84,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,102,0,0,0,1,0,0,8,0,15,
0,26,0,53,0,59,0,73,0,86,0,112,0,129,0,151,0,159,0,171,0,
186,0,202,0,220,0,241,0,253,0,13,1,36,1,60,1,72,1,103,1,108,
@ -1538,7 +1538,7 @@
EVAL_ONE_SIZED_STR((char *)expr, 10344);
}
{
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,48,46,51,84,0,0,0,0,0,0,0,0,0,
SHARED_OK static MZCOMPILED_STRING_FAR unsigned char expr[] = {35,126,8,54,46,49,48,46,49,46,51,84,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,18,0,0,0,1,0,0,8,0,18,
0,22,0,28,0,42,0,56,0,68,0,88,0,102,0,117,0,130,0,135,0,
139,0,151,0,235,0,242,0,20,1,0,0,224,1,0,0,3,1,5,105,110,

View File

@ -6610,100 +6610,18 @@ void scheme_kill_green_thread_timer()
#ifdef OS_X
/* Sleep-in-thread support needed for GUIs Mac OS X.
To merge waiting on a CoreFoundation event with a select(), an embedding
application can attach a single socket to an event callback, and then
create a Mac thread to call the usual sleep and write to the socket when
data is available. */
#ifdef MZ_PRECISE_GC
START_XFORM_SKIP;
#endif
typedef struct {
pthread_mutex_t lock;
pthread_cond_t cond;
int count;
} pt_sema_t;
void pt_sema_init(pt_sema_t *sem)
void scheme_start_sleeper_thread(void (*ignored_sleep)(float seconds, void *fds), float secs, void *fds, int hit_fd)
XFORM_SKIP_PROC
{
pthread_mutex_init(&sem->lock, NULL);
pthread_cond_init(&sem->cond, NULL);
sem->count = 0;
}
void pt_sema_wait(pt_sema_t *sem)
{
pthread_mutex_lock(&sem->lock);
while (sem->count <= 0)
pthread_cond_wait(&sem->cond, &sem->lock);
sem->count--;
pthread_mutex_unlock(&sem->lock);
}
void pt_sema_post(pt_sema_t *sem)
{
pthread_mutex_lock(&sem->lock);
sem->count++;
if (sem->count > 0)
pthread_cond_signal(&sem->cond);
pthread_mutex_unlock(&sem->lock);
}
static pthread_t watcher;
static pt_sema_t sleeping_sema, done_sema;
static float sleep_secs;
static int slept_fd;
static void *sleep_fds;
static void (*sleep_sleep)(float seconds, void *fds);
static void *do_watch(void *other)
{
scheme_init_os_thread_like(other);
while (1) {
pt_sema_wait(&sleeping_sema);
sleep_sleep(sleep_secs, sleep_fds);
write(slept_fd, "y", 1);
pt_sema_post(&done_sema);
}
return NULL;
}
void scheme_start_sleeper_thread(void (*given_sleep)(float seconds, void *fds), float secs, void *fds, int hit_fd)
{
if (!watcher) {
pt_sema_init(&sleeping_sema);
pt_sema_init(&done_sema);
if (pthread_create(&watcher, NULL, do_watch, scheme_get_os_thread_like())) {
scheme_log_abort("pthread_create failed");
abort();
}
}
sleep_sleep = given_sleep;
sleep_fds = fds;
sleep_secs = secs;
slept_fd = hit_fd;
pt_sema_post(&sleeping_sema);
rktio_start_sleep(scheme_rktio, secs, fds, scheme_semaphore_fd_set, hit_fd);
}
void scheme_end_sleeper_thread()
XFORM_SKIP_PROC
{
scheme_signal_received();
pt_sema_wait(&done_sema);
/* Clear external event flag */
rktio_flush_signals_received(scheme_rktio);
rktio_end_sleep(scheme_rktio);
}
#ifdef MZ_PRECISE_GC
END_XFORM_SKIP;
#endif
#else
void scheme_start_sleeper_thread(void (*given_sleep)(float seconds, void *fds), float secs, void *fds, int hit_fd)

View File

@ -15,7 +15,7 @@
#define USE_COMPILED_STARTUP 1
#define EXPECTED_PRIM_COUNT 1159
#define EXPECTED_UNSAFE_COUNT 142
#define EXPECTED_UNSAFE_COUNT 148
#define EXPECTED_FLFXNUM_COUNT 69
#define EXPECTED_EXTFL_COUNT 45
#define EXPECTED_FUTURES_COUNT 15

View File

@ -602,6 +602,8 @@ extern Scheme_Object *scheme_equal_proc;
extern Scheme_Object *scheme_def_exit_proc;
extern Scheme_Object *scheme_unsafe_poller_proc;
THREAD_LOCAL_DECL(extern Scheme_Object *scheme_orig_stdout_port);
THREAD_LOCAL_DECL(extern Scheme_Object *scheme_orig_stdin_port);
THREAD_LOCAL_DECL(extern Scheme_Object *scheme_orig_stderr_port);

View File

@ -13,7 +13,7 @@
consistently.)
*/
#define MZSCHEME_VERSION "6.11.0.1"
#define MZSCHEME_VERSION "6.11.0.2"
#define MZSCHEME_VERSION_X 6
#define MZSCHEME_VERSION_Y 11

View File

@ -51,8 +51,10 @@ READ_ONLY Scheme_Object *scheme_liberal_def_ctx_type;;
READ_ONLY Scheme_Object *scheme_object_name_property;
READ_ONLY Scheme_Object *scheme_struct_to_vector_proc;
READ_ONLY Scheme_Object *scheme_authentic_property;
READ_ONLY Scheme_Object *scheme_unsafe_poller_proc;
READ_ONLY static Scheme_Object *location_struct;
READ_ONLY static Scheme_Object *poller_struct;
READ_ONLY static Scheme_Object *write_property;
READ_ONLY static Scheme_Object *print_attribute_property;
READ_ONLY static Scheme_Object *evt_property;
@ -168,6 +170,7 @@ static void get_struct_type_info(int argc, Scheme_Object *argv[], Scheme_Object
static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo);
static void evt_struct_needs_wakeup(Scheme_Object *o, void *fds);
static int is_evt_struct(Scheme_Object *);
static int wrapped_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo);
@ -221,6 +224,9 @@ static void register_traversers(void);
THREAD_LOCAL_DECL(static Scheme_Bucket_Table *prefab_table);
static Scheme_Object *make_prefab_key(Scheme_Struct_Type *type);
#define SCHEME_POLLERP(v) (SCHEME_STRUCTP(v) && scheme_is_struct_instance(poller_struct, v))
#define SCHEME_POLLER_PROC(o) (((Scheme_Structure *)(o))->slots[0])
#define cons scheme_make_pair
#define icons scheme_make_pair
#define _intern scheme_intern_symbol
@ -337,6 +343,11 @@ scheme_init_struct (Scheme_Env *env)
env);
}
/* Add poller structure: */
REGISTER_SO(poller_struct);
poller_struct = scheme_make_struct_type_from_string("unsafe-poller", NULL, 1, NULL, NULL, 1);
scheme_unsafe_poller_proc = make_struct_proc((Scheme_Struct_Type *)poller_struct, "unsafe-poller", SCHEME_CONSTR, 1);
REGISTER_SO(write_property);
{
Scheme_Object *a[2], *pred, *access;
@ -887,7 +898,7 @@ void scheme_init_struct_wait()
{
scheme_add_evt(scheme_structure_type,
(Scheme_Ready_Fun)evt_struct_is_ready,
NULL,
(Scheme_Needs_Wakeup_Fun)evt_struct_needs_wakeup,
is_evt_struct, 1);
scheme_add_evt(scheme_proc_struct_type,
(Scheme_Ready_Fun)evt_struct_is_ready,
@ -1612,11 +1623,20 @@ static Scheme_Object *check_indirect_property_value_ok(const char *name, Check_V
return v;
}
static int is_evt_or_poller(Scheme_Object *v)
{
if (scheme_is_evt(v))
return 1;
if (SCHEME_POLLERP(v))
return 1;
return 0;
}
static Scheme_Object *check_evt_property_value_ok(int argc, Scheme_Object *argv[])
/* This is the guard for prop:evt */
{
return check_indirect_property_value_ok("guard-for-prop:evt",
scheme_is_evt, 1,
is_evt_or_poller, 1,
"(or/c evt? (any/c . -> . any) exact-nonnegative-integer?)",
argc, argv);
}
@ -1682,9 +1702,68 @@ static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
}
}
if (SCHEME_POLLERP(v)) {
Scheme_Thread *p;
Scheme_Object *a[2];
int done;
scheme_start_in_scheduler();
a[0] = o;
a[1] = scheme_false;
v = _scheme_apply_multi(SCHEME_POLLER_PROC(v), 2, a);
p = scheme_current_thread;
if ((v == SCHEME_MULTIPLE_VALUES)
&& (p->ku.multiple.count == 2)) {
if (SCHEME_FALSEP(p->ku.multiple.array[0])) {
v = p->ku.multiple.array[1];
if (v == o) v = NULL;
done = 0;
} else {
v = p->ku.multiple.array[0];
done = 1;
}
} else {
/* wrong number of results => treat as not ready */
v = NULL;
done = 0;
}
scheme_end_in_scheduler();
if (v) {
if (done && SCHEME_PROCP(v)) {
v = scheme_make_closed_prim_w_arity(return_wrapped, (void *)v, "wrapper", 1, 1);
}
scheme_set_sync_target(sinfo, v, (done ? v : NULL), NULL, 0, 0, NULL);
return 1;
}
}
return 0;
}
void evt_struct_needs_wakeup(Scheme_Object *o, void *fds)
{
Scheme_Object *v;
/* Check for wakeup only if the struct has an immediate `unsafe-poller` */
if (SCHEME_CHAPERONEP(o))
return;
v = scheme_struct_type_property_ref(evt_property, o);
if (SCHEME_POLLERP(v)) {
Scheme_Object *a[2], *e;
scheme_start_in_scheduler();
a[0] = o;
e = scheme_make_cptr(fds, scheme_false);
a[1] = e;
_scheme_apply_multi(SCHEME_POLLER_PROC(v), 2, a);
scheme_end_in_scheduler();
}
}
static int is_evt_struct(Scheme_Object *o)
{
if (scheme_struct_type_property_ref(evt_property, o))

View File

@ -183,6 +183,8 @@ THREAD_LOCAL_DECL(static volatile short delayed_break_ready);
THREAD_LOCAL_DECL(static Scheme_Thread *main_break_target_thread);
THREAD_LOCAL_DECL(Scheme_Sleep_Proc scheme_place_sleep);
THREAD_LOCAL_DECL(Scheme_Object *thread_sleep_callback);
THREAD_LOCAL_DECL(int thread_sleep_callback_fd);
HOOK_SHARED_OK void (*scheme_sleep)(float seconds, void *fds);
HOOK_SHARED_OK void (*scheme_notify_multithread)(int on);
HOOK_SHARED_OK void (*scheme_wakeup_on_input)(void *fds);
@ -401,6 +403,12 @@ static Scheme_Object *unsafe_start_breakable_atomic(int argc, Scheme_Object **ar
static Scheme_Object *unsafe_end_breakable_atomic(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_in_atomic_p(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_poll_ctx_fd_wakeup(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_poll_ctx_eventmask_wakeup(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_poll_ctx_time_wakeup(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_signal_received(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_set_sleep_in_thread(int argc, Scheme_Object **argv);
static void make_initial_config(Scheme_Thread *p);
static int do_kill_thread(Scheme_Thread *p);
@ -645,6 +653,13 @@ scheme_init_unsafe_thread (Scheme_Env *env)
GLOBAL_PRIM_W_ARITY("unsafe-set-on-atomic-timeout!", unsafe_set_on_atomic_timeout, 1, 1, env);
GLOBAL_PRIM_W_ARITY("unsafe-make-security-guard-at-root", unsafe_make_security_guard_at_root, 0, 3, env);
scheme_add_global_constant("unsafe-poller", scheme_unsafe_poller_proc, env);
GLOBAL_PRIM_W_ARITY("unsafe-poll-ctx-fd-wakeup", unsafe_poll_ctx_fd_wakeup, 3, 3, env);
GLOBAL_PRIM_W_ARITY("unsafe-poll-ctx-eventmask-wakeup", unsafe_poll_ctx_eventmask_wakeup, 2, 2, env);
GLOBAL_PRIM_W_ARITY("unsafe-poll-ctx-milliseconds-wakeup", unsafe_poll_ctx_time_wakeup, 2, 2, env);
GLOBAL_PRIM_W_ARITY("unsafe-signal-received", unsafe_signal_received, 0, 0, env);
GLOBAL_PRIM_W_ARITY("unsafe-set-sleep-in-thread!", unsafe_set_sleep_in_thread, 2, 2, env);
}
void scheme_init_thread_places(void) {
@ -5313,6 +5328,88 @@ sch_sleep(int argc, Scheme_Object *args[])
return scheme_void;
}
Scheme_Object *unsafe_poll_ctx_fd_wakeup(int argc, Scheme_Object **argv)
{
if (SCHEME_TRUEP(argv[0])) {
void *fds = SCHEME_CPTR_VAL(argv[0]);
intptr_t fd;
int m;
if (SCHEME_INTP(argv[1]))
fd = SCHEME_INT_VAL(argv[1]);
else
fd = rktio_fd_system_fd(scheme_rktio, (rktio_fd_t *)SCHEME_CPTR_VAL(argv[1]));
if (SAME_OBJ(argv[2], read_symbol))
m = 0;
else if (SAME_OBJ(argv[2], write_symbol))
m = 1;
else
m = 2;
scheme_fdset(scheme_get_fdset(fds, m), fd);
}
return scheme_void;
}
Scheme_Object *unsafe_poll_ctx_eventmask_wakeup(int argc, Scheme_Object **argv)
{
if (SCHEME_TRUEP(argv[0])) {
void *fds = SCHEME_CPTR_VAL(argv[0]);
intptr_t mask = SCHEME_INT_VAL(argv[1]);
scheme_add_fd_eventmask(fds, mask);
}
return scheme_void;
}
Scheme_Object *unsafe_poll_ctx_time_wakeup(int argc, Scheme_Object **argv)
{
if (SCHEME_TRUEP(argv[0])) {
void *fds = SCHEME_CPTR_VAL(argv[0]);
double msecs = SCHEME_DBL_VAL(argv[1]);
scheme_set_wakeup_time(fds, msecs);
}
return scheme_void;
}
Scheme_Object *unsafe_signal_received(int argc, Scheme_Object **argv)
{
scheme_signal_received();
return scheme_void;
}
static void sleep_via_thread(float seconds, void *fds)
{
#ifdef OS_X
scheme_start_sleeper_thread(scheme_sleep, seconds, fds, thread_sleep_callback_fd);
scheme_start_in_scheduler();
_scheme_apply_multi(thread_sleep_callback, 0, NULL);
scheme_end_in_scheduler();
scheme_end_sleeper_thread();
#endif
}
Scheme_Object *unsafe_set_sleep_in_thread(int argc, Scheme_Object **argv)
{
if (!thread_sleep_callback)
REGISTER_SO(thread_sleep_callback);
thread_sleep_callback = argv[0];
if (SCHEME_INTP(argv[1]))
thread_sleep_callback_fd = SCHEME_INT_VAL(argv[1]);
else
thread_sleep_callback_fd = rktio_fd_system_fd(scheme_rktio, (rktio_fd_t *)SCHEME_CPTR_VAL(argv[1]));
scheme_place_sleep = sleep_via_thread;
return scheme_void;
}
static Scheme_Object *break_thread(int argc, Scheme_Object *args[])
{
Scheme_Thread *p;

View File

@ -28,6 +28,7 @@ OBJS = rktio_fs.@LTO@ \
rktio_file.@LTO@ \
rktio_poll_set.@LTO@ \
rktio_ltps.@LTO@ \
rktio_sleep.@LTO@ \
rktio_network.@LTO@ \
rktio_pipe.@LTO@ \
rktio_process.@LTO@ \
@ -76,6 +77,9 @@ rktio_poll_set.@LTO@: $(srcdir)/rktio_poll_set.c $(RKTIO_HEADERS)
rktio_ltps.@LTO@: $(srcdir)/rktio_ltps.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_ltps.@LTO@ -c $(srcdir)/rktio_ltps.c
rktio_sleep.@LTO@: $(srcdir)/rktio_sleep.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_sleep.@LTO@ -c $(srcdir)/rktio_sleep.c
rktio_network.@LTO@: $(srcdir)/rktio_network.c $(RKTIO_HEADERS)
$(CC) $(CFLAGS) -I$(srcdir) -I. -o rktio_network.@LTO@ -c $(srcdir)/rktio_network.c

View File

@ -736,6 +736,20 @@ RKTIO_EXTERN void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds
something registered with `fds` or `lt` is ready, or until there's
some other activity that sometimes causes an early wakeup. */
/*************************************************/
/* Sleeping in a background thread */
RKTIO_EXTERN rktio_ok_t rktio_start_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt,
int woke_fd);
/* Like `rktio_sleep`, but starts a sleep in a background thread. When the
background thread is done sleeping, it writes a byte to `woke_fd`, but the
background thread can be woken up with `rktio_end_sleep`. */
RKTIO_EXTERN void rktio_end_sleep(rktio_t *rktio);
/* Ends a background sleep started with `rktio_sleep`. Call this
function exactly once for each successful `rktio_start_sleep`,
whether or not the background thread write to `woke_fd` already. */
/*************************************************/
/* Files, directories, and links */

View File

@ -38,6 +38,7 @@ rktio_t *rktio_init(void)
void rktio_destroy(rktio_t *rktio)
{
rktio_stop_background(rktio);
rktio_syslog_clean(rktio);
rktio_dll_clean(rktio);
rktio_error_clean(rktio);

View File

@ -26,6 +26,11 @@
# define USE_FAR_RKTIO_FDCALLS
#endif
#if defined(RKTIO_SYSTEM_UNIX) && defined(RKTIO_USE_PTHREADS)
# define SUPPORT_BACKGROUND_SLEEP_THREAD
struct background_sleep_t;
#endif
/*========================================================================*/
/* Globals, as gathered into `rktio_t` */
/*========================================================================*/
@ -107,6 +112,10 @@ struct rktio_t {
#ifdef RKTIO_SYSTEM_UNIX
char *dll_error;
#endif
#ifdef SUPPORT_BACKGROUND_SLEEP_THREAD
struct background_sleep_t *background;
#endif
};
/*========================================================================*/
@ -329,6 +338,8 @@ char **rktio_get_environ_array(void);
void rktio_syslog_init(rktio_t* rktio);
void rktio_syslog_clean(rktio_t* rktio);
void rktio_stop_background(rktio_t *rktio);
#ifdef USE_TRANSITIONAL_64_FILE_OPS
# define BIG_OFF_T_IZE(n) n ## 64
#else

View File

@ -0,0 +1,143 @@
#include "rktio.h"
#include "rktio_private.h"
#ifdef SUPPORT_BACKGROUND_SLEEP_THREAD
# include <string.h>
# include <stdlib.h>
# include <unistd.h>
# include <errno.h>
#endif
/*========================================================================*/
/* Sleeping in a separate OS thread */
/*========================================================================*/
#ifdef SUPPORT_BACKGROUND_SLEEP_THREAD
/* Sleep-in-thread support needed for GUIs Mac OS.
To merge waiting on a CoreFoundation event with a select(), an embedding
application can attach a single socket to an event callback, and then
create a Mac thread to call the usual sleep and write to the socket when
data is available. */
typedef struct {
pthread_mutex_t lock;
pthread_cond_t cond;
int count;
} pt_sema_t;
static void pt_sema_init(pt_sema_t *sem)
{
pthread_mutex_init(&sem->lock, NULL);
pthread_cond_init(&sem->cond, NULL);
sem->count = 0;
}
static void pt_sema_wait(pt_sema_t *sem)
{
pthread_mutex_lock(&sem->lock);
while (sem->count <= 0)
pthread_cond_wait(&sem->cond, &sem->lock);
sem->count--;
pthread_mutex_unlock(&sem->lock);
}
static void pt_sema_post(pt_sema_t *sem)
{
pthread_mutex_lock(&sem->lock);
sem->count++;
if (sem->count > 0)
pthread_cond_signal(&sem->cond);
pthread_mutex_unlock(&sem->lock);
}
typedef struct background_sleep_t {
pthread_t th;
pt_sema_t sleeping_sema, done_sema;
int done; /* => background thread should stop */
float nsecs;
int woke_fd;
rktio_poll_set_t *fds;
rktio_ltps_t *lt;
} background_sleep_t;
static void *do_background_sleep(void *_rktio)
{
rktio_t *rktio = _rktio;
intptr_t len;
while (1) {
pt_sema_wait(&rktio->background->sleeping_sema);
if (rktio->background->done)
break;
rktio_sleep(rktio, rktio->background->nsecs, rktio->background->fds, rktio->background->lt);
do {
len = write(rktio->background->woke_fd, "y", 1);
} while ((len == -1) && (errno == EINTR));
pt_sema_post(&rktio->background->done_sema);
}
return NULL;
}
rktio_ok_t rktio_start_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt, int woke_fd)
{
if (!rktio->background) {
rktio->background = malloc(sizeof(background_sleep_t));
memset(rktio->background, 0, sizeof(background_sleep_t));
pt_sema_init(&rktio->background->sleeping_sema);
pt_sema_init(&rktio->background->done_sema);
if (pthread_create(&rktio->background->th, NULL, do_background_sleep, rktio)) {
get_posix_error();
return 0;
}
}
rktio->background->nsecs = nsecs;
rktio->background->fds = fds;
rktio->background->lt = lt;
rktio->background->woke_fd = woke_fd;
pt_sema_post(&rktio->background->sleeping_sema);
return 1;
}
void rktio_end_sleep(rktio_t *rktio)
{
rktio_signal_received(rktio);
pt_sema_wait(&rktio->background->done_sema);
/* Clear external event flag */
rktio_flush_signals_received(rktio);
}
void rktio_stop_background(rktio_t *rktio)
{
if (rktio->background) {
rktio->background->done = 1;
pt_sema_post(&rktio->background->sleeping_sema);
pthread_join(rktio->background->th, NULL);
free(rktio->background);
}
}
#else
rktio_ok_t rktio_start_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt, int woke_fd)
{
set_racket_error(RKTIO_ERROR_UNSUPPORTED);
return 0;
}
void rktio_end_sleep(rktio_t *rktio)
{
}
void rktio_stop_background(rktio_t *rktio)
{
}
#endif

View File

@ -130,6 +130,10 @@
RelativePath="..\..\rktio\rktio_ltps.c"
>
</File>
<File
RelativePath="..\..\rktio\rktio_sleep.c"
>
</File>
<File
RelativePath="..\..\rktio\rktio_network.c"
>

View File

@ -121,6 +121,7 @@
<ClCompile Include="..\..\rktio\rktio_file.c" />
<ClCompile Include="..\..\rktio\rktio_poll_set.c" />
<ClCompile Include="..\..\rktio\rktio_ltps.c" />
<ClCompile Include="..\..\rktio\rktio_sleep.c" />
<ClCompile Include="..\..\rktio\rktio_network.c" />
<ClCompile Include="..\..\rktio\rktio_pipe.c" />
<ClCompile Include="..\..\rktio\rktio_process.c" />