feat: batch.processSortedSet min/max (#12129)

* feat: batch.processSortedSet min/max

* test if this works
This commit is contained in:
Barış Soner Uşaklı
2023-10-26 10:23:27 -04:00
committed by GitHub
parent 075cd598d1
commit 6c7e614417
4 changed files with 50 additions and 3 deletions

View File

@@ -39,11 +39,23 @@ exports.processSortedSet = async function (setKey, process, options) {
if (process && process.constructor && process.constructor.name !== 'AsyncFunction') {
process = util.promisify(process);
}
const method = options.reverse ? 'getSortedSetRevRange' : 'getSortedSetRange';
const isByScore = (options.min && options.min !== '-inf') || (options.max && options.max !== '+inf');
const byScore = isByScore ? 'ByScore' : '';
const withScores = options.withScores ? 'WithScores' : '';
let iteration = 1;
const getFn = db[`${method}${byScore}${withScores}`];
while (true) {
/* eslint-disable no-await-in-loop */
const ids = await db[`${method}${options.withScores ? 'WithScores' : ''}`](setKey, start, stop);
const ids = await getFn(
setKey,
start,
isByScore ? stop - start + 1 : stop,
options.reverse ? options.max : options.min,
options.reverse ? options.min : options.max,
);
if (!ids.length || options.doneIf(start, stop, ids)) {
return;
}

View File

@@ -568,7 +568,17 @@ module.exports = function (module) {
if (!options.withScores) {
project.score = 0;
}
const cursor = await module.client.collection('objects').find({ _key: setKey }, { projection: project })
const query = { _key: setKey };
if (options.min && options.min !== '-inf') {
query.score = { $gte: parseFloat(options.min) };
}
if (options.max && options.max !== '+inf') {
query.score = query.score || {};
query.score.$lte = parseFloat(options.max);
}
const cursor = await module.client.collection('objects')
.find(query, { projection: project })
.sort({ score: sort })
.batchSize(options.batch);

View File

@@ -665,6 +665,8 @@ SELECT z."value",
const client = await module.pool.connect();
const batchSize = (options || {}).batch || 100;
const sort = options.reverse ? 'DESC' : 'ASC';
const min = options.min && options.min !== '-inf' ? options.min : null;
const max = options.max && options.max !== '+inf' ? options.max : null;
const cursor = client.query(new Cursor(`
SELECT z."value", z."score"
FROM "legacy_object_live" o
@@ -672,7 +674,9 @@ SELECT z."value", z."score"
ON o."_key" = z."_key"
AND o."type" = z."type"
WHERE o."_key" = $1::TEXT
ORDER BY z."score" ${sort}, z."value" ${sort}`, [setKey]));
AND (z."score" >= $2::NUMERIC OR $2::NUMERIC IS NULL)
AND (z."score" <= $3::NUMERIC OR $3::NUMERIC IS NULL)
ORDER BY z."score" ${sort}, z."value" ${sort}`, [setKey, min, max]));
if (process && process.constructor && process.constructor.name !== 'AsyncFunction') {
process = util.promisify(process);

View File

@@ -77,6 +77,27 @@ describe('batch', () => {
assert.strictEqual(total, 490);
});
it('should process sorted set with min/max scores', async () => {
await db.sortedSetAddBulk([
['processByScore', 1, 'item1'],
['processByScore', 2, 'item2'],
['processByScore', 3, 'item3'],
['processByScore', 3, 'item4'],
['processByScore', 4, 'item5'],
['processByScore', 5, 'item6'],
]);
const result = [];
await batch.processSortedSet('processByScore', async (items) => {
result.push(...items);
}, {
min: 3,
max: 4,
});
assert(result.includes('item3'));
assert(result.includes('item4'));
assert(result.includes('item5'));
});
it('should process array with callbacks', (done) => {
let total = 0;
batch.processArray(scores, (nums, next) => {