script can wait until the sync data has been applied

This commit is contained in:
zadam
2019-10-20 17:49:58 +02:00
parent 358fd13c8d
commit 2a5ab3a5e1
6 changed files with 95 additions and 23 deletions

View File

@@ -9,8 +9,9 @@ const outsideSyncMessageHandlers = [];
const messageHandlers = [];
let ws;
let lastSyncId;
let lastSyncId = window.glob.maxSyncIdAtLoad;
let lastPingTs;
let syncDataQueue = [];
function logError(message) {
console.log(utils.now(), message); // needs to be separate from .trace()
@@ -36,7 +37,10 @@ function subscribeToAllSyncMessages(messageHandler) {
allSyncMessageHandlers.push(messageHandler);
}
function handleMessage(event) {
// used to serialize sync operations
let consumeQueuePromise = null;
async function handleMessage(event) {
const message = JSON.parse(event.data);
for (const messageHandler of messageHandlers) {
@@ -46,23 +50,26 @@ function handleMessage(event) {
if (message.type === 'sync') {
lastPingTs = Date.now();
$outstandingSyncsCount.html(message.outstandingSyncs);
if (message.data.length > 0) {
console.debug(utils.now(), "Sync data: ", message.data);
lastSyncId = message.data[message.data.length - 1].id;
syncDataQueue.push(...message.data);
// first wait for all the preceding consumers to finish
while (consumeQueuePromise) {
await consumeQueuePromise;
}
// it's my turn so start it up
consumeQueuePromise = consumeSyncData();
await consumeQueuePromise;
// finish and set to null to signal somebody else can pick it up
consumeQueuePromise = null;
}
for (const syncMessageHandler of allSyncMessageHandlers) {
syncMessageHandler(message.data);
}
const syncData = message.data.filter(sync => sync.sourceId !== glob.sourceId);
for (const syncMessageHandler of outsideSyncMessageHandlers) {
syncMessageHandler(syncData);
}
$outstandingSyncsCount.html(message.outstandingSyncs);
}
else if (message.type === 'sync-hash-check-failed') {
toastService.showError("Sync check failed!", 60000);
@@ -72,6 +79,47 @@ function handleMessage(event) {
}
}
let syncIdReachedListeners = [];
function waitForSyncId(desiredSyncId) {
console.log("Waiting for ", desiredSyncId);
if (desiredSyncId <= lastSyncId) {
return Promise.resolve();
}
return new Promise((res, rej) => {
syncIdReachedListeners.push({
desiredSyncId,
resolvePromise: res
})
});
}
async function consumeSyncData() {
if (syncDataQueue.length >= 0) {
const allSyncData = syncDataQueue;
syncDataQueue = [];
const outsideSyncData = allSyncData.filter(sync => sync.sourceId !== glob.sourceId);
// the update process should be synchronous as a whole but individual handlers can run in parallel
await Promise.all([
...allSyncMessageHandlers.map(syncHandler => syncHandler(allSyncData)),
...outsideSyncMessageHandlers.map(syncHandler => syncHandler(outsideSyncData))
]);
lastSyncId = allSyncData[allSyncData.length - 1].id;
}
syncIdReachedListeners
.filter(l => l.desiredSyncId <= lastSyncId)
.forEach(l => l.resolvePromise());
syncIdReachedListeners = syncIdReachedListeners
.filter(l => l.desiredSyncId > lastSyncId);
}
function connectWebSocket() {
const protocol = document.location.protocol === 'https:' ? 'wss' : 'ws';
@@ -113,5 +161,6 @@ export default {
logError,
subscribeToMessages,
subscribeToAllSyncMessages,
subscribeToOutsideSyncMessages
subscribeToOutsideSyncMessages,
waitForSyncId
};