mirror of
https://github.com/NodeBB/NodeBB.git
synced 2025-10-26 16:46:12 +01:00
443 lines
13 KiB
JavaScript
443 lines
13 KiB
JavaScript
'use strict';
|
|
|
|
const winston = require('winston');
|
|
const nconf = require('nconf');
|
|
|
|
const db = require('../database');
|
|
const batch = require('../batch');
|
|
const meta = require('../meta');
|
|
const privileges = require('../privileges');
|
|
const categories = require('../categories');
|
|
const user = require('../user');
|
|
const topics = require('../topics');
|
|
const posts = require('../posts');
|
|
const utils = require('../utils');
|
|
|
|
const activitypub = module.parent.exports;
|
|
const Notes = module.exports;
|
|
|
|
async function lock(value) {
|
|
const count = await db.incrObjectField('locks', value);
|
|
return count <= 1;
|
|
}
|
|
|
|
async function unlock(value) {
|
|
await db.deleteObjectField('locks', value);
|
|
}
|
|
|
|
Notes.assert = async (uid, input, options = { skipChecks: false }) => {
|
|
/**
|
|
* Given the id or object of any as:Note, either retrieves the full context (if resolvable),
|
|
* or traverses up the reply chain to build a context.
|
|
*/
|
|
|
|
if (!input) {
|
|
return null;
|
|
}
|
|
|
|
const id = !activitypub.helpers.isUri(input) ? input.id : input;
|
|
const lockStatus = await lock(id, '[[error:activitypub.already-asserting]]');
|
|
if (!lockStatus) { // unable to achieve lock, stop processing.
|
|
return null;
|
|
}
|
|
|
|
let chain;
|
|
const context = await activitypub.contexts.get(uid, id);
|
|
if (context.tid) {
|
|
unlock(id);
|
|
const { tid } = context;
|
|
return { tid, count: 0 };
|
|
} else if (context.context) {
|
|
chain = Array.from(await activitypub.contexts.getItems(uid, context.context, { input }));
|
|
} else {
|
|
// Fall back to inReplyTo traversal
|
|
chain = Array.from(await Notes.getParentChain(uid, input));
|
|
}
|
|
if (!chain.length) {
|
|
unlock(id);
|
|
return null;
|
|
}
|
|
|
|
// Reorder chain items by timestamp
|
|
chain = chain.sort((a, b) => a.timestamp - b.timestamp);
|
|
|
|
const mainPost = chain[0];
|
|
let { pid: mainPid, tid, uid: authorId, timestamp, name, content, _activitypub } = mainPost;
|
|
const hasTid = !!tid;
|
|
|
|
const cid = hasTid ? await topics.getTopicField(tid, 'cid') : options.cid || -1;
|
|
if (options.cid && cid === -1) {
|
|
// Move topic if currently uncategorized
|
|
await topics.tools.move(tid, { cid: options.cid, uid: 'system' });
|
|
}
|
|
|
|
const members = await db.isSortedSetMembers(`tid:${tid}:posts`, chain.slice(1).map(p => p.pid));
|
|
members.unshift(await posts.exists(mainPid));
|
|
if (tid && members.every(Boolean)) {
|
|
// All cached, return early.
|
|
// winston.verbose('[notes/assert] No new notes to process.');
|
|
unlock(id);
|
|
return { tid, count: 0 };
|
|
}
|
|
|
|
let title;
|
|
if (hasTid) {
|
|
mainPid = await topics.getTopicField(tid, 'mainPid');
|
|
} else {
|
|
// Check recipients/audience for local category
|
|
const set = activitypub.helpers.makeSet(_activitypub, ['to', 'cc', 'audience']);
|
|
const resolved = await Promise.all(Array.from(set).map(async id => await activitypub.helpers.resolveLocalId(id)));
|
|
const recipientCids = resolved
|
|
.filter(Boolean)
|
|
.filter(({ type }) => type === 'category')
|
|
.map(obj => obj.id);
|
|
if (recipientCids.length) {
|
|
// Overrides passed-in value, respect addressing from main post over booster
|
|
options.cid = recipientCids.shift();
|
|
}
|
|
|
|
// mainPid ok to leave as-is
|
|
title = name || activitypub.helpers.generateTitle(utils.decodeHTMLEntities(content));
|
|
}
|
|
mainPid = utils.isNumber(mainPid) ? parseInt(mainPid, 10) : mainPid;
|
|
|
|
// Relation & privilege check for local categories
|
|
const inputIndex = chain.map(n => n.pid).indexOf(id);
|
|
const hasRelation =
|
|
uid || hasTid ||
|
|
options.skipChecks || options.cid ||
|
|
await assertRelation(chain[inputIndex || 0]);
|
|
const privilege = `topics:${tid ? 'reply' : 'create'}`;
|
|
const allowed = await privileges.categories.can(privilege, cid, activitypub._constants.uid);
|
|
if (!hasRelation || !allowed) {
|
|
if (!hasRelation) {
|
|
winston.info(`[activitypub/notes.assert] Not asserting ${id} as it has no relation to existing tracked content.`);
|
|
}
|
|
|
|
unlock(id);
|
|
return null;
|
|
}
|
|
|
|
tid = tid || utils.generateUUID();
|
|
mainPost.tid = tid;
|
|
|
|
const unprocessed = chain.map((post) => {
|
|
post.tid = tid; // add tid to post hash
|
|
return post;
|
|
}).filter((p, idx) => !members[idx]);
|
|
const count = unprocessed.length;
|
|
// winston.verbose(`[notes/assert] ${count} new note(s) found.`);
|
|
|
|
let tags;
|
|
if (!hasTid) {
|
|
const { to, cc, attachment } = mainPost._activitypub;
|
|
const systemTags = (meta.config.systemTags || '').split(',');
|
|
const maxTags = await categories.getCategoryField(cid, 'maxTags');
|
|
tags = (mainPost._activitypub.tag || [])
|
|
.filter(o => o.type === 'Hashtag' && !systemTags.includes(o.name.slice(1)))
|
|
.map(o => o.name.slice(1));
|
|
|
|
if (tags.length > maxTags) {
|
|
tags.length = maxTags;
|
|
}
|
|
|
|
await Promise.all([
|
|
topics.post({
|
|
tid,
|
|
uid: authorId,
|
|
cid,
|
|
pid: mainPid,
|
|
title,
|
|
timestamp,
|
|
tags,
|
|
content: mainPost.content,
|
|
_activitypub: mainPost._activitypub,
|
|
}),
|
|
Notes.updateLocalRecipients(mainPid, { to, cc }),
|
|
posts.attachments.update(mainPid, attachment),
|
|
]);
|
|
unprocessed.shift();
|
|
}
|
|
|
|
for (const post of unprocessed) {
|
|
const { to, cc, attachment } = post._activitypub;
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
await Promise.all([
|
|
topics.reply(post),
|
|
Notes.updateLocalRecipients(post.pid, { to, cc }),
|
|
posts.attachments.update(post.pid, attachment),
|
|
]);
|
|
}
|
|
|
|
await Promise.all([
|
|
Notes.syncUserInboxes(tid, uid),
|
|
unlock(id),
|
|
]);
|
|
|
|
return { tid, count };
|
|
};
|
|
|
|
async function assertRelation(post) {
|
|
/**
|
|
* Given a mocked post object, ensures that it is related to some other object in database
|
|
* This check ensures that random content isn't added to the database just because it is received.
|
|
*/
|
|
|
|
// Is followed by at least one local user
|
|
const { followers } = await activitypub.actors.getLocalFollowCounts(post.uid);
|
|
|
|
// Local user is mentioned
|
|
const { tag } = post._activitypub;
|
|
let uids = [];
|
|
if (tag && tag.length) {
|
|
const slugs = tag.reduce((slugs, tag) => {
|
|
if (tag.type === 'Mention') {
|
|
const [slug, hostname] = tag.name.slice(1).split('@');
|
|
if (hostname === nconf.get('url_parsed').hostname) {
|
|
slugs.push(slug);
|
|
}
|
|
}
|
|
return slugs;
|
|
}, []);
|
|
|
|
uids = slugs.length ? await db.sortedSetScores('userslug:uid', slugs) : [];
|
|
uids = uids.filter(Boolean);
|
|
}
|
|
|
|
return followers > 0 || uids.length;
|
|
}
|
|
|
|
Notes.updateLocalRecipients = async (id, { to, cc }) => {
|
|
const recipients = new Set([...(to || []), ...(cc || [])]);
|
|
const uids = new Set();
|
|
await Promise.all(Array.from(recipients).map(async (recipient) => {
|
|
const { type, id } = await activitypub.helpers.resolveLocalId(recipient);
|
|
if (type === 'user' && await user.exists(id)) {
|
|
uids.add(parseInt(id, 10));
|
|
return;
|
|
}
|
|
|
|
const followedUid = await db.getObjectField('followersUrl:uid', recipient);
|
|
if (followedUid) {
|
|
const { uids: followers } = await activitypub.actors.getLocalFollowers(followedUid);
|
|
if (followers.size > 0) {
|
|
followers.forEach((uid) => {
|
|
uids.add(uid);
|
|
});
|
|
}
|
|
}
|
|
}));
|
|
|
|
if (uids.size > 0) {
|
|
await db.setAdd(`post:${id}:recipients`, Array.from(uids));
|
|
}
|
|
};
|
|
|
|
Notes.getParentChain = async (uid, input) => {
|
|
// Traverse upwards via `inReplyTo` until you find the root-level Note
|
|
const id = activitypub.helpers.isUri(input) ? input : input.id;
|
|
|
|
const chain = new Set();
|
|
const traverse = async (uid, id) => {
|
|
// Handle remote reference to local post
|
|
const { type, id: localId } = await activitypub.helpers.resolveLocalId(id);
|
|
if (type === 'post' && localId) {
|
|
return await traverse(uid, localId);
|
|
}
|
|
|
|
const postData = await posts.getPostData(id);
|
|
if (postData) {
|
|
chain.add(postData);
|
|
if (postData.toPid) {
|
|
await traverse(uid, postData.toPid);
|
|
} else if (utils.isNumber(id)) { // local pid without toPid, could be OP or reply to OP
|
|
const mainPid = await topics.getTopicField(postData.tid, 'mainPid');
|
|
if (mainPid !== parseInt(id, 10)) {
|
|
await traverse(uid, mainPid);
|
|
}
|
|
}
|
|
} else {
|
|
let object = !activitypub.helpers.isUri(input) && input.id === id ? input : undefined;
|
|
try {
|
|
object = object || await activitypub.get('uid', uid, id);
|
|
|
|
// Handle incorrect id passed in
|
|
if (id !== object.id) {
|
|
return await traverse(uid, object.id);
|
|
}
|
|
|
|
object = await activitypub.mocks.post(object);
|
|
if (object) {
|
|
chain.add(object);
|
|
if (object.toPid) {
|
|
await traverse(uid, object.toPid);
|
|
}
|
|
}
|
|
} catch (e) {
|
|
winston.verbose(`[activitypub/notes/getParentChain] Cannot retrieve ${id}, terminating here.`);
|
|
}
|
|
}
|
|
};
|
|
|
|
await traverse(uid, id);
|
|
return chain;
|
|
};
|
|
|
|
Notes.syncUserInboxes = async function (tid, uid) {
|
|
const [pids, { cid, mainPid }] = await Promise.all([
|
|
db.getSortedSetMembers(`tid:${tid}:posts`),
|
|
topics.getTopicFields(tid, ['tid', 'cid', 'mainPid']),
|
|
]);
|
|
pids.unshift(mainPid);
|
|
|
|
const recipients = await db.getSetsMembers(pids.map(id => `post:${id}:recipients`));
|
|
const uids = recipients.reduce((set, uids) => new Set([...set, ...uids.map(u => parseInt(u, 10))]), new Set());
|
|
if (uid) {
|
|
uids.add(parseInt(uid, 10));
|
|
}
|
|
|
|
const keys = Array.from(uids).map(uid => `uid:${uid}:inbox`);
|
|
const score = await db.sortedSetScore(`cid:${cid}:tids`, tid);
|
|
|
|
const removeKeys = (await db.getSetMembers(`tid:${tid}:recipients`))
|
|
.filter(uid => !uids.has(parseInt(uid, 10)))
|
|
.map((uid => `uid:${uid}:inbox`));
|
|
|
|
// winston.verbose(`[activitypub/syncUserInboxes] Syncing tid ${tid} with ${uids.size} inboxes`);
|
|
await Promise.all([
|
|
db.sortedSetsRemove(removeKeys, tid),
|
|
db.sortedSetsAdd(keys, keys.map(() => score || Date.now()), tid),
|
|
db.setAdd(`tid:${tid}:recipients`, Array.from(uids)),
|
|
]);
|
|
};
|
|
|
|
Notes.getCategoryFollowers = async (cid) => {
|
|
// Retrieves remote users who have followed a category; used to build recipient list
|
|
let uids = await db.getSortedSetRangeByScore(`cid:${cid}:uid:watch:state`, 0, -1, categories.watchStates.tracking, categories.watchStates.tracking);
|
|
uids = uids.filter(uid => !utils.isNumber(uid));
|
|
|
|
return uids;
|
|
};
|
|
|
|
Notes.announce = {};
|
|
|
|
Notes.announce.list = async ({ pid, tid }) => {
|
|
let pids = [];
|
|
if (pid) {
|
|
pids = [pid];
|
|
} else if (tid) {
|
|
let mainPid;
|
|
([pids, mainPid] = await Promise.all([
|
|
db.getSortedSetMembers(`tid:${tid}:posts`),
|
|
topics.getTopicField(tid, 'mainPid'),
|
|
]));
|
|
pids.unshift(mainPid);
|
|
}
|
|
|
|
if (!pids.length) {
|
|
return [];
|
|
}
|
|
|
|
const keys = pids.map(pid => `pid:${pid}:announces`);
|
|
let announces = await db.getSortedSetsMembersWithScores(keys);
|
|
announces = announces.reduce((memo, cur, idx) => {
|
|
if (cur.length) {
|
|
const pid = pids[idx];
|
|
cur.forEach(({ value: actor, score: timestamp }) => {
|
|
memo.push({ pid, actor, timestamp });
|
|
});
|
|
}
|
|
return memo;
|
|
}, []);
|
|
|
|
return announces;
|
|
};
|
|
|
|
Notes.announce.add = async (pid, actor, timestamp = Date.now()) => {
|
|
await db.sortedSetAdd(`pid:${pid}:announces`, timestamp, actor);
|
|
await posts.setPostField(pid, 'announces', await db.sortedSetCard(`pid:${pid}:announces`));
|
|
};
|
|
|
|
Notes.announce.remove = async (pid, actor) => {
|
|
await db.sortedSetRemove(`pid:${pid}:announces`, actor);
|
|
const count = await db.sortedSetCard(`pid:${pid}:announces`);
|
|
if (count > 0) {
|
|
await posts.setPostField(pid, 'announces', count);
|
|
} else {
|
|
await db.deleteObjectField(`post:${pid}`, 'announces');
|
|
}
|
|
};
|
|
|
|
Notes.announce.removeAll = async (pid) => {
|
|
await Promise.all([
|
|
db.delete(`pid:${pid}:announces`),
|
|
db.deleteObjectField(`post:${pid}`, 'announces'),
|
|
]);
|
|
};
|
|
|
|
Notes.delete = async (pids) => {
|
|
if (!Array.isArray(pids)) {
|
|
pids = [pids];
|
|
}
|
|
|
|
const exists = await posts.exists(pids);
|
|
pids = pids.filter((_, idx) => exists[idx]);
|
|
|
|
let tids = await posts.getPostsFields(pids, ['tid']);
|
|
tids = new Set(tids.map(obj => obj.tid));
|
|
|
|
const recipientSets = pids.map(id => `post:${id}:recipients`);
|
|
const announcerSets = pids.map(id => `pid:${id}:announces`);
|
|
|
|
await db.deleteAll([...recipientSets, ...announcerSets]);
|
|
await Promise.all(Array.from(tids).map(async tid => Notes.syncUserInboxes(tid)));
|
|
};
|
|
|
|
Notes.prune = async () => {
|
|
/**
|
|
* Prune topics in cid -1 that have received no engagement.
|
|
* Engagement is defined as:
|
|
* - Replied to (contains a local reply)
|
|
* - Post within is liked
|
|
*/
|
|
winston.info('[notes/prune] Starting scheduled pruning of topics');
|
|
const start = '-inf';
|
|
const stop = Date.now() - (1000 * 60 * 60 * 24 * 30); // 30 days; todo: make configurable?
|
|
let tids = await db.getSortedSetRangeByScore('cid:-1:tids', 0, -1, start, stop);
|
|
|
|
winston.info(`[notes/prune] Found ${tids.length} topics older than 30 days (since last activity).`);
|
|
|
|
const posters = await db.getSortedSetsMembers(tids.map(tid => `tid:${tid}:posters`));
|
|
const hasLocalVoter = await Promise.all(tids.map(async (tid) => {
|
|
const mainPid = await db.getObjectField(`topic:${tid}`, 'mainPid');
|
|
const pids = await db.getSortedSetMembers(`tid:${tid}:posts`);
|
|
pids.unshift(mainPid);
|
|
|
|
// Check voters of each pid for a local uid
|
|
const voters = new Set();
|
|
await Promise.all(pids.map(async (pid) => {
|
|
const [upvoters, downvoters] = await db.getSetsMembers([`pid:${pid}:upvote`, `pid:${pid}:downvote`]);
|
|
upvoters.forEach(uid => voters.add(uid));
|
|
downvoters.forEach(uid => voters.add(uid));
|
|
}));
|
|
|
|
return Array.from(voters).some(uid => utils.isNumber(uid));
|
|
}));
|
|
|
|
tids = tids.filter((_, idx) => {
|
|
const localPoster = posters[idx].some(uid => utils.isNumber(uid));
|
|
const localVoter = hasLocalVoter[idx];
|
|
|
|
return !localPoster && !localVoter;
|
|
});
|
|
|
|
winston.info(`[notes/prune] ${tids.length} topics eligible for pruning`);
|
|
|
|
await batch.processArray(tids, async (tids) => {
|
|
await Promise.all(tids.map(async tid => await topics.purgePostsAndTopic(tid, 0)));
|
|
}, { batch: 100 });
|
|
|
|
winston.info('[notes/prune] Scheduled pruning of topics complete.');
|
|
};
|