steppersync: Move step generation thread from stepcompress.c to steppersync.c

Implement step generation from 'struct syncemitter' instead of in the
stepcompress code.  This simplifies the stepcompress code and
simplifies the overall interface.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2025-09-18 21:49:12 -04:00
parent e78d11bc6f
commit 3ef4702e06
7 changed files with 174 additions and 187 deletions

View File

@@ -51,14 +51,14 @@ defs_stepcompress = """
int stepcompress_extract_old(struct stepcompress *sc
, struct pull_history_steps *p, int max
, uint64_t start_clock, uint64_t end_clock);
void stepcompress_set_stepper_kinematics(struct stepcompress *sc
, struct stepper_kinematics *sk);
struct stepper_kinematics *stepcompress_get_stepper_kinematics(
struct stepcompress *sc);
"""
defs_steppersync = """
struct stepcompress *syncemitter_get_stepcompress(struct syncemitter *se);
void syncemitter_set_stepper_kinematics(struct syncemitter *se
, struct stepper_kinematics *sk);
struct stepper_kinematics *syncemitter_get_stepper_kinematics(
struct syncemitter *se);
void syncemitter_queue_msg(struct syncemitter *se, uint64_t req_clock
, uint32_t *data, int len);
struct syncemitter *steppersync_alloc_syncemitter(struct steppersync *ss

View File

@@ -15,14 +15,12 @@
// efficiency - the repetitive integer math is vastly faster in C.
#include <math.h> // sqrt
#include <pthread.h> // pthread_mutex_lock
#include <stddef.h> // offsetof
#include <stdint.h> // uint32_t
#include <stdio.h> // fprintf
#include <stdlib.h> // malloc
#include <string.h> // memset
#include "compiler.h" // DIV_ROUND_UP
#include "itersolve.h" // itersolve_generate_steps
#include "pyhelper.h" // errorf
#include "serialqueue.h" // struct queue_message
#include "stepcompress.h" // stepcompress_alloc
@@ -48,16 +46,6 @@ struct stepcompress {
// History tracking
int64_t last_position;
struct list_head history_list;
// Thread for step generation
struct stepper_kinematics *sk;
char name[16];
pthread_t tid;
pthread_mutex_t lock; // protects variables below
pthread_cond_t cond;
int have_work;
double bg_gen_steps_time;
uint64_t bg_flush_clock;
int32_t bg_result;
};
struct step_move {
@@ -253,22 +241,15 @@ check_line(struct stepcompress *sc, struct step_move move)
* Step compress interface
****************************************************************/
static int sc_thread_alloc(struct stepcompress *sc, char name[16]);
static void sc_thread_free(struct stepcompress *sc);
// Allocate a new 'stepcompress' object
struct stepcompress *
stepcompress_alloc(char name[16], struct list_head *msg_queue)
stepcompress_alloc(struct list_head *msg_queue)
{
struct stepcompress *sc = malloc(sizeof(*sc));
memset(sc, 0, sizeof(*sc));
list_init(&sc->history_list);
sc->sdir = -1;
sc->msg_queue = msg_queue;
int ret = sc_thread_alloc(sc, name);
if (ret)
return NULL;
return sc;
}
@@ -315,7 +296,6 @@ stepcompress_free(struct stepcompress *sc)
{
if (!sc)
return;
sc_thread_free(sc);
free(sc->queue);
stepcompress_history_expire(sc, UINT64_MAX);
free(sc);
@@ -551,7 +531,7 @@ stepcompress_commit(struct stepcompress *sc)
}
// Flush pending steps
static int
int
stepcompress_flush(struct stepcompress *sc, uint64_t move_clock)
{
if (sc->next_step_clock && move_clock >= sc->next_step_clock) {
@@ -660,131 +640,3 @@ stepcompress_extract_old(struct stepcompress *sc, struct pull_history_steps *p
}
return res;
}
/****************************************************************
* Step generation thread
****************************************************************/
// Store a reference to stepper_kinematics
void __visible
stepcompress_set_stepper_kinematics(struct stepcompress *sc
, struct stepper_kinematics *sk)
{
sc->sk = sk;
}
// Report current stepper_kinematics
struct stepper_kinematics * __visible
stepcompress_get_stepper_kinematics(struct stepcompress *sc)
{
return sc->sk;
}
// Generate steps (via itersolve) and flush
static int32_t
stepcompress_generate_steps(struct stepcompress *sc, double gen_steps_time
, uint64_t flush_clock)
{
if (!sc->sk)
return 0;
// Generate steps
int32_t ret = itersolve_generate_steps(sc->sk, sc, gen_steps_time);
if (ret)
return ret;
// Flush steps
return stepcompress_flush(sc, flush_clock);
}
// Main background thread for generating steps
static void *
sc_background_thread(void *data)
{
struct stepcompress *sc = data;
set_thread_name(sc->name);
pthread_mutex_lock(&sc->lock);
for (;;) {
if (!sc->have_work) {
pthread_cond_wait(&sc->cond, &sc->lock);
continue;
}
if (sc->have_work < 0)
// Exit request
break;
// Request to generate steps
sc->bg_result = stepcompress_generate_steps(sc, sc->bg_gen_steps_time
, sc->bg_flush_clock);
sc->have_work = 0;
pthread_cond_signal(&sc->cond);
}
pthread_mutex_unlock(&sc->lock);
return NULL;
}
// Signal background thread to start step generation
void
stepcompress_start_gen_steps(struct stepcompress *sc, double gen_steps_time
, uint64_t flush_clock)
{
if (!sc->sk)
return;
pthread_mutex_lock(&sc->lock);
while (sc->have_work)
pthread_cond_wait(&sc->cond, &sc->lock);
sc->bg_gen_steps_time = gen_steps_time;
sc->bg_flush_clock = flush_clock;
sc->have_work = 1;
pthread_mutex_unlock(&sc->lock);
pthread_cond_signal(&sc->cond);
}
// Wait for background thread to complete last step generation request
int32_t
stepcompress_finalize_gen_steps(struct stepcompress *sc)
{
pthread_mutex_lock(&sc->lock);
while (sc->have_work)
pthread_cond_wait(&sc->cond, &sc->lock);
int32_t res = sc->bg_result;
pthread_mutex_unlock(&sc->lock);
return res;
}
// Internal helper to start thread
static int
sc_thread_alloc(struct stepcompress *sc, char name[16])
{
strncpy(sc->name, name, sizeof(sc->name));
sc->name[sizeof(sc->name)-1] = '\0';
int ret = pthread_mutex_init(&sc->lock, NULL);
if (ret)
goto fail;
ret = pthread_cond_init(&sc->cond, NULL);
if (ret)
goto fail;
ret = pthread_create(&sc->tid, NULL, sc_background_thread, sc);
if (ret)
goto fail;
return 0;
fail:
report_errno("sc init", ret);
return -1;
}
// Request background thread to exit
static void
sc_thread_free(struct stepcompress *sc)
{
pthread_mutex_lock(&sc->lock);
while (sc->have_work)
pthread_cond_wait(&sc->cond, &sc->lock);
sc->have_work = -1;
pthread_cond_signal(&sc->cond);
pthread_mutex_unlock(&sc->lock);
int ret = pthread_join(sc->tid, NULL);
if (ret)
report_errno("sc pthread_join", ret);
}

View File

@@ -12,8 +12,7 @@ struct pull_history_steps {
};
struct list_head;
struct stepcompress *stepcompress_alloc(char name[16]
, struct list_head *msg_queue);
struct stepcompress *stepcompress_alloc(struct list_head *msg_queue);
void stepcompress_fill(struct stepcompress *sc, uint32_t oid, uint32_t max_error
, int32_t queue_step_msgtag
, int32_t set_next_step_dir_msgtag);
@@ -28,6 +27,7 @@ void stepcompress_set_time(struct stepcompress *sc
int stepcompress_append(struct stepcompress *sc, int sdir
, double print_time, double step_time);
int stepcompress_commit(struct stepcompress *sc);
int stepcompress_flush(struct stepcompress *sc, uint64_t move_clock);
int stepcompress_reset(struct stepcompress *sc, uint64_t last_step_clock);
int stepcompress_set_last_position(struct stepcompress *sc, uint64_t clock
, int64_t last_position);
@@ -37,13 +37,5 @@ int stepcompress_queue_msg(struct stepcompress *sc, uint32_t *data, int len);
int stepcompress_extract_old(struct stepcompress *sc
, struct pull_history_steps *p, int max
, uint64_t start_clock, uint64_t end_clock);
struct stepper_kinematics;
void stepcompress_set_stepper_kinematics(struct stepcompress *sc
, struct stepper_kinematics *sk);
struct stepper_kinematics *stepcompress_get_stepper_kinematics(
struct stepcompress *sc);
void stepcompress_start_gen_steps(struct stepcompress *sc, double gen_steps_time
, uint64_t flush_clock);
int32_t stepcompress_finalize_gen_steps(struct stepcompress *sc);
#endif // stepcompress.h

View File

@@ -11,10 +11,13 @@
// mcu step queue is ordered between steppers so that no stepper
// starves the other steppers of space in the mcu step queue.
#include <pthread.h> // pthread_mutex_lock
#include <stddef.h> // offsetof
#include <stdlib.h> // malloc
#include <string.h> // memset
#include "compiler.h" // __visible
#include "pyhelper.h" // set_thread_name
#include "itersolve.h" // itersolve_generate_steps
#include "serialqueue.h" // struct queue_message
#include "stepcompress.h" // stepcompress_flush
#include "steppersync.h" // steppersync_alloc
@@ -29,8 +32,17 @@ struct syncemitter {
struct list_node ss_node;
// Transmit message queue
struct list_head msg_queue;
// Step compression and generation
// Thread for step generation
struct stepcompress *sc;
struct stepper_kinematics *sk;
char name[16];
pthread_t tid;
pthread_mutex_t lock; // protects variables below
pthread_cond_t cond;
int have_work;
double bg_gen_steps_time;
uint64_t bg_flush_clock;
int32_t bg_result;
};
// Return this emitters 'struct stepcompress' (or NULL if not allocated)
@@ -40,6 +52,21 @@ syncemitter_get_stepcompress(struct syncemitter *se)
return se->sc;
}
// Store a reference to stepper_kinematics
void __visible
syncemitter_set_stepper_kinematics(struct syncemitter *se
, struct stepper_kinematics *sk)
{
se->sk = sk;
}
// Report current stepper_kinematics
struct stepper_kinematics * __visible
syncemitter_get_stepper_kinematics(struct syncemitter *se)
{
return se->sk;
}
// Queue an mcu command that will consume space in the mcu move queue
void __visible
syncemitter_queue_msg(struct syncemitter *se, uint64_t req_clock
@@ -50,6 +77,129 @@ syncemitter_queue_msg(struct syncemitter *se, uint64_t req_clock
list_add_tail(&qm->node, &se->msg_queue);
}
// Generate steps (via itersolve) and flush
static int32_t
se_generate_steps(struct syncemitter *se, double gen_steps_time
, uint64_t flush_clock)
{
if (!se->sc || !se->sk)
return 0;
// Generate steps
int32_t ret = itersolve_generate_steps(se->sk, se->sc, gen_steps_time);
if (ret)
return ret;
// Flush steps
return stepcompress_flush(se->sc, flush_clock);
}
// Main background thread for generating steps
static void *
se_background_thread(void *data)
{
struct syncemitter *se = data;
set_thread_name(se->name);
pthread_mutex_lock(&se->lock);
for (;;) {
if (!se->have_work) {
pthread_cond_wait(&se->cond, &se->lock);
continue;
}
if (se->have_work < 0)
// Exit request
break;
// Request to generate steps
se->bg_result = se_generate_steps(se, se->bg_gen_steps_time
, se->bg_flush_clock);
se->have_work = 0;
pthread_cond_signal(&se->cond);
}
pthread_mutex_unlock(&se->lock);
return NULL;
}
// Signal background thread to start step generation
static void
se_start_gen_steps(struct syncemitter *se, double gen_steps_time
, uint64_t flush_clock)
{
if (!se->sc || !se->sk)
return;
pthread_mutex_lock(&se->lock);
while (se->have_work)
pthread_cond_wait(&se->cond, &se->lock);
se->bg_gen_steps_time = gen_steps_time;
se->bg_flush_clock = flush_clock;
se->have_work = 1;
pthread_mutex_unlock(&se->lock);
pthread_cond_signal(&se->cond);
}
// Wait for background thread to complete last step generation request
static int32_t
se_finalize_gen_steps(struct syncemitter *se)
{
if (!se->sc || !se->sk)
return 0;
pthread_mutex_lock(&se->lock);
while (se->have_work)
pthread_cond_wait(&se->cond, &se->lock);
int32_t res = se->bg_result;
pthread_mutex_unlock(&se->lock);
return res;
}
// Allocate syncemitter and start thread
static struct syncemitter *
syncemitter_alloc(char name[16], int alloc_stepcompress)
{
struct syncemitter *se = malloc(sizeof(*se));
memset(se, 0, sizeof(*se));
list_init(&se->msg_queue);
strncpy(se->name, name, sizeof(se->name));
se->name[sizeof(se->name)-1] = '\0';
if (!alloc_stepcompress)
return se;
se->sc = stepcompress_alloc(&se->msg_queue);
int ret = pthread_mutex_init(&se->lock, NULL);
if (ret)
goto fail;
ret = pthread_cond_init(&se->cond, NULL);
if (ret)
goto fail;
ret = pthread_create(&se->tid, NULL, se_background_thread, se);
if (ret)
goto fail;
return se;
fail:
report_errno("se alloc", ret);
return NULL;
}
// Free syncemitter and exit background thread
static void
syncemitter_free(struct syncemitter *se)
{
if (!se)
return;
if (se->sc) {
pthread_mutex_lock(&se->lock);
while (se->have_work)
pthread_cond_wait(&se->cond, &se->lock);
se->have_work = -1;
pthread_cond_signal(&se->cond);
pthread_mutex_unlock(&se->lock);
int ret = pthread_join(se->tid, NULL);
if (ret)
report_errno("se pthread_join", ret);
stepcompress_free(se->sc);
}
message_queue_free(&se->msg_queue);
free(se);
}
/****************************************************************
* StepperSync - sort move queue for a micro-controller
@@ -75,12 +225,9 @@ struct syncemitter * __visible
steppersync_alloc_syncemitter(struct steppersync *ss, char name[16]
, int alloc_stepcompress)
{
struct syncemitter *se = malloc(sizeof(*se));
memset(se, 0, sizeof(*se));
list_add_tail(&se->ss_node, &ss->se_list);
list_init(&se->msg_queue);
if (alloc_stepcompress)
se->sc = stepcompress_alloc(name, &se->msg_queue);
struct syncemitter *se = syncemitter_alloc(name, alloc_stepcompress);
if (se)
list_add_tail(&se->ss_node, &ss->se_list);
return se;
}
@@ -217,9 +364,7 @@ steppersyncmgr_free(struct steppersyncmgr *ssm)
struct syncemitter *se = list_first_entry(
&ss->se_list, struct syncemitter, ss_node);
list_del(&se->ss_node);
stepcompress_free(se->sc);
message_queue_free(&se->msg_queue);
free(se);
syncemitter_free(se);
}
free(ss);
}
@@ -248,9 +393,7 @@ steppersyncmgr_gen_steps(struct steppersyncmgr *ssm, double flush_time
uint64_t flush_clock = clock_from_time(&ss->ce, flush_time);
struct syncemitter *se;
list_for_each_entry(se, &ss->se_list, ss_node) {
if (!se->sc)
continue;
stepcompress_start_gen_steps(se->sc, gen_steps_time, flush_clock);
se_start_gen_steps(se, gen_steps_time, flush_clock);
}
}
// Wait for step generation threads to complete
@@ -258,9 +401,7 @@ steppersyncmgr_gen_steps(struct steppersyncmgr *ssm, double flush_time
list_for_each_entry(ss, &ssm->ss_list, ssm_node) {
struct syncemitter *se;
list_for_each_entry(se, &ss->se_list, ss_node) {
if (!se->sc)
continue;
int32_t ret = stepcompress_finalize_gen_steps(se->sc);
int32_t ret = se_finalize_gen_steps(se);
if (ret)
res = ret;
}

View File

@@ -5,6 +5,10 @@
struct syncemitter;
struct stepcompress *syncemitter_get_stepcompress(struct syncemitter *se);
void syncemitter_set_stepper_kinematics(struct syncemitter *se
, struct stepper_kinematics *sk);
struct stepper_kinematics *syncemitter_get_stepper_kinematics(
struct syncemitter *se);
void syncemitter_queue_msg(struct syncemitter *se, uint64_t req_clock
, uint32_t *data, int len);