serialqueue: Rework check_wake_receive() to receive_append_wake()

Rename the function and perform list appending and locking in that
function.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2025-11-04 11:45:54 -05:00
parent 459e77c4f9
commit 35dffc1516

View File

@@ -143,14 +143,20 @@ debug_queue_add(struct list_head *root, struct queue_message *qm)
message_free(old); message_free(old);
} }
// Wake up the receiver thread if it is waiting // Add messages and wake up the receiver thread if it is waiting
static void static void
check_wake_receive(struct receiver *receiver) receive_append_wake(struct receiver *receiver, struct list_head *msgs)
{ {
int dokick = 0;
pthread_mutex_lock(&receiver->lock);
list_join_tail(msgs, &receiver->queue);
if (receiver->waiting) { if (receiver->waiting) {
receiver->waiting = 0; receiver->waiting = 0;
pthread_cond_signal(&receiver->cond); dokick = 1;
} }
pthread_mutex_unlock(&receiver->lock);
if (dokick)
pthread_cond_signal(&receiver->cond);
} }
// Write to the internal pipe to wake the background thread if in poll // Write to the internal pipe to wake the background thread if in poll
@@ -297,12 +303,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
list_add_tail(&qm->node, &received); list_add_tail(&qm->node, &received);
} }
if (!list_empty(&received)) { if (!list_empty(&received))
pthread_mutex_lock(&sq->receiver.lock); receive_append_wake(&sq->receiver, &received);
list_join_tail(&received, &sq->receiver.queue);
check_wake_receive(&sq->receiver);
pthread_mutex_unlock(&sq->receiver.lock);
}
// Check fast readers // Check fast readers
struct fastreader *fr; struct fastreader *fr;
@@ -693,9 +695,10 @@ background_thread(void *data)
set_thread_name(sq->name); set_thread_name(sq->name);
pollreactor_run(sq->pr); pollreactor_run(sq->pr);
pthread_mutex_lock(&sq->receiver.lock); // Wake any waiting receivers
check_wake_receive(&sq->receiver); struct list_head dummy;
pthread_mutex_unlock(&sq->receiver.lock); list_init(&dummy);
receive_append_wake(&sq->receiver, &dummy);
return NULL; return NULL;
} }