list remove all (#13113)

* list remove all

* one more test

* sortedSetIncrByBulk

* remove name

* incrObjectFieldByBulk

* test: disable api tests

* try merge

* another test

* give upon bulk incr

* update so answer

* one more try

* fix: name

* chore: up dbsearch
This commit is contained in:
Barış Soner Uşaklı
2025-02-02 03:30:52 -05:00
parent 0298a3af0d
commit 1cdf37a218
5 changed files with 84 additions and 15 deletions

View File

@@ -188,7 +188,7 @@ module.exports = function (module) {
};
module.deleteObjectField = async function (key, field) {
await module.deleteObjectFields(key, [field]);
await module.deleteObjectFields(key, Array.isArray(field) ? field : [field]);
};
module.deleteObjectFields = async function (key, fields) {

View File

@@ -295,7 +295,7 @@ SELECT (h."data" ? $2::TEXT AND h."data"->>$2::TEXT IS NOT NULL) b
};
module.deleteObjectField = async function (key, field) {
await module.deleteObjectFields(key, [field]);
await module.deleteObjectFields(key, Array.isArray(field) ? field : [field]);
};
module.deleteObjectFields = async function (key, fields) {
@@ -377,12 +377,34 @@ RETURNING ("data"->>$2::TEXT)::NUMERIC v`,
if (!Array.isArray(data) || !data.length) {
return;
}
// TODO: perf?
await Promise.all(data.map(async (item) => {
for (const [field, value] of Object.entries(item[1])) {
// eslint-disable-next-line no-await-in-loop
await module.incrObjectFieldBy(item[0], field, value);
}
}));
await module.transaction(async (client) => {
await helpers.ensureLegacyObjectsType(client, data.map(item => item[0]), 'hash');
const keys = data.map(item => item[0]);
const dataStrings = data.map(item => JSON.stringify(item[1]));
await client.query({
name: 'incrObjectFieldByBulk',
text: `
INSERT INTO "legacy_hash" ("_key", "data")
SELECT k, d
FROM UNNEST($1::TEXT[], $2::JSONB[]) vs(k, d)
ON CONFLICT ("_key")
DO UPDATE SET "data" = (
SELECT jsonb_object_agg(
key,
CASE
WHEN jsonb_typeof(legacy_hash.data -> key) = 'number'
AND jsonb_typeof(EXCLUDED.data -> key) = 'number'
THEN to_jsonb((legacy_hash.data ->> key)::NUMERIC + (EXCLUDED.data ->> key)::NUMERIC)
ELSE COALESCE(EXCLUDED.data -> key, legacy_hash.data -> key)
END
)
FROM jsonb_each(legacy_hash.data || EXCLUDED.data) AS merged(key, value)
);`,
values: [keys, dataStrings],
});
});
};
};

View File

@@ -75,9 +75,26 @@ RETURNING A."array"[array_length(A."array", 1)] v`,
if (!key) {
return;
}
// TODO: remove all values with one query
if (Array.isArray(value)) {
await Promise.all(value.map(v => module.listRemoveAll(key, v)));
await module.pool.query({
name: 'listRemoveAllMultiple',
text: `
UPDATE "legacy_list" l
SET "array" = (
SELECT ARRAY(
SELECT elem
FROM unnest(l."array") WITH ORDINALITY AS u(elem, ord)
WHERE elem NOT IN (SELECT unnest($2::TEXT[]))
ORDER BY ord
)
)
FROM "legacy_object_live" o
WHERE o."_key" = l."_key"
AND o."type" = l."type"
AND o."_key" = $1::TEXT;`,
values: [key, value],
});
return;
}
await module.pool.query({

View File

@@ -547,8 +547,38 @@ RETURNING "score" s`,
};
module.sortedSetIncrByBulk = async function (data) {
// TODO: perf single query?
return await Promise.all(data.map(item => module.sortedSetIncrBy(item[0], item[1], item[2])));
if (!data.length) {
return [];
}
return await module.transaction(async (client) => {
await helpers.ensureLegacyObjectsType(client, data.map(item => item[0]), 'zset');
const values = [];
const queryParams = [];
let paramIndex = 1;
data.forEach(([key, increment, value]) => {
value = helpers.valueToString(value);
increment = parseFloat(increment);
values.push(key, value, increment);
queryParams.push(`($${paramIndex}::TEXT, $${paramIndex + 1}::TEXT, $${paramIndex + 2}::NUMERIC)`);
paramIndex += 3;
});
const query = `
INSERT INTO "legacy_zset" ("_key", "value", "score")
VALUES ${queryParams.join(', ')}
ON CONFLICT ("_key", "value")
DO UPDATE SET "score" = "legacy_zset"."score" + EXCLUDED."score"
RETURNING "value", "score"`;
const res = await client.query({
text: query,
values,
});
return res.rows.map(row => parseFloat(row.score));
});
};
module.getSortedSetRangeByLex = async function (key, min, max, start, count) {