mirror of
https://github.com/NodeBB/NodeBB.git
synced 2025-10-26 16:46:12 +01:00
feat: async/await redis connection
This commit is contained in:
@@ -1,9 +1,8 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
const async = require('async');
|
|
||||||
const winston = require('winston');
|
|
||||||
const nconf = require('nconf');
|
const nconf = require('nconf');
|
||||||
const semver = require('semver');
|
const semver = require('semver');
|
||||||
|
const util = require('util');
|
||||||
const session = require('express-session');
|
const session = require('express-session');
|
||||||
|
|
||||||
const connection = require('./redis/connection');
|
const connection = require('./redis/connection');
|
||||||
@@ -36,112 +35,77 @@ redisModule.questions = [
|
|||||||
];
|
];
|
||||||
|
|
||||||
|
|
||||||
redisModule.init = function (callback) {
|
redisModule.init = async function () {
|
||||||
callback = callback || function () { };
|
redisModule.client = await connection.connect(nconf.get('redis'));
|
||||||
redisModule.client = connection.connect(nconf.get('redis'), function (err) {
|
require('./redis/promisify')(redisModule.client);
|
||||||
if (err) {
|
|
||||||
winston.error('NodeBB could not connect to your Redis database. Redis returned the following error\n' + err.stack);
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
require('./redis/promisify')(redisModule.client);
|
|
||||||
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
redisModule.createSessionStore = function (options, callback) {
|
redisModule.createSessionStore = async function (options) {
|
||||||
const meta = require('../meta');
|
const meta = require('../meta');
|
||||||
const sessionStore = require('connect-redis')(session);
|
const sessionStore = require('connect-redis')(session);
|
||||||
const client = connection.connect(options);
|
const client = await connection.connect(options);
|
||||||
const store = new sessionStore({
|
const store = new sessionStore({
|
||||||
client: client,
|
client: client,
|
||||||
ttl: meta.getSessionTTLSeconds(),
|
ttl: meta.getSessionTTLSeconds(),
|
||||||
});
|
});
|
||||||
|
return store;
|
||||||
if (typeof callback === 'function') {
|
|
||||||
callback(null, store);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
redisModule.createIndices = function (callback) {
|
redisModule.checkCompatibility = async function () {
|
||||||
setImmediate(callback);
|
const info = await redisModule.info(redisModule.client);
|
||||||
|
redisModule.checkCompatibilityVersion(info.redis_version);
|
||||||
};
|
};
|
||||||
|
|
||||||
redisModule.checkCompatibility = function (callback) {
|
redisModule.checkCompatibilityVersion = function (version) {
|
||||||
async.waterfall([
|
|
||||||
function (next) {
|
|
||||||
redisModule.info(redisModule.client, next);
|
|
||||||
},
|
|
||||||
function (info, next) {
|
|
||||||
redisModule.checkCompatibilityVersion(info.redis_version, next);
|
|
||||||
},
|
|
||||||
], callback);
|
|
||||||
};
|
|
||||||
|
|
||||||
redisModule.checkCompatibilityVersion = function (version, callback) {
|
|
||||||
if (semver.lt(version, '2.8.9')) {
|
if (semver.lt(version, '2.8.9')) {
|
||||||
return callback(new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.'));
|
throw new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.');
|
||||||
}
|
}
|
||||||
callback();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
redisModule.close = function (callback) {
|
redisModule.close = async function () {
|
||||||
callback = callback || function () {};
|
await redisModule.client.async.quit();
|
||||||
redisModule.client.quit(function (err) {
|
};
|
||||||
callback(err);
|
|
||||||
|
redisModule.info = async function (cxn) {
|
||||||
|
if (!cxn) {
|
||||||
|
cxn = await connection.connect(nconf.get('redis'));
|
||||||
|
}
|
||||||
|
redisModule.client = redisModule.client || cxn;
|
||||||
|
const infoAsync = util.promisify(cb => cxn.info(cb));
|
||||||
|
const data = await infoAsync();
|
||||||
|
const lines = data.toString().split('\r\n').sort();
|
||||||
|
const redisData = {};
|
||||||
|
lines.forEach(function (line) {
|
||||||
|
const parts = line.split(':');
|
||||||
|
if (parts[1]) {
|
||||||
|
redisData[parts[0]] = parts[1];
|
||||||
|
}
|
||||||
});
|
});
|
||||||
};
|
|
||||||
|
|
||||||
redisModule.info = function (cxn, callback) {
|
const keyInfo = redisData['db' + nconf.get('redis:database')];
|
||||||
async.waterfall([
|
if (keyInfo) {
|
||||||
function (next) {
|
const split = keyInfo.split(',');
|
||||||
if (cxn) {
|
redisData.keys = (split[0] || '').replace('keys=', '');
|
||||||
return setImmediate(next, null, cxn);
|
redisData.expires = (split[1] || '').replace('expires=', '');
|
||||||
}
|
redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', '');
|
||||||
connection.connect(nconf.get('redis'), next);
|
}
|
||||||
},
|
|
||||||
function (cxn, next) {
|
|
||||||
redisModule.client = redisModule.client || cxn;
|
|
||||||
|
|
||||||
cxn.info(next);
|
redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3);
|
||||||
},
|
redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3);
|
||||||
function (data, next) {
|
|
||||||
var lines = data.toString().split('\r\n').sort();
|
|
||||||
var redisData = {};
|
|
||||||
lines.forEach(function (line) {
|
|
||||||
var parts = line.split(':');
|
|
||||||
if (parts[1]) {
|
|
||||||
redisData[parts[0]] = parts[1];
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
const keyInfo = redisData['db' + nconf.get('redis:database')];
|
redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
||||||
if (keyInfo) {
|
redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
||||||
const split = keyInfo.split(',');
|
|
||||||
redisData.keys = (split[0] || '').replace('keys=', '');
|
|
||||||
redisData.expires = (split[1] || '').replace('expires=', '');
|
|
||||||
redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', '');
|
|
||||||
}
|
|
||||||
|
|
||||||
redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3);
|
redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
|
||||||
redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3);
|
redisData.raw = JSON.stringify(redisData, null, 4);
|
||||||
|
redisData.redis = true;
|
||||||
redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
return redisData;
|
||||||
redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);
|
|
||||||
|
|
||||||
redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
|
|
||||||
redisData.raw = JSON.stringify(redisData, null, 4);
|
|
||||||
redisData.redis = true;
|
|
||||||
|
|
||||||
next(null, redisData);
|
|
||||||
},
|
|
||||||
], callback);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
redisModule.socketAdapter = function () {
|
redisModule.socketAdapter = function () {
|
||||||
var redisAdapter = require('socket.io-redis');
|
const redisAdapter = require('socket.io-redis');
|
||||||
var pub = connection.connect(nconf.get('redis'));
|
const pub = connection.connect(nconf.get('redis'));
|
||||||
var sub = connection.connect(nconf.get('redis'));
|
const sub = connection.connect(nconf.get('redis'));
|
||||||
return redisAdapter({
|
return redisAdapter({
|
||||||
key: 'db:' + nconf.get('redis:database') + ':adapter_key',
|
key: 'db:' + nconf.get('redis:database') + ':adapter_key',
|
||||||
pubClient: pub,
|
pubClient: pub,
|
||||||
|
|||||||
@@ -9,63 +9,48 @@ const connection = module.exports;
|
|||||||
|
|
||||||
connection.getConnectionOptions = function (redis) {
|
connection.getConnectionOptions = function (redis) {
|
||||||
redis = redis || nconf.get('redis');
|
redis = redis || nconf.get('redis');
|
||||||
let connOptions = {};
|
const connOptions = {};
|
||||||
if (redis.password) {
|
if (redis.password) {
|
||||||
connOptions.auth_pass = redis.password;
|
connOptions.auth_pass = redis.password;
|
||||||
}
|
}
|
||||||
|
if (redis.hasOwnProperty('database')) {
|
||||||
connOptions = _.merge(connOptions, redis.options || {});
|
connOptions.db = redis.database;
|
||||||
return connOptions;
|
}
|
||||||
|
return _.merge(connOptions, redis.options || {});
|
||||||
};
|
};
|
||||||
|
|
||||||
connection.connect = function (options, callback) {
|
connection.connect = async function (options) {
|
||||||
callback = callback || function () {};
|
return new Promise(function (resolve, reject) {
|
||||||
options = options || nconf.get('redis');
|
options = options || nconf.get('redis');
|
||||||
var redis_socket_or_host = options.host;
|
const redis_socket_or_host = options.host;
|
||||||
var cxn;
|
const connOptions = connection.getConnectionOptions(options);
|
||||||
var callbackCalled = false;
|
|
||||||
|
|
||||||
const connOptions = connection.getConnectionOptions(options);
|
let cxn;
|
||||||
|
if (redis_socket_or_host && String(redis_socket_or_host).indexOf('/') >= 0) {
|
||||||
if (redis_socket_or_host && redis_socket_or_host.indexOf('/') >= 0) {
|
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
|
||||||
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
|
cxn = redis.createClient(options.host, connOptions);
|
||||||
cxn = redis.createClient(options.host, connOptions);
|
} else {
|
||||||
} else {
|
/* Else, connect over tcp/ip */
|
||||||
/* Else, connect over tcp/ip */
|
cxn = redis.createClient(options.port, options.host, connOptions);
|
||||||
cxn = redis.createClient(options.port, options.host, connOptions);
|
|
||||||
}
|
|
||||||
|
|
||||||
cxn.on('error', function (err) {
|
|
||||||
winston.error(err.stack);
|
|
||||||
if (!callbackCalled) {
|
|
||||||
callbackCalled = true;
|
|
||||||
callback(err);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
cxn.on('ready', function () {
|
const dbIdx = parseInt(options.database, 10);
|
||||||
if (!callbackCalled) {
|
if (!(dbIdx >= 0)) {
|
||||||
callbackCalled = true;
|
throw new Error('[[error:no-database-selected]]');
|
||||||
callback(null, cxn);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
if (options.password) {
|
cxn.on('error', function (err) {
|
||||||
cxn.auth(options.password);
|
winston.error(err.stack);
|
||||||
}
|
reject(err);
|
||||||
|
});
|
||||||
var dbIdx = parseInt(options.database, 10);
|
cxn.on('ready', function () {
|
||||||
if (dbIdx >= 0) {
|
resolve(cxn);
|
||||||
cxn.select(dbIdx, function (err) {
|
|
||||||
if (err) {
|
|
||||||
winston.error('NodeBB could not select Redis database. Redis returned the following error\n' + err.stack);
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
} else {
|
|
||||||
callbackCalled = true;
|
|
||||||
return callback(new Error('[[error:no-database-selected]]'));
|
|
||||||
}
|
|
||||||
|
|
||||||
return cxn;
|
if (options.password) {
|
||||||
|
cxn.auth(options.password);
|
||||||
|
}
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
require('../../promisify')(connection);
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ const util = require('util');
|
|||||||
|
|
||||||
module.exports = function (redisClient) {
|
module.exports = function (redisClient) {
|
||||||
redisClient.async = {
|
redisClient.async = {
|
||||||
|
quit: util.promisify(redisClient.quit).bind(redisClient),
|
||||||
send_command: util.promisify(redisClient.send_command).bind(redisClient),
|
send_command: util.promisify(redisClient.send_command).bind(redisClient),
|
||||||
|
|
||||||
exists: util.promisify(redisClient.exists).bind(redisClient),
|
exists: util.promisify(redisClient.exists).bind(redisClient),
|
||||||
|
|||||||
@@ -165,7 +165,9 @@ async function completeConfigSetup(config) {
|
|||||||
nconf.overrides(config);
|
nconf.overrides(config);
|
||||||
const db = require('./database');
|
const db = require('./database');
|
||||||
await db.init();
|
await db.init();
|
||||||
await db.createIndices();
|
if (db.hasOwnProperty('createIndices')) {
|
||||||
|
await db.createIndices();
|
||||||
|
}
|
||||||
|
|
||||||
// Sanity-check/fix url/port
|
// Sanity-check/fix url/port
|
||||||
if (!/^http(?:s)?:\/\//.test(config.url)) {
|
if (!/^http(?:s)?:\/\//.test(config.url)) {
|
||||||
|
|||||||
@@ -135,7 +135,9 @@ before(async function () {
|
|||||||
|
|
||||||
|
|
||||||
await db.init();
|
await db.init();
|
||||||
await db.createIndices();
|
if (db.hasOwnProperty('createIndices')) {
|
||||||
|
await db.createIndices();
|
||||||
|
}
|
||||||
await setupMockDefaults();
|
await setupMockDefaults();
|
||||||
await db.initSessionStore();
|
await db.initSessionStore();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user