small sync fixes

This commit is contained in:
zadam
2019-12-16 22:00:44 +01:00
parent b0a3f828fb
commit aff9ce97ee
4 changed files with 26 additions and 15 deletions

View File

@@ -64,13 +64,16 @@ async function handleMessage(event) {
await consumeQueuePromise;
}
// it's my turn so start it up
consumeQueuePromise = consumeSyncData();
try {
// it's my turn so start it up
consumeQueuePromise = consumeSyncData();
await consumeQueuePromise;
// finish and set to null to signal somebody else can pick it up
consumeQueuePromise = null;
await consumeQueuePromise;
}
finally {
// finish and set to null to signal somebody else can pick it up
consumeQueuePromise = null;
}
}
}
else if (message.type === 'sync-hash-check-failed') {
@@ -113,6 +116,15 @@ function checkSyncIdListeners() {
.forEach(l => console.log(`Waiting for syncId ${l.desiredSyncId} while current is ${lastProcessedSyncId} for ${Math.floor((Date.now() - l.start) / 1000)}s`));
}
async function runSafely(syncHandler, syncData) {
try {
return await syncHandler(syncData);
}
catch (e) {
console.log(`Sync handler failed with ${e.message}: ${e.stack}`);
}
}
async function consumeSyncData() {
if (syncDataQueue.length > 0) {
const allSyncData = syncDataQueue;
@@ -126,8 +138,8 @@ async function consumeSyncData() {
// the update process should be synchronous as a whole but individual handlers can run in parallel
await Promise.all([
...allSyncMessageHandlers.map(syncHandler => syncHandler(allSyncData)),
...outsideSyncMessageHandlers.map(syncHandler => syncHandler(outsideSyncData))
...allSyncMessageHandlers.map(syncHandler => runSafely(syncHandler, allSyncData)),
...outsideSyncMessageHandlers.map(syncHandler => runSafely(syncHandler, outsideSyncData))
]);
lastProcessedSyncId = Math.max(lastProcessedSyncId, allSyncData[allSyncData.length - 1].id);
@@ -171,8 +183,6 @@ async function sendPing() {
setTimeout(() => {
ws = connectWebSocket();
lastAcceptedSyncId = glob.maxSyncIdAtLoad;
lastProcessedSyncId = glob.maxSyncIdAtLoad;
lastPingTs = Date.now();
setInterval(sendPing, 1000);