mirror of
https://github.com/NodeBB/NodeBB.git
synced 2025-10-26 16:46:12 +01:00
feat: async/await redis connection
This commit is contained in:
@@ -1,9 +1,8 @@
|
||||
'use strict';
|
||||
|
||||
const async = require('async');
|
||||
const winston = require('winston');
|
||||
const nconf = require('nconf');
|
||||
const semver = require('semver');
|
||||
const util = require('util');
|
||||
const session = require('express-session');
|
||||
|
||||
const connection = require('./redis/connection');
|
||||
@@ -36,112 +35,77 @@ redisModule.questions = [
|
||||
];
|
||||
|
||||
|
||||
redisModule.init = function (callback) {
|
||||
callback = callback || function () { };
|
||||
redisModule.client = connection.connect(nconf.get('redis'), function (err) {
|
||||
if (err) {
|
||||
winston.error('NodeBB could not connect to your Redis database. Redis returned the following error\n' + err.stack);
|
||||
return callback(err);
|
||||
}
|
||||
require('./redis/promisify')(redisModule.client);
|
||||
|
||||
callback();
|
||||
});
|
||||
redisModule.init = async function () {
|
||||
redisModule.client = await connection.connect(nconf.get('redis'));
|
||||
require('./redis/promisify')(redisModule.client);
|
||||
};
|
||||
|
||||
redisModule.createSessionStore = function (options, callback) {
|
||||
redisModule.createSessionStore = async function (options) {
|
||||
const meta = require('../meta');
|
||||
const sessionStore = require('connect-redis')(session);
|
||||
const client = connection.connect(options);
|
||||
const client = await connection.connect(options);
|
||||
const store = new sessionStore({
|
||||
client: client,
|
||||
ttl: meta.getSessionTTLSeconds(),
|
||||
});
|
||||
|
||||
if (typeof callback === 'function') {
|
||||
callback(null, store);
|
||||
}
|
||||
return store;
|
||||
};
|
||||
|
||||
redisModule.createIndices = function (callback) {
|
||||
setImmediate(callback);
|
||||
redisModule.checkCompatibility = async function () {
|
||||
const info = await redisModule.info(redisModule.client);
|
||||
redisModule.checkCompatibilityVersion(info.redis_version);
|
||||
};
|
||||
|
||||
redisModule.checkCompatibility = function (callback) {
|
||||
async.waterfall([
|
||||
function (next) {
|
||||
redisModule.info(redisModule.client, next);
|
||||
},
|
||||
function (info, next) {
|
||||
redisModule.checkCompatibilityVersion(info.redis_version, next);
|
||||
},
|
||||
], callback);
|
||||
};
|
||||
|
||||
redisModule.checkCompatibilityVersion = function (version, callback) {
|
||||
redisModule.checkCompatibilityVersion = function (version) {
|
||||
if (semver.lt(version, '2.8.9')) {
|
||||
return callback(new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.'));
|
||||
throw new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.');
|
||||
}
|
||||
callback();
|
||||
};
|
||||
|
||||
redisModule.close = function (callback) {
|
||||
callback = callback || function () {};
|
||||
redisModule.client.quit(function (err) {
|
||||
callback(err);
|
||||
redisModule.close = async function () {
|
||||
await redisModule.client.async.quit();
|
||||
};
|
||||
|
||||
redisModule.info = async function (cxn) {
|
||||
if (!cxn) {
|
||||
cxn = await connection.connect(nconf.get('redis'));
|
||||
}
|
||||
redisModule.client = redisModule.client || cxn;
|
||||
const infoAsync = util.promisify(cb => cxn.info(cb));
|
||||
const data = await infoAsync();
|
||||
const lines = data.toString().split('\r\n').sort();
|
||||
const redisData = {};
|
||||
lines.forEach(function (line) {
|
||||
const parts = line.split(':');
|
||||
if (parts[1]) {
|
||||
redisData[parts[0]] = parts[1];
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
redisModule.info = function (cxn, callback) {
|
||||
async.waterfall([
|
||||
function (next) {
|
||||
if (cxn) {
|
||||
return setImmediate(next, null, cxn);
|
||||
}
|
||||
connection.connect(nconf.get('redis'), next);
|
||||
},
|
||||
function (cxn, next) {
|
||||
redisModule.client = redisModule.client || cxn;
|
||||
const keyInfo = redisData['db' + nconf.get('redis:database')];
|
||||
if (keyInfo) {
|
||||
const split = keyInfo.split(',');
|
||||
redisData.keys = (split[0] || '').replace('keys=', '');
|
||||
redisData.expires = (split[1] || '').replace('expires=', '');
|
||||
redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', '');
|
||||
}
|
||||
|
||||
cxn.info(next);
|
||||
},
|
||||
function (data, next) {
|
||||
var lines = data.toString().split('\r\n').sort();
|
||||
var redisData = {};
|
||||
lines.forEach(function (line) {
|
||||
var parts = line.split(':');
|
||||
if (parts[1]) {
|
||||
redisData[parts[0]] = parts[1];
|
||||
}
|
||||
});
|
||||
redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3);
|
||||
redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3);
|
||||
|
||||
const keyInfo = redisData['db' + nconf.get('redis:database')];
|
||||
if (keyInfo) {
|
||||
const split = keyInfo.split(',');
|
||||
redisData.keys = (split[0] || '').replace('keys=', '');
|
||||
redisData.expires = (split[1] || '').replace('expires=', '');
|
||||
redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', '');
|
||||
}
|
||||
redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
||||
redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
||||
|
||||
redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3);
|
||||
redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3);
|
||||
|
||||
redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
||||
redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
||||
|
||||
redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
|
||||
redisData.raw = JSON.stringify(redisData, null, 4);
|
||||
redisData.redis = true;
|
||||
|
||||
next(null, redisData);
|
||||
},
|
||||
], callback);
|
||||
redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
|
||||
redisData.raw = JSON.stringify(redisData, null, 4);
|
||||
redisData.redis = true;
|
||||
return redisData;
|
||||
};
|
||||
|
||||
redisModule.socketAdapter = function () {
|
||||
var redisAdapter = require('socket.io-redis');
|
||||
var pub = connection.connect(nconf.get('redis'));
|
||||
var sub = connection.connect(nconf.get('redis'));
|
||||
const redisAdapter = require('socket.io-redis');
|
||||
const pub = connection.connect(nconf.get('redis'));
|
||||
const sub = connection.connect(nconf.get('redis'));
|
||||
return redisAdapter({
|
||||
key: 'db:' + nconf.get('redis:database') + ':adapter_key',
|
||||
pubClient: pub,
|
||||
|
||||
@@ -9,63 +9,48 @@ const connection = module.exports;
|
||||
|
||||
connection.getConnectionOptions = function (redis) {
|
||||
redis = redis || nconf.get('redis');
|
||||
let connOptions = {};
|
||||
const connOptions = {};
|
||||
if (redis.password) {
|
||||
connOptions.auth_pass = redis.password;
|
||||
}
|
||||
|
||||
connOptions = _.merge(connOptions, redis.options || {});
|
||||
return connOptions;
|
||||
if (redis.hasOwnProperty('database')) {
|
||||
connOptions.db = redis.database;
|
||||
}
|
||||
return _.merge(connOptions, redis.options || {});
|
||||
};
|
||||
|
||||
connection.connect = function (options, callback) {
|
||||
callback = callback || function () {};
|
||||
options = options || nconf.get('redis');
|
||||
var redis_socket_or_host = options.host;
|
||||
var cxn;
|
||||
var callbackCalled = false;
|
||||
connection.connect = async function (options) {
|
||||
return new Promise(function (resolve, reject) {
|
||||
options = options || nconf.get('redis');
|
||||
const redis_socket_or_host = options.host;
|
||||
const connOptions = connection.getConnectionOptions(options);
|
||||
|
||||
const connOptions = connection.getConnectionOptions(options);
|
||||
|
||||
if (redis_socket_or_host && redis_socket_or_host.indexOf('/') >= 0) {
|
||||
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
|
||||
cxn = redis.createClient(options.host, connOptions);
|
||||
} else {
|
||||
/* Else, connect over tcp/ip */
|
||||
cxn = redis.createClient(options.port, options.host, connOptions);
|
||||
}
|
||||
|
||||
cxn.on('error', function (err) {
|
||||
winston.error(err.stack);
|
||||
if (!callbackCalled) {
|
||||
callbackCalled = true;
|
||||
callback(err);
|
||||
let cxn;
|
||||
if (redis_socket_or_host && String(redis_socket_or_host).indexOf('/') >= 0) {
|
||||
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
|
||||
cxn = redis.createClient(options.host, connOptions);
|
||||
} else {
|
||||
/* Else, connect over tcp/ip */
|
||||
cxn = redis.createClient(options.port, options.host, connOptions);
|
||||
}
|
||||
});
|
||||
|
||||
cxn.on('ready', function () {
|
||||
if (!callbackCalled) {
|
||||
callbackCalled = true;
|
||||
callback(null, cxn);
|
||||
const dbIdx = parseInt(options.database, 10);
|
||||
if (!(dbIdx >= 0)) {
|
||||
throw new Error('[[error:no-database-selected]]');
|
||||
}
|
||||
});
|
||||
|
||||
if (options.password) {
|
||||
cxn.auth(options.password);
|
||||
}
|
||||
|
||||
var dbIdx = parseInt(options.database, 10);
|
||||
if (dbIdx >= 0) {
|
||||
cxn.select(dbIdx, function (err) {
|
||||
if (err) {
|
||||
winston.error('NodeBB could not select Redis database. Redis returned the following error\n' + err.stack);
|
||||
throw err;
|
||||
}
|
||||
cxn.on('error', function (err) {
|
||||
winston.error(err.stack);
|
||||
reject(err);
|
||||
});
|
||||
cxn.on('ready', function () {
|
||||
resolve(cxn);
|
||||
});
|
||||
} else {
|
||||
callbackCalled = true;
|
||||
return callback(new Error('[[error:no-database-selected]]'));
|
||||
}
|
||||
|
||||
return cxn;
|
||||
if (options.password) {
|
||||
cxn.auth(options.password);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
require('../../promisify')(connection);
|
||||
|
||||
@@ -4,6 +4,7 @@ const util = require('util');
|
||||
|
||||
module.exports = function (redisClient) {
|
||||
redisClient.async = {
|
||||
quit: util.promisify(redisClient.quit).bind(redisClient),
|
||||
send_command: util.promisify(redisClient.send_command).bind(redisClient),
|
||||
|
||||
exists: util.promisify(redisClient.exists).bind(redisClient),
|
||||
|
||||
@@ -165,7 +165,9 @@ async function completeConfigSetup(config) {
|
||||
nconf.overrides(config);
|
||||
const db = require('./database');
|
||||
await db.init();
|
||||
await db.createIndices();
|
||||
if (db.hasOwnProperty('createIndices')) {
|
||||
await db.createIndices();
|
||||
}
|
||||
|
||||
// Sanity-check/fix url/port
|
||||
if (!/^http(?:s)?:\/\//.test(config.url)) {
|
||||
|
||||
@@ -135,7 +135,9 @@ before(async function () {
|
||||
|
||||
|
||||
await db.init();
|
||||
await db.createIndices();
|
||||
if (db.hasOwnProperty('createIndices')) {
|
||||
await db.createIndices();
|
||||
}
|
||||
await setupMockDefaults();
|
||||
await db.initSessionStore();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user