serialqueue: decouple pending & ready queues

Simply describe how the cmdqueue is moved between states.

Signed-off-by: Timofey Titovets <nefelim4ag@gmail.com>
This commit is contained in:
Timofey Titovets
2025-09-22 18:19:55 +02:00
committed by KevinOConnor
parent 7a036a6ba7
commit d7da45e152

View File

@@ -29,11 +29,15 @@
#include "pyhelper.h" // get_monotonic
#include "serialqueue.h" // struct queue_message
struct command_queue {
struct list_head upcoming_queue, ready_queue;
struct message_sub_queue {
struct list_head msg_queue;
struct list_node node;
};
struct command_queue {
struct message_sub_queue ready, upcoming;
};
struct serialqueue {
// Input reading
struct pollreactor *pr;
@@ -59,8 +63,10 @@ struct serialqueue {
struct list_head sent_queue;
double srtt, rttvar, rto;
// Pending transmission message queues
struct list_head pending_queues;
int ready_bytes, upcoming_bytes, need_ack_bytes, last_ack_bytes;
struct list_head upcoming_queues;
int upcoming_bytes;
struct list_head ready_queues;
int ready_bytes, need_ack_bytes, last_ack_bytes;
uint64_t need_kick_clock;
struct list_head notify_queue;
double last_write_fail_time;
@@ -452,23 +458,21 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, int pending
uint64_t min_clock = MAX_CLOCK;
struct command_queue *q, *cq = NULL;
struct queue_message *qm = NULL;
list_for_each_entry(q, &sq->pending_queues, node) {
if (!list_empty(&q->ready_queue)) {
struct queue_message *m = list_first_entry(
&q->ready_queue, struct queue_message, node);
if (m->req_clock < min_clock) {
min_clock = m->req_clock;
cq = q;
qm = m;
}
list_for_each_entry(q, &sq->ready_queues, ready.node) {
struct queue_message *m = list_first_entry(
&q->ready.msg_queue, struct queue_message, node);
if (m->req_clock < min_clock) {
min_clock = m->req_clock;
cq = q;
qm = m;
}
}
// Append message to outgoing command
if (len + qm->len > MESSAGE_MAX - MESSAGE_TRAILER_SIZE)
break;
list_del(&qm->node);
if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue))
list_del(&cq->node);
if (list_empty(&cq->ready.msg_queue))
list_del(&cq->ready.node);
memcpy(&buf[len], qm->msg, qm->len);
len += qm->len;
sq->ready_bytes -= qm->len;
@@ -530,34 +534,42 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime)
idletime += calculate_bittime(sq, pending + MESSAGE_MIN);
uint64_t ack_clock = clock_from_time(&sq->ce, idletime);
uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK;
struct command_queue *cq;
list_for_each_entry(cq, &sq->pending_queues, node) {
// Move messages from the upcoming_queue to the ready_queue
while (!list_empty(&cq->upcoming_queue)) {
struct command_queue *cq, *_ncq;
list_for_each_entry_safe(cq, _ncq, &sq->upcoming_queues, upcoming.node) {
int not_in_ready_queues = list_empty(&cq->ready.msg_queue);
// Move messages from the upcoming.msg_queue to the ready.msg_queue
while (!list_empty(&cq->upcoming.msg_queue)) {
struct queue_message *qm = list_first_entry(
&cq->upcoming_queue, struct queue_message, node);
&cq->upcoming.msg_queue, struct queue_message, node);
if (ack_clock < qm->min_clock) {
if (qm->min_clock < min_stalled_clock)
min_stalled_clock = qm->min_clock;
break;
}
list_del(&qm->node);
list_add_tail(&qm->node, &cq->ready_queue);
list_add_tail(&qm->node, &cq->ready.msg_queue);
sq->upcoming_bytes -= qm->len;
sq->ready_bytes += qm->len;
}
// Remove cq from the list if it is now empty
if (list_empty(&cq->upcoming.msg_queue))
list_del(&cq->upcoming.node);
// Add to ready queues
if (not_in_ready_queues && !list_empty(&cq->ready.msg_queue))
list_add_tail(&cq->ready.node, &sq->ready_queues);
}
// Check if it is still needed to send messages from the ready_queues
list_for_each_entry(cq, &sq->ready_queues, ready.node) {
// Update min_ready_clock
if (!list_empty(&cq->ready_queue)) {
struct queue_message *qm = list_first_entry(
&cq->ready_queue, struct queue_message, node);
uint64_t req_clock = qm->req_clock;
double bgtime = pending ? idletime : sq->idle_time;
double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA;
if (req_clock == BACKGROUND_PRIORITY_CLOCK)
req_clock = clock_from_time(&sq->ce, bgtime + bgoffset);
if (req_clock < min_ready_clock)
min_ready_clock = req_clock;
}
struct queue_message *qm = list_first_entry(
&cq->ready.msg_queue, struct queue_message, node);
uint64_t req_clock = qm->req_clock;
double bgtime = pending ? idletime : sq->idle_time;
double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA;
if (req_clock == BACKGROUND_PRIORITY_CLOCK)
req_clock = clock_from_time(&sq->ce, bgtime + bgoffset);
if (req_clock < min_ready_clock)
min_ready_clock = req_clock;
}
// Check for messages to send
@@ -664,7 +676,8 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
// Queues
sq->need_kick_clock = MAX_CLOCK;
list_init(&sq->pending_queues);
list_init(&sq->upcoming_queues);
list_init(&sq->ready_queues);
list_init(&sq->sent_queue);
list_init(&sq->receive_queue);
list_init(&sq->notify_queue);
@@ -722,12 +735,17 @@ serialqueue_free(struct serialqueue *sq)
message_queue_free(&sq->notify_queue);
message_queue_free(&sq->old_sent);
message_queue_free(&sq->old_receive);
while (!list_empty(&sq->pending_queues)) {
while (!list_empty(&sq->ready_queues)) {
struct command_queue* cq = list_first_entry(
&sq->ready_queues, struct command_queue, ready.node);
list_del(&cq->ready.node);
message_queue_free(&cq->ready.msg_queue);
}
while (!list_empty(&sq->upcoming_queues)) {
struct command_queue *cq = list_first_entry(
&sq->pending_queues, struct command_queue, node);
list_del(&cq->node);
message_queue_free(&cq->ready_queue);
message_queue_free(&cq->upcoming_queue);
&sq->upcoming_queues, struct command_queue, upcoming.node);
list_del(&cq->upcoming.node);
message_queue_free(&cq->upcoming.msg_queue);
}
pthread_mutex_unlock(&sq->lock);
pollreactor_free(sq->pr);
@@ -740,8 +758,8 @@ serialqueue_alloc_commandqueue(void)
{
struct command_queue *cq = malloc(sizeof(*cq));
memset(cq, 0, sizeof(*cq));
list_init(&cq->ready_queue);
list_init(&cq->upcoming_queue);
list_init(&cq->ready.msg_queue);
list_init(&cq->upcoming.msg_queue);
return cq;
}
@@ -751,7 +769,8 @@ serialqueue_free_commandqueue(struct command_queue *cq)
{
if (!cq)
return;
if (!list_empty(&cq->ready_queue) || !list_empty(&cq->upcoming_queue)) {
if (!list_empty(&cq->ready.msg_queue) ||
!list_empty(&cq->upcoming.msg_queue)) {
errorf("Memory leak! Can't free non-empty commandqueue");
return;
}
@@ -799,9 +818,9 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
// Add list to cq->upcoming_queue
pthread_mutex_lock(&sq->lock);
if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue))
list_add_tail(&cq->node, &sq->pending_queues);
list_join_tail(msgs, &cq->upcoming_queue);
if (list_empty(&cq->upcoming.msg_queue))
list_add_tail(&cq->upcoming.node, &sq->upcoming_queues);
list_join_tail(msgs, &cq->upcoming.msg_queue);
sq->upcoming_bytes += len;
int mustwake = 0;
if (qm->min_clock < sq->need_kick_clock) {