diff --git a/src/mzscheme/src/error.c b/src/mzscheme/src/error.c index ecc556cf00..eae21a0e4b 100644 --- a/src/mzscheme/src/error.c +++ b/src/mzscheme/src/error.c @@ -2880,18 +2880,15 @@ void scheme_log_message(Scheme_Logger *logger, int level, char *buffer, long len SCHEME_VEC_ELS(msg)[1] = v; SCHEME_VEC_ELS(msg)[2] = (data ? data : scheme_false); } - if (!lr->tail - && scheme_try_channel_put(lr->ch, msg)) { - /* delivered immediately */ - } else { - /* enqueue */ - q = scheme_make_raw_pair(msg, NULL); - if (lr->tail) - SCHEME_CDR(lr->tail) = q; - else - lr->head = q; - lr->tail = q; - } + + /* enqueue */ + q = scheme_make_raw_pair(msg, NULL); + if (lr->tail) + SCHEME_CDR(lr->tail) = q; + else + lr->head = q; + lr->tail = q; + scheme_post_sema(lr->sema); } } queue = SCHEME_CDR(queue); @@ -3048,7 +3045,7 @@ make_log_reader(int argc, Scheme_Object *argv[]) { Scheme_Logger *logger; Scheme_Log_Reader *lr; - Scheme_Object *ch, *q; + Scheme_Object *sema, *q; int level; if (!SAME_TYPE(SCHEME_TYPE(argv[0]), scheme_logger_type)) @@ -3061,8 +3058,8 @@ make_log_reader(int argc, Scheme_Object *argv[]) lr->so.type = scheme_log_reader_type; lr->want_level = level; - ch = scheme_make_channel(); - lr->ch = ch; + sema = scheme_make_sema(0); + lr->sema = sema; /* Pair a weak reference to the reader with a strong reference to the channel. Channel gets are wrapped to reference the reader. That way, @@ -3070,7 +3067,7 @@ make_log_reader(int argc, Scheme_Object *argv[]) reader. */ q = scheme_make_raw_pair(scheme_make_pair(scheme_make_weak_box((Scheme_Object *)lr), - ch), + sema), logger->readers); logger->readers = q; *logger->timestamp += 1; @@ -3086,12 +3083,7 @@ log_reader_p(int argc, Scheme_Object *argv[]) : scheme_false); } -static Scheme_Object *id_that_preserves_reader(void *data, int argc, Scheme_Object **argv) -{ - return argv[0]; -} - -static int log_reader_get(Scheme_Object *_lr, Scheme_Schedule_Info *sinfo) +static Scheme_Object *dequeue_log(Scheme_Object *_lr) { Scheme_Log_Reader *lr = (Scheme_Log_Reader *)_lr; @@ -3101,18 +3093,20 @@ static int log_reader_get(Scheme_Object *_lr, Scheme_Schedule_Info *sinfo) lr->head = SCHEME_CDR(lr->head); if (!lr->head) lr->tail = NULL; - scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 0); - return 1; + return v; } else { - Scheme_Object *w; - - w = scheme_make_closed_prim(id_that_preserves_reader, lr); - - scheme_set_sync_target(sinfo, lr->ch, w, NULL, 0, 0); - return 0; + scheme_signal_error("empty log-reader queue!?"); + return NULL; } } +static int log_reader_get(Scheme_Object *_lr, Scheme_Schedule_Info *sinfo) +{ + Scheme_Log_Reader *lr = (Scheme_Log_Reader *)_lr; + scheme_set_sync_target(sinfo, lr->sema, (Scheme_Object *)lr, NULL, 0, 1, dequeue_log); + return 0; +} + /***********************************************************************/ void diff --git a/src/mzscheme/src/mzmark.c b/src/mzscheme/src/mzmark.c index dbd985a514..517bf9029c 100644 --- a/src/mzscheme/src/mzmark.c +++ b/src/mzscheme/src/mzmark.c @@ -2677,7 +2677,7 @@ static int mark_log_reader_SIZE(void *p) { static int mark_log_reader_MARK(void *p) { Scheme_Log_Reader *lr = (Scheme_Log_Reader *)p; - gcMARK(lr->ch); + gcMARK(lr->sema); gcMARK(lr->head); gcMARK(lr->tail); return @@ -2686,7 +2686,7 @@ static int mark_log_reader_MARK(void *p) { static int mark_log_reader_FIXUP(void *p) { Scheme_Log_Reader *lr = (Scheme_Log_Reader *)p; - gcFIXUP(lr->ch); + gcFIXUP(lr->sema); gcFIXUP(lr->head); gcFIXUP(lr->tail); return @@ -4235,6 +4235,7 @@ static int mark_syncing_MARK(void *p) { gcMARK(w->wrapss); gcMARK(w->nackss); gcMARK(w->reposts); + gcMARK(w->accepts); gcMARK(w->disable_break); return @@ -4248,6 +4249,7 @@ static int mark_syncing_FIXUP(void *p) { gcFIXUP(w->wrapss); gcFIXUP(w->nackss); gcFIXUP(w->reposts); + gcFIXUP(w->accepts); gcFIXUP(w->disable_break); return diff --git a/src/mzscheme/src/mzmarksrc.c b/src/mzscheme/src/mzmarksrc.c index 9bc1bff78c..f3611ec6db 100644 --- a/src/mzscheme/src/mzmarksrc.c +++ b/src/mzscheme/src/mzmarksrc.c @@ -1063,7 +1063,7 @@ mark_logger { mark_log_reader { mark: Scheme_Log_Reader *lr = (Scheme_Log_Reader *)p; - gcMARK(lr->ch); + gcMARK(lr->sema); gcMARK(lr->head); gcMARK(lr->tail); size: @@ -1724,6 +1724,7 @@ mark_syncing { gcMARK(w->wrapss); gcMARK(w->nackss); gcMARK(w->reposts); + gcMARK(w->accepts); gcMARK(w->disable_break); size: diff --git a/src/mzscheme/src/network.c b/src/mzscheme/src/network.c index 6f8b895547..55f386009e 100644 --- a/src/mzscheme/src/network.c +++ b/src/mzscheme/src/network.c @@ -2465,7 +2465,7 @@ static int tcp_check_accept_evt(Scheme_Object *ae, Scheme_Schedule_Info *sinfo) tcp_accept(1, a); a[0] = scheme_current_thread->ku.multiple.array[0]; a[1] = scheme_current_thread->ku.multiple.array[1]; - scheme_set_sync_target(sinfo, scheme_build_list(2, a), NULL, NULL, 0, 0); + scheme_set_sync_target(sinfo, scheme_build_list(2, a), NULL, NULL, 0, 0, NULL); return 1; } else return 0; @@ -3359,7 +3359,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo) if (do_udp_recv("udp-receive!-evt", uw->udp, uw->str, uw->offset, uw->offset + uw->len, 0, v)) { - scheme_set_sync_target(sinfo, scheme_build_list(3, v), NULL, NULL, 0, 0); + scheme_set_sync_target(sinfo, scheme_build_list(3, v), NULL, NULL, 0, 0, NULL); return 1; } else return 0; @@ -3374,7 +3374,7 @@ static int udp_evt_check_ready(Scheme_Object *_uw, Scheme_Schedule_Info *sinfo) uw->dest_addr, uw->dest_addr_len, 0); if (SCHEME_TRUEP(r)) { - scheme_set_sync_target(sinfo, scheme_void, NULL, NULL, 0, 0); + scheme_set_sync_target(sinfo, scheme_void, NULL, NULL, 0, 0, NULL); return 1; } else return 0; diff --git a/src/mzscheme/src/port.c b/src/mzscheme/src/port.c index d1dbc944f2..c2cb4f14b0 100644 --- a/src/mzscheme/src/port.c +++ b/src/mzscheme/src/port.c @@ -2135,7 +2135,7 @@ Scheme_Object *scheme_progress_evt(Scheme_Object *port) static int progress_evt_ready(Scheme_Object *evt, Scheme_Schedule_Info *sinfo) { - scheme_set_sync_target(sinfo, SCHEME_PTR2_VAL(evt), evt, NULL, 0, 1); + scheme_set_sync_target(sinfo, SCHEME_PTR2_VAL(evt), evt, NULL, 0, 1, NULL); return 0; } @@ -2642,7 +2642,7 @@ static int rw_evt_ready(Scheme_Object *_rww, Scheme_Schedule_Info *sinfo) v = ws(op, rww->v, 1); if (v) { - scheme_set_sync_target(sinfo, scheme_true, NULL, NULL, 0, 0); + scheme_set_sync_target(sinfo, scheme_true, NULL, NULL, 0, 0, NULL); return 1; } else return 0; @@ -2655,7 +2655,7 @@ static int rw_evt_ready(Scheme_Object *_rww, Scheme_Schedule_Info *sinfo) else if (!v && rww->size) return 0; else { - scheme_set_sync_target(sinfo, scheme_make_integer(v), NULL, NULL, 0, 0); + scheme_set_sync_target(sinfo, scheme_make_integer(v), NULL, NULL, 0, 0, NULL); return 1; } } diff --git a/src/mzscheme/src/portfun.c b/src/mzscheme/src/portfun.c index da8171c61e..2ba62999b4 100644 --- a/src/mzscheme/src/portfun.c +++ b/src/mzscheme/src/portfun.c @@ -1221,7 +1221,7 @@ static long user_read_result(const char *who, Scheme_Input_Port *port, when the read/peek (at some offset) succeeds. */ if (nonblock > 0) { if (sinfo) { - scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1); + scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1, NULL); return 0; } else { /* Poll: */ @@ -1655,7 +1655,7 @@ int scheme_user_port_write_probably_ready(Scheme_Output_Port *port, Scheme_Sched val = uop->evt; - scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1); + scheme_set_sync_target(sinfo, val, (Scheme_Object *)port, NULL, 0, 1, NULL); return 0; } diff --git a/src/mzscheme/src/schpriv.h b/src/mzscheme/src/schpriv.h index 809079cc14..84ea52b821 100644 --- a/src/mzscheme/src/schpriv.h +++ b/src/mzscheme/src/schpriv.h @@ -447,9 +447,13 @@ typedef struct { short is_poll; } Scheme_Schedule_Info; +typedef Scheme_Object *(*Scheme_Accept_Sync)(Scheme_Object *wrap); + void scheme_set_sync_target(Scheme_Schedule_Info *sinfo, Scheme_Object *target, Scheme_Object *wrap, Scheme_Object *nack, - int repost, int retry); + int repost, int retry, Scheme_Accept_Sync accept); +struct Syncing; +void scheme_accept_sync(struct Syncing *syncing, int i); typedef int (*Scheme_Ready_Fun_FPC)(Scheme_Object *o, Scheme_Schedule_Info *sinfo); @@ -1327,6 +1331,7 @@ typedef struct Syncing { Scheme_Object **wrapss; Scheme_Object **nackss; char *reposts; + Scheme_Accept_Sync *accepts; Scheme_Thread *disable_break; /* when result is set */ } Syncing; @@ -2770,13 +2775,13 @@ struct Scheme_Logger { int want_level; long *timestamp, local_timestamp; /* determines when want_level is up-to-date */ int syslog_level, stderr_level; - Scheme_Object *readers; /* list of (cons (make-weak-box ) ) */ + Scheme_Object *readers; /* list of (cons (make-weak-box ) ) */ }; typedef struct Scheme_Log_Reader { Scheme_Object so; int want_level; - Scheme_Object *ch; + Scheme_Object *sema; Scheme_Object *head, *tail; } Scheme_Log_Reader; diff --git a/src/mzscheme/src/sema.c b/src/mzscheme/src/sema.c index 6fe168cead..5a24a9b5a5 100644 --- a/src/mzscheme/src/sema.c +++ b/src/mzscheme/src/sema.c @@ -301,6 +301,8 @@ void scheme_post_sema(Scheme_Object *o) consumed = 1; } else consumed = 0; + if (w->syncing->accepts && w->syncing->accepts[w->syncing_i]) + scheme_accept_sync(w->syncing, w->syncing_i); } else { /* In this case, we will remove the syncer from line, but someone else might grab the post. This is unfair, but it @@ -481,7 +483,7 @@ static void ext_get_into_line(Scheme_Object *ch, Scheme_Schedule_Info *sinfo) get_into_line((Scheme_Sema *)ch, w); - scheme_set_sync_target(sinfo, (Scheme_Object *)w, NULL, NULL, 0, 0); + scheme_set_sync_target(sinfo, (Scheme_Object *)w, NULL, NULL, 0, 0, NULL); } void scheme_get_outof_line(Scheme_Channel_Syncer *ch_w) @@ -637,6 +639,8 @@ int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *synci if (semas[i]->value) { if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i])) --semas[i]->value; + if (syncing && syncing->accepts && syncing->accepts[i]) + scheme_accept_sync(syncing, i); break; } } else if (semas[i]->so.type == scheme_never_evt_type) { @@ -811,6 +815,8 @@ int scheme_wait_semas_chs(int n, Scheme_Object **o, int just_try, Syncing *synci if (semas[i]->value) { if ((semas[i]->value > 0) && (!syncing || !syncing->reposts || !syncing->reposts[i])) --semas[i]->value; + if (syncing && syncing->accepts && syncing->accepts[i]) + scheme_accept_sync(syncing, i); break; } } else if (semas[i]->so.type == scheme_never_evt_type) { @@ -988,7 +994,7 @@ static int channel_get_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo) Scheme_Object *result; if (try_channel((Scheme_Sema *)ch, (Syncing *)sinfo->current_syncing, -1, &result)) { - scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 0); + scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 0, NULL); return 1; } @@ -1184,7 +1190,7 @@ static int thread_recv_ready(Scheme_Object *ch, Scheme_Schedule_Info *sinfo) make_mbox_sema(p); - scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1); + scheme_set_sync_target(sinfo, p->mbox_sema, thread_recv_evt, NULL, 1, 1, NULL); return 0; } diff --git a/src/mzscheme/src/struct.c b/src/mzscheme/src/struct.c index b737c4f050..ab0d92ec59 100644 --- a/src/mzscheme/src/struct.c +++ b/src/mzscheme/src/struct.c @@ -999,7 +999,7 @@ static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) } else { v = (Scheme_Object *)scheme_output_port_record(o); } - scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1); + scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1, NULL); return 0; } @@ -1007,7 +1007,7 @@ static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) v = ((Scheme_Structure *)o)->slots[SCHEME_INT_VAL(v)]; if (scheme_is_evt(v)) { - scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1); + scheme_set_sync_target(sinfo, v, NULL, NULL, 0, 1, NULL); return 0; } @@ -1026,12 +1026,12 @@ static int evt_struct_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) if (scheme_is_evt(result)) { SCHEME_USE_FUEL(1); /* Needed beause an apply of a mzc-generated function might not check for breaks. */ - scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1); + scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1, NULL); return 0; } /* non-evt => ready and result is self */ - scheme_set_sync_target(sinfo, o, o, NULL, 0, 0); + scheme_set_sync_target(sinfo, o, o, NULL, 0, 0, NULL); return 1; } @@ -2129,7 +2129,7 @@ static int wrapped_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) wrapper = scheme_box(ww->wrapper); } - scheme_set_sync_target(sinfo, ww->evt, wrapper, NULL, 0, 1); + scheme_set_sync_target(sinfo, ww->evt, wrapper, NULL, 0, 1, NULL); return 0; } @@ -2149,7 +2149,7 @@ static int nack_guard_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo /* Install the semaphore immediately, so that it's posted on exceptions (e.g., breaks) even if they happen while trying to run the maker. */ - scheme_set_sync_target(sinfo, o, NULL, sema, 0, 0); + scheme_set_sync_target(sinfo, o, NULL, sema, 0, 0, NULL); /* Remember both the sema and the current thread's dead evt: */ nack = scheme_alloc_object(); @@ -2162,7 +2162,7 @@ static int nack_guard_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo result = scheme_apply(nw->maker, 1, a); if (scheme_is_evt(result)) { - scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1); + scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1, NULL); return 0; } else return 1; /* Non-evt => ready */ @@ -2182,7 +2182,7 @@ static int nack_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) } /* Redirect to the set, and wrap with void: */ - scheme_set_sync_target(sinfo, wset, scheme_void, NULL, 0, 1); + scheme_set_sync_target(sinfo, wset, scheme_void, NULL, 0, 1, NULL); return 0; } @@ -2201,7 +2201,7 @@ static int poll_evt_is_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) result = scheme_apply(nw->maker, 1, a); if (scheme_is_evt(result)) { - scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1); + scheme_set_sync_target(sinfo, result, NULL, NULL, 0, 1, NULL); return 0; } else return 1; /* Non-evt => ready */ diff --git a/src/mzscheme/src/thread.c b/src/mzscheme/src/thread.c index 1d0d529824..59a702b9a4 100644 --- a/src/mzscheme/src/thread.c +++ b/src/mzscheme/src/thread.c @@ -2937,7 +2937,7 @@ static int thread_wait_done(Scheme_Object *p, Scheme_Schedule_Info *sinfo) the blocking thread can be dequeued: */ Scheme_Object *evt; evt = scheme_get_thread_dead((Scheme_Thread *)p); - scheme_set_sync_target(sinfo, evt, p, NULL, 0, 0); + scheme_set_sync_target(sinfo, evt, p, NULL, 0, 0, NULL); return 0; } else return 1; @@ -4978,11 +4978,11 @@ static int resume_suspend_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) t = SCHEME_PTR2_VAL(o); if (t) { - scheme_set_sync_target(sinfo, o, t, NULL, 0, 0); + scheme_set_sync_target(sinfo, o, t, NULL, 0, 0, NULL); return 1; } - scheme_set_sync_target(sinfo, SCHEME_PTR1_VAL(o), o, NULL, 0, 1); + scheme_set_sync_target(sinfo, SCHEME_PTR1_VAL(o), o, NULL, 0, 1, NULL); return 0; } @@ -5015,7 +5015,7 @@ Scheme_Object *scheme_get_thread_dead(Scheme_Thread *p) static int dead_ready(Scheme_Object *o, Scheme_Schedule_Info *sinfo) { - scheme_set_sync_target(sinfo, SCHEME_PTR_VAL(o), o, NULL, 0, 1); + scheme_set_sync_target(sinfo, SCHEME_PTR_VAL(o), o, NULL, 0, 1, NULL); return 0; } @@ -5163,7 +5163,7 @@ static void *splice_ptr_array(void **a, int al, void **b, int bl, int i) static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target, Scheme_Object *wrap, Scheme_Object *nack, - int repost, int retry) + int repost, int retry, Scheme_Accept_Sync accept) /* Not ready, deferred to target. */ { Evt_Set *evt_set = syncing->set; @@ -5202,6 +5202,16 @@ static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target, syncing->reposts[i] = 1; } + if (accept) { + if (!syncing->accepts) { + Scheme_Accept_Sync *s; + s = (Scheme_Accept_Sync *)scheme_malloc_atomic(sizeof(Scheme_Accept_Sync) * evt_set->argc); + memset(s, 0, evt_set->argc * sizeof(Scheme_Accept_Sync)); + syncing->accepts = s; + } + syncing->accepts[i] = accept; + } + if (SCHEME_EVTSETP(target) && retry) { /* Flatten the set into this one */ Evt_Set *wts = (Evt_Set *)target; @@ -5257,6 +5267,19 @@ static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target, memcpy(s + i + wts->argc, syncing->reposts + i + 1, evt_set->argc - i - 1); syncing->reposts = s; } + if (syncing->accepts) { + Scheme_Accept_Sync *s; + int len; + + len = evt_set->argc + wts->argc - 1; + + s = (Scheme_Accept_Sync *)scheme_malloc_atomic(len * sizeof(Scheme_Accept_Sync)); + memset(s, 0, len * sizeof(Scheme_Accept_Sync)); + + memcpy(s, syncing->accepts, i * sizeof(Scheme_Accept_Sync)); + memcpy(s + i + wts->argc, syncing->accepts + i + 1, (evt_set->argc - i - 1) * sizeof(Scheme_Accept_Sync)); + syncing->accepts = s; + } evt_set->argc += (wts->argc - 1); @@ -5280,10 +5303,10 @@ static void set_sync_target(Syncing *syncing, int i, Scheme_Object *target, void scheme_set_sync_target(Scheme_Schedule_Info *sinfo, Scheme_Object *target, Scheme_Object *wrap, Scheme_Object *nack, - int repost, int retry) + int repost, int retry, Scheme_Accept_Sync accept) { set_sync_target((Syncing *)sinfo->current_syncing, sinfo->w_i, - target, wrap, nack, repost, retry); + target, wrap, nack, repost, retry, accept); if (retry) { /* Rewind one step to try new ones (or continue if the set was empty). */ @@ -5370,6 +5393,8 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo) syncing->disable_break->suspend_break++; if (syncing->reposts && syncing->reposts[i]) scheme_post_sema(o); + if (syncing->accepts && syncing->accepts[i]) + scheme_accept_sync(syncing, i); scheme_post_syncing_nacks(syncing); result = 1; goto set_sleep_end_and_return; @@ -5387,7 +5412,7 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo) Scheme_Object *sema; sema = get_sema(o, &repost); - set_sync_target(syncing, i, sema, o, NULL, repost, 1); + set_sync_target(syncing, i, sema, o, NULL, repost, 1, NULL); j--; /* try again with this sema */ } } @@ -5425,6 +5450,25 @@ static int syncing_ready(Scheme_Object *s, Scheme_Schedule_Info *sinfo) return result; } +void scheme_accept_sync(Syncing *syncing, int i) +{ + /* run atomic accept action to revise the wrap */ + Scheme_Accept_Sync accept; + Scheme_Object *v, *pr; + + accept = syncing->accepts[i]; + syncing->accepts[i] = NULL; + pr = syncing->wrapss[i]; + + v = SCHEME_CAR(pr); + pr = SCHEME_CDR(pr); + + v = accept(v); + + pr = scheme_make_pair(v, pr); + syncing->wrapss[i] = pr; +} + static void syncing_needs_wakeup(Scheme_Object *s, void *fds) { int i;