'use strict'; module.exports = function (module) { module.sortedSetIntersectCard = async function (keys) { if (!Array.isArray(keys) || !keys.length) { return 0; } const objects = module.client.collection('objects'); const counts = await countSets(keys, 50000); if (counts.minCount === 0) { return 0; } let items = await objects.find({ _key: counts.smallestSet }, { projection: { _id: 0, value: 1 }, }).toArray(); const otherSets = keys.filter(s => s !== counts.smallestSet); for (let i = 0; i < otherSets.length; i++) { /* eslint-disable no-await-in-loop */ const query = { _key: otherSets[i], value: { $in: items.map(i => i.value) } }; if (i === otherSets.length - 1) { return await objects.countDocuments(query); } items = await objects.find(query, { projection: { _id: 0, value: 1 } }).toArray(); } }; async function countSets(sets, limit) { const objects = module.client.collection('objects'); const counts = await Promise.all( sets.map(s => objects.countDocuments({ _key: s }, { limit: limit || 25000, })) ); const minCount = Math.min(...counts); const index = counts.indexOf(minCount); const smallestSet = sets[index]; return { minCount: minCount, smallestSet: smallestSet, }; } module.getSortedSetIntersect = async function (params) { params.sort = 1; return await getSortedSetRevIntersect(params); }; module.getSortedSetRevIntersect = async function (params) { params.sort = -1; return await getSortedSetRevIntersect(params); }; async function getSortedSetRevIntersect(params) { params.start = params.hasOwnProperty('start') ? params.start : 0; params.stop = params.hasOwnProperty('stop') ? params.stop : -1; params.weights = params.weights || []; params.limit = params.stop - params.start + 1; if (params.limit <= 0) { params.limit = 0; } params.counts = await countSets(params.sets); if (params.counts.minCount === 0) { return []; } const simple = params.weights.filter(w => w === 1).length === 1 && params.limit !== 0; if (params.counts.minCount < 25000 && simple) { return await intersectSingle(params); } else if (simple) { return await intersectBatch(params); } return await intersectAggregate(params); } async function intersectSingle(params) { const objects = module.client.collection('objects'); const sortSet = params.sets[params.weights.indexOf(1)]; if (sortSet === params.counts.smallestSet) { return await intersectBatch(params); } let items = await objects.find({ _key: params.counts.smallestSet }, { projection: { _id: 0, value: 1 }, }).toArray(); const project = { _id: 0, value: 1 }; if (params.withScores) { project.score = 1; } const otherSets = params.sets.filter(s => s !== params.counts.smallestSet); // move sortSet to the end of array otherSets.push(otherSets.splice(otherSets.indexOf(sortSet), 1)[0]); for (let i = 0; i < otherSets.length; i++) { /* eslint-disable no-await-in-loop */ const cursor = objects.find({ _key: otherSets[i], value: { $in: items.map(i => i.value) } }); // at the last step sort by sortSet if (i === otherSets.length - 1) { cursor.project(project).sort({ score: params.sort }).skip(params.start).limit(params.limit); } else { cursor.project({ _id: 0, value: 1 }); } items = await cursor.toArray(); } if (!params.withScores) { items = items.map(i => i.value); } return items; } async function intersectBatch(params) { const project = { _id: 0, value: 1 }; if (params.withScores) { project.score = 1; } const sortSet = params.sets[params.weights.indexOf(1)]; const batchSize = 10000; const cursor = await module.client.collection('objects') .find({ _key: sortSet }, { projection: project }) .sort({ score: params.sort }) .batchSize(batchSize); const otherSets = params.sets.filter(s => s !== sortSet); let inters = []; let done = false; while (!done) { /* eslint-disable no-await-in-loop */ const items = []; while (items.length < batchSize) { const nextItem = await cursor.next(); if (!nextItem) { done = true; break; } items.push(nextItem); } const members = await Promise.all(otherSets.map(s => module.isSortedSetMembers(s, items.map(i => i.value)))); inters = inters.concat(items.filter((item, idx) => members.every(arr => arr[idx]))); if (inters.length >= params.stop) { done = true; inters = inters.slice(params.start, params.stop + 1); } } if (!params.withScores) { inters = inters.map(item => item.value); } return inters; } async function intersectAggregate(params) { const aggregate = {}; if (params.aggregate) { aggregate['$' + params.aggregate.toLowerCase()] = '$score'; } else { aggregate.$sum = '$score'; } const pipeline = [{ $match: { _key: { $in: params.sets } } }]; params.weights.forEach(function (weight, index) { if (weight !== 1) { pipeline.push({ $project: { value: 1, score: { $cond: { if: { $eq: ['$_key', params.sets[index]], }, then: { $multiply: ['$score', weight], }, else: '$score', }, }, }, }); } }); pipeline.push({ $group: { _id: { value: '$value' }, totalScore: aggregate, count: { $sum: 1 } } }); pipeline.push({ $match: { count: params.sets.length } }); pipeline.push({ $sort: { totalScore: params.sort } }); if (params.start) { pipeline.push({ $skip: params.start }); } if (params.limit > 0) { pipeline.push({ $limit: params.limit }); } const project = { _id: 0, value: '$_id.value' }; if (params.withScores) { project.score = '$totalScore'; } pipeline.push({ $project: project }); let data = await module.client.collection('objects').aggregate(pipeline).toArray(); if (!params.withScores) { data = data.map(item => item.value); } return data; } };