mirror of
https://github.com/NodeBB/NodeBB.git
synced 2025-10-30 10:35:55 +01:00
feat: track incoming requests by id, analytics increment for some metrics, ignore repeated requests by id
closes #12574
This commit is contained in:
@@ -13,6 +13,7 @@ const ttl = require('../cache/ttl');
|
|||||||
const lru = require('../cache/lru');
|
const lru = require('../cache/lru');
|
||||||
const batch = require('../batch');
|
const batch = require('../batch');
|
||||||
const pubsub = require('../pubsub');
|
const pubsub = require('../pubsub');
|
||||||
|
const analytics = require('../analytics');
|
||||||
|
|
||||||
const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes
|
const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes
|
||||||
const ActivityPub = module.exports;
|
const ActivityPub = module.exports;
|
||||||
@@ -322,3 +323,14 @@ ActivityPub.send = async (type, id, targets, payload) => {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
ActivityPub.record = async ({ id, type, actor }) => {
|
||||||
|
const now = Date.now();
|
||||||
|
const { hostname } = new URL(actor);
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
db.sortedSetAdd(`activities:datetime`, now, id),
|
||||||
|
db.sortedSetAdd('domains:lastSeen', now, hostname),
|
||||||
|
analytics.increment(['activities', `activities:byType:${type}`, `activities:byHost:${hostname}`]),
|
||||||
|
]);
|
||||||
|
};
|
||||||
|
|||||||
@@ -122,7 +122,8 @@ Controller.postInbox = async (req, res) => {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
await activitypub.inbox[method](req);
|
await activitypub.inbox[method](req);
|
||||||
helpers.formatApiResponse(200, res);
|
await activitypub.record(req.body);
|
||||||
|
helpers.formatApiResponse(202, res);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
helpers.formatApiResponse(500, res, e);
|
helpers.formatApiResponse(500, res, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,22 @@ middleware.assertS2S = async function (req, res, next) {
|
|||||||
|
|
||||||
middleware.validate = async function (req, res, next) {
|
middleware.validate = async function (req, res, next) {
|
||||||
winston.verbose('[middleware/activitypub] Validating incoming payload...');
|
winston.verbose('[middleware/activitypub] Validating incoming payload...');
|
||||||
|
|
||||||
|
// Sanity-check payload schema
|
||||||
|
const required = ['id', 'type', 'actor', 'object'];
|
||||||
|
if (!required.every(prop => req.body.hasOwnProperty(prop))) {
|
||||||
|
winston.verbose('[middleware/activitypub] Request body missing required properties.');
|
||||||
|
return res.sendStatus(400);
|
||||||
|
}
|
||||||
|
winston.verbose('[middleware/activitypub] Request body check passed.');
|
||||||
|
|
||||||
|
// History check
|
||||||
|
const seen = await db.isSortedSetMember('activities:datetime', req.body.id);
|
||||||
|
if (seen) {
|
||||||
|
winston.verbose(`[middleware/activitypub] Activity already seen, ignoring (${req.body.id}).`);
|
||||||
|
return res.sendStatus(200);
|
||||||
|
}
|
||||||
|
|
||||||
// Checks the validity of the incoming payload against the sender and rejects on failure
|
// Checks the validity of the incoming payload against the sender and rejects on failure
|
||||||
const verified = await activitypub.verify(req);
|
const verified = await activitypub.verify(req);
|
||||||
if (!verified) {
|
if (!verified) {
|
||||||
@@ -40,14 +56,6 @@ middleware.validate = async function (req, res, next) {
|
|||||||
}
|
}
|
||||||
winston.verbose('[middleware/activitypub] HTTP signature verification passed.');
|
winston.verbose('[middleware/activitypub] HTTP signature verification passed.');
|
||||||
|
|
||||||
// Sanity-check payload schema
|
|
||||||
const required = ['type', 'actor', 'object'];
|
|
||||||
if (!required.every(prop => req.body.hasOwnProperty(prop))) {
|
|
||||||
winston.verbose('[middleware/activitypub] Request body missing required properties.');
|
|
||||||
return res.sendStatus(400);
|
|
||||||
}
|
|
||||||
winston.verbose('[middleware/activitypub] Request body check passed.');
|
|
||||||
|
|
||||||
let { actor, object } = req.body;
|
let { actor, object } = req.body;
|
||||||
|
|
||||||
// Actor normalization
|
// Actor normalization
|
||||||
|
|||||||
@@ -361,7 +361,7 @@ describe('ActivityPub integration', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('Serving of local assets to remote clients', () => {
|
describe('Serving of local assets to remote clients (mocking)', () => {
|
||||||
describe('Note', () => {
|
describe('Note', () => {
|
||||||
let cid;
|
let cid;
|
||||||
let uid;
|
let uid;
|
||||||
|
|||||||
154
test/activitypub/analytics.js
Normal file
154
test/activitypub/analytics.js
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const nconf = require('nconf');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
const db = require('../../src/database');
|
||||||
|
const controllers = require('../../src/controllers');
|
||||||
|
const middleware = require('../../src/middleware');
|
||||||
|
const activitypub = require('../../src/activitypub');
|
||||||
|
const utils = require('../../src/utils');
|
||||||
|
const user = require('../../src/user');
|
||||||
|
const categories = require('../../src/categories');
|
||||||
|
const topics = require('../../src/topics');
|
||||||
|
const analytics = require('../../src/analytics');
|
||||||
|
const api = require('../../src/api');
|
||||||
|
|
||||||
|
describe('Analytics', () => {
|
||||||
|
let cid;
|
||||||
|
let uid;
|
||||||
|
let postData;
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
nconf.set('runJobs', 1);
|
||||||
|
({ cid } = await categories.create({ name: utils.generateUUID().slice(0, 8) }));
|
||||||
|
const remoteUser = {
|
||||||
|
'@context': 'https://www.w3.org/ns/activitystreams',
|
||||||
|
id: 'https://example.org/user/foobar',
|
||||||
|
url: 'https://example.org/user/foobar',
|
||||||
|
|
||||||
|
type: 'Person',
|
||||||
|
name: 'Foo Bar',
|
||||||
|
preferredUsername: 'foobar',
|
||||||
|
publicKey: {
|
||||||
|
id: 'https://example.org/user/foobar#key',
|
||||||
|
owner: 'https://example.org/user/foobar',
|
||||||
|
publicKeyPem: 'publickey',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
activitypub._cache.set(`0;https://example.org/user/foobar`, remoteUser);
|
||||||
|
});
|
||||||
|
|
||||||
|
after(async () => {
|
||||||
|
nconf.set('runJobs', undefined);
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
uid = await user.create({ username: utils.generateUUID().slice(0, 8) });
|
||||||
|
({ postData } = await topics.post({
|
||||||
|
uid,
|
||||||
|
cid,
|
||||||
|
title: utils.generateUUID(),
|
||||||
|
content: utils.generateUUID(),
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should record the incoming activity if successfully processed', async () => {
|
||||||
|
const id = `https://example.org/activity/${utils.generateUUID()}`;
|
||||||
|
await controllers.activitypub.postInbox({
|
||||||
|
body: {
|
||||||
|
id,
|
||||||
|
type: 'Like',
|
||||||
|
actor: 'https://example.org/user/foobar',
|
||||||
|
object: {
|
||||||
|
type: 'Note',
|
||||||
|
id: `${nconf.get('url')}/post/${postData.pid}`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, { sendStatus: () => {} });
|
||||||
|
const processed = await db.isSortedSetMember('activities:datetime', id);
|
||||||
|
|
||||||
|
assert(processed);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not process the activity if received again', async () => {
|
||||||
|
// Specifically, the controller would update the score, but the request should be caught in middlewares and ignored
|
||||||
|
const id = `https://example.org/activity/${utils.generateUUID()}`;
|
||||||
|
await controllers.activitypub.postInbox({
|
||||||
|
body: {
|
||||||
|
id,
|
||||||
|
type: 'Like',
|
||||||
|
actor: 'https://example.org/user/foobar',
|
||||||
|
object: {
|
||||||
|
type: 'Note',
|
||||||
|
id: `${nconf.get('url')}/post/${postData.pid}`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, { sendStatus: () => {} });
|
||||||
|
|
||||||
|
await middleware.activitypub.validate({
|
||||||
|
body: {
|
||||||
|
id,
|
||||||
|
type: 'Like',
|
||||||
|
actor: 'https://example.org/user/foobar',
|
||||||
|
object: {
|
||||||
|
type: 'Note',
|
||||||
|
id: `${nconf.get('url')}/post/${postData.pid}`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
sendStatus: (statusCode) => {
|
||||||
|
assert.strictEqual(statusCode, 200);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should increment the last seen time of that domain', async () => {
|
||||||
|
const id = `https://example.org/activity/${utils.generateUUID()}`;
|
||||||
|
const before = await db.sortedSetScore('domains:lastSeen', 'example.org');
|
||||||
|
await controllers.activitypub.postInbox({
|
||||||
|
body: {
|
||||||
|
id,
|
||||||
|
type: 'Like',
|
||||||
|
actor: 'https://example.org/user/foobar',
|
||||||
|
object: {
|
||||||
|
type: 'Note',
|
||||||
|
id: `${nconf.get('url')}/post/${postData.pid}`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, { sendStatus: () => {} });
|
||||||
|
|
||||||
|
const after = await db.sortedSetScore('domains:lastSeen', 'example.org');
|
||||||
|
|
||||||
|
assert(before && after);
|
||||||
|
assert(before < after);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should increment various metrics', async () => {
|
||||||
|
let counters;
|
||||||
|
({ counters } = analytics.peek());
|
||||||
|
const before = { ...counters };
|
||||||
|
|
||||||
|
const id = `https://example.org/activity/${utils.generateUUID()}`;
|
||||||
|
await controllers.activitypub.postInbox({
|
||||||
|
body: {
|
||||||
|
id,
|
||||||
|
type: 'Like',
|
||||||
|
actor: 'https://example.org/user/foobar',
|
||||||
|
object: {
|
||||||
|
type: 'Note',
|
||||||
|
id: `${nconf.get('url')}/post/${postData.pid}`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, { sendStatus: () => {} });
|
||||||
|
|
||||||
|
({ counters } = analytics.peek());
|
||||||
|
const after = { ...counters };
|
||||||
|
|
||||||
|
const metrics = ['activities', 'activities:byType:Like', 'activities:byHost:example.org'];
|
||||||
|
metrics.forEach((metric) => {
|
||||||
|
assert(before[metric] && after[metric]);
|
||||||
|
assert(before[metric] < after[metric]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user