feat: track incoming requests by id, analytics increment for some metrics, ignore repeated requests by id

closes #12574
This commit is contained in:
Julian Lam
2024-05-14 12:06:59 -04:00
parent b106a6a018
commit 4e9cd8efc0
5 changed files with 185 additions and 10 deletions

View File

@@ -13,6 +13,7 @@ const ttl = require('../cache/ttl');
const lru = require('../cache/lru');
const batch = require('../batch');
const pubsub = require('../pubsub');
const analytics = require('../analytics');
const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes
const ActivityPub = module.exports;
@@ -322,3 +323,14 @@ ActivityPub.send = async (type, id, targets, payload) => {
},
);
};
ActivityPub.record = async ({ id, type, actor }) => {
const now = Date.now();
const { hostname } = new URL(actor);
await Promise.all([
db.sortedSetAdd(`activities:datetime`, now, id),
db.sortedSetAdd('domains:lastSeen', now, hostname),
analytics.increment(['activities', `activities:byType:${type}`, `activities:byHost:${hostname}`]),
]);
};

View File

@@ -122,7 +122,8 @@ Controller.postInbox = async (req, res) => {
try {
await activitypub.inbox[method](req);
helpers.formatApiResponse(200, res);
await activitypub.record(req.body);
helpers.formatApiResponse(202, res);
} catch (e) {
helpers.formatApiResponse(500, res, e);
}

View File

@@ -32,6 +32,22 @@ middleware.assertS2S = async function (req, res, next) {
middleware.validate = async function (req, res, next) {
winston.verbose('[middleware/activitypub] Validating incoming payload...');
// Sanity-check payload schema
const required = ['id', 'type', 'actor', 'object'];
if (!required.every(prop => req.body.hasOwnProperty(prop))) {
winston.verbose('[middleware/activitypub] Request body missing required properties.');
return res.sendStatus(400);
}
winston.verbose('[middleware/activitypub] Request body check passed.');
// History check
const seen = await db.isSortedSetMember('activities:datetime', req.body.id);
if (seen) {
winston.verbose(`[middleware/activitypub] Activity already seen, ignoring (${req.body.id}).`);
return res.sendStatus(200);
}
// Checks the validity of the incoming payload against the sender and rejects on failure
const verified = await activitypub.verify(req);
if (!verified) {
@@ -40,14 +56,6 @@ middleware.validate = async function (req, res, next) {
}
winston.verbose('[middleware/activitypub] HTTP signature verification passed.');
// Sanity-check payload schema
const required = ['type', 'actor', 'object'];
if (!required.every(prop => req.body.hasOwnProperty(prop))) {
winston.verbose('[middleware/activitypub] Request body missing required properties.');
return res.sendStatus(400);
}
winston.verbose('[middleware/activitypub] Request body check passed.');
let { actor, object } = req.body;
// Actor normalization

View File

@@ -361,7 +361,7 @@ describe('ActivityPub integration', () => {
});
});
describe('Serving of local assets to remote clients', () => {
describe('Serving of local assets to remote clients (mocking)', () => {
describe('Note', () => {
let cid;
let uid;

View File

@@ -0,0 +1,154 @@
'use strict';
const nconf = require('nconf');
const assert = require('assert');
const db = require('../../src/database');
const controllers = require('../../src/controllers');
const middleware = require('../../src/middleware');
const activitypub = require('../../src/activitypub');
const utils = require('../../src/utils');
const user = require('../../src/user');
const categories = require('../../src/categories');
const topics = require('../../src/topics');
const analytics = require('../../src/analytics');
const api = require('../../src/api');
describe('Analytics', () => {
let cid;
let uid;
let postData;
before(async () => {
nconf.set('runJobs', 1);
({ cid } = await categories.create({ name: utils.generateUUID().slice(0, 8) }));
const remoteUser = {
'@context': 'https://www.w3.org/ns/activitystreams',
id: 'https://example.org/user/foobar',
url: 'https://example.org/user/foobar',
type: 'Person',
name: 'Foo Bar',
preferredUsername: 'foobar',
publicKey: {
id: 'https://example.org/user/foobar#key',
owner: 'https://example.org/user/foobar',
publicKeyPem: 'publickey',
},
};
activitypub._cache.set(`0;https://example.org/user/foobar`, remoteUser);
});
after(async () => {
nconf.set('runJobs', undefined);
});
beforeEach(async () => {
uid = await user.create({ username: utils.generateUUID().slice(0, 8) });
({ postData } = await topics.post({
uid,
cid,
title: utils.generateUUID(),
content: utils.generateUUID(),
}));
});
it('should record the incoming activity if successfully processed', async () => {
const id = `https://example.org/activity/${utils.generateUUID()}`;
await controllers.activitypub.postInbox({
body: {
id,
type: 'Like',
actor: 'https://example.org/user/foobar',
object: {
type: 'Note',
id: `${nconf.get('url')}/post/${postData.pid}`,
},
},
}, { sendStatus: () => {} });
const processed = await db.isSortedSetMember('activities:datetime', id);
assert(processed);
});
it('should not process the activity if received again', async () => {
// Specifically, the controller would update the score, but the request should be caught in middlewares and ignored
const id = `https://example.org/activity/${utils.generateUUID()}`;
await controllers.activitypub.postInbox({
body: {
id,
type: 'Like',
actor: 'https://example.org/user/foobar',
object: {
type: 'Note',
id: `${nconf.get('url')}/post/${postData.pid}`,
},
},
}, { sendStatus: () => {} });
await middleware.activitypub.validate({
body: {
id,
type: 'Like',
actor: 'https://example.org/user/foobar',
object: {
type: 'Note',
id: `${nconf.get('url')}/post/${postData.pid}`,
},
},
}, {
sendStatus: (statusCode) => {
assert.strictEqual(statusCode, 200);
},
});
});
it('should increment the last seen time of that domain', async () => {
const id = `https://example.org/activity/${utils.generateUUID()}`;
const before = await db.sortedSetScore('domains:lastSeen', 'example.org');
await controllers.activitypub.postInbox({
body: {
id,
type: 'Like',
actor: 'https://example.org/user/foobar',
object: {
type: 'Note',
id: `${nconf.get('url')}/post/${postData.pid}`,
},
},
}, { sendStatus: () => {} });
const after = await db.sortedSetScore('domains:lastSeen', 'example.org');
assert(before && after);
assert(before < after);
});
it('should increment various metrics', async () => {
let counters;
({ counters } = analytics.peek());
const before = { ...counters };
const id = `https://example.org/activity/${utils.generateUUID()}`;
await controllers.activitypub.postInbox({
body: {
id,
type: 'Like',
actor: 'https://example.org/user/foobar',
object: {
type: 'Note',
id: `${nconf.get('url')}/post/${postData.pid}`,
},
},
}, { sendStatus: () => {} });
({ counters } = analytics.peek());
const after = { ...counters };
const metrics = ['activities', 'activities:byType:Like', 'activities:byHost:example.org'];
metrics.forEach((metric) => {
assert(before[metric] && after[metric]);
assert(before[metric] < after[metric]);
});
});
});