serialqueue: decouple serialhdl receive lock

Signed-off-by: Timofey Titovets <nefelim4ag@gmail.com>
This commit is contained in:
Timofey Titovets
2025-09-19 01:28:14 +02:00
committed by KevinOConnor
parent d7da45e152
commit 493271697f

View File

@@ -38,6 +38,14 @@ struct command_queue {
struct message_sub_queue ready, upcoming;
};
struct receiver {
pthread_mutex_t lock;
pthread_cond_t cond;
int waiting;
struct list_head queue;
struct list_head old_receive;
};
struct serialqueue {
// Input reading
struct pollreactor *pr;
@@ -49,9 +57,9 @@ struct serialqueue {
// Threading
char name[16];
pthread_t tid;
// SerialHDL reader
struct receiver receiver;
pthread_mutex_t lock; // protects variables below
pthread_cond_t cond;
int receive_waiting;
// Baud / clock tracking
int receive_window;
double bittime_adjust, idle_time;
@@ -70,13 +78,11 @@ struct serialqueue {
uint64_t need_kick_clock;
struct list_head notify_queue;
double last_write_fail_time;
// Received messages
struct list_head receive_queue;
// Fastreader support
pthread_mutex_t fast_reader_dispatch_lock;
struct list_head fast_readers;
// Debugging
struct list_head old_sent, old_receive;
struct list_head old_sent;
// Stats
uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid;
};
@@ -115,23 +121,30 @@ debug_queue_alloc(struct list_head *root, int count)
}
// Copy a message to a debug queue and free old debug messages
static void
debug_queue_add(struct list_head *root, struct queue_message *qm)
static struct queue_message *
_debug_queue_add(struct list_head *root, struct queue_message *qm)
{
list_add_tail(&qm->node, root);
struct queue_message *old = list_first_entry(
root, struct queue_message, node);
list_del(&old->node);
return old;
}
static void
debug_queue_add(struct list_head *root, struct queue_message *qm)
{
struct queue_message *old = _debug_queue_add(root, qm);
message_free(old);
}
// Wake up the receiver thread if it is waiting
static void
check_wake_receive(struct serialqueue *sq)
check_wake_receive(struct receiver *receiver)
{
if (sq->receive_waiting) {
sq->receive_waiting = 0;
pthread_cond_signal(&sq->cond);
if (receiver->waiting) {
receiver->waiting = 0;
pthread_cond_signal(&receiver->cond);
}
}
@@ -245,7 +258,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
sq->bytes_read += len;
// Check for pending messages on notify_queue
int must_wake = 0;
struct list_head received;
list_init(&received);
while (!list_empty(&sq->notify_queue)) {
struct queue_message *qm = list_first_entry(
&sq->notify_queue, struct queue_message, node);
@@ -257,8 +271,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
qm->len = 0;
qm->sent_time = sq->last_receive_sent_time;
qm->receive_time = eventtime;
list_add_tail(&qm->node, &sq->receive_queue);
must_wake = 1;
list_add_tail(&qm->node, &received);
}
// Process message
@@ -276,8 +289,14 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
? sq->last_receive_sent_time : 0.);
qm->receive_time = get_monotonic(); // must be time post read()
qm->receive_time -= calculate_bittime(sq, len);
list_add_tail(&qm->node, &sq->receive_queue);
must_wake = 1;
list_add_tail(&qm->node, &received);
}
if (!list_empty(&received)) {
pthread_mutex_lock(&sq->receiver.lock);
list_join_tail(&received, &sq->receiver.queue);
check_wake_receive(&sq->receiver);
pthread_mutex_unlock(&sq->receiver.lock);
}
// Check fast readers
@@ -289,16 +308,11 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
continue;
// Release main lock and invoke callback
pthread_mutex_lock(&sq->fast_reader_dispatch_lock);
if (must_wake)
check_wake_receive(sq);
pthread_mutex_unlock(&sq->lock);
fr->func(fr, sq->input_buf, len);
pthread_mutex_unlock(&sq->fast_reader_dispatch_lock);
return;
}
if (must_wake)
check_wake_receive(sq);
pthread_mutex_unlock(&sq->lock);
}
@@ -628,9 +642,9 @@ background_thread(void *data)
set_thread_name(sq->name);
pollreactor_run(sq->pr);
pthread_mutex_lock(&sq->lock);
check_wake_receive(sq);
pthread_mutex_unlock(&sq->lock);
pthread_mutex_lock(&sq->receiver.lock);
check_wake_receive(&sq->receiver);
pthread_mutex_unlock(&sq->receiver.lock);
return NULL;
}
@@ -679,21 +693,24 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
list_init(&sq->upcoming_queues);
list_init(&sq->ready_queues);
list_init(&sq->sent_queue);
list_init(&sq->receive_queue);
list_init(&sq->receiver.queue);
list_init(&sq->notify_queue);
list_init(&sq->fast_readers);
// Debugging
list_init(&sq->old_sent);
list_init(&sq->old_receive);
list_init(&sq->receiver.old_receive);
debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT);
debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE);
debug_queue_alloc(&sq->receiver.old_receive, DEBUG_QUEUE_RECEIVE);
// Thread setup
ret = pthread_mutex_init(&sq->lock, NULL);
if (ret)
goto fail;
ret = pthread_cond_init(&sq->cond, NULL);
ret = pthread_mutex_init(&sq->receiver.lock, NULL);
if (ret)
goto fail;
ret = pthread_cond_init(&sq->receiver.cond, NULL);
if (ret)
goto fail;
ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL);
@@ -731,10 +748,12 @@ serialqueue_free(struct serialqueue *sq)
serialqueue_exit(sq);
pthread_mutex_lock(&sq->lock);
message_queue_free(&sq->sent_queue);
message_queue_free(&sq->receive_queue);
pthread_mutex_lock(&sq->receiver.lock);
message_queue_free(&sq->receiver.queue);
message_queue_free(&sq->receiver.old_receive);
pthread_mutex_unlock(&sq->receiver.lock);
message_queue_free(&sq->notify_queue);
message_queue_free(&sq->old_sent);
message_queue_free(&sq->old_receive);
while (!list_empty(&sq->ready_queues)) {
struct command_queue* cq = list_first_entry(
&sq->ready_queues, struct command_queue, ready.node);
@@ -864,20 +883,21 @@ serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg
void __visible
serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
{
pthread_mutex_lock(&sq->lock);
struct receiver *receiver = &sq->receiver;
pthread_mutex_lock(&receiver->lock);
// Wait for message to be available
while (list_empty(&sq->receive_queue)) {
while (list_empty(&receiver->queue)) {
if (pollreactor_is_exit(sq->pr))
goto exit;
sq->receive_waiting = 1;
int ret = pthread_cond_wait(&sq->cond, &sq->lock);
receiver->waiting = 1;
int ret = pthread_cond_wait(&receiver->cond, &receiver->lock);
if (ret)
report_errno("pthread_cond_wait", ret);
}
// Remove message from queue
struct queue_message *qm = list_first_entry(
&sq->receive_queue, struct queue_message, node);
&receiver->queue, struct queue_message, node);
list_del(&qm->node);
// Copy message
@@ -887,16 +907,14 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
pqm->receive_time = qm->receive_time;
pqm->notify_id = qm->notify_id;
if (qm->len)
debug_queue_add(&sq->old_receive, qm);
else
message_free(qm);
pthread_mutex_unlock(&sq->lock);
qm = _debug_queue_add(&receiver->old_receive, qm);
pthread_mutex_unlock(&receiver->lock);
message_free(qm);
return;
exit:
pqm->len = -1;
pthread_mutex_unlock(&sq->lock);
pthread_mutex_unlock(&receiver->lock);
}
void __visible
@@ -969,18 +987,27 @@ serialqueue_extract_old(struct serialqueue *sq, int sentq
, struct pull_queue_message *q, int max)
{
int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE;
struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive;
struct list_head *rootp;
rootp = sentq ? &sq->old_sent : &sq->receiver.old_receive;
struct list_head replacement, current;
list_init(&replacement);
debug_queue_alloc(&replacement, count);
list_init(&current);
// Atomically replace existing debug list with new zero'd list
pthread_mutex_lock(&sq->lock);
list_join_tail(rootp, &current);
list_init(rootp);
list_join_tail(&replacement, rootp);
pthread_mutex_unlock(&sq->lock);
if (rootp == &sq->receiver.old_receive) {
pthread_mutex_lock(&sq->receiver.lock);
list_join_tail(rootp, &current);
list_init(rootp);
list_join_tail(&replacement, rootp);
pthread_mutex_unlock(&sq->receiver.lock);
} else {
pthread_mutex_lock(&sq->lock);
list_join_tail(rootp, &current);
list_init(rootp);
list_join_tail(&replacement, rootp);
pthread_mutex_unlock(&sq->lock);
}
// Walk the debug list
int pos = 0;