mirror of
https://github.com/NodeBB/NodeBB.git
synced 2025-10-26 08:36:12 +01:00
refactor: move ap retry queue from lru cache to db (#13568)
* refactor: move ap retry queue from lru cache to db get rid of the setTimeouts that were running for 2months retries will survive server restarts * refactor: reduce exp. backoff
This commit is contained in:
@@ -617,6 +617,8 @@ inbox.reject = async (req) => {
|
|||||||
const queueId = `${type}:${id}:${hostname}`;
|
const queueId = `${type}:${id}:${hostname}`;
|
||||||
|
|
||||||
// stop retrying rejected requests
|
// stop retrying rejected requests
|
||||||
clearTimeout(activitypub.retryQueue.get(queueId));
|
await Promise.all([
|
||||||
activitypub.retryQueue.delete(queueId);
|
db.sortedSetRemove('ap:retry:queue', queueId),
|
||||||
|
db.delete(`ap:retry:queue:${queueId}`),
|
||||||
|
]);
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -14,9 +14,7 @@ const messaging = require('../messaging');
|
|||||||
const user = require('../user');
|
const user = require('../user');
|
||||||
const utils = require('../utils');
|
const utils = require('../utils');
|
||||||
const ttl = require('../cache/ttl');
|
const ttl = require('../cache/ttl');
|
||||||
const lru = require('../cache/lru');
|
|
||||||
const batch = require('../batch');
|
const batch = require('../batch');
|
||||||
const pubsub = require('../pubsub');
|
|
||||||
const analytics = require('../analytics');
|
const analytics = require('../analytics');
|
||||||
|
|
||||||
const requestCache = ttl({
|
const requestCache = ttl({
|
||||||
@@ -69,28 +67,30 @@ ActivityPub.feps = require('./feps');
|
|||||||
|
|
||||||
ActivityPub.startJobs = () => {
|
ActivityPub.startJobs = () => {
|
||||||
ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.');
|
ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.');
|
||||||
new CronJob('0 0 * * *', async () => {
|
async function tryCronJob(method) {
|
||||||
if (!meta.config.activitypubEnabled) {
|
if (!meta.config.activitypubEnabled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
await ActivityPub.notes.prune();
|
await method();
|
||||||
await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000);
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
winston.error(err.stack);
|
winston.error(err.stack);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
new CronJob('0 0 * * *', async () => {
|
||||||
|
await tryCronJob(async () => {
|
||||||
|
await ActivityPub.notes.prune();
|
||||||
|
await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000);
|
||||||
|
});
|
||||||
}, null, true, null, null, false); // change last argument to true for debugging
|
}, null, true, null, null, false); // change last argument to true for debugging
|
||||||
|
|
||||||
new CronJob('*/30 * * * *', async () => {
|
new CronJob('*/30 * * * *', async () => {
|
||||||
if (!meta.config.activitypubEnabled) {
|
await tryCronJob(ActivityPub.actors.prune);
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
await ActivityPub.actors.prune();
|
|
||||||
} catch (err) {
|
|
||||||
winston.error(err.stack);
|
|
||||||
}
|
|
||||||
}, null, true, null, null, false); // change last argument to true for debugging
|
}, null, true, null, null, false); // change last argument to true for debugging
|
||||||
|
|
||||||
|
new CronJob('0 * * * * *', async () => {
|
||||||
|
await tryCronJob(retryFailedMessages);
|
||||||
|
}, null, true, null, null, false);
|
||||||
};
|
};
|
||||||
|
|
||||||
ActivityPub.resolveId = async (uid, id) => {
|
ActivityPub.resolveId = async (uid, id) => {
|
||||||
@@ -348,29 +348,11 @@ ActivityPub.get = async (type, id, uri, options) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ActivityPub.retryQueue = lru({
|
async function sendMessage(uri, id, type, payload) {
|
||||||
name: 'activitypub-retry-queue',
|
|
||||||
max: 4000,
|
|
||||||
ttl: 1000 * 60 * 60 * 24 * 60,
|
|
||||||
dispose: (value) => {
|
|
||||||
if (value) {
|
|
||||||
clearTimeout(value);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
// handle clearing retry queue from another member of the cluster
|
|
||||||
pubsub.on(`activitypub-retry-queue:lruCache:del`, (keys) => {
|
|
||||||
if (Array.isArray(keys)) {
|
|
||||||
keys.forEach(key => clearTimeout(ActivityPub.retryQueue.get(key)));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
async function sendMessage(uri, id, type, payload, attempts = 1) {
|
|
||||||
const keyData = await ActivityPub.getPrivateKey(type, id);
|
|
||||||
const headers = await ActivityPub.sign(keyData, uri, payload);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const keyData = await ActivityPub.getPrivateKey(type, id);
|
||||||
|
const headers = await ActivityPub.sign(keyData, uri, payload);
|
||||||
|
|
||||||
const { response, body } = await request.post(uri, {
|
const { response, body } = await request.post(uri, {
|
||||||
headers: {
|
headers: {
|
||||||
...headers,
|
...headers,
|
||||||
@@ -382,25 +364,15 @@ async function sendMessage(uri, id, type, payload, attempts = 1) {
|
|||||||
|
|
||||||
if (String(response.statusCode).startsWith('2')) {
|
if (String(response.statusCode).startsWith('2')) {
|
||||||
ActivityPub.helpers.log(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`);
|
ActivityPub.helpers.log(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`);
|
||||||
} else {
|
return true;
|
||||||
if (typeof body === 'object') {
|
|
||||||
throw new Error(JSON.stringify(body));
|
|
||||||
}
|
|
||||||
throw new Error(String(body));
|
|
||||||
}
|
}
|
||||||
|
if (typeof body === 'object') {
|
||||||
|
throw new Error(JSON.stringify(body));
|
||||||
|
}
|
||||||
|
throw new Error(String(body));
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
ActivityPub.helpers.log(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`);
|
ActivityPub.helpers.log(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`);
|
||||||
// add to retry queue
|
return false;
|
||||||
if (attempts < 12) { // stop attempting after ~2 months
|
|
||||||
const timeout = (4 ** attempts) * 1000; // exponential backoff
|
|
||||||
const queueId = `${payload.type}:${payload.id}:${new URL(uri).hostname}`;
|
|
||||||
const timeoutId = setTimeout(() => sendMessage(uri, id, type, payload, attempts + 1), timeout);
|
|
||||||
ActivityPub.retryQueue.set(queueId, timeoutId);
|
|
||||||
|
|
||||||
ActivityPub.helpers.log(`[activitypub/send] Added ${payload.type} to ${uri} to retry queue for ${timeout}ms`);
|
|
||||||
} else {
|
|
||||||
winston.warn(`[activitypub/send] Max attempts reached for ${payload.type} to ${uri}; giving up on sending`);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -429,17 +401,75 @@ ActivityPub.send = async (type, id, targets, payload) => {
|
|||||||
...payload,
|
...payload,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Runs in background... potentially a better queue is required... later.
|
const oneMinute = 1000 * 60;
|
||||||
batch.processArray(
|
batch.processArray(inboxes, async (inboxBatch) => {
|
||||||
inboxes,
|
const retryQueueAdd = [];
|
||||||
async inboxBatch => Promise.all(inboxBatch.map(async uri => sendMessage(uri, id, type, payload))),
|
const retryQueuedSet = [];
|
||||||
{
|
|
||||||
batch: 50,
|
await Promise.all(inboxBatch.map(async (uri) => {
|
||||||
interval: 100,
|
const ok = await sendMessage(uri, id, type, payload);
|
||||||
},
|
if (!ok) {
|
||||||
);
|
const queueId = `${type}:${id}:${new URL(uri).hostname}`;
|
||||||
|
const nextTryOn = Date.now() + oneMinute;
|
||||||
|
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
|
||||||
|
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
|
||||||
|
queueId,
|
||||||
|
uri,
|
||||||
|
id,
|
||||||
|
type,
|
||||||
|
attempts: 1,
|
||||||
|
timestamp: nextTryOn,
|
||||||
|
payload: JSON.stringify(payload),
|
||||||
|
}]);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
if (retryQueueAdd.length) {
|
||||||
|
await Promise.all([
|
||||||
|
db.sortedSetAddBulk(retryQueueAdd),
|
||||||
|
db.setObjectBulk(retryQueuedSet),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
batch: 50,
|
||||||
|
interval: 100,
|
||||||
|
}).catch(err => winston.error(err.stack));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
async function retryFailedMessages() {
|
||||||
|
const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now());
|
||||||
|
const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`))).filter(Boolean);
|
||||||
|
|
||||||
|
const retryQueueAdd = [];
|
||||||
|
const retryQueuedSet = [];
|
||||||
|
const queueIdsToRemove = [];
|
||||||
|
|
||||||
|
const oneMinute = 1000 * 60;
|
||||||
|
await Promise.all(queuedData.map(async (data) => {
|
||||||
|
const { queueId, uri, id, type, attempts, payload } = data;
|
||||||
|
const payloadObj = JSON.parse(payload);
|
||||||
|
|
||||||
|
const ok = await sendMessage(uri, id, type, payloadObj);
|
||||||
|
|
||||||
|
if (ok || attempts > 10) {
|
||||||
|
queueIdsToRemove.push(queueId);
|
||||||
|
} else {
|
||||||
|
const nextAttempt = (parseInt(attempts, 10) || 0) + 1;
|
||||||
|
const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff
|
||||||
|
const nextTryOn = Date.now() + timeout;
|
||||||
|
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
|
||||||
|
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
|
||||||
|
attempts: nextAttempt,
|
||||||
|
timestamp: nextTryOn,
|
||||||
|
}]);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
await Promise.all([
|
||||||
|
db.sortedSetRemove('ap:retry:queue', queueIdsToRemove),
|
||||||
|
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
ActivityPub.record = async ({ id, type, actor }) => {
|
ActivityPub.record = async ({ id, type, actor }) => {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const { hostname } = new URL(actor);
|
const { hostname } = new URL(actor);
|
||||||
|
|||||||
Reference in New Issue
Block a user