fix: handle retry queue invalidation across a cluster

This commit is contained in:
Opliko
2024-05-06 23:16:58 +02:00
parent 4cbb1f2a42
commit 22b42f11dd

View File

@@ -12,6 +12,7 @@ const utils = require('../utils');
const ttl = require('../cache/ttl'); const ttl = require('../cache/ttl');
const lru = require('../cache/lru'); const lru = require('../cache/lru');
const batch = require('../batch'); const batch = require('../batch');
const pubsub = require('../pubsub');
const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes
const ActivityPub = module.exports; const ActivityPub = module.exports;
@@ -256,6 +257,13 @@ ActivityPub.get = async (type, id, uri) => {
ActivityPub.retryQueue = lru({ name: 'activitypub-retry-queue', max: 4000, ttl: 1000 * 60 * 60 * 24 * 60 }); ActivityPub.retryQueue = lru({ name: 'activitypub-retry-queue', max: 4000, ttl: 1000 * 60 * 60 * 24 * 60 });
// handle clearing retry queue from another member of the cluster
pubsub.on(`activitypub-retry-queue:lruCache:del`, (keys) => {
if (Array.isArray(keys)) {
keys.forEach(key => clearTimeout(ActivityPub.retryQueue.get(key)));
}
});
async function sendMessage(uri, id, type, payload, attempts = 1) { async function sendMessage(uri, id, type, payload, attempts = 1) {
const keyData = await ActivityPub.getPrivateKey(type, id); const keyData = await ActivityPub.getPrivateKey(type, id);
const headers = await ActivityPub.sign(keyData, uri, payload); const headers = await ActivityPub.sign(keyData, uri, payload);