feat: add batch size to cursors

This commit is contained in:
Barış Soner Uşaklı
2020-05-22 20:25:26 -04:00
parent 0a31e3e6b0
commit a015af4a41

View File

@@ -12,7 +12,7 @@ module.exports = function (module) {
} }
let items = await objects.find({ _key: counts.smallestSet }, { let items = await objects.find({ _key: counts.smallestSet }, {
projection: { _id: 0, value: 1 }, projection: { _id: 0, value: 1 },
}).toArray(); }).batchSize(counts.minCount + 1).toArray();
const otherSets = keys.filter(s => s !== counts.smallestSet); const otherSets = keys.filter(s => s !== counts.smallestSet);
for (let i = 0; i < otherSets.length; i++) { for (let i = 0; i < otherSets.length; i++) {
@@ -21,7 +21,7 @@ module.exports = function (module) {
if (i === otherSets.length - 1) { if (i === otherSets.length - 1) {
return await objects.countDocuments(query); return await objects.countDocuments(query);
} }
items = await objects.find(query, { projection: { _id: 0, value: 1 } }).toArray(); items = await objects.find(query, { projection: { _id: 0, value: 1 } }).batchSize(items.length + 1).toArray();
} }
}; };
@@ -81,9 +81,13 @@ module.exports = function (module) {
return await intersectBatch(params); return await intersectBatch(params);
} }
let items = await objects.find({ _key: params.counts.smallestSet }, { const cursorSmall = objects.find({ _key: params.counts.smallestSet }, {
projection: { _id: 0, value: 1 }, projection: { _id: 0, value: 1 },
}).toArray(); });
if (params.counts.minCount > 1) {
cursorSmall.batchSize(params.counts.minCount + 1);
}
let items = await cursorSmall.toArray();
const project = { _id: 0, value: 1 }; const project = { _id: 0, value: 1 };
if (params.withScores) { if (params.withScores) {
project.score = 1; project.score = 1;
@@ -94,6 +98,7 @@ module.exports = function (module) {
for (let i = 0; i < otherSets.length; i++) { for (let i = 0; i < otherSets.length; i++) {
/* eslint-disable no-await-in-loop */ /* eslint-disable no-await-in-loop */
const cursor = objects.find({ _key: otherSets[i], value: { $in: items.map(i => i.value) } }); const cursor = objects.find({ _key: otherSets[i], value: { $in: items.map(i => i.value) } });
cursor.batchSize(items.length + 1);
// at the last step sort by sortSet // at the last step sort by sortSet
if (i === otherSets.length - 1) { if (i === otherSets.length - 1) {
cursor.project(project).sort({ score: params.sort }).skip(params.start).limit(params.limit); cursor.project(project).sort({ score: params.sort }).skip(params.start).limit(params.limit);
@@ -135,8 +140,15 @@ module.exports = function (module) {
items.push(nextItem); items.push(nextItem);
} }
const members = await Promise.all(otherSets.map(s => module.isSortedSetMembers(s, items.map(i => i.value)))); const members = await Promise.all(otherSets.map(async function (s) {
inters = inters.concat(items.filter((item, idx) => members.every(arr => arr[idx]))); const data = await module.client.collection('objects').find({
_key: s, value: { $in: items.map(i => i.value) },
}, {
projection: { _id: 0, value: 1 },
}).batchSize(items.length + 1).toArray();
return new Set(data.map(i => i.value));
}));
inters = inters.concat(items.filter(item => members.every(arr => arr.has(item.value))));
if (inters.length >= params.stop) { if (inters.length >= params.stop) {
done = true; done = true;
inters = inters.slice(params.start, params.stop + 1); inters = inters.slice(params.start, params.stop + 1);