refactor: remove invalid queued items

catch invalid json in payload
This commit is contained in:
Barış Soner Uşaklı
2025-08-28 12:39:44 -04:00
parent 5f7085f34d
commit b73ee309e0

View File

@@ -441,20 +441,34 @@ ActivityPub.send = async (type, id, targets, payload) => {
async function retryFailedMessages() { async function retryFailedMessages() {
const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now()); const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now());
const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`))).filter(Boolean); const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`)));
const retryQueueAdd = []; const retryQueueAdd = [];
const retryQueuedSet = []; const retryQueuedSet = [];
const queueIdsToRemove = []; const queueIdsToRemove = [];
const oneMinute = 1000 * 60; const oneMinute = 1000 * 60;
await Promise.all(queuedData.map(async (data) => { await Promise.all(queuedData.map(async (data, index) => {
const { queueId, uri, id, type, attempts, payload } = data; const queueId = queueIds[index];
const payloadObj = JSON.parse(payload); if (!data) {
queueIdsToRemove.push(queueId);
return;
}
const { uri, id, type, attempts, payload } = data;
if (!uri || !id || !type || !payload || attempts > 10) {
queueIdsToRemove.push(queueId);
return;
}
let payloadObj;
try {
payloadObj = JSON.parse(payload);
} catch (err) {
queueIdsToRemove.push(queueId);
return;
}
const ok = await sendMessage(uri, id, type, payloadObj); const ok = await sendMessage(uri, id, type, payloadObj);
if (ok) {
if (ok || attempts > 10) {
queueIdsToRemove.push(queueId); queueIdsToRemove.push(queueId);
} else { } else {
const nextAttempt = (parseInt(attempts, 10) || 0) + 1; const nextAttempt = (parseInt(attempts, 10) || 0) + 1;