mirror of
				https://github.com/NodeBB/NodeBB.git
				synced 2025-10-31 02:55:58 +01:00 
			
		
		
		
	refactor: move ap retry queue from lru cache to db (#13568)
* refactor: move ap retry queue from lru cache to db get rid of the setTimeouts that were running for 2months retries will survive server restarts * refactor: reduce exp. backoff
This commit is contained in:
		| @@ -617,6 +617,8 @@ inbox.reject = async (req) => { | |||||||
| 	const queueId = `${type}:${id}:${hostname}`; | 	const queueId = `${type}:${id}:${hostname}`; | ||||||
|  |  | ||||||
| 	// stop retrying rejected requests | 	// stop retrying rejected requests | ||||||
| 	clearTimeout(activitypub.retryQueue.get(queueId)); | 	await Promise.all([ | ||||||
| 	activitypub.retryQueue.delete(queueId); | 		db.sortedSetRemove('ap:retry:queue', queueId), | ||||||
|  | 		db.delete(`ap:retry:queue:${queueId}`), | ||||||
|  | 	]); | ||||||
| }; | }; | ||||||
|   | |||||||
| @@ -14,9 +14,7 @@ const messaging = require('../messaging'); | |||||||
| const user = require('../user'); | const user = require('../user'); | ||||||
| const utils = require('../utils'); | const utils = require('../utils'); | ||||||
| const ttl = require('../cache/ttl'); | const ttl = require('../cache/ttl'); | ||||||
| const lru = require('../cache/lru'); |  | ||||||
| const batch = require('../batch'); | const batch = require('../batch'); | ||||||
| const pubsub = require('../pubsub'); |  | ||||||
| const analytics = require('../analytics'); | const analytics = require('../analytics'); | ||||||
|  |  | ||||||
| const requestCache = ttl({ | const requestCache = ttl({ | ||||||
| @@ -69,28 +67,30 @@ ActivityPub.feps = require('./feps'); | |||||||
|  |  | ||||||
| ActivityPub.startJobs = () => { | ActivityPub.startJobs = () => { | ||||||
| 	ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.'); | 	ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.'); | ||||||
| 	new CronJob('0 0 * * *', async () => { | 	async function tryCronJob(method) { | ||||||
| 		if (!meta.config.activitypubEnabled) { | 		if (!meta.config.activitypubEnabled) { | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 		try { | 		try { | ||||||
| 			await ActivityPub.notes.prune(); | 			await method(); | ||||||
| 			await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); |  | ||||||
| 		} catch (err) { | 		} catch (err) { | ||||||
| 			winston.error(err.stack); | 			winston.error(err.stack); | ||||||
| 		} | 		} | ||||||
|  | 	} | ||||||
|  | 	new CronJob('0 0 * * *', async () => { | ||||||
|  | 		await tryCronJob(async () => { | ||||||
|  | 			await ActivityPub.notes.prune(); | ||||||
|  | 			await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); | ||||||
|  | 		}); | ||||||
| 	}, null, true, null, null, false); // change last argument to true for debugging | 	}, null, true, null, null, false); // change last argument to true for debugging | ||||||
|  |  | ||||||
| 	new CronJob('*/30 * * * *', async () => { | 	new CronJob('*/30 * * * *', async () => { | ||||||
| 		if (!meta.config.activitypubEnabled) { | 		await tryCronJob(ActivityPub.actors.prune); | ||||||
| 			return; |  | ||||||
| 		} |  | ||||||
| 		try { |  | ||||||
| 			await ActivityPub.actors.prune(); |  | ||||||
| 		} catch (err) { |  | ||||||
| 			winston.error(err.stack); |  | ||||||
| 		} |  | ||||||
| 	}, null, true, null, null, false); // change last argument to true for debugging | 	}, null, true, null, null, false); // change last argument to true for debugging | ||||||
|  |  | ||||||
|  | 	new CronJob('0 * * * * *', async () => { | ||||||
|  | 		await tryCronJob(retryFailedMessages); | ||||||
|  | 	}, null, true, null, null, false); | ||||||
| }; | }; | ||||||
|  |  | ||||||
| ActivityPub.resolveId = async (uid, id) => { | ActivityPub.resolveId = async (uid, id) => { | ||||||
| @@ -348,29 +348,11 @@ ActivityPub.get = async (type, id, uri, options) => { | |||||||
| 	} | 	} | ||||||
| }; | }; | ||||||
|  |  | ||||||
| ActivityPub.retryQueue = lru({ | async function sendMessage(uri, id, type, payload) { | ||||||
| 	name: 'activitypub-retry-queue', | 	try { | ||||||
| 	max: 4000, |  | ||||||
| 	ttl: 1000 * 60 * 60 * 24 * 60, |  | ||||||
| 	dispose: (value) => { |  | ||||||
| 		if (value) { |  | ||||||
| 			clearTimeout(value); |  | ||||||
| 		} |  | ||||||
| 	}, |  | ||||||
| }); |  | ||||||
|  |  | ||||||
| // handle clearing retry queue from another member of the cluster |  | ||||||
| pubsub.on(`activitypub-retry-queue:lruCache:del`, (keys) => { |  | ||||||
| 	if (Array.isArray(keys)) { |  | ||||||
| 		keys.forEach(key => clearTimeout(ActivityPub.retryQueue.get(key))); |  | ||||||
| 	} |  | ||||||
| }); |  | ||||||
|  |  | ||||||
| async function sendMessage(uri, id, type, payload, attempts = 1) { |  | ||||||
| 		const keyData = await ActivityPub.getPrivateKey(type, id); | 		const keyData = await ActivityPub.getPrivateKey(type, id); | ||||||
| 		const headers = await ActivityPub.sign(keyData, uri, payload); | 		const headers = await ActivityPub.sign(keyData, uri, payload); | ||||||
|  |  | ||||||
| 	try { |  | ||||||
| 		const { response, body } = await request.post(uri, { | 		const { response, body } = await request.post(uri, { | ||||||
| 			headers: { | 			headers: { | ||||||
| 				...headers, | 				...headers, | ||||||
| @@ -382,25 +364,15 @@ async function sendMessage(uri, id, type, payload, attempts = 1) { | |||||||
|  |  | ||||||
| 		if (String(response.statusCode).startsWith('2')) { | 		if (String(response.statusCode).startsWith('2')) { | ||||||
| 			ActivityPub.helpers.log(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`); | 			ActivityPub.helpers.log(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`); | ||||||
| 		} else { | 			return true; | ||||||
|  | 		} | ||||||
| 		if (typeof body === 'object') { | 		if (typeof body === 'object') { | ||||||
| 			throw new Error(JSON.stringify(body)); | 			throw new Error(JSON.stringify(body)); | ||||||
| 		} | 		} | ||||||
| 		throw new Error(String(body)); | 		throw new Error(String(body)); | ||||||
| 		} |  | ||||||
| 	} catch (e) { | 	} catch (e) { | ||||||
| 		ActivityPub.helpers.log(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`); | 		ActivityPub.helpers.log(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`); | ||||||
| 		// add to retry queue | 		return false; | ||||||
| 		if (attempts < 12) { // stop attempting after ~2 months |  | ||||||
| 			const timeout = (4 ** attempts) * 1000; // exponential backoff |  | ||||||
| 			const queueId = `${payload.type}:${payload.id}:${new URL(uri).hostname}`; |  | ||||||
| 			const timeoutId = setTimeout(() => sendMessage(uri, id, type, payload, attempts + 1), timeout); |  | ||||||
| 			ActivityPub.retryQueue.set(queueId, timeoutId); |  | ||||||
|  |  | ||||||
| 			ActivityPub.helpers.log(`[activitypub/send] Added ${payload.type} to ${uri} to retry queue for ${timeout}ms`); |  | ||||||
| 		} else { |  | ||||||
| 			winston.warn(`[activitypub/send] Max attempts reached for ${payload.type} to ${uri}; giving up on sending`); |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -429,17 +401,75 @@ ActivityPub.send = async (type, id, targets, payload) => { | |||||||
| 		...payload, | 		...payload, | ||||||
| 	}; | 	}; | ||||||
|  |  | ||||||
| 	// Runs in background... potentially a better queue is required... later. | 	const oneMinute = 1000 * 60; | ||||||
| 	batch.processArray( | 	batch.processArray(inboxes, async (inboxBatch) => { | ||||||
| 		inboxes, | 		const retryQueueAdd = []; | ||||||
| 		async inboxBatch => Promise.all(inboxBatch.map(async uri => sendMessage(uri, id, type, payload))), | 		const retryQueuedSet = []; | ||||||
| 		{ |  | ||||||
|  | 		await Promise.all(inboxBatch.map(async (uri) => { | ||||||
|  | 			const ok = await sendMessage(uri, id, type, payload); | ||||||
|  | 			if (!ok) { | ||||||
|  | 				const queueId = `${type}:${id}:${new URL(uri).hostname}`; | ||||||
|  | 				const nextTryOn = Date.now() + oneMinute; | ||||||
|  | 				retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]); | ||||||
|  | 				retryQueuedSet.push([`ap:retry:queue:${queueId}`, { | ||||||
|  | 					queueId, | ||||||
|  | 					uri, | ||||||
|  | 					id, | ||||||
|  | 					type, | ||||||
|  | 					attempts: 1, | ||||||
|  | 					timestamp: nextTryOn, | ||||||
|  | 					payload: JSON.stringify(payload), | ||||||
|  | 				}]); | ||||||
|  | 			} | ||||||
|  | 		})); | ||||||
|  |  | ||||||
|  | 		if (retryQueueAdd.length) { | ||||||
|  | 			await Promise.all([ | ||||||
|  | 				db.sortedSetAddBulk(retryQueueAdd), | ||||||
|  | 				db.setObjectBulk(retryQueuedSet), | ||||||
|  | 			]); | ||||||
|  | 		} | ||||||
|  | 	}, { | ||||||
| 		batch: 50, | 		batch: 50, | ||||||
| 		interval: 100, | 		interval: 100, | ||||||
| 		}, | 	}).catch(err => winston.error(err.stack)); | ||||||
| 	); |  | ||||||
| }; | }; | ||||||
|  |  | ||||||
|  | async function retryFailedMessages() { | ||||||
|  | 	const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now()); | ||||||
|  | 	const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`))).filter(Boolean); | ||||||
|  |  | ||||||
|  | 	const retryQueueAdd = []; | ||||||
|  | 	const retryQueuedSet = []; | ||||||
|  | 	const queueIdsToRemove = []; | ||||||
|  |  | ||||||
|  | 	const oneMinute = 1000 * 60; | ||||||
|  | 	await Promise.all(queuedData.map(async (data) => { | ||||||
|  | 		const { queueId, uri, id, type, attempts, payload } = data; | ||||||
|  | 		const payloadObj = JSON.parse(payload); | ||||||
|  |  | ||||||
|  | 		const ok = await sendMessage(uri, id, type, payloadObj); | ||||||
|  |  | ||||||
|  | 		if (ok || attempts > 10) { | ||||||
|  | 			queueIdsToRemove.push(queueId); | ||||||
|  | 		} else { | ||||||
|  | 			const nextAttempt = (parseInt(attempts, 10) || 0) + 1; | ||||||
|  | 			const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff | ||||||
|  | 			const nextTryOn = Date.now() + timeout; | ||||||
|  | 			retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]); | ||||||
|  | 			retryQueuedSet.push([`ap:retry:queue:${queueId}`, { | ||||||
|  | 				attempts: nextAttempt, | ||||||
|  | 				timestamp: nextTryOn, | ||||||
|  | 			}]); | ||||||
|  | 		} | ||||||
|  | 	})); | ||||||
|  | 	await Promise.all([ | ||||||
|  | 		db.sortedSetRemove('ap:retry:queue', queueIdsToRemove), | ||||||
|  | 		db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)), | ||||||
|  | 	]); | ||||||
|  | } | ||||||
|  |  | ||||||
| ActivityPub.record = async ({ id, type, actor }) => { | ActivityPub.record = async ({ id, type, actor }) => { | ||||||
| 	const now = Date.now(); | 	const now = Date.now(); | ||||||
| 	const { hostname } = new URL(actor); | 	const { hostname } = new URL(actor); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user