'use strict'; module.exports = function (module) { const helpers = require('./helpers'); module.flushdb = async function () { await module.pool.query(`DROP SCHEMA "public" CASCADE`); await module.pool.query(`CREATE SCHEMA "public"`); }; module.emptydb = async function () { await module.pool.query(`DELETE FROM "legacy_object"`); }; module.exists = async function (key) { if (!key) { return; } // Redis/Mongo consider empty zsets as non-existent, match that behaviour const type = await module.type(key); if (type === 'zset') { if (Array.isArray(key)) { const members = await Promise.all(key.map(key => module.getSortedSetRange(key, 0, 0))); return members.map(member => member.length > 0); } const members = await module.getSortedSetRange(key, 0, 0); return members.length > 0; } if (Array.isArray(key)) { const res = await module.pool.query({ name: 'existsArray', text: ` SELECT o."_key" k FROM "legacy_object_live" o WHERE o."_key" = ANY($1::TEXT[])`, values: [key], }); return key.map(k => res.rows.some(r => r.k === k)); } const res = await module.pool.query({ name: 'exists', text: ` SELECT EXISTS(SELECT * FROM "legacy_object_live" WHERE "_key" = $1::TEXT LIMIT 1) e`, values: [key], }); return res.rows[0].e; }; module.scan = async function (params) { let { match } = params; if (match.startsWith('*')) { match = `%${match.substring(1)}`; } if (match.endsWith('*')) { match = `${match.substring(0, match.length - 1)}%`; } const res = await module.pool.query({ text: ` SELECT o."_key" FROM "legacy_object_live" o WHERE o."_key" LIKE '${match}'`, }); return res.rows.map(r => r._key); }; module.delete = async function (key) { if (!key) { return; } await module.pool.query({ name: 'delete', text: ` DELETE FROM "legacy_object" WHERE "_key" = $1::TEXT`, values: [key], }); }; module.deleteAll = async function (keys) { if (!Array.isArray(keys) || !keys.length) { return; } await module.pool.query({ name: 'deleteAll', text: ` DELETE FROM "legacy_object" WHERE "_key" = ANY($1::TEXT[])`, values: [keys], }); }; module.get = async function (key) { if (!key) { return; } const res = await module.pool.query({ name: 'get', text: ` SELECT s."data" t FROM "legacy_object_live" o INNER JOIN "legacy_string" s ON o."_key" = s."_key" AND o."type" = s."type" WHERE o."_key" = $1::TEXT LIMIT 1`, values: [key], }); return res.rows.length ? res.rows[0].t : null; }; module.mget = async function (keys) { if (!keys || !Array.isArray(keys) || !keys.length) { return []; } const res = await module.pool.query({ name: 'mget', text: ` SELECT s."data", s."_key" FROM "legacy_object_live" o INNER JOIN "legacy_string" s ON o."_key" = s."_key" AND o."type" = s."type" WHERE o."_key" = ANY($1::TEXT[]) LIMIT 1`, values: [keys], }); const map = {}; res.rows.forEach((d) => { map[d._key] = d.data; }); return keys.map(k => (map.hasOwnProperty(k) ? map[k] : null)); }; module.set = async function (key, value) { if (!key) { return; } await module.transaction(async (client) => { await helpers.ensureLegacyObjectType(client, key, 'string'); await client.query({ name: 'set', text: ` INSERT INTO "legacy_string" ("_key", "data") VALUES ($1::TEXT, $2::TEXT) ON CONFLICT ("_key") DO UPDATE SET "data" = $2::TEXT`, values: [key, value], }); }); }; module.increment = async function (key) { if (!key) { return; } return await module.transaction(async (client) => { await helpers.ensureLegacyObjectType(client, key, 'string'); const res = await client.query({ name: 'increment', text: ` INSERT INTO "legacy_string" ("_key", "data") VALUES ($1::TEXT, '1') ON CONFLICT ("_key") DO UPDATE SET "data" = ("legacy_string"."data"::NUMERIC + 1)::TEXT RETURNING "data" d`, values: [key], }); return parseFloat(res.rows[0].d); }); }; module.rename = async function (oldKey, newKey) { await module.transaction(async (client) => { await client.query({ name: 'deleteRename', text: ` DELETE FROM "legacy_object" WHERE "_key" = $1::TEXT`, values: [newKey], }); await client.query({ name: 'rename', text: ` UPDATE "legacy_object" SET "_key" = $2::TEXT WHERE "_key" = $1::TEXT`, values: [oldKey, newKey], }); }); }; module.type = async function (key) { const res = await module.pool.query({ name: 'type', text: ` SELECT "type"::TEXT t FROM "legacy_object_live" WHERE "_key" = $1::TEXT LIMIT 1`, values: [key], }); return res.rows.length ? res.rows[0].t : null; }; async function doExpire(key, date) { await module.pool.query({ name: 'expire', text: ` UPDATE "legacy_object" SET "expireAt" = $2::TIMESTAMPTZ WHERE "_key" = $1::TEXT`, values: [key, date], }); } module.expire = async function (key, seconds) { await doExpire(key, new Date(((Date.now() / 1000) + seconds) * 1000)); }; module.expireAt = async function (key, timestamp) { await doExpire(key, new Date(timestamp * 1000)); }; module.pexpire = async function (key, ms) { await doExpire(key, new Date(Date.now() + parseInt(ms, 10))); }; module.pexpireAt = async function (key, timestamp) { await doExpire(key, new Date(timestamp)); }; async function getExpire(key) { const res = await module.pool.query({ name: 'ttl', text: ` SELECT "expireAt"::TEXT FROM "legacy_object" WHERE "_key" = $1::TEXT LIMIT 1`, values: [key], }); return res.rows.length ? new Date(res.rows[0].expireAt).getTime() : null; } module.ttl = async function (key) { return Math.round((await getExpire(key) - Date.now()) / 1000); }; module.pttl = async function (key) { return await getExpire(key) - Date.now(); }; };