vyzo's thread mailboxes (3.99.0.19)

svn: r9010
This commit is contained in:
Matthew Flatt 2008-03-18 00:38:36 +00:00
parent a314447264
commit c58820e30d
15 changed files with 373 additions and 101 deletions

View File

@ -55,6 +55,7 @@
"David Van Horn, "
"Anton van Straaten, "
"Dale Vaillancourt, "
"Dimitris Vyzovitis, "
"Stephanie Weirich, "
"Noel Welsh, "
"Adam Wick, "

View File

@ -42,7 +42,11 @@
free-transformer-identifier=?
free-template-identifier=?
free-label-identifier=?
vector-copy!)
vector-copy!
thread-send
thread-receive
thread-try-receive
thread-receive-evt)
(rename syntax->datum syntax-object->datum)
(rename datum->syntax datum->syntax-object)
(rename free-identifier=? module-identifier=?)

View File

@ -6,6 +6,7 @@
@note-lib-only[scheme/async-channel]
@margin-note/ref{See also @secref["threadmbox"].}
@defproc[(async-channel? [v any/c]) boolean?]{

View File

@ -58,9 +58,14 @@
[(_ s) (scheme s)]))
(provide exnraise Exn)
(provide refalso moreref Guide guideintro guidesecref
(provide margin-note/ref
refalso moreref Guide guideintro guidesecref
HonuManual)
(define (margin-note/ref . s)
(apply margin-note
(decode-content (cons magnify s))))
(define (refalso tag . s)
(apply margin-note
(decode-content (append (list magnify (secref tag) " also provides information on ")

View File

@ -222,3 +222,34 @@ suspended and then resumes after a call to
@scheme[thread-suspend-evt], the result event remains ready; after
each resume of @scheme[thd] created a fresh event to be returned by
@scheme[thread-suspend-evt].}
@;------------------------------------------------------------------------
@section[#:tag "threadmbox"]{Thread Mailboxes}
Each thread has a @defterm{mailbox} through which it can receive
arbitrary message. In other words, each thread has a built-in
asynchronous channel.
@margin-note/ref{See also @secref["async-channel"].}
@defproc[(thread-send [thd thread?] [v any/c]) void?]{
Queues @scheme[v] as a message to @scheme[thd]. This function never
blocks.}
@defproc[(thread-receive) any/c]{
Receives and dequeues a message queued for the current thread, if
any. If no message is available, @scheme[thread-receive] blocks until
one is available.}
@defproc[(thread-try-receive) any/c]{
Receives and dequeues a message queued for the current thread, if any,
or returns @scheme[#f] immediately if no message is available.}
@defproc[(thread-receive-evt) evt?]{
Returns a constant @tech{synchronizable event} (see @secref["sync"])
that becomes ready when the synchronizing thread has a message to
receive. The event result is itself.}

View File

@ -687,6 +687,44 @@
(test #f sync/timeout 0 (thread-suspend-evt t))
(test d thread-dead-evt t)))))
;; ----------------------------------------
;; thread mbox
(test #f thread-try-receive)
(test #f sync/timeout 0 (thread-receive-evt))
(test (void) thread-send (current-thread) 10)
(let ([t (thread-receive-evt)])
(test t sync/timeout 10 t))
(test 10 thread-try-receive)
(test #f thread-try-receive)
(let ([t (current-thread)])
(thread (lambda ()
(sync (system-idle-evt))
(thread-send t 35))))
(test 35 thread-receive)
(let* ([s #f]
[t1 (let ([t (current-thread)])
(thread (lambda ()
(set! s (thread-receive)))))])
(sync (system-idle-evt))
(thread-suspend t1)
(thread-send t1 'apple)
(sync (system-idle-evt))
(test #f values s)
(thread-resume t1)
(sync (system-idle-evt))
(test 'apple values s))
(let* ([s 0]
[t (thread (lambda ()
(set! s (list (thread-receive)
(thread-receive)
(thread-receive)))))])
(thread-send t 0)
(thread-send t 1)
(thread-send t 2)
(sync (system-idle-evt))
(test '(0 1 2) values s))
;; ----------------------------------------
;; Garbage collection

View File

@ -1066,6 +1066,10 @@ typedef struct Scheme_Thread {
Scheme_Object *name;
Scheme_Object *mbox_first;
Scheme_Object *mbox_last;
Scheme_Object *mbox_sema;
#ifdef MZ_PRECISE_GC
int gc_owner_set;
#endif

View File

@ -1678,6 +1678,9 @@ static int thread_val_MARK(void *p) {
gcMARK(pr->dead_box);
gcMARK(pr->running_box);
gcMARK(pr->mbox_first);
gcMARK(pr->mbox_last);
gcMARK(pr->mbox_sema);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
}
@ -1775,6 +1778,9 @@ static int thread_val_FIXUP(void *p) {
gcFIXUP(pr->dead_box);
gcFIXUP(pr->running_box);
gcFIXUP(pr->mbox_first);
gcFIXUP(pr->mbox_last);
gcFIXUP(pr->mbox_sema);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
}

View File

@ -692,6 +692,9 @@ thread_val {
gcMARK(pr->dead_box);
gcMARK(pr->running_box);
gcMARK(pr->mbox_first);
gcMARK(pr->mbox_last);
gcMARK(pr->mbox_sema);
size:
gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
}

View File

@ -11,9 +11,9 @@
EXPECTED_PRIM_COUNT to the new value, and then USE_COMPILED_STARTUP
can be set to 1 again. */
#define USE_COMPILED_STARTUP 1
#define USE_COMPILED_STARTUP 0
#define EXPECTED_PRIM_COUNT 899
#define EXPECTED_PRIM_COUNT 903
#ifdef MZSCHEME_SOMETHING_OMITTED
# undef USE_COMPILED_STARTUP

View File

@ -13,12 +13,12 @@
consistently.)
*/
#define MZSCHEME_VERSION "3.99.0.18"
#define MZSCHEME_VERSION "3.99.0.19"
#define MZSCHEME_VERSION_X 3
#define MZSCHEME_VERSION_Y 99
#define MZSCHEME_VERSION_Z 0
#define MZSCHEME_VERSION_W 18
#define MZSCHEME_VERSION_W 19
#define MZSCHEME_VERSION_MAJOR ((MZSCHEME_VERSION_X * 100) + MZSCHEME_VERSION_Y)
#define MZSCHEME_VERSION_MINOR ((MZSCHEME_VERSION_Z * 1000) + MZSCHEME_VERSION_W)

View File

@ -38,6 +38,11 @@ static Scheme_Object *make_channel(int n, Scheme_Object **p);
static Scheme_Object *make_channel_put(int n, Scheme_Object **p);
static Scheme_Object *channel_p(int n, Scheme_Object **p);
static Scheme_Object *thread_send(int n, Scheme_Object **p);
static Scheme_Object *thread_receive(int n, Scheme_Object **p);
static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
static Scheme_Object *make_alarm(int n, Scheme_Object **p);
static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
@ -47,12 +52,14 @@ static int channel_syncer_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
static int alarm_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
static int always_ready(Scheme_Object *w);
static int never_ready(Scheme_Object *w);
static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo);
static int pending_break(Scheme_Thread *p);
int scheme_main_was_once_suspended;
static Scheme_Object *system_idle_put_evt;
static Scheme_Object *thread_recv_evt;
#ifdef MZ_PRECISE_GC
static void register_traversers(void);
@ -136,6 +143,28 @@ void scheme_init_sema(Scheme_Env *env)
1, 1, 1),
env);
scheme_add_global_constant("thread-send",
scheme_make_prim_w_arity(thread_send,
"thread-send",
2, 2),
env);
scheme_add_global_constant("thread-receive",
scheme_make_prim_w_arity(thread_receive,
"thread-receive",
0, 0),
env);
scheme_add_global_constant("thread-try-receive",
scheme_make_prim_w_arity(thread_try_receive,
"thread-try-receive",
0, 0),
env);
scheme_add_global_constant("thread-receive-evt",
scheme_make_prim_w_arity(thread_receive_evt,
"thread-receive-evt",
0, 0),
env);
scheme_add_global_constant("alarm-evt",
scheme_make_prim_w_arity(make_alarm,
"alarm-evt",
@ -157,6 +186,11 @@ void scheme_init_sema(Scheme_Env *env)
o->type = scheme_never_evt_type;
scheme_add_global_constant("never-evt", o, env);
REGISTER_SO(thread_recv_evt);
o = scheme_alloc_small_object();
o->type = scheme_thread_recv_evt_type;
thread_recv_evt = o;
REGISTER_SO(scheme_system_idle_channel);
scheme_system_idle_channel = scheme_make_channel();
@ -168,6 +202,7 @@ void scheme_init_sema(Scheme_Env *env)
scheme_add_evt(scheme_alarm_type, (Scheme_Ready_Fun)alarm_ready, NULL, NULL, 0);
scheme_add_evt(scheme_always_evt_type, always_ready, NULL, NULL, 0);
scheme_add_evt(scheme_never_evt_type, never_ready, NULL, NULL, 0);
scheme_add_evt(scheme_thread_recv_evt_type, (Scheme_Ready_Fun)thread_recv_ready, NULL, NULL, 0);
}
Scheme_Object *scheme_make_sema(long v)
@ -842,7 +877,7 @@ static Scheme_Object *block_sema(int n, Scheme_Object **p)
scheme_wait_sema(p[0], 0);
/* In case a break appeared after wwe received the post,
/* In case a break appeared after we received the post,
check for a break, because scheme_wait_sema() won't: */
scheme_check_break_now();
@ -975,6 +1010,134 @@ int scheme_try_channel_get(Scheme_Object *ch)
return 0;
}
/**********************************************************************/
/* Thread mbox */
/**********************************************************************/
static void make_mbox_sema(Scheme_Thread *p)
{
if (!p->mbox_sema) {
Scheme_Object *sema = NULL;
sema = scheme_make_sema(0);
p->mbox_sema = sema;
}
}
static void mbox_push(Scheme_Thread *p, Scheme_Object *o)
{
Scheme_Object *next;
next = scheme_make_raw_pair(o, NULL);
if (p->mbox_first) {
SCHEME_CDR(p->mbox_last) = next;
p->mbox_last = next;
} else {
p->mbox_first = next;
p->mbox_last = next;
}
make_mbox_sema(p);
scheme_post_sema(p->mbox_sema);
/* Post can't overflow the semaphore, because we'd run out of
memory for the queue, first. */
}
static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec)
{
/* Assertion: mbox_first != NULL */
Scheme_Object *r = NULL;
r = SCHEME_CAR(p->mbox_first);
p->mbox_first = SCHEME_CDR(p->mbox_first);
if (!p->mbox_first)
p->mbox_last = NULL;
if (dec)
scheme_try_plain_sema(p->mbox_sema);
return r;
}
static Scheme_Object *thread_send(int argc, Scheme_Object **argv)
{
if (SCHEME_THREADP(argv[0])) {
int running;
if (argc > 2) {
scheme_check_proc_arity("thread-send", 0, 2, argc, argv);
}
running = ((Scheme_Thread*)argv[0])->running;
if (MZTHREAD_STILL_RUNNING(running)) {
mbox_push((Scheme_Thread*)argv[0], argv[1]);
return scheme_void;
} else {
if (argc > 2) {
return _scheme_tail_apply(argv[2], 0, NULL);
} else
scheme_raise_exn(MZEXN_FAIL, "thread-send: thread is not running");
}
} else
scheme_wrong_type("thread-send", "thread", 0, argc, argv);
return NULL;
}
static Scheme_Object *thread_receive(int argc, Scheme_Object **argv)
{
/* The mbox semaphore can only be downed by the current thread, so
receive/try-receive can directly dec+pop without syncing
(by calling mbox_pop with dec=1). */
if (scheme_current_thread->mbox_first) {
return mbox_pop(scheme_current_thread, 1);
} else {
Scheme_Object *v;
Scheme_Thread *p = scheme_current_thread;
make_mbox_sema(p);
scheme_wait_sema(p->mbox_sema, 0);
/* We're relying on atomicity of return after wait succeeds to ensure
that a semaphore wait guarantees a mailbox dequeue. */
v = mbox_pop(p, 0);
/* Due to that atomicity, though, we're obliged to check for
a break at this point: */
scheme_check_break_now();
return v;
}
}
static Scheme_Object *thread_try_receive(int argc, Scheme_Object **argv)
{
if (scheme_current_thread->mbox_first)
return mbox_pop(scheme_current_thread, 1);
else
return scheme_false;
}
static Scheme_Object *thread_receive_evt(int argc, Scheme_Object **argv)
{
return thread_recv_evt;
}
static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
{
Scheme_Thread *p;
p = sinfo->false_positive_ok;
if (!p)
p = scheme_current_thread;
make_mbox_sema(p);
scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1);
return 0;
}
/**********************************************************************/
/* alarms */
/**********************************************************************/

View File

@ -140,103 +140,104 @@ enum {
scheme_thread_set_type, /* 122 */
scheme_string_converter_type, /* 123 */
scheme_alarm_type, /* 124 */
scheme_thread_cell_type, /* 125 */
scheme_channel_syncer_type, /* 126 */
scheme_special_comment_type, /* 127 */
scheme_write_evt_type, /* 128 */
scheme_always_evt_type, /* 129 */
scheme_never_evt_type, /* 130 */
scheme_progress_evt_type, /* 131 */
scheme_certifications_type, /* 132 */
scheme_already_comp_type, /* 133 */
scheme_readtable_type, /* 134 */
scheme_intdef_context_type, /* 135 */
scheme_lexical_rib_type, /* 136 */
scheme_thread_cell_values_type, /* 137 */
scheme_global_ref_type, /* 138 */
scheme_cont_mark_chain_type, /* 139 */
scheme_raw_pair_type, /* 140 */
scheme_prompt_type, /* 141 */
scheme_prompt_tag_type, /* 142 */
scheme_expanded_syntax_type, /* 143 */
scheme_delay_syntax_type, /* 144 */
scheme_cust_box_type, /* 145 */
scheme_resolved_module_path_type, /* 146 */
scheme_module_phase_exports_type, /* 147 */
scheme_thread_recv_evt_type, /* 125 */
scheme_thread_cell_type, /* 126 */
scheme_channel_syncer_type, /* 127 */
scheme_special_comment_type, /* 128 */
scheme_write_evt_type, /* 129 */
scheme_always_evt_type, /* 130 */
scheme_never_evt_type, /* 131 */
scheme_progress_evt_type, /* 132 */
scheme_certifications_type, /* 133 */
scheme_already_comp_type, /* 134 */
scheme_readtable_type, /* 135 */
scheme_intdef_context_type, /* 136 */
scheme_lexical_rib_type, /* 137 */
scheme_thread_cell_values_type, /* 138 */
scheme_global_ref_type, /* 139 */
scheme_cont_mark_chain_type, /* 140 */
scheme_raw_pair_type, /* 141 */
scheme_prompt_type, /* 142 */
scheme_prompt_tag_type, /* 143 */
scheme_expanded_syntax_type, /* 144 */
scheme_delay_syntax_type, /* 145 */
scheme_cust_box_type, /* 146 */
scheme_resolved_module_path_type, /* 147 */
scheme_module_phase_exports_type, /* 148 */
#ifdef MZTAG_REQUIRED
_scheme_last_normal_type_, /* 148 */
_scheme_last_normal_type_, /* 149 */
scheme_rt_weak_array, /* 149 */
scheme_rt_weak_array, /* 150 */
scheme_rt_comp_env, /* 150 */
scheme_rt_constant_binding, /* 151 */
scheme_rt_resolve_info, /* 152 */
scheme_rt_optimize_info, /* 153 */
scheme_rt_compile_info, /* 154 */
scheme_rt_cont_mark, /* 155 */
scheme_rt_saved_stack, /* 156 */
scheme_rt_reply_item, /* 157 */
scheme_rt_closure_info, /* 158 */
scheme_rt_overflow, /* 159 */
scheme_rt_overflow_jmp, /* 160 */
scheme_rt_meta_cont, /* 161 */
scheme_rt_dyn_wind_cell, /* 162 */
scheme_rt_dyn_wind_info, /* 163 */
scheme_rt_dyn_wind, /* 164 */
scheme_rt_dup_check, /* 165 */
scheme_rt_thread_memory, /* 166 */
scheme_rt_input_file, /* 167 */
scheme_rt_input_fd, /* 168 */
scheme_rt_oskit_console_input, /* 169 */
scheme_rt_tested_input_file, /* 170 */
scheme_rt_tested_output_file, /* 171 */
scheme_rt_indexed_string, /* 172 */
scheme_rt_output_file, /* 173 */
scheme_rt_load_handler_data, /* 174 */
scheme_rt_pipe, /* 175 */
scheme_rt_beos_process, /* 176 */
scheme_rt_system_child, /* 177 */
scheme_rt_tcp, /* 178 */
scheme_rt_write_data, /* 179 */
scheme_rt_tcp_select_info, /* 180 */
scheme_rt_namespace_option, /* 181 */
scheme_rt_param_data, /* 182 */
scheme_rt_will, /* 183 */
scheme_rt_will_registration, /* 184 */
scheme_rt_struct_proc_info, /* 185 */
scheme_rt_linker_name, /* 186 */
scheme_rt_param_map, /* 187 */
scheme_rt_finalization, /* 188 */
scheme_rt_finalizations, /* 189 */
scheme_rt_cpp_object, /* 190 */
scheme_rt_cpp_array_object, /* 191 */
scheme_rt_stack_object, /* 192 */
scheme_rt_preallocated_object, /* 193 */
scheme_thread_hop_type, /* 194 */
scheme_rt_srcloc, /* 195 */
scheme_rt_evt, /* 196 */
scheme_rt_syncing, /* 197 */
scheme_rt_comp_prefix, /* 198 */
scheme_rt_user_input, /* 199 */
scheme_rt_user_output, /* 200 */
scheme_rt_compact_port, /* 201 */
scheme_rt_read_special_dw, /* 202 */
scheme_rt_regwork, /* 203 */
scheme_rt_buf_holder, /* 204 */
scheme_rt_parameterization, /* 205 */
scheme_rt_print_params, /* 206 */
scheme_rt_read_params, /* 207 */
scheme_rt_native_code, /* 208 */
scheme_rt_native_code_plus_case, /* 209 */
scheme_rt_jitter_data, /* 210 */
scheme_rt_module_exports, /* 211 */
scheme_rt_delay_load_info, /* 212 */
scheme_rt_marshal_info, /* 213 */
scheme_rt_unmarshal_info, /* 214 */
scheme_rt_runstack, /* 215 */
scheme_rt_sfs_info, /* 216 */
scheme_rt_validate_clearing, /* 217 */
scheme_rt_comp_env, /* 151 */
scheme_rt_constant_binding, /* 152 */
scheme_rt_resolve_info, /* 153 */
scheme_rt_optimize_info, /* 154 */
scheme_rt_compile_info, /* 155 */
scheme_rt_cont_mark, /* 156 */
scheme_rt_saved_stack, /* 157 */
scheme_rt_reply_item, /* 158 */
scheme_rt_closure_info, /* 159 */
scheme_rt_overflow, /* 160 */
scheme_rt_overflow_jmp, /* 161 */
scheme_rt_meta_cont, /* 162 */
scheme_rt_dyn_wind_cell, /* 163 */
scheme_rt_dyn_wind_info, /* 164 */
scheme_rt_dyn_wind, /* 165 */
scheme_rt_dup_check, /* 166 */
scheme_rt_thread_memory, /* 167 */
scheme_rt_input_file, /* 168 */
scheme_rt_input_fd, /* 169 */
scheme_rt_oskit_console_input, /* 170 */
scheme_rt_tested_input_file, /* 171 */
scheme_rt_tested_output_file, /* 172 */
scheme_rt_indexed_string, /* 173 */
scheme_rt_output_file, /* 174 */
scheme_rt_load_handler_data, /* 175 */
scheme_rt_pipe, /* 176 */
scheme_rt_beos_process, /* 177 */
scheme_rt_system_child, /* 178 */
scheme_rt_tcp, /* 179 */
scheme_rt_write_data, /* 180 */
scheme_rt_tcp_select_info, /* 181 */
scheme_rt_namespace_option, /* 182 */
scheme_rt_param_data, /* 183 */
scheme_rt_will, /* 184 */
scheme_rt_will_registration, /* 185 */
scheme_rt_struct_proc_info, /* 186 */
scheme_rt_linker_name, /* 187 */
scheme_rt_param_map, /* 188 */
scheme_rt_finalization, /* 189 */
scheme_rt_finalizations, /* 190 */
scheme_rt_cpp_object, /* 191 */
scheme_rt_cpp_array_object, /* 192 */
scheme_rt_stack_object, /* 193 */
scheme_rt_preallocated_object, /* 194 */
scheme_thread_hop_type, /* 195 */
scheme_rt_srcloc, /* 196 */
scheme_rt_evt, /* 197 */
scheme_rt_syncing, /* 198 */
scheme_rt_comp_prefix, /* 199 */
scheme_rt_user_input, /* 200 */
scheme_rt_user_output, /* 201 */
scheme_rt_compact_port, /* 202 */
scheme_rt_read_special_dw, /* 203 */
scheme_rt_regwork, /* 204 */
scheme_rt_buf_holder, /* 205 */
scheme_rt_parameterization, /* 206 */
scheme_rt_print_params, /* 207 */
scheme_rt_read_params, /* 208 */
scheme_rt_native_code, /* 209 */
scheme_rt_native_code_plus_case, /* 210 */
scheme_rt_jitter_data, /* 211 */
scheme_rt_module_exports, /* 212 */
scheme_rt_delay_load_info, /* 213 */
scheme_rt_marshal_info, /* 214 */
scheme_rt_unmarshal_info, /* 215 */
scheme_rt_runstack, /* 216 */
scheme_rt_sfs_info, /* 217 */
scheme_rt_validate_clearing, /* 218 */
#endif
_scheme_last_type_

View File

@ -2176,7 +2176,7 @@ static Scheme_Thread *make_thread(Scheme_Config *config,
process->list_stack = NULL;
scheme_gmp_tls_init(process->gmp_tls);
if (prefix) {
process->next = scheme_first_thread;
process->prev = NULL;
@ -2244,6 +2244,15 @@ static Scheme_Thread *make_thread(Scheme_Config *config,
process->nester = process->nestee = NULL;
process->mbox_first = NULL;
process->mbox_last = NULL;
process->mbox_sema = NULL;
process->mref = NULL;
process->extra_mrefs = NULL;
/* A thread points to a lot of stuff, so it's bad to put a finalization
on it, which is what registering with a custodian does. Instead, we
register a weak indirection with the custodian. That way, the thread
@ -2586,6 +2595,10 @@ static void thread_is_dead(Scheme_Thread *r)
r->error_buf = NULL;
r->spare_runstack = NULL;
r->mbox_first = NULL;
r->mbox_last = NULL;
r->mbox_sema = NULL;
}
static void remove_thread(Scheme_Thread *r)

View File

@ -236,6 +236,7 @@ scheme_init_type (Scheme_Env *env)
set_name(scheme_write_evt_type, "<write-evt>");
set_name(scheme_always_evt_type, "<always-evt>");
set_name(scheme_never_evt_type, "<never-evt>");
set_name(scheme_thread_recv_evt_type, "<thread-receive-evt>");
set_name(scheme_thread_resume_type, "<thread-resume-evt>");
set_name(scheme_thread_suspend_type, "<thread-suspend-evt>");
@ -585,6 +586,7 @@ void scheme_register_traversers(void)
GC_REG_TRAV(scheme_nack_evt_type, twoptr_obj);
GC_REG_TRAV(scheme_always_evt_type, char_obj);
GC_REG_TRAV(scheme_never_evt_type, char_obj);
GC_REG_TRAV(scheme_thread_recv_evt_type, char_obj);
GC_REG_TRAV(scheme_inspector_type, mark_inspector);