syncification

This commit is contained in:
zadam
2020-06-20 12:31:38 +02:00
parent 30062d687f
commit 88348c560c
97 changed files with 1673 additions and 1700 deletions

View File

@@ -28,7 +28,7 @@ const stats = {
async function sync() {
try {
return await syncMutexService.doExclusively(async () => {
if (!await syncOptions.isSyncSetup()) {
if (!syncOptions.isSyncSetup()) {
return { success: false, message: 'Sync not configured' };
}
@@ -87,13 +87,13 @@ async function login() {
await setupService.sendSeedToSyncServer();
}
return await doLogin();
return doLogin();
}
async function doLogin() {
const timestamp = dateUtils.utcNowDateTime();
const documentSecret = await optionService.getOption('documentSecret');
const documentSecret = optionService.getOption('documentSecret');
const hash = utils.hmac(documentSecret, timestamp);
const syncContext = { cookieJar: {} };
@@ -109,14 +109,14 @@ async function doLogin() {
syncContext.sourceId = resp.sourceId;
const lastSyncedPull = await getLastSyncedPull();
const lastSyncedPull = getLastSyncedPull();
// this is important in a scenario where we setup the sync by manually copying the document
// lastSyncedPull then could be pretty off for the newly cloned client
if (lastSyncedPull > resp.maxSyncId) {
log.info(`Lowering last synced pull from ${lastSyncedPull} to ${resp.maxSyncId}`);
await setLastSyncedPull(resp.maxSyncId);
setLastSyncedPull(resp.maxSyncId);
}
return syncContext;
@@ -126,7 +126,7 @@ async function pullSync(syncContext) {
let appliedPulls = 0;
while (true) {
const lastSyncedPull = await getLastSyncedPull();
const lastSyncedPull = getLastSyncedPull();
const changesUri = '/api/sync/changed?lastSyncId=' + lastSyncedPull;
const startDate = Date.now();
@@ -144,7 +144,7 @@ async function pullSync(syncContext) {
break;
}
await sql.transactional(async () => {
sql.transactional(() => {
for (const {sync, entity} of rows) {
if (!sourceIdService.isLocalSourceId(sync.sourceId)) {
if (appliedPulls === 0 && sync.entity !== 'recent_notes') { // send only for first
@@ -153,13 +153,13 @@ async function pullSync(syncContext) {
appliedPulls++;
}
await syncUpdateService.updateEntity(sync, entity, syncContext.sourceId);
syncUpdateService.updateEntity(sync, entity, syncContext.sourceId);
}
stats.outstandingPulls = resp.maxSyncId - sync.id;
}
await setLastSyncedPull(rows[rows.length - 1].sync.id);
setLastSyncedPull(rows[rows.length - 1].sync.id);
});
log.info(`Pulled and updated ${rows.length} changes from ${changesUri} in ${Date.now() - startDate}ms`);
@@ -173,10 +173,10 @@ async function pullSync(syncContext) {
}
async function pushSync(syncContext) {
let lastSyncedPush = await getLastSyncedPush();
let lastSyncedPush = getLastSyncedPush();
while (true) {
const syncs = await sql.getRows('SELECT * FROM sync WHERE isSynced = 1 AND id > ? LIMIT 1000', [lastSyncedPush]);
const syncs = sql.getRows('SELECT * FROM sync WHERE isSynced = 1 AND id > ? LIMIT 1000', [lastSyncedPush]);
if (syncs.length === 0) {
log.info("Nothing to push");
@@ -201,12 +201,12 @@ async function pushSync(syncContext) {
if (filteredSyncs.length === 0) {
// there still might be more syncs (because of batch limit), just all from current batch
// has been filtered out
await setLastSyncedPush(lastSyncedPush);
setLastSyncedPush(lastSyncedPush);
continue;
}
const syncRecords = await getSyncRecords(filteredSyncs);
const syncRecords = getSyncRecords(filteredSyncs);
const startDate = new Date();
await syncRequest(syncContext, 'PUT', '/api/sync/update', {
@@ -218,7 +218,7 @@ async function pushSync(syncContext) {
lastSyncedPush = syncRecords[syncRecords.length - 1].sync.id;
await setLastSyncedPush(lastSyncedPush);
setLastSyncedPush(lastSyncedPush);
}
}
@@ -228,7 +228,7 @@ async function syncFinished(syncContext) {
async function checkContentHash(syncContext) {
const resp = await syncRequest(syncContext, 'GET', '/api/sync/check');
const lastSyncedPullId = await getLastSyncedPull();
const lastSyncedPullId = getLastSyncedPull();
if (lastSyncedPullId < resp.maxSyncId) {
log.info(`There are some outstanding pulls (${lastSyncedPullId} vs. ${resp.maxSyncId}), skipping content check.`);
@@ -236,7 +236,7 @@ async function checkContentHash(syncContext) {
return true;
}
const notPushedSyncs = await sql.getValue("SELECT EXISTS(SELECT 1 FROM sync WHERE isSynced = 1 AND id > ?)", [await getLastSyncedPush()]);
const notPushedSyncs = sql.getValue("SELECT EXISTS(SELECT 1 FROM sync WHERE isSynced = 1 AND id > ?)", [getLastSyncedPush()]);
if (notPushedSyncs) {
log.info(`There's ${notPushedSyncs} outstanding pushes, skipping content check.`);
@@ -244,12 +244,12 @@ async function checkContentHash(syncContext) {
return true;
}
const failedChecks = await contentHashService.checkContentHashes(resp.entityHashes);
const failedChecks = contentHashService.checkContentHashes(resp.entityHashes);
for (const {entityName, sector} of failedChecks) {
const entityPrimaryKey = entityConstructor.getEntityFromEntityName(entityName).primaryKeyName;
await syncTableService.addEntitySyncsForSector(entityName, entityPrimaryKey, sector);
syncTableService.addEntitySyncsForSector(entityName, entityPrimaryKey, sector);
await syncRequest(syncContext, 'POST', `/api/sync/queue-sector/${entityName}/${sector}`);
}
@@ -257,19 +257,19 @@ async function checkContentHash(syncContext) {
return failedChecks.length > 0;
}
async function syncRequest(syncContext, method, requestPath, body) {
const timeout = await syncOptions.getSyncTimeout();
function syncRequest(syncContext, method, requestPath, body) {
const timeout = syncOptions.getSyncTimeout();
const opts = {
method,
url: await syncOptions.getSyncServerHost() + requestPath,
url: syncOptions.getSyncServerHost() + requestPath,
cookieJar: syncContext.cookieJar,
timeout: timeout,
body,
proxy: proxyToggle ? await syncOptions.getSyncProxy() : null
proxy: proxyToggle ? syncOptions.getSyncProxy() : null
};
return await utils.timeLimit(request.exec(opts), timeout);
return utils.timeLimit(request.exec(opts), timeout);
}
const primaryKeys = {
@@ -284,9 +284,9 @@ const primaryKeys = {
"attributes": "attributeId"
};
async function getEntityRow(entityName, entityId) {
function getEntityRow(entityName, entityId) {
if (entityName === 'note_reordering') {
return await sql.getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [entityId]);
return sql.getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [entityId]);
}
else {
const primaryKey = primaryKeys[entityName];
@@ -295,7 +295,7 @@ async function getEntityRow(entityName, entityId) {
throw new Error("Unknown entity " + entityName);
}
const entity = await sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]);
const entity = sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]);
if (!entity) {
throw new Error(`Entity ${entityName} ${entityId} not found.`);
@@ -313,12 +313,12 @@ async function getEntityRow(entityName, entityId) {
}
}
async function getSyncRecords(syncs) {
function getSyncRecords(syncs) {
const records = [];
let length = 0;
for (const sync of syncs) {
const entity = await getEntityRow(sync.entityName, sync.entityId);
const entity = getEntityRow(sync.entityName, sync.entityId);
if (sync.entityName === 'options' && !entity.isSynced) {
records.push({sync});
@@ -340,42 +340,40 @@ async function getSyncRecords(syncs) {
return records;
}
async function getLastSyncedPull() {
return parseInt(await optionService.getOption('lastSyncedPull'));
function getLastSyncedPull() {
return parseInt(optionService.getOption('lastSyncedPull'));
}
async function setLastSyncedPull(syncId) {
await optionService.setOption('lastSyncedPull', syncId);
function setLastSyncedPull(syncId) {
optionService.setOption('lastSyncedPull', syncId);
}
async function getLastSyncedPush() {
return parseInt(await optionService.getOption('lastSyncedPush'));
function getLastSyncedPush() {
return parseInt(optionService.getOption('lastSyncedPush'));
}
async function setLastSyncedPush(lastSyncedPush) {
await optionService.setOption('lastSyncedPush', lastSyncedPush);
function setLastSyncedPush(lastSyncedPush) {
optionService.setOption('lastSyncedPush', lastSyncedPush);
}
async function updatePushStats() {
if (await syncOptions.isSyncSetup()) {
const lastSyncedPush = await optionService.getOption('lastSyncedPush');
function updatePushStats() {
if (syncOptions.isSyncSetup()) {
const lastSyncedPush = optionService.getOption('lastSyncedPush');
stats.outstandingPushes = await sql.getValue("SELECT COUNT(1) FROM sync WHERE isSynced = 1 AND id > ?", [lastSyncedPush]);
stats.outstandingPushes = sql.getValue("SELECT COUNT(1) FROM sync WHERE isSynced = 1 AND id > ?", [lastSyncedPush]);
}
}
async function getMaxSyncId() {
return await sql.getValue('SELECT MAX(id) FROM sync');
function getMaxSyncId() {
return sql.getValue('SELECT MAX(id) FROM sync');
}
sqlInit.dbReady.then(async () => {
setInterval(cls.wrap(sync), 60000);
setInterval(cls.wrap(sync), 60000);
// kickoff initial sync immediately
setTimeout(cls.wrap(sync), 3000);
// kickoff initial sync immediately
setTimeout(cls.wrap(sync), 3000);
setInterval(cls.wrap(updatePushStats), 1000);
});
setInterval(cls.wrap(updatePushStats), 1000);
module.exports = {
sync,