mirror of
https://github.com/Klipper3d/klipper.git
synced 2025-10-26 07:46:11 +01:00
serialqueue: Revert recent serialqueue locking changes
This reverts commitaea8d8e0a1. This reverts commit493271697f. This reverts commitd7da45e152. There are reports of a regression since making this change. Revert for now until the root cause can be found. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
@@ -29,44 +29,25 @@
|
||||
#include "pyhelper.h" // get_monotonic
|
||||
#include "serialqueue.h" // struct queue_message
|
||||
|
||||
struct message_sub_queue {
|
||||
struct list_head msg_queue;
|
||||
struct list_node node;
|
||||
};
|
||||
|
||||
struct command_queue {
|
||||
struct message_sub_queue ready, upcoming;
|
||||
};
|
||||
|
||||
struct receiver {
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t cond;
|
||||
int waiting;
|
||||
struct list_head queue;
|
||||
struct list_head old_receive;
|
||||
};
|
||||
|
||||
struct transmit_requests {
|
||||
int pipe_fds[2];
|
||||
pthread_mutex_t lock; // protects variables below
|
||||
struct list_head upcoming_queues;
|
||||
int upcoming_bytes;
|
||||
uint64_t need_kick_clock;
|
||||
struct list_head upcoming_queue, ready_queue;
|
||||
struct list_node node;
|
||||
};
|
||||
|
||||
struct serialqueue {
|
||||
// Input reading
|
||||
struct pollreactor *pr;
|
||||
int serial_fd, serial_fd_type, client_id;
|
||||
int pipe_fds[2];
|
||||
uint8_t input_buf[4096];
|
||||
uint8_t need_sync;
|
||||
int input_pos;
|
||||
// Threading
|
||||
char name[16];
|
||||
pthread_t tid;
|
||||
// SerialHDL reader
|
||||
struct receiver receiver;
|
||||
pthread_mutex_t lock; // protects variables below
|
||||
pthread_cond_t cond;
|
||||
int receive_waiting;
|
||||
// Baud / clock tracking
|
||||
int receive_window;
|
||||
double bittime_adjust, idle_time;
|
||||
@@ -78,16 +59,18 @@ struct serialqueue {
|
||||
struct list_head sent_queue;
|
||||
double srtt, rttvar, rto;
|
||||
// Pending transmission message queues
|
||||
struct transmit_requests transmit_requests;
|
||||
struct list_head ready_queues;
|
||||
int ready_bytes, need_ack_bytes, last_ack_bytes;
|
||||
struct list_head pending_queues;
|
||||
int ready_bytes, upcoming_bytes, need_ack_bytes, last_ack_bytes;
|
||||
uint64_t need_kick_clock;
|
||||
struct list_head notify_queue;
|
||||
double last_write_fail_time;
|
||||
// Received messages
|
||||
struct list_head receive_queue;
|
||||
// Fastreader support
|
||||
pthread_mutex_t fast_reader_dispatch_lock;
|
||||
struct list_head fast_readers;
|
||||
// Debugging
|
||||
struct list_head old_sent;
|
||||
struct list_head old_sent, old_receive;
|
||||
// Stats
|
||||
uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid;
|
||||
};
|
||||
@@ -126,30 +109,23 @@ debug_queue_alloc(struct list_head *root, int count)
|
||||
}
|
||||
|
||||
// Copy a message to a debug queue and free old debug messages
|
||||
static struct queue_message *
|
||||
_debug_queue_add(struct list_head *root, struct queue_message *qm)
|
||||
static void
|
||||
debug_queue_add(struct list_head *root, struct queue_message *qm)
|
||||
{
|
||||
list_add_tail(&qm->node, root);
|
||||
struct queue_message *old = list_first_entry(
|
||||
root, struct queue_message, node);
|
||||
list_del(&old->node);
|
||||
return old;
|
||||
}
|
||||
|
||||
static void
|
||||
debug_queue_add(struct list_head *root, struct queue_message *qm)
|
||||
{
|
||||
struct queue_message *old = _debug_queue_add(root, qm);
|
||||
message_free(old);
|
||||
}
|
||||
|
||||
// Wake up the receiver thread if it is waiting
|
||||
static void
|
||||
check_wake_receive(struct receiver *receiver)
|
||||
check_wake_receive(struct serialqueue *sq)
|
||||
{
|
||||
if (receiver->waiting) {
|
||||
receiver->waiting = 0;
|
||||
pthread_cond_signal(&receiver->cond);
|
||||
if (sq->receive_waiting) {
|
||||
sq->receive_waiting = 0;
|
||||
pthread_cond_signal(&sq->cond);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +133,7 @@ check_wake_receive(struct receiver *receiver)
|
||||
static void
|
||||
kick_bg_thread(struct serialqueue *sq)
|
||||
{
|
||||
int ret = write(sq->transmit_requests.pipe_fds[1], ".", 1);
|
||||
int ret = write(sq->pipe_fds[1], ".", 1);
|
||||
if (ret < 0)
|
||||
report_errno("pipe write", ret);
|
||||
}
|
||||
@@ -263,8 +239,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
|
||||
sq->bytes_read += len;
|
||||
|
||||
// Check for pending messages on notify_queue
|
||||
struct list_head received;
|
||||
list_init(&received);
|
||||
int must_wake = 0;
|
||||
while (!list_empty(&sq->notify_queue)) {
|
||||
struct queue_message *qm = list_first_entry(
|
||||
&sq->notify_queue, struct queue_message, node);
|
||||
@@ -276,7 +251,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
|
||||
qm->len = 0;
|
||||
qm->sent_time = sq->last_receive_sent_time;
|
||||
qm->receive_time = eventtime;
|
||||
list_add_tail(&qm->node, &received);
|
||||
list_add_tail(&qm->node, &sq->receive_queue);
|
||||
must_wake = 1;
|
||||
}
|
||||
|
||||
// Process message
|
||||
@@ -294,14 +270,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
|
||||
? sq->last_receive_sent_time : 0.);
|
||||
qm->receive_time = get_monotonic(); // must be time post read()
|
||||
qm->receive_time -= calculate_bittime(sq, len);
|
||||
list_add_tail(&qm->node, &received);
|
||||
}
|
||||
|
||||
if (!list_empty(&received)) {
|
||||
pthread_mutex_lock(&sq->receiver.lock);
|
||||
list_join_tail(&received, &sq->receiver.queue);
|
||||
check_wake_receive(&sq->receiver);
|
||||
pthread_mutex_unlock(&sq->receiver.lock);
|
||||
list_add_tail(&qm->node, &sq->receive_queue);
|
||||
must_wake = 1;
|
||||
}
|
||||
|
||||
// Check fast readers
|
||||
@@ -313,11 +283,16 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
|
||||
continue;
|
||||
// Release main lock and invoke callback
|
||||
pthread_mutex_lock(&sq->fast_reader_dispatch_lock);
|
||||
if (must_wake)
|
||||
check_wake_receive(sq);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
fr->func(fr, sq->input_buf, len);
|
||||
pthread_mutex_unlock(&sq->fast_reader_dispatch_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
if (must_wake)
|
||||
check_wake_receive(sq);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
}
|
||||
|
||||
@@ -376,7 +351,7 @@ static void
|
||||
kick_event(struct serialqueue *sq, double eventtime)
|
||||
{
|
||||
char dummy[4096];
|
||||
int ret = read(sq->transmit_requests.pipe_fds[0], dummy, sizeof(dummy));
|
||||
int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy));
|
||||
if (ret < 0)
|
||||
report_errno("pipe read", ret);
|
||||
pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW);
|
||||
@@ -477,21 +452,23 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, int pending
|
||||
uint64_t min_clock = MAX_CLOCK;
|
||||
struct command_queue *q, *cq = NULL;
|
||||
struct queue_message *qm = NULL;
|
||||
list_for_each_entry(q, &sq->ready_queues, ready.node) {
|
||||
struct queue_message *m = list_first_entry(
|
||||
&q->ready.msg_queue, struct queue_message, node);
|
||||
if (m->req_clock < min_clock) {
|
||||
min_clock = m->req_clock;
|
||||
cq = q;
|
||||
qm = m;
|
||||
list_for_each_entry(q, &sq->pending_queues, node) {
|
||||
if (!list_empty(&q->ready_queue)) {
|
||||
struct queue_message *m = list_first_entry(
|
||||
&q->ready_queue, struct queue_message, node);
|
||||
if (m->req_clock < min_clock) {
|
||||
min_clock = m->req_clock;
|
||||
cq = q;
|
||||
qm = m;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Append message to outgoing command
|
||||
if (len + qm->len > MESSAGE_MAX - MESSAGE_TRAILER_SIZE)
|
||||
break;
|
||||
list_del(&qm->node);
|
||||
if (list_empty(&cq->ready.msg_queue))
|
||||
list_del(&cq->ready.node);
|
||||
if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue))
|
||||
list_del(&cq->node);
|
||||
memcpy(&buf[len], qm->msg, qm->len);
|
||||
len += qm->len;
|
||||
sq->ready_bytes -= qm->len;
|
||||
@@ -553,68 +530,53 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime)
|
||||
idletime += calculate_bittime(sq, pending + MESSAGE_MIN);
|
||||
uint64_t ack_clock = clock_from_time(&sq->ce, idletime);
|
||||
uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK;
|
||||
struct command_queue *cq, *_ncq;
|
||||
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||
list_for_each_entry_safe(cq, _ncq, &sq->transmit_requests.upcoming_queues,
|
||||
upcoming.node) {
|
||||
int not_in_ready_queues = list_empty(&cq->ready.msg_queue);
|
||||
// Move messages from the upcoming.msg_queue to the ready.msg_queue
|
||||
while (!list_empty(&cq->upcoming.msg_queue)) {
|
||||
struct command_queue *cq;
|
||||
list_for_each_entry(cq, &sq->pending_queues, node) {
|
||||
// Move messages from the upcoming_queue to the ready_queue
|
||||
while (!list_empty(&cq->upcoming_queue)) {
|
||||
struct queue_message *qm = list_first_entry(
|
||||
&cq->upcoming.msg_queue, struct queue_message, node);
|
||||
&cq->upcoming_queue, struct queue_message, node);
|
||||
if (ack_clock < qm->min_clock) {
|
||||
if (qm->min_clock < min_stalled_clock)
|
||||
min_stalled_clock = qm->min_clock;
|
||||
break;
|
||||
}
|
||||
list_del(&qm->node);
|
||||
list_add_tail(&qm->node, &cq->ready.msg_queue);
|
||||
sq->transmit_requests.upcoming_bytes -= qm->len;
|
||||
list_add_tail(&qm->node, &cq->ready_queue);
|
||||
sq->upcoming_bytes -= qm->len;
|
||||
sq->ready_bytes += qm->len;
|
||||
}
|
||||
// Remove cq from the list if it is now empty
|
||||
if (list_empty(&cq->upcoming.msg_queue))
|
||||
list_del(&cq->upcoming.node);
|
||||
// Add to ready queues
|
||||
if (not_in_ready_queues && !list_empty(&cq->ready.msg_queue))
|
||||
list_add_tail(&cq->ready.node, &sq->ready_queues);
|
||||
}
|
||||
// Check if it is still needed to send messages from the ready_queues
|
||||
list_for_each_entry(cq, &sq->ready_queues, ready.node) {
|
||||
// Update min_ready_clock
|
||||
struct queue_message *qm = list_first_entry(
|
||||
&cq->ready.msg_queue, struct queue_message, node);
|
||||
uint64_t req_clock = qm->req_clock;
|
||||
double bgtime = pending ? idletime : sq->idle_time;
|
||||
double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA;
|
||||
if (req_clock == BACKGROUND_PRIORITY_CLOCK)
|
||||
req_clock = clock_from_time(&sq->ce, bgtime + bgoffset);
|
||||
if (req_clock < min_ready_clock)
|
||||
min_ready_clock = req_clock;
|
||||
if (!list_empty(&cq->ready_queue)) {
|
||||
struct queue_message *qm = list_first_entry(
|
||||
&cq->ready_queue, struct queue_message, node);
|
||||
uint64_t req_clock = qm->req_clock;
|
||||
double bgtime = pending ? idletime : sq->idle_time;
|
||||
double bgoffset = MIN_REQTIME_DELTA + MIN_BACKGROUND_DELTA;
|
||||
if (req_clock == BACKGROUND_PRIORITY_CLOCK)
|
||||
req_clock = clock_from_time(&sq->ce, bgtime + bgoffset);
|
||||
if (req_clock < min_ready_clock)
|
||||
min_ready_clock = req_clock;
|
||||
}
|
||||
}
|
||||
|
||||
// Check for messages to send
|
||||
if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX)
|
||||
goto now;
|
||||
return PR_NOW;
|
||||
if (! sq->ce.est_freq) {
|
||||
if (sq->ready_bytes)
|
||||
goto now;
|
||||
sq->transmit_requests.need_kick_clock = MAX_CLOCK;
|
||||
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||
return PR_NOW;
|
||||
sq->need_kick_clock = MAX_CLOCK;
|
||||
return PR_NEVER;
|
||||
}
|
||||
uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->ce.est_freq;
|
||||
if (min_ready_clock <= ack_clock + reqclock_delta)
|
||||
goto now;
|
||||
return PR_NOW;
|
||||
uint64_t wantclock = min_ready_clock - reqclock_delta;
|
||||
if (min_stalled_clock < wantclock)
|
||||
wantclock = min_stalled_clock;
|
||||
sq->transmit_requests.need_kick_clock = wantclock;
|
||||
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||
sq->need_kick_clock = wantclock;
|
||||
return idletime + (wantclock - ack_clock) / sq->ce.est_freq;
|
||||
now:
|
||||
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||
return PR_NOW;
|
||||
}
|
||||
|
||||
// Callback timer to send data to the serial port
|
||||
@@ -654,9 +616,9 @@ background_thread(void *data)
|
||||
set_thread_name(sq->name);
|
||||
pollreactor_run(sq->pr);
|
||||
|
||||
pthread_mutex_lock(&sq->receiver.lock);
|
||||
check_wake_receive(&sq->receiver);
|
||||
pthread_mutex_unlock(&sq->receiver.lock);
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
check_wake_receive(sq);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
@@ -674,7 +636,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
|
||||
strncpy(sq->name, name, sizeof(sq->name));
|
||||
sq->name[sizeof(sq->name)-1] = '\0';
|
||||
|
||||
int ret = pipe(sq->transmit_requests.pipe_fds);
|
||||
int ret = pipe(sq->pipe_fds);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
@@ -682,13 +644,12 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
|
||||
sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq);
|
||||
pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event
|
||||
, serial_fd_type==SQT_DEBUGFILE);
|
||||
pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->transmit_requests.pipe_fds[0]
|
||||
, kick_event, 0);
|
||||
pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0);
|
||||
pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event);
|
||||
pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event);
|
||||
fd_set_non_blocking(serial_fd);
|
||||
fd_set_non_blocking(sq->transmit_requests.pipe_fds[0]);
|
||||
fd_set_non_blocking(sq->transmit_requests.pipe_fds[1]);
|
||||
fd_set_non_blocking(sq->pipe_fds[0]);
|
||||
fd_set_non_blocking(sq->pipe_fds[1]);
|
||||
|
||||
// Retransmit setup
|
||||
sq->send_seq = 1;
|
||||
@@ -702,29 +663,24 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id
|
||||
}
|
||||
|
||||
// Queues
|
||||
sq->transmit_requests.need_kick_clock = MAX_CLOCK;
|
||||
list_init(&sq->transmit_requests.upcoming_queues);
|
||||
pthread_mutex_init(&sq->transmit_requests.lock, NULL);
|
||||
list_init(&sq->ready_queues);
|
||||
sq->need_kick_clock = MAX_CLOCK;
|
||||
list_init(&sq->pending_queues);
|
||||
list_init(&sq->sent_queue);
|
||||
list_init(&sq->receiver.queue);
|
||||
list_init(&sq->receive_queue);
|
||||
list_init(&sq->notify_queue);
|
||||
list_init(&sq->fast_readers);
|
||||
|
||||
// Debugging
|
||||
list_init(&sq->old_sent);
|
||||
list_init(&sq->receiver.old_receive);
|
||||
list_init(&sq->old_receive);
|
||||
debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT);
|
||||
debug_queue_alloc(&sq->receiver.old_receive, DEBUG_QUEUE_RECEIVE);
|
||||
debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE);
|
||||
|
||||
// Thread setup
|
||||
ret = pthread_mutex_init(&sq->lock, NULL);
|
||||
if (ret)
|
||||
goto fail;
|
||||
ret = pthread_mutex_init(&sq->receiver.lock, NULL);
|
||||
if (ret)
|
||||
goto fail;
|
||||
ret = pthread_cond_init(&sq->receiver.cond, NULL);
|
||||
ret = pthread_cond_init(&sq->cond, NULL);
|
||||
if (ret)
|
||||
goto fail;
|
||||
ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL);
|
||||
@@ -762,27 +718,17 @@ serialqueue_free(struct serialqueue *sq)
|
||||
serialqueue_exit(sq);
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
message_queue_free(&sq->sent_queue);
|
||||
pthread_mutex_lock(&sq->receiver.lock);
|
||||
message_queue_free(&sq->receiver.queue);
|
||||
message_queue_free(&sq->receiver.old_receive);
|
||||
pthread_mutex_unlock(&sq->receiver.lock);
|
||||
message_queue_free(&sq->receive_queue);
|
||||
message_queue_free(&sq->notify_queue);
|
||||
message_queue_free(&sq->old_sent);
|
||||
while (!list_empty(&sq->ready_queues)) {
|
||||
struct command_queue* cq = list_first_entry(
|
||||
&sq->ready_queues, struct command_queue, ready.node);
|
||||
list_del(&cq->ready.node);
|
||||
message_queue_free(&cq->ready.msg_queue);
|
||||
}
|
||||
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||
while (!list_empty(&sq->transmit_requests.upcoming_queues)) {
|
||||
message_queue_free(&sq->old_receive);
|
||||
while (!list_empty(&sq->pending_queues)) {
|
||||
struct command_queue *cq = list_first_entry(
|
||||
&sq->transmit_requests.upcoming_queues,
|
||||
struct command_queue, upcoming.node);
|
||||
list_del(&cq->upcoming.node);
|
||||
message_queue_free(&cq->upcoming.msg_queue);
|
||||
&sq->pending_queues, struct command_queue, node);
|
||||
list_del(&cq->node);
|
||||
message_queue_free(&cq->ready_queue);
|
||||
message_queue_free(&cq->upcoming_queue);
|
||||
}
|
||||
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
pollreactor_free(sq->pr);
|
||||
free(sq);
|
||||
@@ -794,8 +740,8 @@ serialqueue_alloc_commandqueue(void)
|
||||
{
|
||||
struct command_queue *cq = malloc(sizeof(*cq));
|
||||
memset(cq, 0, sizeof(*cq));
|
||||
list_init(&cq->ready.msg_queue);
|
||||
list_init(&cq->upcoming.msg_queue);
|
||||
list_init(&cq->ready_queue);
|
||||
list_init(&cq->upcoming_queue);
|
||||
return cq;
|
||||
}
|
||||
|
||||
@@ -805,8 +751,7 @@ serialqueue_free_commandqueue(struct command_queue *cq)
|
||||
{
|
||||
if (!cq)
|
||||
return;
|
||||
if (!list_empty(&cq->ready.msg_queue) ||
|
||||
!list_empty(&cq->upcoming.msg_queue)) {
|
||||
if (!list_empty(&cq->ready_queue) || !list_empty(&cq->upcoming_queue)) {
|
||||
errorf("Memory leak! Can't free non-empty commandqueue");
|
||||
return;
|
||||
}
|
||||
@@ -853,19 +798,17 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
|
||||
qm = list_first_entry(msgs, struct queue_message, node);
|
||||
|
||||
// Add list to cq->upcoming_queue
|
||||
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||
if (list_empty(&cq->upcoming.msg_queue))
|
||||
list_add_tail(&cq->upcoming.node,
|
||||
&sq->transmit_requests.upcoming_queues);
|
||||
list_join_tail(msgs, &cq->upcoming.msg_queue);
|
||||
sq->transmit_requests.upcoming_bytes += len;
|
||||
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
if (list_empty(&cq->ready_queue) && list_empty(&cq->upcoming_queue))
|
||||
list_add_tail(&cq->node, &sq->pending_queues);
|
||||
list_join_tail(msgs, &cq->upcoming_queue);
|
||||
sq->upcoming_bytes += len;
|
||||
int mustwake = 0;
|
||||
if (qm->min_clock < sq->transmit_requests.need_kick_clock) {
|
||||
sq->transmit_requests.need_kick_clock = 0;
|
||||
if (qm->min_clock < sq->need_kick_clock) {
|
||||
sq->need_kick_clock = 0;
|
||||
mustwake = 1;
|
||||
}
|
||||
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
|
||||
// Wake the background thread if necessary
|
||||
if (mustwake)
|
||||
@@ -902,21 +845,20 @@ serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg
|
||||
void __visible
|
||||
serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
|
||||
{
|
||||
struct receiver *receiver = &sq->receiver;
|
||||
pthread_mutex_lock(&receiver->lock);
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
// Wait for message to be available
|
||||
while (list_empty(&receiver->queue)) {
|
||||
while (list_empty(&sq->receive_queue)) {
|
||||
if (pollreactor_is_exit(sq->pr))
|
||||
goto exit;
|
||||
receiver->waiting = 1;
|
||||
int ret = pthread_cond_wait(&receiver->cond, &receiver->lock);
|
||||
sq->receive_waiting = 1;
|
||||
int ret = pthread_cond_wait(&sq->cond, &sq->lock);
|
||||
if (ret)
|
||||
report_errno("pthread_cond_wait", ret);
|
||||
}
|
||||
|
||||
// Remove message from queue
|
||||
struct queue_message *qm = list_first_entry(
|
||||
&receiver->queue, struct queue_message, node);
|
||||
&sq->receive_queue, struct queue_message, node);
|
||||
list_del(&qm->node);
|
||||
|
||||
// Copy message
|
||||
@@ -926,14 +868,16 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
|
||||
pqm->receive_time = qm->receive_time;
|
||||
pqm->notify_id = qm->notify_id;
|
||||
if (qm->len)
|
||||
qm = _debug_queue_add(&receiver->old_receive, qm);
|
||||
pthread_mutex_unlock(&receiver->lock);
|
||||
message_free(qm);
|
||||
debug_queue_add(&sq->old_receive, qm);
|
||||
else
|
||||
message_free(qm);
|
||||
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
return;
|
||||
|
||||
exit:
|
||||
pqm->len = -1;
|
||||
pthread_mutex_unlock(&receiver->lock);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
}
|
||||
|
||||
void __visible
|
||||
@@ -984,9 +928,7 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len)
|
||||
{
|
||||
struct serialqueue stats;
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
pthread_mutex_lock(&sq->transmit_requests.lock);
|
||||
memcpy(&stats, sq, sizeof(stats));
|
||||
pthread_mutex_unlock(&sq->transmit_requests.lock);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
|
||||
snprintf(buf, len, "bytes_write=%u bytes_read=%u"
|
||||
@@ -999,7 +941,7 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len)
|
||||
, (int)stats.send_seq, (int)stats.receive_seq
|
||||
, (int)stats.retransmit_seq
|
||||
, stats.srtt, stats.rttvar, stats.rto
|
||||
, stats.ready_bytes, stats.transmit_requests.upcoming_bytes);
|
||||
, stats.ready_bytes, stats.upcoming_bytes);
|
||||
}
|
||||
|
||||
// Extract old messages stored in the debug queues
|
||||
@@ -1008,27 +950,18 @@ serialqueue_extract_old(struct serialqueue *sq, int sentq
|
||||
, struct pull_queue_message *q, int max)
|
||||
{
|
||||
int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE;
|
||||
struct list_head *rootp;
|
||||
rootp = sentq ? &sq->old_sent : &sq->receiver.old_receive;
|
||||
struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive;
|
||||
struct list_head replacement, current;
|
||||
list_init(&replacement);
|
||||
debug_queue_alloc(&replacement, count);
|
||||
list_init(¤t);
|
||||
|
||||
// Atomically replace existing debug list with new zero'd list
|
||||
if (rootp == &sq->receiver.old_receive) {
|
||||
pthread_mutex_lock(&sq->receiver.lock);
|
||||
list_join_tail(rootp, ¤t);
|
||||
list_init(rootp);
|
||||
list_join_tail(&replacement, rootp);
|
||||
pthread_mutex_unlock(&sq->receiver.lock);
|
||||
} else {
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
list_join_tail(rootp, ¤t);
|
||||
list_init(rootp);
|
||||
list_join_tail(&replacement, rootp);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
}
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
list_join_tail(rootp, ¤t);
|
||||
list_init(rootp);
|
||||
list_join_tail(&replacement, rootp);
|
||||
pthread_mutex_unlock(&sq->lock);
|
||||
|
||||
// Walk the debug list
|
||||
int pos = 0;
|
||||
|
||||
Reference in New Issue
Block a user