steppersync: Move history clearing to background thread

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2025-09-18 22:03:29 -04:00
parent 3ef4702e06
commit 414679ac99

View File

@@ -41,7 +41,7 @@ struct syncemitter {
pthread_cond_t cond;
int have_work;
double bg_gen_steps_time;
uint64_t bg_flush_clock;
uint64_t bg_flush_clock, bg_clear_history_clock;
int32_t bg_result;
};
@@ -79,17 +79,24 @@ syncemitter_queue_msg(struct syncemitter *se, uint64_t req_clock
// Generate steps (via itersolve) and flush
static int32_t
se_generate_steps(struct syncemitter *se, double gen_steps_time
, uint64_t flush_clock)
se_generate_steps(struct syncemitter *se)
{
if (!se->sc || !se->sk)
return 0;
double gen_steps_time = se->bg_gen_steps_time;
uint64_t flush_clock = se->bg_flush_clock;
uint64_t clear_history_clock = se->bg_clear_history_clock;
// Generate steps
int32_t ret = itersolve_generate_steps(se->sk, se->sc, gen_steps_time);
if (ret)
return ret;
// Flush steps
return stepcompress_flush(se->sc, flush_clock);
ret = stepcompress_flush(se->sc, flush_clock);
if (ret)
return ret;
// Clear history
stepcompress_history_expire(se->sc, clear_history_clock);
return 0;
}
// Main background thread for generating steps
@@ -110,8 +117,7 @@ se_background_thread(void *data)
break;
// Request to generate steps
se->bg_result = se_generate_steps(se, se->bg_gen_steps_time
, se->bg_flush_clock);
se->bg_result = se_generate_steps(se);
se->have_work = 0;
pthread_cond_signal(&se->cond);
}
@@ -123,7 +129,7 @@ se_background_thread(void *data)
// Signal background thread to start step generation
static void
se_start_gen_steps(struct syncemitter *se, double gen_steps_time
, uint64_t flush_clock)
, uint64_t flush_clock, uint64_t clear_history_clock)
{
if (!se->sc || !se->sk)
return;
@@ -132,6 +138,7 @@ se_start_gen_steps(struct syncemitter *se, double gen_steps_time
pthread_cond_wait(&se->cond, &se->lock);
se->bg_gen_steps_time = gen_steps_time;
se->bg_flush_clock = flush_clock;
se->bg_clear_history_clock = clear_history_clock;
se->have_work = 1;
pthread_mutex_unlock(&se->lock);
pthread_cond_signal(&se->cond);
@@ -391,9 +398,10 @@ steppersyncmgr_gen_steps(struct steppersyncmgr *ssm, double flush_time
// Start step generation threads
list_for_each_entry(ss, &ssm->ss_list, ssm_node) {
uint64_t flush_clock = clock_from_time(&ss->ce, flush_time);
uint64_t clear_clock = clock_from_time(&ss->ce, clear_history_time);
struct syncemitter *se;
list_for_each_entry(se, &ss->se_list, ss_node) {
se_start_gen_steps(se, gen_steps_time, flush_clock);
se_start_gen_steps(se, gen_steps_time, flush_clock, clear_clock);
}
}
// Wait for step generation threads to complete
@@ -410,17 +418,5 @@ steppersyncmgr_gen_steps(struct steppersyncmgr *ssm, double flush_time
uint64_t flush_clock = clock_from_time(&ss->ce, flush_time);
steppersync_flush(ss, flush_clock);
}
if (res)
return res;
// Clear history
list_for_each_entry(ss, &ssm->ss_list, ssm_node) {
uint64_t end_clock = clock_from_time(&ss->ce, clear_history_time);
struct syncemitter *se;
list_for_each_entry(se, &ss->se_list, ss_node) {
if (!se->sc)
continue;
stepcompress_history_expire(se->sc, end_clock);
}
}
return 0;
return res;
}