refactor: notes.assert to add finally block, update assertPayload to update instances:lastSeen via method instead of direct db call

This commit is contained in:
Julian Lam
2025-09-19 10:34:57 -04:00
parent 9b48bbd501
commit 559155da63
2 changed files with 9 additions and 23 deletions

View File

@@ -20,15 +20,6 @@ const utils = require('../utils');
const activitypub = module.parent.exports; const activitypub = module.parent.exports;
const Notes = module.exports; const Notes = module.exports;
async function lock(value) {
const count = await db.incrObjectField('locks', value);
return count <= 1;
}
async function unlock(value) {
await db.deleteObjectField('locks', value);
}
Notes._normalizeTags = async (tag, cid) => { Notes._normalizeTags = async (tag, cid) => {
const systemTags = (meta.config.systemTags || '').split(','); const systemTags = (meta.config.systemTags || '').split(',');
const maxTags = await categories.getCategoryField(cid, 'maxTags'); const maxTags = await categories.getCategoryField(cid, 'maxTags');
@@ -64,7 +55,9 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
} }
let id = !activitypub.helpers.isUri(input) ? input.id : input; let id = !activitypub.helpers.isUri(input) ? input.id : input;
const lockStatus = await lock(id);
let lockStatus = await db.incrObjectField('locks', id);
lockStatus = lockStatus <= 1;
if (!lockStatus) { // unable to achieve lock, stop processing. if (!lockStatus) { // unable to achieve lock, stop processing.
winston.warn(`[activitypub/notes.assert] Unable to acquire lock, skipping processing of ${id}`); winston.warn(`[activitypub/notes.assert] Unable to acquire lock, skipping processing of ${id}`);
return null; return null;
@@ -78,7 +71,6 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
let chain; let chain;
let context = await activitypub.contexts.get(uid, id); let context = await activitypub.contexts.get(uid, id);
if (context.tid) { if (context.tid) {
await unlock(id);
const { tid } = context; const { tid } = context;
return { tid, count: 0 }; return { tid, count: 0 };
} else if (context.context) { } else if (context.context) {
@@ -99,7 +91,6 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
// Can't resolve — give up. // Can't resolve — give up.
if (!chain.length) { if (!chain.length) {
await unlock(id);
return null; return null;
} }
@@ -122,7 +113,6 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
if (tid && members.every(Boolean)) { if (tid && members.every(Boolean)) {
// All cached, return early. // All cached, return early.
activitypub.helpers.log('[notes/assert] No new notes to process.'); activitypub.helpers.log('[notes/assert] No new notes to process.');
await unlock(id);
return { tid, count: 0 }; return { tid, count: 0 };
} }
@@ -196,7 +186,6 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
activitypub.helpers.log(`[activitypub/notes.assert] Not asserting ${id} as it has no relation to existing tracked content.`); activitypub.helpers.log(`[activitypub/notes.assert] Not asserting ${id} as it has no relation to existing tracked content.`);
} }
await unlock(id);
return null; return null;
} }
@@ -237,7 +226,6 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
unprocessed.shift(); unprocessed.shift();
} catch (e) { } catch (e) {
activitypub.helpers.log(`[activitypub/notes.assert] Could not post topic (${mainPost.pid}): ${e.message}`); activitypub.helpers.log(`[activitypub/notes.assert] Could not post topic (${mainPost.pid}): ${e.message}`);
await unlock(id);
return null; return null;
} }
@@ -273,16 +261,14 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
} }
} }
await Promise.all([ await Notes.syncUserInboxes(tid, uid);
Notes.syncUserInboxes(tid, uid),
unlock(id),
]);
return { tid, count }; return { tid, count };
} catch (e) { } catch (e) {
winston.warn(`[activitypub/notes.assert] Could not assert ${id} (${e.message}), releasing lock.`); winston.warn(`[activitypub/notes.assert] Could not assert ${id} (${e.message}).`);
await unlock(id);
return null; return null;
} finally {
winston.verbose(`[activitypub/notes.assert] Releasing lock (${id})`);
await db.deleteObjectField('locks', id);
} }
}; };

View File

@@ -98,7 +98,7 @@ middleware.assertPayload = helpers.try(async function (req, res, next) {
activitypub.helpers.log(`[middleware/activitypub] Blocked incoming activity from ${hostname}.`); activitypub.helpers.log(`[middleware/activitypub] Blocked incoming activity from ${hostname}.`);
return res.sendStatus(403); return res.sendStatus(403);
} }
await db.sortedSetAdd('instances:lastSeen', Date.now(), hostname); await activitypub.instances.log(hostname);
// Origin checking // Origin checking
if (typeof object !== 'string' && object.hasOwnProperty('id')) { if (typeof object !== 'string' && object.hasOwnProperty('id')) {