fix log-reader syncing

svn: r11267
This commit is contained in:
Matthew Flatt 2008-08-15 02:12:45 +00:00
parent e3877dbf0f
commit d01479b5e4
10 changed files with 116 additions and 64 deletions

View File

@ -2880,18 +2880,15 @@ void scheme_log_message(Scheme_Logger *logger, int level, char *buffer, long len
SCHEME_VEC_ELS(msg)[1] = v;
SCHEME_VEC_ELS(msg)[2] = (data ? data : scheme_false);
}
if (!lr->tail
&& scheme_try_channel_put(lr->ch, msg)) {
/* delivered immediately */
} else {
/* enqueue */
q = scheme_make_raw_pair(msg, NULL);
if (lr->tail)
SCHEME_CDR(lr->tail) = q;
else
lr->head = q;
lr->tail = q;
}
/* enqueue */
q = scheme_make_raw_pair(msg, NULL);
if (lr->tail)
SCHEME_CDR(lr->tail) = q;
else
lr->head = q;
lr->tail = q;
scheme_post_sema(lr->sema);
}
}
queue = SCHEME_CDR(queue);
@ -3048,7 +3045,7 @@ make_log_reader(int argc, Scheme_Object *argv[])
{
Scheme_Logger *logger;
Scheme_Log_Reader *lr;
Scheme_Object *ch, *q;
Scheme_Object *sema, *q;
int level;
if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_logger_type))
@ -3061,8 +3058,8 @@ make_log_reader(int argc, Scheme_Object *argv[])
lr->so.type = scheme_log_reader_type;
lr->want_level = level;
ch = scheme_make_channel();
lr->ch = ch;
sema = scheme_make_sema(0);
lr->sema = sema;
/* Pair a weak reference to the reader with a strong reference to the
channel. Channel gets are wrapped to reference the reader. That way,
@ -3070,7 +3067,7 @@ make_log_reader(int argc, Scheme_Object *argv[])
reader. */
q = scheme_make_raw_pair(scheme_make_pair(scheme_make_weak_box((Scheme_Object *)lr),
ch),
sema),
logger->readers);
logger->readers = q;
*logger->timestamp += 1;
@ -3086,12 +3083,7 @@ log_reader_p(int argc, Scheme_Object *argv[])
: scheme_false);
}
static Scheme_Object *id_that_preserves_reader(void *data, int argc, Scheme_Object **argv)
{
return argv[0];
}
static int log_reader_get(Scheme_Object *_lr, Scheme_Schedule_Info *sinfo)
static Scheme_Object *dequeue_log(Scheme_Object *_lr)
{
Scheme_Log_Reader *lr = (Scheme_Log_Reader *)_lr;
@ -3101,18 +3093,20 @@ static int log_reader_get(Scheme_Object *_lr, Scheme_Schedule_Info *sinfo)
lr->head = SCHEME_CDR(lr->head);
if (!lr->head)
lr->tail = NULL;
scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 0);
return 1;
return v;
} else {
Scheme_Object *w;
w = scheme_make_closed_prim(id_that_preserves_reader, lr);
scheme_set_sync_target(sinfo, lr->ch, w, NULL, 0, 0);
return 0;
scheme_signal_error("empty log-reader queue!?");
return NULL;
}
}
static int log_reader_get(Scheme_Object *_lr, Scheme_Schedule_Info *sinfo)
{
Scheme_Log_Reader *lr = (Scheme_Log_Reader *)_lr;
scheme_set_sync_target(sinfo, lr->sema, (Scheme_Object *)lr, NULL, 0, 1, dequeue_log);
return 0;
}
/***********************************************************************/
void

View File

@ -2677,7 +2677,7 @@ static int mark_log_reader_SIZE(void *p) {
static int mark_log_reader_MARK(void *p) {
Scheme_Log_Reader *lr = (Scheme_Log_Reader *)p;
gcMARK(lr->ch);
gcMARK(lr->sema);
gcMARK(lr->head);
gcMARK(lr->tail);
return
@ -2686,7 +2686,7 @@ static int mark_log_reader_MARK(void *p) {
static int mark_log_reader_FIXUP(void *p) {
Scheme_Log_Reader *lr = (Scheme_Log_Reader *)p;
gcFIXUP(lr->ch);
gcFIXUP(lr->sema);
gcFIXUP(lr->head);
gcFIXUP(lr->tail);
return
@ -4235,6 +4235,7 @@ static int mark_syncing_MARK(void *p) {
gcMARK(w->wrapss);
gcMARK(w->nackss);
gcMARK(w->reposts);
gcMARK(w->accepts);
gcMARK(w->disable_break);
return
@ -4248,6 +4249,7 @@ static int mark_syncing_FIXUP(void *p) {
gcFIXUP(w->wrapss);
gcFIXUP(w->nackss);
gcFIXUP(w->reposts);
gcFIXUP(w->accepts);
gcFIXUP(w->disable_break);
return

View File

@ -1063,7 +1063,7 @@ mark_logger {
mark_log_reader {
mark:
Scheme_Log_Reader *lr = (Scheme_Log_Reader *)p;
gcMARK(lr->ch);
gcMARK(lr->sema);
gcMARK(lr->head);
gcMARK(lr->tail);
size:
@ -1724,6 +1724,7 @@ mark_syncing {
gcMARK(w->wrapss);
gcMARK(w->nackss);
gcMARK(w->reposts);
gcMARK(w->accepts);
gcMARK(w->disable_break);
size:

View File

@ -2465,7 +2465,7 @@ static int tcp_check_accept_evt(Scheme_Object *ae, Scheme_Schedule_Info *sinfo)
tcp_accept(1, a);
a[0] = scheme_current_thread->ku.multiple.array[0];
a[1] = scheme_current_thread->ku.multiple.array[1];
scheme_set_sync_target(sinfo, scheme_build_list(2, a), NULL, NULL, 0, 0);
scheme_set_sync_target(sinfo, scheme_build_list(2, a), NULL, NULL, 0, 0, NULL);
return 1;
} else
return 0;
@ -3359,7 +3359,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo)
if (do_udp_recv("udp-receive!-evt", uw->udp,
uw->str, uw->offset, uw->offset + uw->len,
0, v)) {
scheme_set_sync_target(sinfo, scheme_build_list(3, v), NULL, NULL, 0, 0);
scheme_set_sync_target(sinfo, scheme_build_list(3, v), NULL, NULL, 0, 0, NULL);
return 1;
} else
return 0;
@ -3374,7 +3374,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo)
uw->dest_addr, uw->dest_addr_len,
0);
if (SCHEME_TRUEP(r)) {
scheme_set_sync_target(sinfo, scheme_void, NULL, NULL, 0, 0);
scheme_set_sync_target(sinfo, scheme_void, NULL, NULL, 0, 0, NULL);
return 1;
} else
return 0;

View File

@ -2135,7 +2135,7 @@ Scheme_Object *scheme_progress_evt(Scheme_Object *port)
static int progress_evt_ready(Scheme_Object *evt, Scheme_Schedule_Info *sinfo)
{
scheme_set_sync_target(sinfo, SCHEME_PTR2_VAL(evt), evt, NULL, 0, 1);
scheme_set_sync_target(sinfo, SCHEME_PTR2_VAL(evt), evt, NULL, 0, 1, NULL);
return 0;
}
@ -2642,7 +2642,7 @@ static int rw_evt_ready(Scheme_Object *_rww, Scheme_Schedule_Info *sinfo)
v = ws(op, rww->v, 1);
if (v) {
scheme_set_sync_target(sinfo, scheme_true, NULL, NULL, 0, 0);
scheme_set_sync_target(sinfo, scheme_true, NULL, NULL, 0, 0, NULL);
return 1;
} else
return 0;
@ -2655,7 +2655,7 @@ static int rw_evt_ready(Scheme_Object *_rww, Scheme_Schedule_Info *sinfo)
else if (!v && rww->size)
return 0;
else {
scheme_set_sync_target(sinfo, scheme_make_integer(v), NULL, NULL, 0, 0);
scheme_set_sync_target(sinfo, scheme_make_integer(v), NULL, NULL, 0, 0, NULL);
return 1;
}
}

View File

@ -1221,7 +1221,7 @@ static long user_read_result(const char *who, Scheme_Input_Port *port,
when the read/peek (at some offset) succeeds. */
if (nonblock > 0) {
if (sinfo) {
scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1);
scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1, NULL);
return 0;
} else {
/* Poll: */
@ -1655,7 +1655,7 @@ int scheme_user_port_write_probably_ready(Scheme_Output_Port *port, Scheme_Sched
val = uop->evt;
scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1);
scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1, NULL);
return 0;
}

View File

@ -447,9 +447,13 @@ typedef struct {
short is_poll;
} Scheme_Schedule_Info;
typedef Scheme_Object *(*Scheme_Accept_Sync)(Scheme_Object *wrap);
void scheme_set_sync_target(Scheme_Schedule_Info *sinfo, Scheme_Object *target,
Scheme_Object *wrap, Scheme_Object *nack,
int repost, int retry);
int repost, int retry, Scheme_Accept_Sync accept);
struct Syncing;
void scheme_accept_sync(struct Syncing *syncing, int i);
typedef int (*Scheme_Ready_Fun_FPC)(Scheme_Object *o, Scheme_Schedule_Info *sinfo);
@ -1327,6 +1331,7 @@ typedef struct Syncing {
Scheme_Object **wrapss;
Scheme_Object **nackss;
char *reposts;
Scheme_Accept_Sync *accepts;
Scheme_Thread *disable_break; /* when result is set */
} Syncing;
@ -2770,13 +2775,13 @@ struct Scheme_Logger {
int want_level;
long *timestamp, local_timestamp; /* determines when want_level is up-to-date */
int syslog_level, stderr_level;
Scheme_Object *readers; /* list of (cons (make-weak-box <reader>) <channel>) */
Scheme_Object *readers; /* list of (cons (make-weak-box <reader>) <sema>) */
};
typedef struct Scheme_Log_Reader {
Scheme_Object so;
int want_level;
Scheme_Object *ch;
Scheme_Object *sema;
Scheme_Object *head, *tail;
} Scheme_Log_Reader;

View File

@ -301,6 +301,8 @@ void scheme_post_sema(Scheme_Object *o)
consumed = 1;
} else
consumed = 0;
if (w->syncing->accepts && w->syncing->accepts[w->syncing_i])
scheme_accept_sync(w->syncing, w->syncing_i);
} else {
/* In this case, we will remove the syncer from line, but
someone else might grab the post. This is unfair, but it
@ -481,7 +483,7 @@ static void ext_get_into_line(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
get_into_line((Scheme_Sema *)ch, w);
scheme_set_sync_target(sinfo, (Scheme_Object *)w, NULL, NULL, 0, 0);
scheme_set_sync_target(sinfo, (Scheme_Object *)w, NULL, NULL, 0, 0, NULL);
}
void scheme_get_outof_line(Scheme_Channel_Syncer *ch_w)
@ -637,6 +639,8 @@ int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *synci
if (semas[i]->value) {
if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
--semas[i]->value;
if (syncing && syncing->accepts && syncing->accepts[i])
scheme_accept_sync(syncing, i);
break;
}
} else if (semas[i]->so.type == scheme_never_evt_type) {
@ -811,6 +815,8 @@ int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *synci
if (semas[i]->value) {
if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i]))
--semas[i]->value;
if (syncing && syncing->accepts && syncing->accepts[i])
scheme_accept_sync(syncing, i);
break;
}
} else if (semas[i]->so.type == scheme_never_evt_type) {
@ -988,7 +994,7 @@ static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
Scheme_Object *result;
if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, &result)) {
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 0);
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 0, NULL);
return 1;
}
@ -1184,7 +1190,7 @@ static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo)
make_mbox_sema(p);
scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1);
scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1, NULL);
return 0;
}

View File

@ -999,7 +999,7 @@ static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
} else {
v = (Scheme_Object *)scheme_output_port_record(o);
}
scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1);
scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1, NULL);
return 0;
}
@ -1007,7 +1007,7 @@ static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
v = ((Scheme_Structure *)o)->slots[SCHEME_INT_VAL(v)];
if (scheme_is_evt(v)) {
scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1);
scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1, NULL);
return 0;
}
@ -1026,12 +1026,12 @@ static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
if (scheme_is_evt(result)) {
SCHEME_USE_FUEL(1); /* Needed beause an apply of a mzc-generated function
might not check for breaks. */
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1);
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1, NULL);
return 0;
}
/* non-evt => ready and result is self */
scheme_set_sync_target(sinfo, o, o, NULL, 0, 0);
scheme_set_sync_target(sinfo, o, o, NULL, 0, 0, NULL);
return 1;
}
@ -2129,7 +2129,7 @@ static int wrapped_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
wrapper = scheme_box(ww->wrapper);
}
scheme_set_sync_target(sinfo, ww->evt, wrapper, NULL, 0, 1);
scheme_set_sync_target(sinfo, ww->evt, wrapper, NULL, 0, 1, NULL);
return 0;
}
@ -2149,7 +2149,7 @@ static int nack_guard_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo
/* Install the semaphore immediately, so that it's posted on
exceptions (e.g., breaks) even if they happen while trying
to run the maker. */
scheme_set_sync_target(sinfo, o, NULL, sema, 0, 0);
scheme_set_sync_target(sinfo, o, NULL, sema, 0, 0, NULL);
/* Remember both the sema and the current thread's dead evt: */
nack = scheme_alloc_object();
@ -2162,7 +2162,7 @@ static int nack_guard_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo
result = scheme_apply(nw->maker, 1, a);
if (scheme_is_evt(result)) {
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1);
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1, NULL);
return 0;
} else
return 1; /* Non-evt => ready */
@ -2182,7 +2182,7 @@ static int nack_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
}
/* Redirect to the set, and wrap with void: */
scheme_set_sync_target(sinfo, wset, scheme_void, NULL, 0, 1);
scheme_set_sync_target(sinfo, wset, scheme_void, NULL, 0, 1, NULL);
return 0;
}
@ -2201,7 +2201,7 @@ static int poll_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
result = scheme_apply(nw->maker, 1, a);
if (scheme_is_evt(result)) {
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1);
scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1, NULL);
return 0;
} else
return 1; /* Non-evt => ready */

View File

@ -2937,7 +2937,7 @@ static int thread_wait_done(Scheme_Object *p, Scheme_Schedule_Info *sinfo)
the blocking thread can be dequeued: */
Scheme_Object *evt;
evt = scheme_get_thread_dead((Scheme_Thread *)p);
scheme_set_sync_target(sinfo, evt, p, NULL, 0, 0);
scheme_set_sync_target(sinfo, evt, p, NULL, 0, 0, NULL);
return 0;
} else
return 1;
@ -4978,11 +4978,11 @@ static int resume_suspend_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
t = SCHEME_PTR2_VAL(o);
if (t) {
scheme_set_sync_target(sinfo, o, t, NULL, 0, 0);
scheme_set_sync_target(sinfo, o, t, NULL, 0, 0, NULL);
return 1;
}
scheme_set_sync_target(sinfo, SCHEME_PTR1_VAL(o), o, NULL, 0, 1);
scheme_set_sync_target(sinfo, SCHEME_PTR1_VAL(o), o, NULL, 0, 1, NULL);
return 0;
}
@ -5015,7 +5015,7 @@ Scheme_Object *scheme_get_thread_dead(Scheme_Thread *p)
static int dead_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo)
{
scheme_set_sync_target(sinfo, SCHEME_PTR_VAL(o), o, NULL, 0, 1);
scheme_set_sync_target(sinfo, SCHEME_PTR_VAL(o), o, NULL, 0, 1, NULL);
return 0;
}
@ -5163,7 +5163,7 @@ static void *splice_ptr_array(void **a, int al, void **b, int bl, int i)
static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target,
Scheme_Object *wrap, Scheme_Object *nack,
int repost, int retry)
int repost, int retry, Scheme_Accept_Sync accept)
/* Not ready, deferred to target. */
{
Evt_Set *evt_set = syncing->set;
@ -5202,6 +5202,16 @@ static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target,
syncing->reposts[i] = 1;
}
if (accept) {
if (!syncing->accepts) {
Scheme_Accept_Sync *s;
s = (Scheme_Accept_Sync *)scheme_malloc_atomic(sizeof(Scheme_Accept_Sync) * evt_set->argc);
memset(s, 0, evt_set->argc * sizeof(Scheme_Accept_Sync));
syncing->accepts = s;
}
syncing->accepts[i] = accept;
}
if (SCHEME_EVTSETP(target) && retry) {
/* Flatten the set into this one */
Evt_Set *wts = (Evt_Set *)target;
@ -5257,6 +5267,19 @@ static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target,
memcpy(s + i + wts->argc, syncing->reposts + i + 1, evt_set->argc - i - 1);
syncing->reposts = s;
}
if (syncing->accepts) {
Scheme_Accept_Sync *s;
int len;
len = evt_set->argc + wts->argc - 1;
s = (Scheme_Accept_Sync *)scheme_malloc_atomic(len * sizeof(Scheme_Accept_Sync));
memset(s, 0, len * sizeof(Scheme_Accept_Sync));
memcpy(s, syncing->accepts, i * sizeof(Scheme_Accept_Sync));
memcpy(s + i + wts->argc, syncing->accepts + i + 1, (evt_set->argc - i - 1) * sizeof(Scheme_Accept_Sync));
syncing->accepts = s;
}
evt_set->argc += (wts->argc - 1);
@ -5280,10 +5303,10 @@ static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target,
void scheme_set_sync_target(Scheme_Schedule_Info *sinfo, Scheme_Object *target,
Scheme_Object *wrap, Scheme_Object *nack,
int repost, int retry)
int repost, int retry, Scheme_Accept_Sync accept)
{
set_sync_target((Syncing *)sinfo->current_syncing, sinfo->w_i,
target, wrap, nack, repost, retry);
target, wrap, nack, repost, retry, accept);
if (retry) {
/* Rewind one step to try new ones (or continue
if the set was empty). */
@ -5370,6 +5393,8 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo)
syncing->disable_break->suspend_break++;
if (syncing->reposts && syncing->reposts[i])
scheme_post_sema(o);
if (syncing->accepts && syncing->accepts[i])
scheme_accept_sync(syncing, i);
scheme_post_syncing_nacks(syncing);
result = 1;
goto set_sleep_end_and_return;
@ -5387,7 +5412,7 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo)
Scheme_Object *sema;
sema = get_sema(o, &repost);
set_sync_target(syncing, i, sema, o, NULL, repost, 1);
set_sync_target(syncing, i, sema, o, NULL, repost, 1, NULL);
j--; /* try again with this sema */
}
}
@ -5425,6 +5450,25 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo)
return result;
}
void scheme_accept_sync(Syncing *syncing, int i)
{
/* run atomic accept action to revise the wrap */
Scheme_Accept_Sync accept;
Scheme_Object *v, *pr;
accept = syncing->accepts[i];
syncing->accepts[i] = NULL;
pr = syncing->wrapss[i];
v = SCHEME_CAR(pr);
pr = SCHEME_CDR(pr);
v = accept(v);
pr = scheme_make_pair(v, pr);
syncing->wrapss[i] = pr;
}
static void syncing_needs_wakeup(Scheme_Object *s, void *fds)
{
int i;