mirror of
https://github.com/Klipper3d/klipper.git
synced 2025-11-02 03:16:02 +01:00
serialqueue: decouple transmit requests
Signed-off-by: Timofey Titovets <nefelim4ag@gmail.com>
This commit is contained in:
committed by
KevinOConnor
parent
493271697f
commit
aea8d8e0a1
@@ -46,11 +46,18 @@ struct receiver {
|
|||||||
struct list_head old_receive;
|
struct list_head old_receive;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct transmit_requests {
|
||||||
|
int pipe_fds[2];
|
||||||
|
pthread_mutex_t lock; // protects variables below
|
||||||
|
struct list_head upcoming_queues;
|
||||||
|
int upcoming_bytes;
|
||||||
|
uint64_t need_kick_clock;
|
||||||
|
};
|
||||||
|
|
||||||
struct serialqueue {
|
struct serialqueue {
|
||||||
// Input reading
|
// Input reading
|
||||||
struct pollreactor *pr;
|
struct pollreactor *pr;
|
||||||
int serial_fd, serial_fd_type, client_id;
|
int serial_fd, serial_fd_type, client_id;
|
||||||
int pipe_fds[2];
|
|
||||||
uint8_t input_buf[4096];
|
uint8_t input_buf[4096];
|
||||||
uint8_t need_sync;
|
uint8_t need_sync;
|
||||||
int input_pos;
|
int input_pos;
|
||||||
@@ -71,11 +78,9 @@ struct serialqueue {
|
|||||||
struct list_head sent_queue;
|
struct list_head sent_queue;
|
||||||
double srtt, rttvar, rto;
|
double srtt, rttvar, rto;
|
||||||
// Pending transmission message queues
|
// Pending transmission message queues
|
||||||
struct list_head upcoming_queues;
|
struct transmit_requests transmit_requests;
|
||||||
int upcoming_bytes;
|
|
||||||
struct list_head ready_queues;
|
struct list_head ready_queues;
|
||||||
int ready_bytes, need_ack_bytes, last_ack_bytes;
|
int ready_bytes, need_ack_bytes, last_ack_bytes;
|
||||||
uint64_t need_kick_clock;
|
|
||||||
struct list_head notify_queue;
|
struct list_head notify_queue;
|
||||||
double last_write_fail_time;
|
double last_write_fail_time;
|
||||||
// Fastreader support
|
// Fastreader support
|
||||||
@@ -152,7 +157,7 @@ check_wake_receive(struct receiver *receiver)
|
|||||||
static void
|
static void
|
||||||
kick_bg_thread(struct serialqueue *sq)
|
kick_bg_thread(struct serialqueue *sq)
|
||||||
{
|
{
|
||||||
int ret = write(sq->pipe_fds[1], ".", 1);
|
int ret = write(sq->transmit_requests.pipe_fds[1], ".", 1);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
report_errno("pipe write", ret);
|
report_errno("pipe write", ret);
|
||||||
}
|
}
|
||||||
@@ -371,7 +376,7 @@ static void
|
|||||||
kick_event(struct serialqueue *sq, double eventtime)
|
kick_event(struct serialqueue *sq, double eventtime)
|
||||||
{
|
{
|
||||||
char dummy[4096];
|
char dummy[4096];
|
||||||
int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy));
|
int ret = read(sq->transmit_requests.pipe_fds[0], dummy, sizeof(dummy));
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
report_errno("pipe read", ret);
|
report_errno("pipe read", ret);
|
||||||
pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW);
|
pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW);
|
||||||
@@ -549,7 +554,9 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime)
|
|||||||
uint64_t ack_clock = clock_from_time(&sq->ce, idletime);
|
uint64_t ack_clock = clock_from_time(&sq->ce, idletime);
|
||||||
uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK;
|
uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK;
|
||||||
struct command_queue *cq, *_ncq;
|
struct command_queue *cq, *_ncq;
|
||||||
list_for_each_entry_safe(cq, _ncq, &sq->upcoming_queues, upcoming.node) {
|
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||||
|
list_for_each_entry_safe(cq, _ncq, &sq->transmit_requests.upcoming_queues,
|
||||||
|
upcoming.node) {
|
||||||
int not_in_ready_queues = list_empty(&cq->ready.msg_queue);
|
int not_in_ready_queues = list_empty(&cq->ready.msg_queue);
|
||||||
// Move messages from the upcoming.msg_queue to the ready.msg_queue
|
// Move messages from the upcoming.msg_queue to the ready.msg_queue
|
||||||
while (!list_empty(&cq->upcoming.msg_queue)) {
|
while (!list_empty(&cq->upcoming.msg_queue)) {
|
||||||
@@ -562,7 +569,7 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime)
|
|||||||
}
|
}
|
||||||
list_del(&qm->node);
|
list_del(&qm->node);
|
||||||
list_add_tail(&qm->node, &cq->ready.msg_queue);
|
list_add_tail(&qm->node, &cq->ready.msg_queue);
|
||||||
sq->upcoming_bytes -= qm->len;
|
sq->transmit_requests.upcoming_bytes -= qm->len;
|
||||||
sq->ready_bytes += qm->len;
|
sq->ready_bytes += qm->len;
|
||||||
}
|
}
|
||||||
// Remove cq from the list if it is now empty
|
// Remove cq from the list if it is now empty
|
||||||
@@ -588,21 +595,26 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime)
|
|||||||
|
|
||||||
// Check for messages to send
|
// Check for messages to send
|
||||||
if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX)
|
if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX)
|
||||||
return PR_NOW;
|
goto now;
|
||||||
if (! sq->ce.est_freq) {
|
if (! sq->ce.est_freq) {
|
||||||
if (sq->ready_bytes)
|
if (sq->ready_bytes)
|
||||||
return PR_NOW;
|
goto now;
|
||||||
sq->need_kick_clock = MAX_CLOCK;
|
sq->transmit_requests.need_kick_clock = MAX_CLOCK;
|
||||||
|
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||||
return PR_NEVER;
|
return PR_NEVER;
|
||||||
}
|
}
|
||||||
uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->ce.est_freq;
|
uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->ce.est_freq;
|
||||||
if (min_ready_clock <= ack_clock + reqclock_delta)
|
if (min_ready_clock <= ack_clock + reqclock_delta)
|
||||||
return PR_NOW;
|
goto now;
|
||||||
uint64_t wantclock = min_ready_clock - reqclock_delta;
|
uint64_t wantclock = min_ready_clock - reqclock_delta;
|
||||||
if (min_stalled_clock < wantclock)
|
if (min_stalled_clock < wantclock)
|
||||||
wantclock = min_stalled_clock;
|
wantclock = min_stalled_clock;
|
||||||
sq->need_kick_clock = wantclock;
|
sq->transmit_requests.need_kick_clock = wantclock;
|
||||||
|
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||||
return idletime + (wantclock - ack_clock) / sq->ce.est_freq;
|
return idletime + (wantclock - ack_clock) / sq->ce.est_freq;
|
||||||
|
now:
|
||||||
|
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||||
|
return PR_NOW;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Callback timer to send data to the serial port
|
// Callback timer to send data to the serial port
|
||||||
@@ -662,7 +674,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
|
|||||||
strncpy(sq->name, name, sizeof(sq->name));
|
strncpy(sq->name, name, sizeof(sq->name));
|
||||||
sq->name[sizeof(sq->name)-1] = '\0';
|
sq->name[sizeof(sq->name)-1] = '\0';
|
||||||
|
|
||||||
int ret = pipe(sq->pipe_fds);
|
int ret = pipe(sq->transmit_requests.pipe_fds);
|
||||||
if (ret)
|
if (ret)
|
||||||
goto fail;
|
goto fail;
|
||||||
|
|
||||||
@@ -670,12 +682,13 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
|
|||||||
sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq);
|
sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq);
|
||||||
pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event
|
pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event
|
||||||
, serial_fd_type==SQT_DEBUGFILE);
|
, serial_fd_type==SQT_DEBUGFILE);
|
||||||
pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0);
|
pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->transmit_requests.pipe_fds[0]
|
||||||
|
, kick_event, 0);
|
||||||
pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event);
|
pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event);
|
||||||
pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event);
|
pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event);
|
||||||
fd_set_non_blocking(serial_fd);
|
fd_set_non_blocking(serial_fd);
|
||||||
fd_set_non_blocking(sq->pipe_fds[0]);
|
fd_set_non_blocking(sq->transmit_requests.pipe_fds[0]);
|
||||||
fd_set_non_blocking(sq->pipe_fds[1]);
|
fd_set_non_blocking(sq->transmit_requests.pipe_fds[1]);
|
||||||
|
|
||||||
// Retransmit setup
|
// Retransmit setup
|
||||||
sq->send_seq = 1;
|
sq->send_seq = 1;
|
||||||
@@ -689,8 +702,9 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Queues
|
// Queues
|
||||||
sq->need_kick_clock = MAX_CLOCK;
|
sq->transmit_requests.need_kick_clock = MAX_CLOCK;
|
||||||
list_init(&sq->upcoming_queues);
|
list_init(&sq->transmit_requests.upcoming_queues);
|
||||||
|
pthread_mutex_init(&sq->transmit_requests.lock, NULL);
|
||||||
list_init(&sq->ready_queues);
|
list_init(&sq->ready_queues);
|
||||||
list_init(&sq->sent_queue);
|
list_init(&sq->sent_queue);
|
||||||
list_init(&sq->receiver.queue);
|
list_init(&sq->receiver.queue);
|
||||||
@@ -760,12 +774,15 @@ serialqueue_free(struct serialqueue *sq)
|
|||||||
list_del(&cq->ready.node);
|
list_del(&cq->ready.node);
|
||||||
message_queue_free(&cq->ready.msg_queue);
|
message_queue_free(&cq->ready.msg_queue);
|
||||||
}
|
}
|
||||||
while (!list_empty(&sq->upcoming_queues)) {
|
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||||
|
while (!list_empty(&sq->transmit_requests.upcoming_queues)) {
|
||||||
struct command_queue *cq = list_first_entry(
|
struct command_queue *cq = list_first_entry(
|
||||||
&sq->upcoming_queues, struct command_queue, upcoming.node);
|
&sq->transmit_requests.upcoming_queues,
|
||||||
|
struct command_queue, upcoming.node);
|
||||||
list_del(&cq->upcoming.node);
|
list_del(&cq->upcoming.node);
|
||||||
message_queue_free(&cq->upcoming.msg_queue);
|
message_queue_free(&cq->upcoming.msg_queue);
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||||
pthread_mutex_unlock(&sq->lock);
|
pthread_mutex_unlock(&sq->lock);
|
||||||
pollreactor_free(sq->pr);
|
pollreactor_free(sq->pr);
|
||||||
free(sq);
|
free(sq);
|
||||||
@@ -836,17 +853,19 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
|
|||||||
qm = list_first_entry(msgs, struct queue_message, node);
|
qm = list_first_entry(msgs, struct queue_message, node);
|
||||||
|
|
||||||
// Add list to cq->upcoming_queue
|
// Add list to cq->upcoming_queue
|
||||||
pthread_mutex_lock(&sq->lock);
|
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||||
if (list_empty(&cq->upcoming.msg_queue))
|
if (list_empty(&cq->upcoming.msg_queue))
|
||||||
list_add_tail(&cq->upcoming.node, &sq->upcoming_queues);
|
list_add_tail(&cq->upcoming.node,
|
||||||
|
&sq->transmit_requests.upcoming_queues);
|
||||||
list_join_tail(msgs, &cq->upcoming.msg_queue);
|
list_join_tail(msgs, &cq->upcoming.msg_queue);
|
||||||
sq->upcoming_bytes += len;
|
sq->transmit_requests.upcoming_bytes += len;
|
||||||
|
|
||||||
int mustwake = 0;
|
int mustwake = 0;
|
||||||
if (qm->min_clock < sq->need_kick_clock) {
|
if (qm->min_clock < sq->transmit_requests.need_kick_clock) {
|
||||||
sq->need_kick_clock = 0;
|
sq->transmit_requests.need_kick_clock = 0;
|
||||||
mustwake = 1;
|
mustwake = 1;
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&sq->lock);
|
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||||
|
|
||||||
// Wake the background thread if necessary
|
// Wake the background thread if necessary
|
||||||
if (mustwake)
|
if (mustwake)
|
||||||
@@ -965,7 +984,9 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len)
|
|||||||
{
|
{
|
||||||
struct serialqueue stats;
|
struct serialqueue stats;
|
||||||
pthread_mutex_lock(&sq->lock);
|
pthread_mutex_lock(&sq->lock);
|
||||||
|
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||||
memcpy(&stats, sq, sizeof(stats));
|
memcpy(&stats, sq, sizeof(stats));
|
||||||
|
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||||
pthread_mutex_unlock(&sq->lock);
|
pthread_mutex_unlock(&sq->lock);
|
||||||
|
|
||||||
snprintf(buf, len, "bytes_write=%u bytes_read=%u"
|
snprintf(buf, len, "bytes_write=%u bytes_read=%u"
|
||||||
@@ -978,7 +999,7 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len)
|
|||||||
, (int)stats.send_seq, (int)stats.receive_seq
|
, (int)stats.send_seq, (int)stats.receive_seq
|
||||||
, (int)stats.retransmit_seq
|
, (int)stats.retransmit_seq
|
||||||
, stats.srtt, stats.rttvar, stats.rto
|
, stats.srtt, stats.rttvar, stats.rto
|
||||||
, stats.ready_bytes, stats.upcoming_bytes);
|
, stats.ready_bytes, stats.transmit_requests.upcoming_bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract old messages stored in the debug queues
|
// Extract old messages stored in the debug queues
|
||||||
|
|||||||
Reference in New Issue
Block a user