feat: basic retry queue

This commit is contained in:
Opliko
2024-05-06 22:49:31 +02:00
parent 4e7b12b925
commit 50bc9a37c5
2 changed files with 47 additions and 22 deletions

View File

@@ -10,6 +10,7 @@ const meta = require('../meta');
const user = require('../user');
const utils = require('../utils');
const ttl = require('../cache/ttl');
const lru = require('../cache/lru');
const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes
const ActivityPub = module.exports;
@@ -252,6 +253,40 @@ ActivityPub.get = async (type, id, uri) => {
}
};
ActivityPub.retryQueue = lru({ name: 'activitypub-retry-queue', max: 4000, ttl: 1000 * 60 * 60 * 24 * 60 });
async function sendMessage(uri, id, type, payload, attempts = 1) {
const keyData = await ActivityPub.getPrivateKey(type, id);
const headers = await ActivityPub.sign(keyData, uri, payload);
winston.verbose(`[activitypub/send] ${uri}`);
try {
const { response, body } = await request.post(uri, {
headers: {
...headers,
'content-type': 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
},
body: payload,
});
if (String(response.statusCode).startsWith('2')) {
winston.verbose(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`);
} else {
throw new Error(String(body));
}
} catch (e) {
winston.warn(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`);
// add to retry queue
if (attempts < 12) { // stop attempting after ~2 months
const timeout = (4 ** attempts) * 1000; // exponential backoff
const queueId = `${payload.type}:${payload.id}:${new URL(uri).hostname}`;
const timeoutId = setTimeout(() => sendMessage(uri, id, type, payload, attempts + 1), timeout);
ActivityPub.retryQueue.set(queueId, timeoutId);
winston.verbose(`[activitypub/send] Added ${payload.type} to ${uri} to retry queue for ${timeout}ms`);
}
}
}
ActivityPub.send = async (type, id, targets, payload) => {
if (!Array.isArray(targets)) {
targets = [targets];
@@ -267,26 +302,5 @@ ActivityPub.send = async (type, id, targets, payload) => {
...payload,
};
await Promise.all(inboxes.map(async (uri) => {
const keyData = await ActivityPub.getPrivateKey(type, id);
const headers = await ActivityPub.sign(keyData, uri, payload);
winston.verbose(`[activitypub/send] ${uri}`);
try {
const { response, body } = await request.post(uri, {
headers: {
...headers,
'content-type': 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
},
body: payload,
});
if (String(response.statusCode).startsWith('2')) {
winston.verbose(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`);
} else {
winston.warn(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${String(body)}`);
}
} catch (e) {
winston.warn(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`);
}
}));
await Promise.all(inboxes.map(async uri => sendMessage(uri, id, type, payload)));
};