refactored "sync" table to "entity_changes" - more changes

This commit is contained in:
zadam
2020-08-02 23:43:39 +02:00
parent 864271d5ef
commit 7900622f38
13 changed files with 93 additions and 92 deletions

View File

@@ -141,31 +141,31 @@ async function pullSync(syncContext) {
stats.outstandingPulls = 0;
}
const rows = resp.syncs;
const {entityChanges} = resp;
if (rows.length === 0) {
if (entityChanges.length === 0) {
break;
}
sql.transactional(() => {
for (const {sync, entity} of rows) {
if (!sourceIdService.isLocalSourceId(sync.sourceId)) {
if (!atLeastOnePullApplied && sync.entity !== 'recent_notes') { // send only for first
for (const {entityChange, entity} of entityChanges) {
if (!sourceIdService.isLocalSourceId(entityChange.sourceId)) {
if (!atLeastOnePullApplied && entityChange.entity !== 'recent_notes') { // send only for first
ws.syncPullInProgress();
atLeastOnePullApplied = true;
}
syncUpdateService.updateEntity(sync, entity, syncContext.sourceId);
syncUpdateService.updateEntity(entityChange, entity, syncContext.sourceId);
}
stats.outstandingPulls = resp.maxEntityChangeId - sync.id;
stats.outstandingPulls = resp.maxEntityChangeId - entityChange.id;
}
setLastSyncedPull(rows[rows.length - 1].sync.id);
setLastSyncedPull(entityChanges[entityChanges.length - 1].entityChange.id);
});
log.info(`Pulled ${rows.length} changes starting at entityChangeId=${lastSyncedPull} in ${pulledDate - startDate}ms and applied them in ${Date.now() - pulledDate}ms, ${stats.outstandingPulls} outstanding pulls`);
log.info(`Pulled ${entityChanges.length} changes starting at entityChangeId=${lastSyncedPull} in ${pulledDate - startDate}ms and applied them in ${Date.now() - pulledDate}ms, ${stats.outstandingPulls} outstanding pulls`);
}
if (atLeastOnePullApplied) {
@@ -179,20 +179,20 @@ async function pushSync(syncContext) {
let lastSyncedPush = getLastSyncedPush();
while (true) {
const syncs = sql.getRows('SELECT * FROM entity_changes WHERE isSynced = 1 AND id > ? LIMIT 1000', [lastSyncedPush]);
const entityChanges = sql.getRows('SELECT * FROM entity_changes WHERE isSynced = 1 AND id > ? LIMIT 1000', [lastSyncedPush]);
if (syncs.length === 0) {
if (entityChanges.length === 0) {
log.info("Nothing to push");
break;
}
const filteredSyncs = syncs.filter(sync => {
if (sync.sourceId === syncContext.sourceId) {
const filteredEntityChanges = entityChanges.filter(entityChange => {
if (entityChange.sourceId === syncContext.sourceId) {
// this may set lastSyncedPush beyond what's actually sent (because of size limit)
// so this is applied to the database only if there's no actual update
// TODO: it would be better to simplify this somehow
lastSyncedPush = sync.id;
lastSyncedPush = entityChange.id;
return false;
}
@@ -201,25 +201,25 @@ async function pushSync(syncContext) {
}
});
if (filteredSyncs.length === 0) {
// there still might be more syncs (because of batch limit), just all from current batch
if (filteredEntityChanges.length === 0) {
// there still might be more sync changes (because of batch limit), just all from current batch
// has been filtered out
setLastSyncedPush(lastSyncedPush);
continue;
}
const syncRecords = getEntityChangesRecords(filteredSyncs);
const entityChangesRecords = getEntityChangesRecords(filteredEntityChanges);
const startDate = new Date();
await syncRequest(syncContext, 'PUT', '/api/sync/update', {
sourceId: sourceIdService.getCurrentSourceId(),
entities: syncRecords
entities: entityChangesRecords
});
log.info(`Pushing ${syncRecords.length} syncs in ` + (Date.now() - startDate.getTime()) + "ms");
log.info(`Pushing ${entityChangesRecords.length} sync changes in ` + (Date.now() - startDate.getTime()) + "ms");
lastSyncedPush = syncRecords[syncRecords.length - 1].sync.id;
lastSyncedPush = entityChangesRecords[entityChangesRecords.length - 1].entityChange.id;
setLastSyncedPush(lastSyncedPush);
}