bc: futures repair related to internal stack overflow

The combination of 1117392cb5 and the existing "tail-apply.rkt" test
exposed another bug.
This commit is contained in:
Matthew Flatt 2020-06-13 15:44:29 -06:00
parent bc853db1c7
commit 265c9eaa57
2 changed files with 34 additions and 17 deletions

View File

@ -259,16 +259,7 @@ void scheme_end_futures_per_place()
#include <string.h>
#include "jit.h"
#ifdef OS_X
# define CHECK_FUTURE_ASSERTS
#endif
#ifdef CHECK_FUTURE_ASSERTS
# include <assert.h>
# define FUTURE_ASSERT(x) assert(x)
#else
# define FUTURE_ASSERT(x) /* empty */
#endif
#define FUTURE_ASSERT(x) MZ_ASSERT(x)
static Scheme_Object *make_fsemaphore(int argc, Scheme_Object *argv[]);
static Scheme_Object *touch(int argc, Scheme_Object *argv[]);
@ -1787,7 +1778,10 @@ static void dequeue_future(Scheme_Future_State *fs, future_t *ft)
ft->next = NULL;
ft->prev = NULL;
--fs->future_queue_count;
if (ft->in_future_queue) {
--fs->future_queue_count;
ft->in_future_queue = 0;
}
}
static void complete_rtcall(Scheme_Future_State *fs, future_t *future)
@ -1803,6 +1797,7 @@ static void complete_rtcall(Scheme_Future_State *fs, future_t *future)
future->want_lw = 0; /* needed if we get here via direct_future_to_future_touch() */
if (future->can_continue_sema) {
mzrt_sema *can_continue_sema = future->can_continue_sema;
FUTURE_ASSERT(!future->in_atomic_queue);
future->can_continue_sema = NULL;
mzrt_sema_post(can_continue_sema);
}
@ -2023,7 +2018,7 @@ Scheme_Object *general_touch(int argc, Scheme_Object *argv[])
}
else if (ft->status == WAITING_FOR_PRIM)
{
if (ft->rt_prim_is_atomic) {
if (ft->in_atomic_queue) {
/* Should be in the atomic-wait queue, so
handle those actions now: */
mzrt_mutex_unlock(fs->future_mutex);
@ -2259,6 +2254,9 @@ void *worker_thread_future_loop(void *arg)
ft = get_pending_future(fs);
if (ft) {
FUTURE_ASSERT(!ft->in_atomic_queue);
FUTURE_ASSERT(!ft->in_future_queue);
fs->busy_thread_count++;
if (ft->suspended_lw_stack)
@ -2595,6 +2593,7 @@ void scheme_check_future_work()
if (ft) {
fs->future_waiting_atomic = ft->next_waiting_atomic;
ft->next_waiting_atomic = NULL;
ft->in_atomic_queue = 0;
if ((ft->status == WAITING_FOR_PRIM) && ft->rt_prim_is_atomic) {
ft->status = HANDLING_PRIM;
ft->want_lw = 0; /* we expect to handle it quickly,
@ -2666,6 +2665,7 @@ void scheme_check_future_work()
afterward whether the continuation wants a lwc: */
can_continue_sema = ft->can_continue_sema;
ft->can_continue_sema = NULL;
FUTURE_ASSERT(!ft->in_atomic_queue);
}
}
mzrt_mutex_unlock(fs->future_mutex);
@ -2760,6 +2760,9 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
/* Fetch the future descriptor for this thread */
future = fts->thread->current_ft;
FUTURE_ASSERT(!future->in_atomic_queue);
FUTURE_ASSERT(!future->in_future_queue);
if (!for_overflow) {
/* Check if this prim in fact does have a
safe C version */
@ -2849,12 +2852,16 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
future->status = WAITING_FOR_PRIM;
}
if (is_atomic) {
future->next_waiting_atomic = fs->future_waiting_atomic;
fs->future_waiting_atomic = future;
}
if (fts->thread->current_ft) {
if (is_atomic && !insist_to_suspend) {
FUTURE_ASSERT(!future->in_atomic_queue);
FUTURE_ASSERT(!future->in_future_queue);
FUTURE_ASSERT(func != touch);
future->next_waiting_atomic = fs->future_waiting_atomic;
fs->future_waiting_atomic = future;
future->in_atomic_queue = 1;
}
if (insist_to_suspend) {
/* couldn't capture the continuation locally, so ask
the runtime thread to capture it: */
@ -2904,6 +2911,7 @@ static void future_do_runtimecall(Scheme_Future_Thread_State *fts,
FUTURE_ASSERT(!future || !future->can_continue_sema);
FUTURE_ASSERT(!future || !for_overflow);
FUTURE_ASSERT(!future || !future->in_atomic_queue);
if (future) {
future->want_lw = 0;
@ -3676,6 +3684,7 @@ static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile
FUTURE_ASSERT(!future->want_lw);
FUTURE_ASSERT(!is_atomic || future->rt_prim_is_atomic);
FUTURE_ASSERT(!future->in_atomic_queue);
savebuf = p->error_buf;
p->error_buf = &newbuf;
@ -3696,6 +3705,7 @@ static void invoke_rtcall(Scheme_Future_State * volatile fs, future_t * volatile
/* Signal the waiting worker thread that it
can continue running machine code */
mzrt_sema *can_continue_sema = future->can_continue_sema;
FUTURE_ASSERT(!future->in_atomic_queue);
future->can_continue_sema = NULL;
mzrt_sema_post(can_continue_sema);
mzrt_mutex_unlock(fs->future_mutex);
@ -3728,6 +3738,9 @@ future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft)
XFORM_SKIP_PROC
/* called in any thread with lock held */
{
FUTURE_ASSERT(!ft->in_atomic_queue);
FUTURE_ASSERT(!ft->in_future_queue);
if (fs->future_queue_end) {
fs->future_queue_end->next = ft;
ft->prev = fs->future_queue_end;
@ -3736,6 +3749,7 @@ future_t *enqueue_future(Scheme_Future_State *fs, future_t *ft)
if (!fs->future_queue)
fs->future_queue = ft;
fs->future_queue_count++;
ft->in_future_queue = 1;
/* Signal that a future is pending */
mzrt_sema_post(fs->future_pending_sema);
@ -3752,6 +3766,7 @@ future_t *get_pending_future(Scheme_Future_State *fs)
while (1) {
f = fs->future_queue;
if (f) {
FUTURE_ASSERT(f->in_future_queue);
dequeue_future(fs, f);
if (!scheme_custodian_is_available(f->cust)) {
f->status = SUSPENDED;

View File

@ -138,6 +138,8 @@ typedef struct future_t {
in any thread atomically (i.e., it's a "synchronizing"
operation insteda of a general blocking operation) */
char in_future_queue, in_atomic_queue;
double time_of_request;
const char *source_of_request;
int source_type;