From 46af133613e6bc67605f93d7969b3be1b1ee8d12 Mon Sep 17 00:00:00 2001 From: Timofey Titovets Date: Sat, 20 Sep 2025 02:42:40 +0200 Subject: [PATCH] serialqueue: decouple transmit requests Signed-off-by: Timofey Titovets --- klippy/chelper/serialqueue.c | 77 +++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 59e799fb3..0bdda0d01 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -46,11 +46,18 @@ struct receiver { struct list_head old_receive; }; +struct transmit_requests { + int pipe_fds[2]; + pthread_mutex_t lock; // protects variables below + struct list_head upcoming_queues; + int upcoming_bytes; + uint64_t need_kick_clock; +}; + struct serialqueue { // Input reading struct pollreactor *pr; int serial_fd, serial_fd_type, client_id; - int pipe_fds[2]; uint8_t input_buf[4096]; uint8_t need_sync; int input_pos; @@ -71,11 +78,9 @@ struct serialqueue { struct list_head sent_queue; double srtt, rttvar, rto; // Pending transmission message queues - struct list_head upcoming_queues; - int upcoming_bytes; + struct transmit_requests transmit_requests; struct list_head ready_queues; int ready_bytes, need_ack_bytes, last_ack_bytes; - uint64_t need_kick_clock; struct list_head notify_queue; double last_write_fail_time; // Fastreader support @@ -152,7 +157,7 @@ check_wake_receive(struct receiver *receiver) static void kick_bg_thread(struct serialqueue *sq) { - int ret = write(sq->pipe_fds[1], ".", 1); + int ret = write(sq->transmit_requests.pipe_fds[1], ".", 1); if (ret < 0) report_errno("pipe write", ret); } @@ -371,7 +376,7 @@ static void kick_event(struct serialqueue *sq, double eventtime) { char dummy[4096]; - int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy)); + int ret = read(sq->transmit_requests.pipe_fds[0], dummy, sizeof(dummy)); if (ret < 0) report_errno("pipe read", ret); pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW); @@ -549,7 +554,9 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) uint64_t ack_clock = clock_from_time(&sq->ce, idletime); uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK; struct command_queue *cq, *_ncq; - list_for_each_entry_safe(cq, _ncq, &sq->upcoming_queues, upcoming.node) { + pthread_mutex_lock(&sq->transmit_requests.lock); + list_for_each_entry_safe(cq, _ncq, &sq->transmit_requests.upcoming_queues, + upcoming.node) { int not_in_ready_queues = list_empty(&cq->ready.msg_queue); // Move messages from the upcoming.msg_queue to the ready.msg_queue struct queue_message *qm, *_nqm; @@ -561,7 +568,7 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) } list_del(&qm->node); list_add_tail(&qm->node, &cq->ready.msg_queue); - sq->upcoming_bytes -= qm->len; + sq->transmit_requests.upcoming_bytes -= qm->len; sq->ready_bytes += qm->len; } // Remove cq from the list if it is now empty @@ -587,21 +594,26 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) // Check for messages to send if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX) - return PR_NOW; + goto now; if (! sq->ce.est_freq) { if (sq->ready_bytes) - return PR_NOW; - sq->need_kick_clock = MAX_CLOCK; + goto now; + sq->transmit_requests.need_kick_clock = MAX_CLOCK; + pthread_mutex_unlock(&sq->transmit_requests.lock); return PR_NEVER; } uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->ce.est_freq; if (min_ready_clock <= ack_clock + reqclock_delta) - return PR_NOW; + goto now; uint64_t wantclock = min_ready_clock - reqclock_delta; if (min_stalled_clock < wantclock) wantclock = min_stalled_clock; - sq->need_kick_clock = wantclock; + sq->transmit_requests.need_kick_clock = wantclock; + pthread_mutex_unlock(&sq->transmit_requests.lock); return idletime + (wantclock - ack_clock) / sq->ce.est_freq; +now: + pthread_mutex_unlock(&sq->transmit_requests.lock); + return PR_NOW; } // Callback timer to send data to the serial port @@ -661,7 +673,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id strncpy(sq->name, name, sizeof(sq->name)); sq->name[sizeof(sq->name)-1] = '\0'; - int ret = pipe(sq->pipe_fds); + int ret = pipe(sq->transmit_requests.pipe_fds); if (ret) goto fail; @@ -669,12 +681,13 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq); pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event , serial_fd_type==SQT_DEBUGFILE); - pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0); + pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->transmit_requests.pipe_fds[0] + , kick_event, 0); pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event); pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event); fd_set_non_blocking(serial_fd); - fd_set_non_blocking(sq->pipe_fds[0]); - fd_set_non_blocking(sq->pipe_fds[1]); + fd_set_non_blocking(sq->transmit_requests.pipe_fds[0]); + fd_set_non_blocking(sq->transmit_requests.pipe_fds[1]); // Retransmit setup sq->send_seq = 1; @@ -688,8 +701,9 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id } // Queues - sq->need_kick_clock = MAX_CLOCK; - list_init(&sq->upcoming_queues); + sq->transmit_requests.need_kick_clock = MAX_CLOCK; + list_init(&sq->transmit_requests.upcoming_queues); + pthread_mutex_init(&sq->transmit_requests.lock, NULL); list_init(&sq->ready_queues); list_init(&sq->sent_queue); list_init(&sq->receiver.queue); @@ -759,12 +773,15 @@ serialqueue_free(struct serialqueue *sq) list_del(&cq->ready.node); message_queue_free(&cq->ready.msg_queue); } - while (!list_empty(&sq->upcoming_queues)) { + pthread_mutex_lock(&sq->transmit_requests.lock); + while (!list_empty(&sq->transmit_requests.upcoming_queues)) { struct command_queue *cq = list_first_entry( - &sq->upcoming_queues, struct command_queue, upcoming.node); + &sq->transmit_requests.upcoming_queues, + struct command_queue, upcoming.node); list_del(&cq->upcoming.node); message_queue_free(&cq->upcoming.msg_queue); } + pthread_mutex_unlock(&sq->transmit_requests.lock); pthread_mutex_unlock(&sq->lock); pollreactor_free(sq->pr); free(sq); @@ -835,17 +852,19 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq qm = list_first_entry(msgs, struct queue_message, node); // Add list to cq->upcoming_queue - pthread_mutex_lock(&sq->lock); + pthread_mutex_lock(&sq->transmit_requests.lock); if (list_empty(&cq->upcoming.msg_queue)) - list_add_tail(&cq->upcoming.node, &sq->upcoming_queues); + list_add_tail(&cq->upcoming.node, + &sq->transmit_requests.upcoming_queues); list_join_tail(msgs, &cq->upcoming.msg_queue); - sq->upcoming_bytes += len; + sq->transmit_requests.upcoming_bytes += len; + int mustwake = 0; - if (qm->min_clock < sq->need_kick_clock) { - sq->need_kick_clock = 0; + if (qm->min_clock < sq->transmit_requests.need_kick_clock) { + sq->transmit_requests.need_kick_clock = 0; mustwake = 1; } - pthread_mutex_unlock(&sq->lock); + pthread_mutex_unlock(&sq->transmit_requests.lock); // Wake the background thread if necessary if (mustwake) @@ -964,7 +983,9 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) { struct serialqueue stats; pthread_mutex_lock(&sq->lock); + pthread_mutex_lock(&sq->transmit_requests.lock); memcpy(&stats, sq, sizeof(stats)); + pthread_mutex_unlock(&sq->transmit_requests.lock); pthread_mutex_unlock(&sq->lock); snprintf(buf, len, "bytes_write=%u bytes_read=%u" @@ -977,7 +998,7 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) , (int)stats.send_seq, (int)stats.receive_seq , (int)stats.retransmit_seq , stats.srtt, stats.rttvar, stats.rto - , stats.ready_bytes, stats.upcoming_bytes); + , stats.ready_bytes, stats.transmit_requests.upcoming_bytes); } // Extract old messages stored in the debug queues