mirror of
https://github.com/zadam/trilium.git
synced 2026-03-22 20:01:42 +01:00
Compare commits
18 Commits
feature/st
...
feature/st
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a8ea40b2e1 | ||
|
|
308bab8a3c | ||
|
|
ef8c4cef8a | ||
|
|
63198a03ab | ||
|
|
ed808abd22 | ||
|
|
9fe23442f5 | ||
|
|
0e2e86e7d3 | ||
|
|
ea0e3fd248 | ||
|
|
2ac85a1d1c | ||
|
|
cb71dc4202 | ||
|
|
6637542e7c | ||
|
|
971ce09811 | ||
|
|
04826074f4 | ||
|
|
bcd4baff3d | ||
|
|
3bcf7b22be | ||
|
|
ee8c54bdd3 | ||
|
|
1af8699fc0 | ||
|
|
5bc1fc71ef |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -51,3 +51,6 @@ upload
|
||||
site/
|
||||
apps/*/coverage
|
||||
scripts/translation/.language*.json
|
||||
|
||||
# AI
|
||||
.claude/settings.local.json
|
||||
@@ -47,6 +47,7 @@
|
||||
"jquery": "3.7.1",
|
||||
"jquery.fancytree": "2.38.5",
|
||||
"js-sha1": "0.7.0",
|
||||
"js-sha256": "0.11.1",
|
||||
"js-sha512": "0.9.0",
|
||||
"jsplumb": "2.15.6",
|
||||
"katex": "0.16.27",
|
||||
|
||||
@@ -11,6 +11,7 @@ export interface BrowserRequest {
|
||||
path: string;
|
||||
params: Record<string, string>;
|
||||
query: Record<string, string | undefined>;
|
||||
headers?: Record<string, string>;
|
||||
body?: unknown;
|
||||
}
|
||||
|
||||
|
||||
@@ -4,29 +4,49 @@
|
||||
*/
|
||||
|
||||
import { BootstrapDefinition } from '@triliumnext/commons';
|
||||
import { getSharedBootstrapItems, getSql, routes } from '@triliumnext/core';
|
||||
import { entity_changes, getContext, getSharedBootstrapItems, getSql, routes } from '@triliumnext/core';
|
||||
|
||||
import packageJson from '../../package.json' with { type: 'json' };
|
||||
import { type BrowserRequest,BrowserRouter } from './browser_router';
|
||||
import { type BrowserRequest, BrowserRouter } from './browser_router';
|
||||
|
||||
/** Minimal response object used by apiResultHandler to capture the processed result. */
|
||||
interface ResultHandlerResponse {
|
||||
headers: Record<string, string>;
|
||||
result: unknown;
|
||||
setHeader(name: string, value: string): void;
|
||||
}
|
||||
|
||||
type HttpMethod = 'get' | 'post' | 'put' | 'patch' | 'delete';
|
||||
|
||||
/**
|
||||
* Creates an Express-like request object from a BrowserRequest.
|
||||
*/
|
||||
function toExpressLikeReq(req: BrowserRequest) {
|
||||
return {
|
||||
params: req.params,
|
||||
query: req.query,
|
||||
body: req.body,
|
||||
headers: req.headers ?? {},
|
||||
method: req.method,
|
||||
get originalUrl() { return req.url; }
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a core route handler to work with the BrowserRouter.
|
||||
* Core handlers expect an Express-like request object with params, query, and body.
|
||||
* Each request is wrapped in an execution context (like cls.init() on the server)
|
||||
* to ensure entity change tracking works correctly.
|
||||
*/
|
||||
function wrapHandler(handler: (req: any) => unknown, transactional: boolean) {
|
||||
return (req: BrowserRequest) => {
|
||||
// Create an Express-like request object
|
||||
const expressLikeReq = {
|
||||
params: req.params,
|
||||
query: req.query,
|
||||
body: req.body
|
||||
};
|
||||
if (transactional) {
|
||||
return getSql().transactional(() => handler(expressLikeReq));
|
||||
}
|
||||
return handler(expressLikeReq);
|
||||
return getContext().init(() => {
|
||||
const expressLikeReq = toExpressLikeReq(req);
|
||||
if (transactional) {
|
||||
return getSql().transactional(() => handler(expressLikeReq));
|
||||
}
|
||||
return handler(expressLikeReq);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -40,6 +60,74 @@ function createApiRoute(router: BrowserRouter, transactional: boolean) {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Low-level route registration matching the server's `route()` signature:
|
||||
* route(method, path, middleware[], handler, resultHandler)
|
||||
*
|
||||
* In standalone mode:
|
||||
* - Middleware (e.g. checkApiAuth) is skipped — there's no authentication.
|
||||
* - The resultHandler is applied to post-process the result (entity conversion, status codes).
|
||||
*/
|
||||
function createRoute(router: BrowserRouter) {
|
||||
return (method: HttpMethod, path: string, _middleware: any[], handler: (req: any) => unknown, resultHandler?: ((req: any, res: any, result: unknown) => unknown) | null) => {
|
||||
router.register(method, path, (req: BrowserRequest) => {
|
||||
return getContext().init(() => {
|
||||
const expressLikeReq = toExpressLikeReq(req);
|
||||
const result = getSql().transactional(() => handler(expressLikeReq));
|
||||
|
||||
if (resultHandler) {
|
||||
// Create a minimal response object that captures what apiResultHandler sets.
|
||||
const res = createResultHandlerResponse();
|
||||
resultHandler(expressLikeReq, res, result);
|
||||
return res.result;
|
||||
}
|
||||
|
||||
return result;
|
||||
});
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Standalone apiResultHandler matching the server's behavior:
|
||||
* - Converts Becca entities to POJOs
|
||||
* - Handles [statusCode, response] tuple format
|
||||
* - Sets trilium-max-entity-change-id (captured in response headers)
|
||||
*/
|
||||
function apiResultHandler(_req: any, res: ResultHandlerResponse, result: unknown) {
|
||||
res.headers["trilium-max-entity-change-id"] = String(entity_changes.getMaxEntityChangeId());
|
||||
result = routes.convertEntitiesToPojo(result);
|
||||
|
||||
if (Array.isArray(result) && result.length > 0 && Number.isInteger(result[0])) {
|
||||
const [_statusCode, response] = result;
|
||||
res.result = response;
|
||||
} else if (result === undefined) {
|
||||
res.result = "";
|
||||
} else {
|
||||
res.result = result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* No-op auth middleware for standalone — there's no authentication.
|
||||
*/
|
||||
function checkApiAuth() {
|
||||
// No authentication in standalone mode.
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a minimal response-like object for the apiResultHandler.
|
||||
*/
|
||||
function createResultHandlerResponse(): ResultHandlerResponse {
|
||||
return {
|
||||
headers: {},
|
||||
result: undefined,
|
||||
setHeader(name: string, value: string) {
|
||||
this.headers[name] = value;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Register all API routes on the browser router using the shared builder.
|
||||
*
|
||||
@@ -48,8 +136,11 @@ function createApiRoute(router: BrowserRouter, transactional: boolean) {
|
||||
export function registerRoutes(router: BrowserRouter): void {
|
||||
const apiRoute = createApiRoute(router, true);
|
||||
routes.buildSharedApiRoutes({
|
||||
route: createRoute(router),
|
||||
apiRoute,
|
||||
asyncApiRoute: createApiRoute(router, false)
|
||||
asyncApiRoute: createApiRoute(router, false),
|
||||
apiResultHandler,
|
||||
checkApiAuth
|
||||
});
|
||||
apiRoute('get', '/bootstrap', bootstrapRoute);
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { CryptoProvider } from "@triliumnext/core";
|
||||
import { sha1 } from "js-sha1";
|
||||
import { sha256 } from "js-sha256";
|
||||
import { sha512 } from "js-sha512";
|
||||
|
||||
interface Cipher {
|
||||
@@ -52,6 +53,18 @@ export default class BrowserCryptoProvider implements CryptoProvider {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
hmac(secret: string | Uint8Array, value: string | Uint8Array): string {
|
||||
const secretStr = typeof secret === "string" ? secret : new TextDecoder().decode(secret);
|
||||
const valueStr = typeof value === "string" ? value : new TextDecoder().decode(value);
|
||||
// sha256.hmac returns hex, convert to base64 to match Node's behavior
|
||||
const hexHash = sha256.hmac(secretStr, valueStr);
|
||||
const bytes = new Uint8Array(hexHash.length / 2);
|
||||
for (let i = 0; i < hexHash.length; i += 2) {
|
||||
bytes[i / 2] = parseInt(hexHash.substr(i, 2), 16);
|
||||
}
|
||||
return btoa(String.fromCharCode(...bytes));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { WebSocketMessage } from "@triliumnext/commons";
|
||||
import type { MessagingProvider, MessageHandler } from "@triliumnext/core";
|
||||
import type { ClientMessageHandler, MessageHandler,MessagingProvider } from "@triliumnext/core";
|
||||
|
||||
/**
|
||||
* Messaging provider for browser Worker environments.
|
||||
@@ -14,6 +14,7 @@ import type { MessagingProvider, MessageHandler } from "@triliumnext/core";
|
||||
*/
|
||||
export default class WorkerMessagingProvider implements MessagingProvider {
|
||||
private messageHandlers: MessageHandler[] = [];
|
||||
private clientMessageHandler?: ClientMessageHandler;
|
||||
private isDisposed = false;
|
||||
|
||||
constructor() {
|
||||
@@ -27,6 +28,15 @@ export default class WorkerMessagingProvider implements MessagingProvider {
|
||||
const { type, message } = event.data || {};
|
||||
|
||||
if (type === "WS_MESSAGE" && message) {
|
||||
// Dispatch to the client message handler (used by ws.ts for log-error, log-info, ping)
|
||||
if (this.clientMessageHandler) {
|
||||
try {
|
||||
this.clientMessageHandler("main-thread", message);
|
||||
} catch (e) {
|
||||
console.error("[WorkerMessagingProvider] Error in client message handler:", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch to all registered handlers
|
||||
for (const handler of this.messageHandlers) {
|
||||
try {
|
||||
@@ -58,6 +68,26 @@ export default class WorkerMessagingProvider implements MessagingProvider {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a specific client.
|
||||
* In worker context, there's only one client (the main thread), so clientId is ignored.
|
||||
*/
|
||||
sendMessageToClient(_clientId: string, message: WebSocketMessage): boolean {
|
||||
if (this.isDisposed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
this.sendMessageToAllClients(message);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a handler for incoming client messages.
|
||||
*/
|
||||
setClientMessageHandler(handler: ClientMessageHandler): void {
|
||||
this.clientMessageHandler = handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to incoming messages from the main thread.
|
||||
*/
|
||||
|
||||
94
apps/client-standalone/src/lightweight/request_provider.ts
Normal file
94
apps/client-standalone/src/lightweight/request_provider.ts
Normal file
@@ -0,0 +1,94 @@
|
||||
import type { ExecOpts, RequestProvider } from "@triliumnext/core";
|
||||
|
||||
/**
|
||||
* Fetch-based implementation of RequestProvider for browser environments.
|
||||
*
|
||||
* Uses the Fetch API instead of Node's http/https modules.
|
||||
* Proxy support is not available in browsers, so the proxy option is ignored.
|
||||
*/
|
||||
export default class FetchRequestProvider implements RequestProvider {
|
||||
|
||||
async exec<T>(opts: ExecOpts): Promise<T> {
|
||||
const paging = opts.paging || {
|
||||
pageCount: 1,
|
||||
pageIndex: 0,
|
||||
requestId: "n/a"
|
||||
};
|
||||
|
||||
const headers: Record<string, string> = {
|
||||
"Content-Type": paging.pageCount === 1 ? "application/json" : "text/plain",
|
||||
"pageCount": String(paging.pageCount),
|
||||
"pageIndex": String(paging.pageIndex),
|
||||
"requestId": paging.requestId
|
||||
};
|
||||
|
||||
if (opts.cookieJar?.header) {
|
||||
headers["Cookie"] = opts.cookieJar.header;
|
||||
}
|
||||
|
||||
if (opts.auth?.password) {
|
||||
headers["trilium-cred"] = btoa(`dummy:${opts.auth.password}`);
|
||||
}
|
||||
|
||||
let body: string | undefined;
|
||||
if (opts.body) {
|
||||
body = typeof opts.body === "object" ? JSON.stringify(opts.body) : opts.body;
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
const timeoutId = opts.timeout
|
||||
? setTimeout(() => controller.abort(), opts.timeout)
|
||||
: undefined;
|
||||
|
||||
try {
|
||||
const response = await fetch(opts.url, {
|
||||
method: opts.method,
|
||||
headers,
|
||||
body,
|
||||
signal: controller.signal
|
||||
});
|
||||
|
||||
// Handle set-cookie from response (limited in browser, but keep for API compat)
|
||||
if (opts.cookieJar) {
|
||||
const setCookie = response.headers.get("set-cookie");
|
||||
if (setCookie) {
|
||||
opts.cookieJar.header = setCookie;
|
||||
}
|
||||
}
|
||||
|
||||
if ([200, 201, 204].includes(response.status)) {
|
||||
const text = await response.text();
|
||||
return text.trim() ? JSON.parse(text) : null;
|
||||
} else {
|
||||
const text = await response.text();
|
||||
let errorMessage: string;
|
||||
try {
|
||||
const json = JSON.parse(text);
|
||||
errorMessage = json?.message || "";
|
||||
} catch {
|
||||
errorMessage = text.substring(0, 100);
|
||||
}
|
||||
throw new Error(`Request to ${opts.method} ${opts.url} failed, error: ${response.status} ${response.statusText} ${errorMessage}`);
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e.name === "AbortError") {
|
||||
throw new Error(`Request to ${opts.method} ${opts.url} failed, error: timeout after ${opts.timeout}ms`);
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
if (timeoutId) {
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async getImage(imageUrl: string): Promise<ArrayBuffer> {
|
||||
const response = await fetch(imageUrl);
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Request to GET ${imageUrl} failed, error: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
return await response.arrayBuffer();
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,15 @@ export function startLocalServerWorker() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle WebSocket-like messages from the worker (for frontend updates)
|
||||
if (msg?.type === "WS_MESSAGE" && msg.message) {
|
||||
// Dispatch a custom event that ws.ts listens to in standalone mode
|
||||
window.dispatchEvent(new CustomEvent("trilium:ws-message", {
|
||||
detail: msg.message
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!msg || msg.type !== "LOCAL_RESPONSE") return;
|
||||
|
||||
const { id, response, error } = msg;
|
||||
@@ -58,10 +67,10 @@ export function attachServiceWorkerBridge() {
|
||||
const id = msg.id;
|
||||
const req = msg.request;
|
||||
|
||||
const response = await new Promise((resolve, reject) => {
|
||||
const response = await new Promise<{ body?: ArrayBuffer }>((resolve, reject) => {
|
||||
pending.set(id, { resolve, reject });
|
||||
// Transfer body to worker for efficiency (if present)
|
||||
localWorker.postMessage({
|
||||
localWorker!.postMessage({
|
||||
type: "LOCAL_REQUEST",
|
||||
id,
|
||||
request: req
|
||||
@@ -73,14 +82,15 @@ export function attachServiceWorkerBridge() {
|
||||
id,
|
||||
response
|
||||
}, response.body ? [response.body] : []);
|
||||
} catch (e) {
|
||||
} catch (e: unknown) {
|
||||
const errorMessage = e instanceof Error ? e.message : String(e);
|
||||
port.postMessage({
|
||||
type: "LOCAL_FETCH_RESPONSE",
|
||||
id: msg.id,
|
||||
response: {
|
||||
status: 500,
|
||||
headers: { "content-type": "text/plain; charset=utf-8" },
|
||||
body: new TextEncoder().encode(String(e?.message || e)).buffer
|
||||
body: new TextEncoder().encode(errorMessage).buffer
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ let BrowserSqlProvider: typeof import('./lightweight/sql_provider').default;
|
||||
let WorkerMessagingProvider: typeof import('./lightweight/messaging_provider').default;
|
||||
let BrowserExecutionContext: typeof import('./lightweight/cls_provider').default;
|
||||
let BrowserCryptoProvider: typeof import('./lightweight/crypto_provider').default;
|
||||
let FetchRequestProvider: typeof import('./lightweight/request_provider').default;
|
||||
let translationProvider: typeof import('./lightweight/translation_provider').default;
|
||||
let createConfiguredRouter: typeof import('./lightweight/browser_routes').createConfiguredRouter;
|
||||
|
||||
@@ -79,6 +80,7 @@ async function loadModules(): Promise<void> {
|
||||
messagingModule,
|
||||
clsModule,
|
||||
cryptoModule,
|
||||
requestModule,
|
||||
translationModule,
|
||||
routesModule
|
||||
] = await Promise.all([
|
||||
@@ -86,6 +88,7 @@ async function loadModules(): Promise<void> {
|
||||
import('./lightweight/messaging_provider.js'),
|
||||
import('./lightweight/cls_provider.js'),
|
||||
import('./lightweight/crypto_provider.js'),
|
||||
import('./lightweight/request_provider.js'),
|
||||
import('./lightweight/translation_provider.js'),
|
||||
import('./lightweight/browser_routes.js')
|
||||
]);
|
||||
@@ -94,6 +97,7 @@ async function loadModules(): Promise<void> {
|
||||
WorkerMessagingProvider = messagingModule.default;
|
||||
BrowserExecutionContext = clsModule.default;
|
||||
BrowserCryptoProvider = cryptoModule.default;
|
||||
FetchRequestProvider = requestModule.default;
|
||||
translationProvider = translationModule.default;
|
||||
createConfiguredRouter = routesModule.createConfiguredRouter;
|
||||
|
||||
@@ -152,18 +156,20 @@ async function initialize(): Promise<void> {
|
||||
executionContext: new BrowserExecutionContext(),
|
||||
crypto: new BrowserCryptoProvider(),
|
||||
messaging: messagingProvider!,
|
||||
request: new FetchRequestProvider(),
|
||||
translations: translationProvider,
|
||||
dbConfig: {
|
||||
provider: sqlProvider!,
|
||||
isReadOnly: false,
|
||||
onTransactionCommit: () => {
|
||||
// No-op for now
|
||||
coreModule?.ws.sendTransactionEntityChangesToAllClients();
|
||||
},
|
||||
onTransactionRollback: () => {
|
||||
// No-op for now
|
||||
}
|
||||
}
|
||||
});
|
||||
coreModule.ws.init();
|
||||
|
||||
console.log("[Worker] Supported routes", Object.keys(coreModule.routes));
|
||||
|
||||
@@ -206,10 +212,6 @@ interface LocalRequest {
|
||||
|
||||
// Main dispatch
|
||||
async function dispatch(request: LocalRequest) {
|
||||
const url = new URL(request.url);
|
||||
|
||||
console.log("[Worker] Dispatch:", url.pathname);
|
||||
|
||||
// Ensure initialization is complete and get the router
|
||||
const appRouter = await ensureInitialized();
|
||||
|
||||
|
||||
@@ -57,6 +57,49 @@ export function unsubscribeToMessage(messageHandler: MessageHandler) {
|
||||
messageHandlers = messageHandlers.filter(handler => handler !== messageHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch a message to all handlers and process it.
|
||||
* This is the main entry point for incoming messages from any provider
|
||||
* (WebSocket, Worker, etc.)
|
||||
*/
|
||||
export async function dispatchMessage(message: WebSocketMessage) {
|
||||
// Notify all subscribers
|
||||
for (const messageHandler of messageHandlers) {
|
||||
messageHandler(message);
|
||||
}
|
||||
|
||||
// Use string type for flexibility - server sends more message types than are typed
|
||||
const messageType = message.type as string;
|
||||
const msg = message as any;
|
||||
|
||||
// Process the message
|
||||
if (messageType === "ping") {
|
||||
lastPingTs = Date.now();
|
||||
} else if (messageType === "reload-frontend") {
|
||||
utils.reloadFrontendApp("received request from backend to reload frontend");
|
||||
} else if (messageType === "frontend-update") {
|
||||
await executeFrontendUpdate(msg.data.entityChanges);
|
||||
} else if (messageType === "sync-hash-check-failed") {
|
||||
toastService.showError(t("ws.sync-check-failed"), 60000);
|
||||
} else if (messageType === "consistency-checks-failed") {
|
||||
toastService.showError(t("ws.consistency-checks-failed"), 50 * 60000);
|
||||
} else if (messageType === "api-log-messages") {
|
||||
appContext.triggerEvent("apiLogMessages", { noteId: msg.noteId, messages: msg.messages });
|
||||
} else if (messageType === "toast") {
|
||||
toastService.showMessage(msg.message);
|
||||
} else if (messageType === "execute-script") {
|
||||
// TODO: Remove after porting the file
|
||||
// @ts-ignore
|
||||
const bundleService = (await import("./bundle.js")).default as any;
|
||||
// TODO: Remove after porting the file
|
||||
// @ts-ignore
|
||||
const froca = (await import("./froca.js")).default as any;
|
||||
const originEntity = msg.originEntityId ? await froca.getNote(msg.originEntityId) : null;
|
||||
|
||||
bundleService.getAndExecuteBundle(msg.currentNoteId, originEntity, msg.script, msg.params);
|
||||
}
|
||||
}
|
||||
|
||||
// used to serialize frontend update operations
|
||||
let consumeQueuePromise: Promise<void> | null = null;
|
||||
|
||||
@@ -112,38 +155,13 @@ async function executeFrontendUpdate(entityChanges: EntityChange[]) {
|
||||
}
|
||||
}
|
||||
|
||||
async function handleMessage(event: MessageEvent<any>) {
|
||||
const message = JSON.parse(event.data);
|
||||
|
||||
for (const messageHandler of messageHandlers) {
|
||||
messageHandler(message);
|
||||
}
|
||||
|
||||
if (message.type === "ping") {
|
||||
lastPingTs = Date.now();
|
||||
} else if (message.type === "reload-frontend") {
|
||||
utils.reloadFrontendApp("received request from backend to reload frontend");
|
||||
} else if (message.type === "frontend-update") {
|
||||
await executeFrontendUpdate(message.data.entityChanges);
|
||||
} else if (message.type === "sync-hash-check-failed") {
|
||||
toastService.showError(t("ws.sync-check-failed"), 60000);
|
||||
} else if (message.type === "consistency-checks-failed") {
|
||||
toastService.showError(t("ws.consistency-checks-failed"), 50 * 60000);
|
||||
} else if (message.type === "api-log-messages") {
|
||||
appContext.triggerEvent("apiLogMessages", { noteId: message.noteId, messages: message.messages });
|
||||
} else if (message.type === "toast") {
|
||||
toastService.showMessage(message.message);
|
||||
} else if (message.type === "execute-script") {
|
||||
// TODO: Remove after porting the file
|
||||
// @ts-ignore
|
||||
const bundleService = (await import("./bundle.js")).default as any;
|
||||
// TODO: Remove after porting the file
|
||||
// @ts-ignore
|
||||
const froca = (await import("./froca.js")).default as any;
|
||||
const originEntity = message.originEntityId ? await froca.getNote(message.originEntityId) : null;
|
||||
|
||||
bundleService.getAndExecuteBundle(message.currentNoteId, originEntity, message.script, message.params);
|
||||
}
|
||||
/**
|
||||
* WebSocket message handler - parses the event and dispatches to generic handler.
|
||||
* This is only used in WebSocket mode (not standalone).
|
||||
*/
|
||||
async function handleWebSocketMessage(event: MessageEvent<string>) {
|
||||
const message = JSON.parse(event.data) as WebSocketMessage;
|
||||
await dispatchMessage(message);
|
||||
}
|
||||
|
||||
let entityChangeIdReachedListeners: {
|
||||
@@ -228,13 +246,18 @@ function connectWebSocket() {
|
||||
// use wss for secure messaging
|
||||
const ws = new WebSocket(webSocketUri);
|
||||
ws.onopen = () => console.debug(utils.now(), `Connected to server ${webSocketUri} with WebSocket`);
|
||||
ws.onmessage = handleMessage;
|
||||
ws.onmessage = handleWebSocketMessage;
|
||||
// we're not handling ws.onclose here because reconnection is done in sendPing()
|
||||
|
||||
return ws;
|
||||
}
|
||||
|
||||
async function sendPing() {
|
||||
if (!ws) {
|
||||
// In standalone mode, there's no WebSocket — nothing to ping.
|
||||
return;
|
||||
}
|
||||
|
||||
if (Date.now() - lastPingTs > 30000) {
|
||||
console.warn(utils.now(), "Lost websocket connection to the backend");
|
||||
toast.showPersistent({
|
||||
@@ -261,8 +284,18 @@ async function sendPing() {
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
if (glob.device === "print" || glob.isStandalone) return;
|
||||
if (glob.device === "print") return;
|
||||
|
||||
if (glob.isStandalone) {
|
||||
// In standalone mode, listen for messages from the local worker via custom event
|
||||
window.addEventListener("trilium:ws-message", ((event: CustomEvent<WebSocketMessage>) => {
|
||||
dispatchMessage(event.detail);
|
||||
}) as EventListener);
|
||||
console.debug(utils.now(), "Standalone mode: listening for worker messages");
|
||||
return;
|
||||
}
|
||||
|
||||
// Normal mode: use WebSocket
|
||||
ws = connectWebSocket();
|
||||
|
||||
lastPingTs = Date.now();
|
||||
|
||||
@@ -66,7 +66,6 @@
|
||||
"@types/ws": "8.18.1",
|
||||
"@types/xml2js": "0.4.14",
|
||||
"archiver": "7.0.1",
|
||||
"async-mutex": "0.5.0",
|
||||
"axios": "1.13.6",
|
||||
"bindings": "1.5.0",
|
||||
"bootstrap": "5.3.8",
|
||||
|
||||
@@ -38,9 +38,10 @@ export default async function buildApp() {
|
||||
app.set("view engine", "ejs");
|
||||
|
||||
app.use((req, res, next) => {
|
||||
// set CORS header
|
||||
// set CORS headers
|
||||
if (config["Network"]["corsAllowOrigin"]) {
|
||||
res.header("Access-Control-Allow-Origin", config["Network"]["corsAllowOrigin"]);
|
||||
res.header("Access-Control-Allow-Credentials", "true");
|
||||
}
|
||||
if (config["Network"]["corsAllowMethods"]) {
|
||||
res.header("Access-Control-Allow-Methods", config["Network"]["corsAllowMethods"]);
|
||||
@@ -49,6 +50,12 @@ export default async function buildApp() {
|
||||
res.header("Access-Control-Allow-Headers", config["Network"]["corsAllowHeaders"]);
|
||||
}
|
||||
|
||||
// Handle preflight OPTIONS requests
|
||||
if (req.method === "OPTIONS" && config["Network"]["corsAllowOrigin"]) {
|
||||
res.sendStatus(204);
|
||||
return;
|
||||
}
|
||||
|
||||
res.locals.t = t;
|
||||
return next();
|
||||
});
|
||||
@@ -98,13 +105,12 @@ export default async function buildApp() {
|
||||
custom.register(app);
|
||||
error_handlers.register(app);
|
||||
|
||||
const { startSyncTimer } = await import("./services/sync.js");
|
||||
startSyncTimer();
|
||||
const { sync, consistency_checks } = await import("@triliumnext/core");
|
||||
sync.startSyncTimer();
|
||||
|
||||
await import("./services/backup.js");
|
||||
|
||||
const { startConsistencyChecks } = await import("./services/consistency_checks.js");
|
||||
startConsistencyChecks();
|
||||
consistency_checks.startConsistencyChecks();
|
||||
|
||||
const { startScheduler } = await import("./services/scheduler.js");
|
||||
startScheduler();
|
||||
|
||||
@@ -26,4 +26,10 @@ export default class NodejsCryptoProvider implements CryptoProvider {
|
||||
return randtoken.generate(length);
|
||||
}
|
||||
|
||||
hmac(secret: string | Uint8Array, value: string | Uint8Array) {
|
||||
const hmac = crypto.createHmac("sha256", Buffer.from(secret.toString(), "ascii"));
|
||||
hmac.update(value.toString());
|
||||
return hmac.digest("base64");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ import path from "path";
|
||||
import ClsHookedExecutionContext from "./cls_provider.js";
|
||||
import NodejsCryptoProvider from "./crypto_provider.js";
|
||||
import dataDirs from "./services/data_dir.js";
|
||||
import NodeRequestProvider from "./services/request.js";
|
||||
import WebSocketMessagingProvider from "./services/ws_messaging_provider.js";
|
||||
import BetterSqlite3Provider from "./sql_provider.js";
|
||||
|
||||
async function startApplication() {
|
||||
@@ -45,7 +47,9 @@ async function startApplication() {
|
||||
},
|
||||
},
|
||||
crypto: new NodejsCryptoProvider(),
|
||||
request: new NodeRequestProvider(),
|
||||
executionContext: new ClsHookedExecutionContext(),
|
||||
messaging: new WebSocketMessagingProvider(),
|
||||
translations: (await import("./services/i18n.js")).initializeTranslations,
|
||||
extraAppInfo: {
|
||||
nodeVersion: process.version,
|
||||
|
||||
@@ -38,7 +38,6 @@ import scriptRoute from "./api/script.js";
|
||||
import senderRoute from "./api/sender.js";
|
||||
import setupApiRoute from "./api/setup.js";
|
||||
import similarNotesRoute from "./api/similar_notes.js";
|
||||
import syncApiRoute from "./api/sync.js";
|
||||
import systemInfoRoute from "./api/system_info.js";
|
||||
import totp from './api/totp.js';
|
||||
// API routes
|
||||
@@ -86,8 +85,11 @@ function register(app: express.Application) {
|
||||
apiRoute(GET, '/api/totp_recovery/used', recoveryCodes.getUsedRecoveryCodes);
|
||||
|
||||
routes.buildSharedApiRoutes({
|
||||
route,
|
||||
apiRoute,
|
||||
asyncApiRoute
|
||||
asyncApiRoute,
|
||||
apiResultHandler,
|
||||
checkApiAuth: auth.checkApiAuth
|
||||
});
|
||||
|
||||
route(PUT, "/api/notes/:noteId/file", [auth.checkApiAuthOrElectron, uploadMiddlewareWithErrorHandling, csrfMiddleware], filesRoute.updateFile, apiResultHandler);
|
||||
@@ -145,18 +147,6 @@ function register(app: express.Application) {
|
||||
apiRoute(PST, "/api/password/change", passwordApiRoute.changePassword);
|
||||
apiRoute(PST, "/api/password/reset", passwordApiRoute.resetPassword);
|
||||
|
||||
asyncApiRoute(PST, "/api/sync/test", syncApiRoute.testSync);
|
||||
asyncApiRoute(PST, "/api/sync/now", syncApiRoute.syncNow);
|
||||
apiRoute(PST, "/api/sync/fill-entity-changes", syncApiRoute.fillEntityChanges);
|
||||
apiRoute(PST, "/api/sync/force-full-sync", syncApiRoute.forceFullSync);
|
||||
route(GET, "/api/sync/check", [auth.checkApiAuth], syncApiRoute.checkSync, apiResultHandler);
|
||||
route(GET, "/api/sync/changed", [auth.checkApiAuth], syncApiRoute.getChanged, apiResultHandler);
|
||||
route(PUT, "/api/sync/update", [auth.checkApiAuth], syncApiRoute.update, apiResultHandler);
|
||||
route(PST, "/api/sync/finished", [auth.checkApiAuth], syncApiRoute.syncFinished, apiResultHandler);
|
||||
route(PST, "/api/sync/check-entity-changes", [auth.checkApiAuth], syncApiRoute.checkEntityChanges, apiResultHandler);
|
||||
route(PST, "/api/sync/queue-sector/:entityName/:sector", [auth.checkApiAuth], syncApiRoute.queueSector, apiResultHandler);
|
||||
route(GET, "/api/sync/stats", [], syncApiRoute.getStats, apiResultHandler);
|
||||
|
||||
apiRoute(GET, "/api/metrics", metricsRoute.getMetrics);
|
||||
apiRoute(GET, "/api/system-checks", systemInfoRoute.systemChecks);
|
||||
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
import type { OptionRow } from "@triliumnext/commons";
|
||||
|
||||
/**
|
||||
* Response for /api/setup/status.
|
||||
*/
|
||||
export interface SetupStatusResponse {
|
||||
syncVersion: number;
|
||||
schemaExists: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Response for /api/setup/sync-seed.
|
||||
*/
|
||||
export interface SetupSyncSeedResponse {
|
||||
syncVersion: number;
|
||||
options: OptionRow[];
|
||||
}
|
||||
2
apps/server/src/services/attributes.ts
Normal file
2
apps/server/src/services/attributes.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
import { attributes } from "@triliumnext/core";
|
||||
export default attributes;
|
||||
@@ -1,5 +1,5 @@
|
||||
import { type AttributeRow, dayjs, formatLogMessage } from "@triliumnext/commons";
|
||||
import { type AbstractBeccaEntity, Becca, branches as branchService, NoteParams } from "@triliumnext/core";
|
||||
import { type AbstractBeccaEntity, Becca, branches as branchService, NoteParams, SearchContext, sync_mutex as syncMutex } from "@triliumnext/core";
|
||||
import axios from "axios";
|
||||
import * as cheerio from "cheerio";
|
||||
import xml2js from "xml2js";
|
||||
@@ -23,12 +23,10 @@ import exportService from "./export/zip.js";
|
||||
import log from "./log.js";
|
||||
import noteService from "./notes.js";
|
||||
import optionsService from "./options.js";
|
||||
import SearchContext from "./search/search_context.js";
|
||||
import searchService from "./search/services/search.js";
|
||||
import SpacedUpdate from "./spaced_update.js";
|
||||
import specialNotesService from "./special_notes.js";
|
||||
import sql from "./sql.js";
|
||||
import syncMutex from "./sync_mutex.js";
|
||||
import treeService from "./tree.js";
|
||||
import { escapeHtml, randomString, unescapeHtml } from "./utils.js";
|
||||
import ws from "./ws.js";
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
"use strict";
|
||||
|
||||
import dateUtils from "./date_utils.js";
|
||||
import optionService from "./options.js";
|
||||
import fs from "fs";
|
||||
import dataDir from "./data_dir.js";
|
||||
import log from "./log.js";
|
||||
import syncMutexService from "./sync_mutex.js";
|
||||
import cls from "./cls.js";
|
||||
import sql from "./sql.js";
|
||||
import path from "path";
|
||||
import type { DatabaseBackup, OptionNames } from "@triliumnext/commons";
|
||||
import { sync_mutex as syncMutexService } from "@triliumnext/core";
|
||||
import fs from "fs";
|
||||
import path from "path";
|
||||
|
||||
import cls from "./cls.js";
|
||||
import dataDir from "./data_dir.js";
|
||||
import dateUtils from "./date_utils.js";
|
||||
import log from "./log.js";
|
||||
import optionService from "./options.js";
|
||||
import sql from "./sql.js";
|
||||
|
||||
type BackupType = "daily" | "weekly" | "monthly";
|
||||
|
||||
|
||||
@@ -37,12 +37,9 @@ function isMigrationRunning() {
|
||||
return cls.isMigrationRunning();
|
||||
}
|
||||
|
||||
/** @deprecated */
|
||||
function getAndClearEntityChangeIds() {
|
||||
const entityChangeIds = cls.getContext().get("entityChangeIds") || [];
|
||||
|
||||
cls.getContext().set("entityChangeIds", []);
|
||||
|
||||
return entityChangeIds;
|
||||
return cls.getAndClearEntityChangeIds();
|
||||
}
|
||||
|
||||
function putEntityChange(entityChange: EntityChange) {
|
||||
|
||||
@@ -1,965 +1,2 @@
|
||||
import type { BranchRow } from "@triliumnext/commons";
|
||||
import type { EntityChange } from "@triliumnext/commons";
|
||||
import { becca_loader, erase as eraseService, utils } from "@triliumnext/core";
|
||||
|
||||
import becca from "../becca/becca.js";
|
||||
import BBranch from "../becca/entities/bbranch.js";
|
||||
import noteTypesService from "../services/note_types.js";
|
||||
import { hashedBlobId, randomString } from "../services/utils.js";
|
||||
import cls from "./cls.js";
|
||||
import entityChangesService from "./entity_changes.js";
|
||||
import log from "./log.js";
|
||||
import optionsService from "./options.js";
|
||||
import sql from "./sql.js";
|
||||
import sqlInit from "./sql_init.js";
|
||||
import syncMutexService from "./sync_mutex.js";
|
||||
import ws from "./ws.js";
|
||||
const noteTypes = noteTypesService.getNoteTypeNames();
|
||||
|
||||
class ConsistencyChecks {
|
||||
private autoFix: boolean;
|
||||
private unrecoveredConsistencyErrors: boolean;
|
||||
private fixedIssues: boolean;
|
||||
private reloadNeeded: boolean;
|
||||
|
||||
/**
|
||||
* @param autoFix - automatically fix all encountered problems. False is only for debugging during development (fail fast)
|
||||
*/
|
||||
constructor(autoFix: boolean) {
|
||||
this.autoFix = autoFix;
|
||||
this.unrecoveredConsistencyErrors = false;
|
||||
this.fixedIssues = false;
|
||||
this.reloadNeeded = false;
|
||||
}
|
||||
|
||||
findAndFixIssues(query: string, fixerCb: (res: any) => void) {
|
||||
const results = sql.getRows(query);
|
||||
|
||||
for (const res of results) {
|
||||
try {
|
||||
sql.transactional(() => fixerCb(res));
|
||||
|
||||
if (this.autoFix) {
|
||||
this.fixedIssues = true;
|
||||
} else {
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
}
|
||||
} catch (e: any) {
|
||||
logError(`Fixer failed with ${e.message} ${e.stack}`);
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
checkTreeCycles() {
|
||||
const childToParents: Record<string, string[]> = {};
|
||||
const rows = sql.getRows<BranchRow>("SELECT noteId, parentNoteId FROM branches WHERE isDeleted = 0");
|
||||
|
||||
for (const row of rows) {
|
||||
const childNoteId = row.noteId;
|
||||
const parentNoteId = row.parentNoteId;
|
||||
|
||||
childToParents[childNoteId] = childToParents[childNoteId] || [];
|
||||
childToParents[childNoteId].push(parentNoteId);
|
||||
}
|
||||
|
||||
/** @returns true if cycle was found and we should try again */
|
||||
const checkTreeCycle = (noteId: string, path: string[]) => {
|
||||
if (noteId === "root") {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const parentNoteId of childToParents[noteId]) {
|
||||
if (path.includes(parentNoteId)) {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranchFromChildAndParent(noteId, parentNoteId);
|
||||
if (branch) {
|
||||
branch.markAsDeleted("cycle-autofix");
|
||||
logFix(`Branch '${branch.branchId}' between child '${noteId}' and parent '${parentNoteId}' has been deleted since it was causing a tree cycle.`);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
logError(`Tree cycle detected at parent-child relationship: '${parentNoteId}' - '${noteId}', whole path: '${path}'`);
|
||||
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
|
||||
} else {
|
||||
const newPath = path.slice();
|
||||
newPath.push(noteId);
|
||||
|
||||
const retryNeeded = checkTreeCycle(parentNoteId, newPath);
|
||||
|
||||
if (retryNeeded) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
const noteIds = Object.keys(childToParents);
|
||||
|
||||
for (const noteId of noteIds) {
|
||||
const retryNeeded = checkTreeCycle(noteId, []);
|
||||
|
||||
if (retryNeeded) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
checkAndRepairTreeCycles() {
|
||||
let treeFixed = false;
|
||||
|
||||
while (this.checkTreeCycles()) {
|
||||
// fixing cycle means deleting branches, we might need to create a new branch to recover the note
|
||||
this.findExistencyIssues();
|
||||
|
||||
treeFixed = true;
|
||||
}
|
||||
|
||||
if (treeFixed) {
|
||||
this.reloadNeeded = true;
|
||||
}
|
||||
}
|
||||
|
||||
findBrokenReferenceIssues() {
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId, branches.noteId
|
||||
FROM branches
|
||||
LEFT JOIN notes USING (noteId)
|
||||
WHERE branches.isDeleted = 0
|
||||
AND notes.noteId IS NULL`,
|
||||
({ branchId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranch(branchId);
|
||||
if (!branch) {
|
||||
return;
|
||||
}
|
||||
branch.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Branch '${branchId}' has been deleted since it references missing note '${noteId}'`);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' references missing note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId, branches.parentNoteId AS parentNoteId
|
||||
FROM branches
|
||||
LEFT JOIN notes ON notes.noteId = branches.parentNoteId
|
||||
WHERE branches.isDeleted = 0
|
||||
AND branches.noteId != 'root'
|
||||
AND notes.noteId IS NULL`,
|
||||
({ branchId, parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
// Delete the old branch and recreate it with root as parent.
|
||||
const oldBranch = becca.getBranch(branchId);
|
||||
if (!oldBranch) {
|
||||
return;
|
||||
}
|
||||
|
||||
const noteId = oldBranch.noteId;
|
||||
oldBranch.markAsDeleted("missing-parent");
|
||||
|
||||
let message = `Branch '${branchId}' was missing parent note '${parentNoteId}', so it was deleted. `;
|
||||
|
||||
const note = becca.getNote(noteId);
|
||||
if (!note) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (note.getParentBranches().length === 0) {
|
||||
const newBranch = new BBranch({
|
||||
parentNoteId: "root",
|
||||
noteId,
|
||||
prefix: "recovered"
|
||||
}).save();
|
||||
|
||||
message += `${newBranch.branchId} was created in the root instead.`;
|
||||
} else {
|
||||
message += `There is one or more valid branches, so no new one will be created as a replacement.`;
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(message);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' references missing parent note '${parentNoteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId, attributes.noteId
|
||||
FROM attributes
|
||||
LEFT JOIN notes USING (noteId)
|
||||
WHERE attributes.isDeleted = 0
|
||||
AND notes.noteId IS NULL`,
|
||||
({ attributeId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) {
|
||||
return;
|
||||
}
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Attribute '${attributeId}' has been deleted since it references missing source note '${noteId}'`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' references missing source note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId, attributes.value AS noteId
|
||||
FROM attributes
|
||||
LEFT JOIN notes ON notes.noteId = attributes.value
|
||||
WHERE attributes.isDeleted = 0
|
||||
AND attributes.type = 'relation'
|
||||
AND notes.noteId IS NULL`,
|
||||
({ attributeId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) {
|
||||
return;
|
||||
}
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Relation '${attributeId}' has been deleted since it references missing note '${noteId}'`);
|
||||
} else {
|
||||
logError(`Relation '${attributeId}' references missing note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attachmentId, attachments.ownerId AS noteId
|
||||
FROM attachments
|
||||
WHERE attachments.ownerId NOT IN (
|
||||
SELECT noteId FROM notes
|
||||
UNION ALL
|
||||
SELECT revisionId FROM revisions
|
||||
)
|
||||
AND attachments.isDeleted = 0`,
|
||||
({ attachmentId, ownerId }) => {
|
||||
if (this.autoFix) {
|
||||
const attachment = becca.getAttachment(attachmentId);
|
||||
if (!attachment) {
|
||||
return;
|
||||
}
|
||||
attachment.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = false;
|
||||
|
||||
logFix(`Attachment '${attachmentId}' has been deleted since it references missing note/revision '${ownerId}'`);
|
||||
} else {
|
||||
logError(`Attachment '${attachmentId}' references missing note/revision '${ownerId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
findExistencyIssues() {
|
||||
// the principle for fixing inconsistencies is that if the note itself is deleted (isDeleted=true) then all related
|
||||
// entities should be also deleted (branches, attributes), but if the note is not deleted,
|
||||
// then at least one branch should exist.
|
||||
|
||||
// the order here is important - first we might need to delete inconsistent branches, and after that
|
||||
// another check might create missing branch
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId,
|
||||
noteId
|
||||
FROM branches
|
||||
JOIN notes USING (noteId)
|
||||
WHERE notes.isDeleted = 1
|
||||
AND branches.isDeleted = 0`,
|
||||
({ branchId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranch(branchId);
|
||||
if (!branch) return;
|
||||
branch.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Branch '${branchId}' has been deleted since the associated note '${noteId}' is deleted.`);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' is not deleted even though the associated note '${noteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId,
|
||||
parentNoteId
|
||||
FROM branches
|
||||
JOIN notes AS parentNote ON parentNote.noteId = branches.parentNoteId
|
||||
WHERE parentNote.isDeleted = 1
|
||||
AND branches.isDeleted = 0
|
||||
`,
|
||||
({ branchId, parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranch(branchId);
|
||||
if (!branch) {
|
||||
return;
|
||||
}
|
||||
branch.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Branch '${branchId}' has been deleted since the associated parent note '${parentNoteId}' is deleted.`);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' is not deleted even though the associated parent note '${parentNoteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT DISTINCT notes.noteId
|
||||
FROM notes
|
||||
LEFT JOIN branches ON notes.noteId = branches.noteId AND branches.isDeleted = 0
|
||||
WHERE notes.isDeleted = 0
|
||||
AND branches.branchId IS NULL
|
||||
`,
|
||||
({ noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = new BBranch({
|
||||
parentNoteId: "root",
|
||||
noteId,
|
||||
prefix: "recovered"
|
||||
}).save();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Created missing branch '${branch.branchId}' for note '${noteId}'`);
|
||||
} else {
|
||||
logError(`No undeleted branch found for note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// there should be a unique relationship between note and its parent
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT noteId,
|
||||
parentNoteId
|
||||
FROM branches
|
||||
WHERE branches.isDeleted = 0
|
||||
GROUP BY branches.parentNoteId,
|
||||
branches.noteId
|
||||
HAVING COUNT(1) > 1`,
|
||||
({ noteId, parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branchIds = sql.getColumn<string>(
|
||||
/*sql*/`SELECT branchId
|
||||
FROM branches
|
||||
WHERE noteId = ?
|
||||
and parentNoteId = ?
|
||||
and isDeleted = 0
|
||||
ORDER BY utcDateModified`,
|
||||
[noteId, parentNoteId]
|
||||
);
|
||||
|
||||
const branches = branchIds.map((branchId) => becca.getBranch(branchId));
|
||||
|
||||
// it's not necessarily "original" branch, it's just the only one which will survive
|
||||
const origBranch = branches[0];
|
||||
if (!origBranch) {
|
||||
logError(`Unable to find original branch.`);
|
||||
return;
|
||||
}
|
||||
|
||||
// delete all but the first branch
|
||||
for (const branch of branches.slice(1)) {
|
||||
if (!branch) {
|
||||
continue;
|
||||
}
|
||||
|
||||
branch.markAsDeleted();
|
||||
|
||||
logFix(`Removing branch '${branch.branchId}' since it's a parent-child duplicate of branch '${origBranch.branchId}'`);
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
} else {
|
||||
logError(`Duplicate branches for note '${noteId}' and parent '${parentNoteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attachmentId,
|
||||
attachments.ownerId AS noteId
|
||||
FROM attachments
|
||||
JOIN notes ON notes.noteId = attachments.ownerId
|
||||
WHERE notes.isDeleted = 1
|
||||
AND attachments.isDeleted = 0`,
|
||||
({ attachmentId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attachment = becca.getAttachment(attachmentId);
|
||||
if (!attachment) return;
|
||||
attachment.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = false;
|
||||
|
||||
logFix(`Attachment '${attachmentId}' has been deleted since the associated note '${noteId}' is deleted.`);
|
||||
} else {
|
||||
logError(`Attachment '${attachmentId}' is not deleted even though the associated note '${noteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
findLogicIssues() {
|
||||
const noteTypesStr = noteTypes.map((nt) => `'${nt}'`).join(", ");
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT noteId, type
|
||||
FROM notes
|
||||
WHERE isDeleted = 0
|
||||
AND type NOT IN (${noteTypesStr})`,
|
||||
({ noteId, type }) => {
|
||||
if (this.autoFix) {
|
||||
const note = becca.getNote(noteId);
|
||||
if (!note) return;
|
||||
note.type = "file"; // file is a safe option to recover notes if the type is not known
|
||||
note.save();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note '${noteId}' type has been change to file since it had invalid type '${type}'`);
|
||||
} else {
|
||||
logError(`Note '${noteId}' has invalid type '${type}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT notes.noteId, notes.isProtected, notes.type, notes.mime
|
||||
FROM notes
|
||||
LEFT JOIN blobs USING (blobId)
|
||||
WHERE blobs.blobId IS NULL
|
||||
AND notes.isDeleted = 0`,
|
||||
({ noteId, isProtected, type, mime }) => {
|
||||
if (this.autoFix) {
|
||||
// it might be possible that the blob is not available only because of the interrupted
|
||||
// sync, and it will come later. It's therefore important to guarantee that this artificial
|
||||
// record won't overwrite the real one coming from the sync.
|
||||
const fakeDate = "2000-01-01 00:00:00Z";
|
||||
|
||||
const blankContent = getBlankContent(isProtected, type, mime);
|
||||
if (!blankContent) {
|
||||
logError(`Unable to recover note ${noteId} since it's content could not be retrieved (might be protected note).`);
|
||||
return;
|
||||
}
|
||||
const blobId = hashedBlobId(blankContent);
|
||||
const blobAlreadyExists = !!sql.getValue("SELECT 1 FROM blobs WHERE blobId = ?", [blobId]);
|
||||
|
||||
if (!blobAlreadyExists) {
|
||||
// manually creating row since this can also affect deleted notes
|
||||
sql.upsert("blobs", "blobId", {
|
||||
noteId,
|
||||
content: blankContent,
|
||||
utcDateModified: fakeDate,
|
||||
dateModified: fakeDate
|
||||
});
|
||||
|
||||
const hash = utils.hash(randomString(10));
|
||||
|
||||
entityChangesService.putEntityChange({
|
||||
entityName: "blobs",
|
||||
entityId: blobId,
|
||||
hash,
|
||||
isErased: false,
|
||||
utcDateChanged: fakeDate,
|
||||
isSynced: true
|
||||
});
|
||||
}
|
||||
|
||||
sql.execute("UPDATE notes SET blobId = ? WHERE noteId = ?", [blobId, noteId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note '${noteId}' content was set to empty string since there was no corresponding row`);
|
||||
} else {
|
||||
logError(`Note '${noteId}' content row does not exist`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
if (sqlInit.getDbSize() < 500000) {
|
||||
// querying for "content IS NULL" is expensive since content is not indexed. See e.g. https://github.com/zadam/trilium/issues/2887
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT notes.noteId, notes.type, notes.mime
|
||||
FROM notes
|
||||
JOIN blobs USING (blobId)
|
||||
WHERE isDeleted = 0
|
||||
AND isProtected = 0
|
||||
AND content IS NULL`,
|
||||
({ noteId, type, mime }) => {
|
||||
if (this.autoFix) {
|
||||
const note = becca.getNote(noteId);
|
||||
const blankContent = getBlankContent(false, type, mime);
|
||||
if (!note) return;
|
||||
|
||||
if (blankContent) {
|
||||
note.setContent(blankContent);
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note '${noteId}' content was set to '${blankContent}' since it was null even though it is not deleted`);
|
||||
} else {
|
||||
logError(`Note '${noteId}' content is null even though it is not deleted`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT revisions.revisionId, blobs.blobId
|
||||
FROM revisions
|
||||
LEFT JOIN blobs USING (blobId)
|
||||
WHERE blobs.blobId IS NULL`,
|
||||
({ revisionId, blobId }) => {
|
||||
if (this.autoFix) {
|
||||
eraseService.eraseRevisions([revisionId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note revision '${revisionId}' was erased since the referenced blob '${blobId}' did not exist.`);
|
||||
} else {
|
||||
logError(`Note revision '${revisionId}' blob '${blobId}' does not exist`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attachments.attachmentId, blobs.blobId
|
||||
FROM attachments
|
||||
LEFT JOIN blobs USING (blobId)
|
||||
WHERE blobs.blobId IS NULL`,
|
||||
({ attachmentId, blobId }) => {
|
||||
if (this.autoFix) {
|
||||
eraseService.eraseAttachments([attachmentId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Attachment '${attachmentId}' was erased since the referenced blob '${blobId}' did not exist.`);
|
||||
} else {
|
||||
logError(`Attachment '${attachmentId}' blob '${blobId}' does not exist`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT parentNoteId
|
||||
FROM branches
|
||||
JOIN notes ON notes.noteId = branches.parentNoteId
|
||||
WHERE notes.isDeleted = 0
|
||||
AND notes.type == 'search'
|
||||
AND branches.isDeleted = 0`,
|
||||
({ parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branchIds = sql.getColumn<string>(
|
||||
`
|
||||
SELECT branchId
|
||||
FROM branches
|
||||
WHERE isDeleted = 0
|
||||
AND parentNoteId = ?`,
|
||||
[parentNoteId]
|
||||
);
|
||||
|
||||
const branches = branchIds.map((branchId) => becca.getBranch(branchId));
|
||||
|
||||
for (const branch of branches) {
|
||||
if (!branch) continue;
|
||||
|
||||
// delete the old wrong branch
|
||||
branch.markAsDeleted("parent-is-search");
|
||||
|
||||
// create a replacement branch in root parent
|
||||
new BBranch({
|
||||
parentNoteId: "root",
|
||||
noteId: branch.noteId,
|
||||
prefix: "recovered"
|
||||
}).save();
|
||||
|
||||
logFix(`Note '${branch.noteId}' has been moved to root since it was a child of a search note '${parentNoteId}'`);
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
} else {
|
||||
logError(`Search note '${parentNoteId}' has children`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId
|
||||
FROM attributes
|
||||
WHERE isDeleted = 0
|
||||
AND type = 'relation'
|
||||
AND value = ''`,
|
||||
({ attributeId }) => {
|
||||
if (this.autoFix) {
|
||||
const relation = becca.getAttribute(attributeId);
|
||||
if (!relation) return;
|
||||
relation.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Removed relation '${relation.attributeId}' of name '${relation.name}' with empty target.`);
|
||||
} else {
|
||||
logError(`Relation '${attributeId}' has empty target.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId,
|
||||
type
|
||||
FROM attributes
|
||||
WHERE isDeleted = 0
|
||||
AND type != 'label'
|
||||
AND type != 'relation'`,
|
||||
({ attributeId, type }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) return;
|
||||
attribute.type = "label";
|
||||
attribute.save();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Attribute '${attributeId}' type was changed to label since it had invalid type '${type}'`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' has invalid type '${type}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId,
|
||||
attributes.noteId
|
||||
FROM attributes
|
||||
JOIN notes ON attributes.noteId = notes.noteId
|
||||
WHERE attributes.isDeleted = 0
|
||||
AND notes.isDeleted = 1`,
|
||||
({ attributeId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) return;
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Removed attribute '${attributeId}' because owning note '${noteId}' is also deleted.`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' is not deleted even though owning note '${noteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId,
|
||||
attributes.value AS targetNoteId
|
||||
FROM attributes
|
||||
JOIN notes ON attributes.value = notes.noteId
|
||||
WHERE attributes.type = 'relation'
|
||||
AND attributes.isDeleted = 0
|
||||
AND notes.isDeleted = 1`,
|
||||
({ attributeId, targetNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) return;
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Removed attribute '${attributeId}' because target note '${targetNoteId}' is also deleted.`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' is not deleted even though target note '${targetNoteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
runEntityChangeChecks(entityName: string, key: string) {
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT ${key} as entityId
|
||||
FROM ${entityName}
|
||||
LEFT JOIN entity_changes ec ON ec.entityName = '${entityName}' AND ec.entityId = ${entityName}.${key}
|
||||
WHERE ec.id IS NULL`,
|
||||
({ entityId }) => {
|
||||
const entityRow = sql.getRow<EntityChange>(/*sql*/`SELECT * FROM ${entityName} WHERE ${key} = ?`, [entityId]);
|
||||
|
||||
if (this.autoFix) {
|
||||
entityChangesService.putEntityChange({
|
||||
entityName,
|
||||
entityId,
|
||||
hash: randomString(10), // doesn't matter, will force sync, but that's OK
|
||||
isErased: false,
|
||||
utcDateChanged: entityRow.utcDateModified || entityRow.utcDateCreated,
|
||||
isSynced: entityName !== "options" || entityRow.isSynced
|
||||
});
|
||||
|
||||
logFix(`Created missing entity change for entityName '${entityName}', entityId '${entityId}'`);
|
||||
} else {
|
||||
logError(`Missing entity change for entityName '${entityName}', entityId '${entityId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT id, entityId
|
||||
FROM entity_changes
|
||||
LEFT JOIN ${entityName} ON entityId = ${entityName}.${key}
|
||||
WHERE
|
||||
entity_changes.isErased = 0
|
||||
AND entity_changes.entityName = '${entityName}'
|
||||
AND ${entityName}.${key} IS NULL`,
|
||||
({ id, entityId }) => {
|
||||
if (this.autoFix) {
|
||||
sql.execute("DELETE FROM entity_changes WHERE entityName = ? AND entityId = ?", [entityName, entityId]);
|
||||
|
||||
logFix(`Deleted extra entity change id '${id}', entityName '${entityName}', entityId '${entityId}'`);
|
||||
} else {
|
||||
logError(`Unrecognized entity change id '${id}', entityName '${entityName}', entityId '${entityId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT id, entityId
|
||||
FROM entity_changes
|
||||
JOIN ${entityName} ON entityId = ${entityName}.${key}
|
||||
WHERE
|
||||
entity_changes.isErased = 1
|
||||
AND entity_changes.entityName = '${entityName}'`,
|
||||
({ id, entityId }) => {
|
||||
if (this.autoFix) {
|
||||
sql.execute(/*sql*/`DELETE FROM ${entityName} WHERE ${key} = ?`, [entityId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Erasing entityName '${entityName}', entityId '${entityId}' since entity change id '${id}' has it as erased.`);
|
||||
} else {
|
||||
logError(`Entity change id '${id}' has entityName '${entityName}', entityId '${entityId}' as erased, but it's not.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
findEntityChangeIssues() {
|
||||
this.runEntityChangeChecks("notes", "noteId");
|
||||
this.runEntityChangeChecks("revisions", "revisionId");
|
||||
this.runEntityChangeChecks("attachments", "attachmentId");
|
||||
this.runEntityChangeChecks("blobs", "blobId");
|
||||
this.runEntityChangeChecks("branches", "branchId");
|
||||
this.runEntityChangeChecks("attributes", "attributeId");
|
||||
this.runEntityChangeChecks("etapi_tokens", "etapiTokenId");
|
||||
this.runEntityChangeChecks("options", "name");
|
||||
}
|
||||
|
||||
findWronglyNamedAttributes() {
|
||||
const attrNames = sql.getColumn<string>(/*sql*/`SELECT DISTINCT name FROM attributes`);
|
||||
|
||||
for (const origName of attrNames) {
|
||||
const fixedName = utils.sanitizeAttributeName(origName);
|
||||
|
||||
if (fixedName !== origName) {
|
||||
if (this.autoFix) {
|
||||
// there isn't a good way to update this:
|
||||
// - just SQL query will fix it in DB but not notify frontend (or other caches) that it has been fixed
|
||||
// - renaming the attribute would break the invariant that single attribute never changes the name
|
||||
// - deleting the old attribute and creating new will create duplicates across synchronized cluster (specifically in the initial migration)
|
||||
// But in general, we assume there won't be many such problems
|
||||
sql.execute("UPDATE attributes SET name = ? WHERE name = ?", [fixedName, origName]);
|
||||
|
||||
this.fixedIssues = true;
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Renamed incorrectly named attributes '${origName}' to '${fixedName}'`);
|
||||
} else {
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
|
||||
logFix(`There are incorrectly named attributes '${origName}'`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
findSyncIssues() {
|
||||
const lastSyncedPush = parseInt(sql.getValue("SELECT value FROM options WHERE name = 'lastSyncedPush'"));
|
||||
const maxEntityChangeId = sql.getValue<number>("SELECT MAX(id) FROM entity_changes");
|
||||
|
||||
if (lastSyncedPush > maxEntityChangeId) {
|
||||
if (this.autoFix) {
|
||||
sql.execute("UPDATE options SET value = ? WHERE name = 'lastSyncedPush'", [maxEntityChangeId]);
|
||||
|
||||
this.fixedIssues = true;
|
||||
|
||||
logFix(`Fixed incorrect lastSyncedPush - was ${lastSyncedPush}, needs to be at maximum ${maxEntityChangeId}`);
|
||||
} else {
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
|
||||
logFix(`Incorrect lastSyncedPush - is ${lastSyncedPush}, needs to be at maximum ${maxEntityChangeId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
runAllChecksAndFixers() {
|
||||
this.unrecoveredConsistencyErrors = false;
|
||||
this.fixedIssues = false;
|
||||
this.reloadNeeded = false;
|
||||
|
||||
this.findEntityChangeIssues();
|
||||
|
||||
this.findBrokenReferenceIssues();
|
||||
|
||||
this.findExistencyIssues();
|
||||
|
||||
this.findLogicIssues();
|
||||
|
||||
this.findWronglyNamedAttributes();
|
||||
|
||||
this.findSyncIssues();
|
||||
|
||||
// root branch should always be expanded
|
||||
sql.execute("UPDATE branches SET isExpanded = 1 WHERE noteId = 'root'");
|
||||
|
||||
if (!this.unrecoveredConsistencyErrors) {
|
||||
// we run this only if basic checks passed since this assumes basic data consistency
|
||||
|
||||
this.checkAndRepairTreeCycles();
|
||||
}
|
||||
|
||||
if (this.reloadNeeded) {
|
||||
becca_loader.reload("consistency checks need becca reload");
|
||||
}
|
||||
|
||||
return !this.unrecoveredConsistencyErrors;
|
||||
}
|
||||
|
||||
runDbDiagnostics() {
|
||||
function getTableRowCount(tableName: string) {
|
||||
const count = sql.getValue<number>(/*sql*/`SELECT COUNT(1) FROM ${tableName}`);
|
||||
|
||||
return `${tableName}: ${count}`;
|
||||
}
|
||||
|
||||
const tables = ["notes", "revisions", "attachments", "branches", "attributes", "etapi_tokens", "blobs"];
|
||||
|
||||
log.info(`Table counts: ${tables.map((tableName) => getTableRowCount(tableName)).join(", ")}`);
|
||||
}
|
||||
|
||||
async runChecks() {
|
||||
let elapsedTimeMs;
|
||||
|
||||
await syncMutexService.doExclusively(() => {
|
||||
const startTimeMs = Date.now();
|
||||
|
||||
this.runDbDiagnostics();
|
||||
|
||||
this.runAllChecksAndFixers();
|
||||
|
||||
elapsedTimeMs = Date.now() - startTimeMs;
|
||||
});
|
||||
|
||||
if (this.unrecoveredConsistencyErrors) {
|
||||
log.info(`Consistency checks failed (took ${elapsedTimeMs}ms)`);
|
||||
|
||||
ws.sendMessageToAllClients({ type: "consistency-checks-failed" });
|
||||
} else {
|
||||
log.info(`All consistency checks passed ${ this.fixedIssues ? "after some fixes" : "with no errors detected" } (took ${elapsedTimeMs}ms)`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function getBlankContent(isProtected: boolean, type: string, mime: string) {
|
||||
if (isProtected) {
|
||||
return null; // this is wrong for protected non-erased notes, but we cannot create a valid value without a password
|
||||
}
|
||||
|
||||
if (mime === "application/json") {
|
||||
return "{}";
|
||||
}
|
||||
|
||||
return ""; // empty string might be a wrong choice for some note types, but it's the best guess
|
||||
}
|
||||
|
||||
function logFix(message: string) {
|
||||
log.info(`Consistency issue fixed: ${message}`);
|
||||
}
|
||||
|
||||
function logError(message: string) {
|
||||
log.info(`Consistency error: ${message}`);
|
||||
}
|
||||
|
||||
function runPeriodicChecks() {
|
||||
const autoFix = optionsService.getOptionBool("autoFixConsistencyIssues");
|
||||
|
||||
const consistencyChecks = new ConsistencyChecks(autoFix);
|
||||
consistencyChecks.runChecks();
|
||||
}
|
||||
|
||||
async function runOnDemandChecks(autoFix: boolean) {
|
||||
const consistencyChecks = new ConsistencyChecks(autoFix);
|
||||
await consistencyChecks.runChecks();
|
||||
}
|
||||
|
||||
function runEntityChangesChecks() {
|
||||
const consistencyChecks = new ConsistencyChecks(true);
|
||||
consistencyChecks.findEntityChangeIssues();
|
||||
}
|
||||
|
||||
export function startConsistencyChecks() {
|
||||
sqlInit.dbReady.then(() => {
|
||||
setInterval(cls.wrap(runPeriodicChecks), 60 * 60 * 1000);
|
||||
|
||||
// kickoff checks soon after startup (to not block the initial load)
|
||||
setTimeout(cls.wrap(runPeriodicChecks), 4 * 1000);
|
||||
});
|
||||
}
|
||||
|
||||
export default {
|
||||
runOnDemandChecks,
|
||||
runEntityChangesChecks
|
||||
};
|
||||
import { consistency_checks } from "@triliumnext/core";
|
||||
export default consistency_checks;
|
||||
|
||||
@@ -1,89 +1,2 @@
|
||||
import { erase as eraseService,utils } from "@triliumnext/core";
|
||||
|
||||
import log from "./log.js";
|
||||
import sql from "./sql.js";
|
||||
|
||||
type SectorHash = Record<string, string>;
|
||||
|
||||
interface FailedCheck {
|
||||
entityName: string;
|
||||
sector: string[1];
|
||||
}
|
||||
|
||||
function getEntityHashes() {
|
||||
// blob erasure is not synced, we should check before each sync if there's some blob to erase
|
||||
eraseService.eraseUnusedBlobs();
|
||||
|
||||
const startTime = new Date();
|
||||
|
||||
// we know this is slow and the total content hash calculation time is logged
|
||||
type HashRow = [string, string, string, boolean];
|
||||
const hashRows = sql.disableSlowQueryLogging(() =>
|
||||
sql.getRawRows<HashRow>(`
|
||||
SELECT entityName,
|
||||
entityId,
|
||||
hash,
|
||||
isErased
|
||||
FROM entity_changes
|
||||
WHERE isSynced = 1
|
||||
AND entityName != 'note_reordering'`)
|
||||
);
|
||||
|
||||
// sorting is faster in memory
|
||||
// sorting by entityId is enough, hashes will be segmented by entityName later on anyway
|
||||
hashRows.sort((a, b) => (a[1] < b[1] ? -1 : 1));
|
||||
|
||||
const hashMap: Record<string, SectorHash> = {};
|
||||
|
||||
for (const [entityName, entityId, hash, isErased] of hashRows) {
|
||||
const entityHashMap = (hashMap[entityName] = hashMap[entityName] || {});
|
||||
|
||||
const sector = entityId[0];
|
||||
|
||||
// if the entity is erased, its hash is not updated, so it has to be added extra
|
||||
entityHashMap[sector] = (entityHashMap[sector] || "") + hash + isErased;
|
||||
}
|
||||
|
||||
for (const entityHashMap of Object.values(hashMap)) {
|
||||
for (const key in entityHashMap) {
|
||||
entityHashMap[key] = utils.hash(entityHashMap[key]);
|
||||
}
|
||||
}
|
||||
|
||||
const elapsedTimeMs = Date.now() - startTime.getTime();
|
||||
|
||||
log.info(`Content hash computation took ${elapsedTimeMs}ms`);
|
||||
|
||||
return hashMap;
|
||||
}
|
||||
|
||||
function checkContentHashes(otherHashes: Record<string, SectorHash>) {
|
||||
const entityHashes = getEntityHashes();
|
||||
const failedChecks: FailedCheck[] = [];
|
||||
|
||||
for (const entityName in entityHashes) {
|
||||
const thisSectorHashes: SectorHash = entityHashes[entityName] || {};
|
||||
const otherSectorHashes: SectorHash = otherHashes[entityName] || {};
|
||||
|
||||
const sectors = new Set(Object.keys(thisSectorHashes).concat(Object.keys(otherSectorHashes)));
|
||||
|
||||
for (const sector of sectors) {
|
||||
if (thisSectorHashes[sector] !== otherSectorHashes[sector]) {
|
||||
log.info(`Content hash check for ${entityName} sector ${sector} FAILED. Local is ${thisSectorHashes[sector]}, remote is ${otherSectorHashes[sector]}`);
|
||||
|
||||
failedChecks.push({ entityName, sector });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failedChecks.length === 0) {
|
||||
log.info("Content hash checks PASSED");
|
||||
}
|
||||
|
||||
return failedChecks;
|
||||
}
|
||||
|
||||
export default {
|
||||
getEntityHashes,
|
||||
checkContentHashes
|
||||
};
|
||||
import { content_hash } from "@triliumnext/core";
|
||||
export default content_hash;
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
"use strict";
|
||||
|
||||
import type { ExecOpts, RequestProvider } from "@triliumnext/core";
|
||||
import { isElectron } from "./utils.js";
|
||||
import log from "./log.js";
|
||||
import url from "url";
|
||||
import syncOptions from "./sync_options.js";
|
||||
import type { ExecOpts } from "./request_interface.js";
|
||||
|
||||
// this service provides abstraction over node's HTTP/HTTPS and electron net.client APIs
|
||||
// this allows supporting system proxy
|
||||
@@ -33,160 +33,19 @@ interface Client {
|
||||
request(opts: ClientOpts): Request;
|
||||
}
|
||||
|
||||
async function exec<T>(opts: ExecOpts): Promise<T> {
|
||||
const client = getClient(opts);
|
||||
|
||||
// hack for cases where electron.net does not work, but we don't want to set proxy
|
||||
if (opts.proxy === "noproxy") {
|
||||
opts.proxy = null;
|
||||
}
|
||||
|
||||
const paging = opts.paging || {
|
||||
pageCount: 1,
|
||||
pageIndex: 0,
|
||||
requestId: "n/a"
|
||||
};
|
||||
|
||||
const proxyAgent = await getProxyAgent(opts);
|
||||
const parsedTargetUrl = url.parse(opts.url);
|
||||
|
||||
return new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const headers: Record<string, string | number> = {
|
||||
Cookie: (opts.cookieJar && opts.cookieJar.header) || "",
|
||||
"Content-Type": paging.pageCount === 1 ? "application/json" : "text/plain",
|
||||
pageCount: paging.pageCount,
|
||||
pageIndex: paging.pageIndex,
|
||||
requestId: paging.requestId
|
||||
};
|
||||
|
||||
if (opts.auth) {
|
||||
headers["trilium-cred"] = Buffer.from(`dummy:${opts.auth.password}`).toString("base64");
|
||||
}
|
||||
|
||||
const request = (await client).request({
|
||||
method: opts.method,
|
||||
// url is used by electron net module
|
||||
url: opts.url,
|
||||
// 4 fields below are used by http and https node modules
|
||||
protocol: parsedTargetUrl.protocol,
|
||||
host: parsedTargetUrl.hostname,
|
||||
port: parsedTargetUrl.port,
|
||||
path: parsedTargetUrl.path,
|
||||
timeout: opts.timeout, // works only for node.js client
|
||||
headers,
|
||||
agent: proxyAgent
|
||||
});
|
||||
|
||||
request.on("error", (err) => reject(generateError(opts, err)));
|
||||
|
||||
request.on("response", (response) => {
|
||||
if (opts.cookieJar && response.headers["set-cookie"]) {
|
||||
opts.cookieJar.header = response.headers["set-cookie"];
|
||||
}
|
||||
|
||||
let responseStr = "";
|
||||
let chunks: Buffer[] = [];
|
||||
|
||||
response.on("data", (chunk: Buffer) => chunks.push(chunk));
|
||||
|
||||
response.on("end", () => {
|
||||
// use Buffer instead of string concatenation to avoid implicit decoding for each chunk
|
||||
// decode the entire data chunks explicitly as utf-8
|
||||
responseStr = Buffer.concat(chunks).toString("utf-8");
|
||||
|
||||
if ([200, 201, 204].includes(response.statusCode)) {
|
||||
try {
|
||||
const jsonObj = responseStr.trim() ? JSON.parse(responseStr) : null;
|
||||
|
||||
resolve(jsonObj);
|
||||
} catch (e: any) {
|
||||
log.error(`Failed to deserialize sync response: ${responseStr}`);
|
||||
|
||||
reject(generateError(opts, e.message));
|
||||
}
|
||||
} else {
|
||||
let errorMessage;
|
||||
|
||||
try {
|
||||
const jsonObj = JSON.parse(responseStr);
|
||||
|
||||
errorMessage = jsonObj?.message || "";
|
||||
} catch (e: any) {
|
||||
errorMessage = responseStr.substr(0, Math.min(responseStr.length, 100));
|
||||
}
|
||||
|
||||
reject(generateError(opts, `${response.statusCode} ${response.statusMessage} ${errorMessage}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
let payload;
|
||||
|
||||
if (opts.body) {
|
||||
payload = typeof opts.body === "object" ? JSON.stringify(opts.body) : opts.body;
|
||||
}
|
||||
|
||||
request.end(payload as string);
|
||||
} catch (e: any) {
|
||||
reject(generateError(opts, e.message));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function getImage(imageUrl: string): Promise<Buffer> {
|
||||
const proxyConf = syncOptions.getSyncProxy();
|
||||
const opts: ClientOpts = {
|
||||
method: "GET",
|
||||
url: imageUrl,
|
||||
proxy: proxyConf !== "noproxy" ? proxyConf : null
|
||||
};
|
||||
|
||||
const client = await getClient(opts);
|
||||
const proxyAgent = await getProxyAgent(opts);
|
||||
const parsedTargetUrl = url.parse(opts.url);
|
||||
|
||||
return new Promise<Buffer>((resolve, reject) => {
|
||||
try {
|
||||
const request = client.request({
|
||||
method: opts.method,
|
||||
// url is used by electron net module
|
||||
url: opts.url,
|
||||
// 4 fields below are used by http and https node modules
|
||||
protocol: parsedTargetUrl.protocol,
|
||||
host: parsedTargetUrl.hostname,
|
||||
port: parsedTargetUrl.port,
|
||||
path: parsedTargetUrl.path,
|
||||
timeout: opts.timeout, // works only for the node client
|
||||
headers: {},
|
||||
agent: proxyAgent
|
||||
});
|
||||
|
||||
request.on("error", (err) => reject(generateError(opts, err)));
|
||||
|
||||
request.on("abort", (err) => reject(generateError(opts, err)));
|
||||
|
||||
request.on("response", (response) => {
|
||||
if (![200, 201, 204].includes(response.statusCode)) {
|
||||
reject(generateError(opts, `${response.statusCode} ${response.statusMessage}`));
|
||||
}
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
|
||||
response.on("data", (chunk: Buffer) => chunks.push(chunk));
|
||||
response.on("end", () => resolve(Buffer.concat(chunks)));
|
||||
});
|
||||
|
||||
request.end(undefined);
|
||||
} catch (e: any) {
|
||||
reject(generateError(opts, e.message));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const HTTP = "http:",
|
||||
HTTPS = "https:";
|
||||
|
||||
function generateError(
|
||||
opts: {
|
||||
method: string;
|
||||
url: string;
|
||||
},
|
||||
message: string
|
||||
) {
|
||||
return new Error(`Request to ${opts.method} ${opts.url} failed, error: ${message}`);
|
||||
}
|
||||
|
||||
async function getProxyAgent(opts: ClientOpts) {
|
||||
if (!opts.proxy) {
|
||||
return null;
|
||||
@@ -219,17 +78,159 @@ async function getClient(opts: ClientOpts): Promise<Client> {
|
||||
}
|
||||
}
|
||||
|
||||
function generateError(
|
||||
opts: {
|
||||
method: string;
|
||||
url: string;
|
||||
},
|
||||
message: string
|
||||
) {
|
||||
return new Error(`Request to ${opts.method} ${opts.url} failed, error: ${message}`);
|
||||
}
|
||||
export default class NodeRequestProvider implements RequestProvider {
|
||||
|
||||
export default {
|
||||
exec,
|
||||
getImage
|
||||
};
|
||||
async exec<T>(opts: ExecOpts): Promise<T> {
|
||||
const client = getClient(opts);
|
||||
|
||||
// hack for cases where electron.net does not work, but we don't want to set proxy
|
||||
if (opts.proxy === "noproxy") {
|
||||
opts.proxy = null;
|
||||
}
|
||||
|
||||
const paging = opts.paging || {
|
||||
pageCount: 1,
|
||||
pageIndex: 0,
|
||||
requestId: "n/a"
|
||||
};
|
||||
|
||||
const proxyAgent = await getProxyAgent(opts);
|
||||
const parsedTargetUrl = url.parse(opts.url);
|
||||
|
||||
return new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const headers: Record<string, string | number> = {
|
||||
Cookie: (opts.cookieJar && opts.cookieJar.header) || "",
|
||||
"Content-Type": paging.pageCount === 1 ? "application/json" : "text/plain",
|
||||
pageCount: paging.pageCount,
|
||||
pageIndex: paging.pageIndex,
|
||||
requestId: paging.requestId
|
||||
};
|
||||
|
||||
if (opts.auth) {
|
||||
headers["trilium-cred"] = Buffer.from(`dummy:${opts.auth.password}`).toString("base64");
|
||||
}
|
||||
|
||||
const request = (await client).request({
|
||||
method: opts.method,
|
||||
// url is used by electron net module
|
||||
url: opts.url,
|
||||
// 4 fields below are used by http and https node modules
|
||||
protocol: parsedTargetUrl.protocol,
|
||||
host: parsedTargetUrl.hostname,
|
||||
port: parsedTargetUrl.port,
|
||||
path: parsedTargetUrl.path,
|
||||
timeout: opts.timeout, // works only for node.js client
|
||||
headers,
|
||||
agent: proxyAgent
|
||||
});
|
||||
|
||||
request.on("error", (err) => reject(generateError(opts, err)));
|
||||
|
||||
request.on("response", (response) => {
|
||||
if (opts.cookieJar && response.headers["set-cookie"]) {
|
||||
opts.cookieJar.header = response.headers["set-cookie"];
|
||||
}
|
||||
|
||||
let responseStr = "";
|
||||
let chunks: Buffer[] = [];
|
||||
|
||||
response.on("data", (chunk: Buffer) => chunks.push(chunk));
|
||||
|
||||
response.on("end", () => {
|
||||
// use Buffer instead of string concatenation to avoid implicit decoding for each chunk
|
||||
// decode the entire data chunks explicitly as utf-8
|
||||
responseStr = Buffer.concat(chunks).toString("utf-8");
|
||||
|
||||
if ([200, 201, 204].includes(response.statusCode)) {
|
||||
try {
|
||||
const jsonObj = responseStr.trim() ? JSON.parse(responseStr) : null;
|
||||
|
||||
resolve(jsonObj);
|
||||
} catch (e: any) {
|
||||
log.error(`Failed to deserialize sync response: ${responseStr}`);
|
||||
|
||||
reject(generateError(opts, e.message));
|
||||
}
|
||||
} else {
|
||||
let errorMessage;
|
||||
|
||||
try {
|
||||
const jsonObj = JSON.parse(responseStr);
|
||||
|
||||
errorMessage = jsonObj?.message || "";
|
||||
} catch (e: any) {
|
||||
errorMessage = responseStr.substr(0, Math.min(responseStr.length, 100));
|
||||
}
|
||||
|
||||
reject(generateError(opts, `${response.statusCode} ${response.statusMessage} ${errorMessage}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
let payload;
|
||||
|
||||
if (opts.body) {
|
||||
payload = typeof opts.body === "object" ? JSON.stringify(opts.body) : opts.body;
|
||||
}
|
||||
|
||||
request.end(payload as string);
|
||||
} catch (e: any) {
|
||||
reject(generateError(opts, e.message));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async getImage(imageUrl: string): Promise<ArrayBuffer> {
|
||||
const proxyConf = syncOptions.getSyncProxy();
|
||||
const opts: ClientOpts = {
|
||||
method: "GET",
|
||||
url: imageUrl,
|
||||
proxy: proxyConf !== "noproxy" ? proxyConf : null
|
||||
};
|
||||
|
||||
const client = await getClient(opts);
|
||||
const proxyAgent = await getProxyAgent(opts);
|
||||
const parsedTargetUrl = url.parse(opts.url);
|
||||
|
||||
return new Promise<ArrayBuffer>((resolve, reject) => {
|
||||
try {
|
||||
const request = client.request({
|
||||
method: opts.method,
|
||||
// url is used by electron net module
|
||||
url: opts.url,
|
||||
// 4 fields below are used by http and https node modules
|
||||
protocol: parsedTargetUrl.protocol,
|
||||
host: parsedTargetUrl.hostname,
|
||||
port: parsedTargetUrl.port,
|
||||
path: parsedTargetUrl.path,
|
||||
timeout: opts.timeout, // works only for the node client
|
||||
headers: {},
|
||||
agent: proxyAgent
|
||||
});
|
||||
|
||||
request.on("error", (err) => reject(generateError(opts, err)));
|
||||
|
||||
request.on("abort", (err) => reject(generateError(opts, err)));
|
||||
|
||||
request.on("response", (response) => {
|
||||
if (![200, 201, 204].includes(response.statusCode)) {
|
||||
reject(generateError(opts, `${response.statusCode} ${response.statusMessage}`));
|
||||
}
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
|
||||
response.on("data", (chunk: Buffer) => chunks.push(chunk));
|
||||
response.on("end", () => {
|
||||
const buf = Buffer.concat(chunks);
|
||||
resolve(buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength));
|
||||
});
|
||||
});
|
||||
|
||||
request.end(undefined);
|
||||
} catch (e: any) {
|
||||
reject(generateError(opts, e.message));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
2
apps/server/src/services/search/search_context.ts
Normal file
2
apps/server/src/services/search/search_context.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
import { SearchContext } from "@triliumnext/core";
|
||||
export default SearchContext;
|
||||
2
apps/server/src/services/search/services/search.ts
Normal file
2
apps/server/src/services/search/services/search.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
import { search } from "@triliumnext/core";
|
||||
export default search;
|
||||
@@ -3,7 +3,7 @@ import log from "./log.js";
|
||||
import sqlInit from "./sql_init.js";
|
||||
import optionService from "./options.js";
|
||||
import syncOptions from "./sync_options.js";
|
||||
import request from "./request.js";
|
||||
import { request } from "@triliumnext/core";
|
||||
import appInfo from "./app_info.js";
|
||||
import { timeLimit } from "./utils.js";
|
||||
import becca from "../becca/becca.js";
|
||||
|
||||
@@ -1,464 +1,2 @@
|
||||
import type { EntityChange, EntityChangeRecord, EntityRow } from "@triliumnext/commons";
|
||||
import { becca_loader, binary_utils, entity_constructor, getInstanceId } from "@triliumnext/core";
|
||||
|
||||
import becca from "../becca/becca.js";
|
||||
import appInfo from "./app_info.js";
|
||||
import cls from "./cls.js";
|
||||
import consistency_checks from "./consistency_checks.js";
|
||||
import contentHashService from "./content_hash.js";
|
||||
import dateUtils from "./date_utils.js";
|
||||
import entityChangesService from "./entity_changes.js";
|
||||
import log from "./log.js";
|
||||
import optionService from "./options.js";
|
||||
import request from "./request.js";
|
||||
import type { CookieJar, ExecOpts } from "./request_interface.js";
|
||||
import setupService from "./setup.js";
|
||||
import sql from "./sql.js";
|
||||
import syncMutexService from "./sync_mutex.js";
|
||||
import syncOptions from "./sync_options.js";
|
||||
import syncUpdateService from "./sync_update.js";
|
||||
import { hmac, randomString, timeLimit } from "./utils.js";
|
||||
import ws from "./ws.js";
|
||||
|
||||
let proxyToggle = true;
|
||||
|
||||
let outstandingPullCount = 0;
|
||||
|
||||
interface CheckResponse {
|
||||
maxEntityChangeId: number;
|
||||
entityHashes: Record<string, Record<string, string>>;
|
||||
}
|
||||
|
||||
interface SyncResponse {
|
||||
instanceId: string;
|
||||
maxEntityChangeId: number;
|
||||
}
|
||||
|
||||
interface ChangesResponse {
|
||||
entityChanges: EntityChangeRecord[];
|
||||
lastEntityChangeId: number;
|
||||
outstandingPullCount: number;
|
||||
}
|
||||
|
||||
interface SyncContext {
|
||||
cookieJar: CookieJar;
|
||||
instanceId?: string;
|
||||
}
|
||||
|
||||
async function sync() {
|
||||
try {
|
||||
return await syncMutexService.doExclusively(async () => {
|
||||
if (!syncOptions.isSyncSetup()) {
|
||||
return { success: false, errorCode: "NOT_CONFIGURED", message: "Sync not configured" };
|
||||
}
|
||||
|
||||
let continueSync = false;
|
||||
|
||||
do {
|
||||
const syncContext = await login();
|
||||
|
||||
await pushChanges(syncContext);
|
||||
|
||||
await pullChanges(syncContext);
|
||||
|
||||
await pushChanges(syncContext);
|
||||
|
||||
await syncFinished(syncContext);
|
||||
|
||||
continueSync = await checkContentHash(syncContext);
|
||||
} while (continueSync);
|
||||
|
||||
ws.syncFinished();
|
||||
|
||||
return {
|
||||
success: true
|
||||
};
|
||||
});
|
||||
} catch (e: any) {
|
||||
// we're dynamically switching whether we're using proxy or not based on whether we encountered error with the current method
|
||||
proxyToggle = !proxyToggle;
|
||||
|
||||
if (
|
||||
e.message?.includes("ECONNREFUSED") ||
|
||||
e.message?.includes("ERR_") || // node network errors
|
||||
e.message?.includes("Bad Gateway")
|
||||
) {
|
||||
ws.syncFailed();
|
||||
|
||||
log.info("No connection to sync server.");
|
||||
|
||||
return {
|
||||
success: false,
|
||||
message: "No connection to sync server."
|
||||
};
|
||||
}
|
||||
log.info(`Sync failed: '${e.message}', stack: ${e.stack}`);
|
||||
|
||||
ws.syncFailed();
|
||||
|
||||
return {
|
||||
success: false,
|
||||
message: e.message
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
async function login() {
|
||||
if (!(await setupService.hasSyncServerSchemaAndSeed())) {
|
||||
await setupService.sendSeedToSyncServer();
|
||||
}
|
||||
|
||||
return await doLogin();
|
||||
}
|
||||
|
||||
async function doLogin(): Promise<SyncContext> {
|
||||
const timestamp = dateUtils.utcNowDateTime();
|
||||
|
||||
const documentSecret = optionService.getOption("documentSecret");
|
||||
const hash = hmac(documentSecret, timestamp);
|
||||
|
||||
const syncContext: SyncContext = { cookieJar: {} };
|
||||
const resp = await syncRequest<SyncResponse>(syncContext, "POST", "/api/login/sync", {
|
||||
timestamp,
|
||||
syncVersion: appInfo.syncVersion,
|
||||
hash
|
||||
});
|
||||
|
||||
if (!resp) {
|
||||
throw new Error("Got no response.");
|
||||
}
|
||||
|
||||
if (resp.instanceId === getInstanceId()) {
|
||||
throw new Error(
|
||||
`Sync server has instance ID '${resp.instanceId}' which is also local. This usually happens when the sync client is (mis)configured to sync with itself (URL points back to client) instead of the correct sync server.`
|
||||
);
|
||||
}
|
||||
|
||||
syncContext.instanceId = resp.instanceId;
|
||||
|
||||
const lastSyncedPull = getLastSyncedPull();
|
||||
|
||||
// this is important in a scenario where we set up the sync by manually copying the document
|
||||
// lastSyncedPull then could be pretty off for the newly cloned client
|
||||
if (lastSyncedPull > resp.maxEntityChangeId) {
|
||||
log.info(`Lowering last synced pull from ${lastSyncedPull} to ${resp.maxEntityChangeId}`);
|
||||
|
||||
setLastSyncedPull(resp.maxEntityChangeId);
|
||||
}
|
||||
|
||||
return syncContext;
|
||||
}
|
||||
|
||||
async function pullChanges(syncContext: SyncContext) {
|
||||
while (true) {
|
||||
const lastSyncedPull = getLastSyncedPull();
|
||||
const logMarkerId = randomString(10); // to easily pair sync events between client and server logs
|
||||
const changesUri = `/api/sync/changed?instanceId=${getInstanceId()}&lastEntityChangeId=${lastSyncedPull}&logMarkerId=${logMarkerId}`;
|
||||
|
||||
const startDate = Date.now();
|
||||
|
||||
const resp = await syncRequest<ChangesResponse>(syncContext, "GET", changesUri);
|
||||
if (!resp) {
|
||||
throw new Error("Request failed.");
|
||||
}
|
||||
const { entityChanges, lastEntityChangeId } = resp;
|
||||
|
||||
outstandingPullCount = resp.outstandingPullCount;
|
||||
|
||||
const pulledDate = Date.now();
|
||||
|
||||
sql.transactional(() => {
|
||||
if (syncContext.instanceId) {
|
||||
syncUpdateService.updateEntities(entityChanges, syncContext.instanceId);
|
||||
}
|
||||
|
||||
if (lastSyncedPull !== lastEntityChangeId) {
|
||||
setLastSyncedPull(lastEntityChangeId);
|
||||
}
|
||||
});
|
||||
|
||||
if (entityChanges.length === 0) {
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
// https://github.com/zadam/trilium/issues/4310
|
||||
const sizeInKb = Math.round(JSON.stringify(resp).length / 1024);
|
||||
|
||||
log.info(
|
||||
`Sync ${logMarkerId}: Pulled ${entityChanges.length} changes in ${sizeInKb} KB, starting at entityChangeId=${lastSyncedPull} in ${pulledDate - startDate}ms and applied them in ${Date.now() - pulledDate}ms, ${outstandingPullCount} outstanding pulls`
|
||||
);
|
||||
} catch (e: any) {
|
||||
log.error(`Error occurred ${e.message} ${e.stack}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Finished pull");
|
||||
}
|
||||
|
||||
async function pushChanges(syncContext: SyncContext) {
|
||||
let lastSyncedPush: number | null | undefined = getLastSyncedPush();
|
||||
|
||||
while (true) {
|
||||
const entityChanges = sql.getRows<EntityChange>("SELECT * FROM entity_changes WHERE isSynced = 1 AND id > ? LIMIT 1000", [lastSyncedPush]);
|
||||
|
||||
if (entityChanges.length === 0) {
|
||||
log.info("Nothing to push");
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
const filteredEntityChanges = entityChanges.filter((entityChange) => {
|
||||
if (entityChange.instanceId === syncContext.instanceId) {
|
||||
// this may set lastSyncedPush beyond what's actually sent (because of size limit)
|
||||
// so this is applied to the database only if there's no actual update
|
||||
lastSyncedPush = entityChange.id;
|
||||
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
||||
});
|
||||
|
||||
if (filteredEntityChanges.length === 0 && lastSyncedPush) {
|
||||
// there still might be more sync changes (because of batch limit), just all the current batch
|
||||
// has been filtered out
|
||||
setLastSyncedPush(lastSyncedPush);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
const entityChangesRecords = getEntityChangeRecords(filteredEntityChanges);
|
||||
const startDate = new Date();
|
||||
|
||||
const logMarkerId = randomString(10); // to easily pair sync events between client and server logs
|
||||
|
||||
await syncRequest(syncContext, "PUT", `/api/sync/update?logMarkerId=${logMarkerId}`, {
|
||||
entities: entityChangesRecords,
|
||||
instanceId: getInstanceId()
|
||||
});
|
||||
|
||||
ws.syncPushInProgress();
|
||||
|
||||
log.info(`Sync ${logMarkerId}: Pushing ${entityChangesRecords.length} sync changes in ${Date.now() - startDate.getTime()}ms`);
|
||||
|
||||
lastSyncedPush = entityChangesRecords[entityChangesRecords.length - 1].entityChange.id;
|
||||
|
||||
if (lastSyncedPush) {
|
||||
setLastSyncedPush(lastSyncedPush);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function syncFinished(syncContext: SyncContext) {
|
||||
await syncRequest(syncContext, "POST", "/api/sync/finished");
|
||||
}
|
||||
|
||||
async function checkContentHash(syncContext: SyncContext) {
|
||||
const resp = await syncRequest<CheckResponse>(syncContext, "GET", "/api/sync/check");
|
||||
if (!resp) {
|
||||
throw new Error("Got no response.");
|
||||
}
|
||||
|
||||
const lastSyncedPullId = getLastSyncedPull();
|
||||
|
||||
if (lastSyncedPullId < resp.maxEntityChangeId) {
|
||||
log.info(`There are some outstanding pulls (${lastSyncedPullId} vs. ${resp.maxEntityChangeId}), skipping content check.`);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const notPushedSyncs = sql.getValue("SELECT EXISTS(SELECT 1 FROM entity_changes WHERE isSynced = 1 AND id > ?)", [getLastSyncedPush()]);
|
||||
|
||||
if (notPushedSyncs) {
|
||||
log.info(`There's ${notPushedSyncs} outstanding pushes, skipping content check.`);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const failedChecks = contentHashService.checkContentHashes(resp.entityHashes);
|
||||
|
||||
if (failedChecks.length > 0) {
|
||||
// before re-queuing sectors, make sure the entity changes are correct
|
||||
consistency_checks.runEntityChangesChecks();
|
||||
|
||||
await syncRequest(syncContext, "POST", `/api/sync/check-entity-changes`);
|
||||
}
|
||||
|
||||
for (const { entityName, sector } of failedChecks) {
|
||||
entityChangesService.addEntityChangesForSector(entityName, sector);
|
||||
|
||||
await syncRequest(syncContext, "POST", `/api/sync/queue-sector/${entityName}/${sector}`);
|
||||
}
|
||||
|
||||
return failedChecks.length > 0;
|
||||
}
|
||||
|
||||
const PAGE_SIZE = 1000000;
|
||||
|
||||
interface SyncContext {
|
||||
cookieJar: CookieJar;
|
||||
}
|
||||
|
||||
async function syncRequest<T extends {}>(syncContext: SyncContext, method: string, requestPath: string, _body?: {}) {
|
||||
const body = _body ? JSON.stringify(_body) : "";
|
||||
|
||||
const timeout = syncOptions.getSyncTimeout();
|
||||
|
||||
let response;
|
||||
|
||||
const requestId = randomString(10);
|
||||
const pageCount = Math.max(1, Math.ceil(body.length / PAGE_SIZE));
|
||||
|
||||
for (let pageIndex = 0; pageIndex < pageCount; pageIndex++) {
|
||||
const opts: ExecOpts = {
|
||||
method,
|
||||
url: syncOptions.getSyncServerHost() + requestPath,
|
||||
cookieJar: syncContext.cookieJar,
|
||||
timeout,
|
||||
paging: {
|
||||
pageIndex,
|
||||
pageCount,
|
||||
requestId
|
||||
},
|
||||
body: body.substr(pageIndex * PAGE_SIZE, Math.min(PAGE_SIZE, body.length - pageIndex * PAGE_SIZE)),
|
||||
proxy: proxyToggle ? syncOptions.getSyncProxy() : null
|
||||
};
|
||||
|
||||
response = (await timeLimit(request.exec(opts), timeout)) as T;
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
function getEntityChangeRow(entityChange: EntityChange) {
|
||||
const { entityName, entityId } = entityChange;
|
||||
|
||||
if (entityName === "note_reordering") {
|
||||
return sql.getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [entityId]);
|
||||
}
|
||||
const primaryKey = entity_constructor.getEntityFromEntityName(entityName).primaryKeyName;
|
||||
|
||||
if (!primaryKey) {
|
||||
throw new Error(`Unknown entity for entity change ${JSON.stringify(entityChange)}`);
|
||||
}
|
||||
|
||||
const entityRow = sql.getRow<EntityRow>(/*sql*/`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]);
|
||||
|
||||
if (!entityRow) {
|
||||
log.error(`Cannot find entity for entity change ${JSON.stringify(entityChange)}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (entityName === "blobs" && entityRow.content !== null) {
|
||||
if (typeof entityRow.content === "string") {
|
||||
entityRow.content = Buffer.from(entityRow.content, "utf-8");
|
||||
}
|
||||
|
||||
if (entityRow.content) {
|
||||
entityRow.content = binary_utils.encodeBase64(entityRow.content);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return entityRow;
|
||||
|
||||
}
|
||||
|
||||
function getEntityChangeRecords(entityChanges: EntityChange[]) {
|
||||
const records: EntityChangeRecord[] = [];
|
||||
let length = 0;
|
||||
|
||||
for (const entityChange of entityChanges) {
|
||||
if (entityChange.isErased) {
|
||||
records.push({ entityChange });
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
const entity = getEntityChangeRow(entityChange);
|
||||
if (!entity) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const record: EntityChangeRecord = { entityChange, entity };
|
||||
|
||||
records.push(record);
|
||||
|
||||
length += JSON.stringify(record).length;
|
||||
|
||||
if (length > 1_000_000) {
|
||||
// each sync request/response should have at most ~1 MB.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return records;
|
||||
}
|
||||
|
||||
function getLastSyncedPull() {
|
||||
return parseInt(optionService.getOption("lastSyncedPull"));
|
||||
}
|
||||
|
||||
function setLastSyncedPull(entityChangeId: number) {
|
||||
const lastSyncedPullOption = becca.getOption("lastSyncedPull");
|
||||
|
||||
if (lastSyncedPullOption) {
|
||||
// might be null in initial sync when becca is not loaded
|
||||
lastSyncedPullOption.value = `${entityChangeId}`;
|
||||
}
|
||||
|
||||
// this way we avoid updating entity_changes which otherwise means that we've never pushed all entity_changes
|
||||
sql.execute("UPDATE options SET value = ? WHERE name = ?", [entityChangeId, "lastSyncedPull"]);
|
||||
}
|
||||
|
||||
function getLastSyncedPush() {
|
||||
const lastSyncedPush = parseInt(optionService.getOption("lastSyncedPush"));
|
||||
|
||||
ws.setLastSyncedPush(lastSyncedPush);
|
||||
|
||||
return lastSyncedPush;
|
||||
}
|
||||
|
||||
function setLastSyncedPush(entityChangeId: number) {
|
||||
ws.setLastSyncedPush(entityChangeId);
|
||||
|
||||
const lastSyncedPushOption = becca.getOption("lastSyncedPush");
|
||||
|
||||
if (lastSyncedPushOption) {
|
||||
// might be null in initial sync when becca is not loaded
|
||||
lastSyncedPushOption.value = `${entityChangeId}`;
|
||||
}
|
||||
|
||||
// this way we avoid updating entity_changes which otherwise means that we've never pushed all entity_changes
|
||||
sql.execute("UPDATE options SET value = ? WHERE name = ?", [entityChangeId, "lastSyncedPush"]);
|
||||
}
|
||||
|
||||
function getMaxEntityChangeId() {
|
||||
return sql.getValue("SELECT COALESCE(MAX(id), 0) FROM entity_changes");
|
||||
}
|
||||
|
||||
function getOutstandingPullCount() {
|
||||
return outstandingPullCount;
|
||||
}
|
||||
|
||||
export function startSyncTimer() {
|
||||
becca_loader.beccaLoaded.then(() => {
|
||||
setInterval(cls.wrap(sync), 60000);
|
||||
|
||||
// kickoff initial sync immediately, but should happen after initial consistency checks
|
||||
setTimeout(cls.wrap(sync), 5000);
|
||||
|
||||
// called just so ws.setLastSyncedPush() is called
|
||||
getLastSyncedPush();
|
||||
});
|
||||
}
|
||||
|
||||
export default {
|
||||
sync,
|
||||
login,
|
||||
getEntityChangeRecords,
|
||||
getOutstandingPullCount,
|
||||
getMaxEntityChangeId
|
||||
};
|
||||
import { sync } from "@triliumnext/core";
|
||||
export default sync;
|
||||
|
||||
@@ -1,34 +1,2 @@
|
||||
"use strict";
|
||||
|
||||
import optionService from "./options.js";
|
||||
import config from "./config.js";
|
||||
import { normalizeUrl } from "./utils.js";
|
||||
|
||||
/*
|
||||
* Primary configuration for sync is in the options (document), but we allow to override
|
||||
* these settings in config file. The reason for that is to avoid a mistake of loading a live/production
|
||||
* document with live sync settings in a dev/debug environment. Changes would then successfully propagate
|
||||
* to live sync server.
|
||||
*/
|
||||
|
||||
function get(name: keyof typeof config.Sync) {
|
||||
return (config["Sync"] && config["Sync"][name]) || optionService.getOption(name);
|
||||
}
|
||||
|
||||
export default {
|
||||
// env variable is the easiest way to guarantee we won't overwrite prod data during development
|
||||
// after copying prod document/data directory
|
||||
getSyncServerHost: () => {
|
||||
const host = get("syncServerHost");
|
||||
return host ? normalizeUrl(host) : host;
|
||||
},
|
||||
isSyncSetup: () => {
|
||||
const syncServerHost = get("syncServerHost");
|
||||
|
||||
// special value "disabled" is here to support a use case where the document is configured with sync server,
|
||||
// and we need to override it with config from config.ini
|
||||
return !!syncServerHost && syncServerHost !== "disabled";
|
||||
},
|
||||
getSyncTimeout: () => parseInt(get("syncServerTimeout")) || 120000,
|
||||
getSyncProxy: () => get("syncProxy")
|
||||
};
|
||||
import { sync_options } from "@triliumnext/core";
|
||||
export default sync_options;
|
||||
|
||||
@@ -1,170 +1,2 @@
|
||||
import type { EntityChange, EntityChangeRecord, EntityRow } from "@triliumnext/commons";
|
||||
import { entity_constructor, events as eventService } from "@triliumnext/core";
|
||||
|
||||
import entityChangesService from "./entity_changes.js";
|
||||
import log from "./log.js";
|
||||
import sql from "./sql.js";
|
||||
import ws from "./ws.js";
|
||||
|
||||
interface UpdateContext {
|
||||
alreadyErased: number;
|
||||
erased: number;
|
||||
updated: Record<string, string[]>;
|
||||
}
|
||||
|
||||
function updateEntities(entityChanges: EntityChangeRecord[], instanceId: string) {
|
||||
if (entityChanges.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
let atLeastOnePullApplied = false;
|
||||
const updateContext = {
|
||||
updated: {},
|
||||
alreadyUpdated: 0,
|
||||
erased: 0,
|
||||
alreadyErased: 0
|
||||
};
|
||||
|
||||
for (const { entityChange, entity } of entityChanges) {
|
||||
const changeAppliedAlready = entityChange.changeId && !!sql.getValue("SELECT 1 FROM entity_changes WHERE changeId = ?", [entityChange.changeId]);
|
||||
|
||||
if (changeAppliedAlready) {
|
||||
updateContext.alreadyUpdated++;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!atLeastOnePullApplied) {
|
||||
// avoid spamming and send only for first
|
||||
ws.syncPullInProgress();
|
||||
|
||||
atLeastOnePullApplied = true;
|
||||
}
|
||||
|
||||
updateEntity(entityChange, entity, instanceId, updateContext);
|
||||
}
|
||||
|
||||
logUpdateContext(updateContext);
|
||||
}
|
||||
|
||||
function updateEntity(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string, updateContext: UpdateContext) {
|
||||
if (!remoteEntityRow && remoteEC.entityName === "options") {
|
||||
return; // can be undefined for options with isSynced=false
|
||||
}
|
||||
|
||||
const updated = remoteEC.entityName === "note_reordering"
|
||||
? updateNoteReordering(remoteEC, remoteEntityRow, instanceId)
|
||||
: updateNormalEntity(remoteEC, remoteEntityRow, instanceId, updateContext);
|
||||
|
||||
if (updated) {
|
||||
if (remoteEntityRow?.isDeleted) {
|
||||
eventService.emit(eventService.ENTITY_DELETE_SYNCED, {
|
||||
entityName: remoteEC.entityName,
|
||||
entityId: remoteEC.entityId
|
||||
});
|
||||
} else if (!remoteEC.isErased) {
|
||||
eventService.emit(eventService.ENTITY_CHANGE_SYNCED, {
|
||||
entityName: remoteEC.entityName,
|
||||
entityRow: remoteEntityRow
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function updateNormalEntity(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string, updateContext: UpdateContext) {
|
||||
const localEC = sql.getRow<EntityChange | undefined>(/*sql*/`SELECT * FROM entity_changes WHERE entityName = ? AND entityId = ?`, [remoteEC.entityName, remoteEC.entityId]);
|
||||
const localECIsOlderOrSameAsRemote = localEC && localEC.utcDateChanged && remoteEC.utcDateChanged && localEC.utcDateChanged <= remoteEC.utcDateChanged;
|
||||
|
||||
if (!localEC || localECIsOlderOrSameAsRemote) {
|
||||
if (remoteEC.isErased) {
|
||||
if (localEC?.isErased) {
|
||||
eraseEntity(remoteEC); // make sure it's erased anyway
|
||||
updateContext.alreadyErased++;
|
||||
} else {
|
||||
eraseEntity(remoteEC);
|
||||
updateContext.erased++;
|
||||
}
|
||||
} else {
|
||||
if (!remoteEntityRow) {
|
||||
throw new Error(`Empty entity row for: ${JSON.stringify(remoteEC)}`);
|
||||
}
|
||||
|
||||
preProcessContent(remoteEC, remoteEntityRow);
|
||||
|
||||
sql.replace(remoteEC.entityName, remoteEntityRow);
|
||||
|
||||
updateContext.updated[remoteEC.entityName] = updateContext.updated[remoteEC.entityName] || [];
|
||||
updateContext.updated[remoteEC.entityName].push(remoteEC.entityId);
|
||||
}
|
||||
|
||||
if (!localEC || localECIsOlderOrSameAsRemote || localEC.hash !== remoteEC.hash || localEC.isErased !== remoteEC.isErased) {
|
||||
entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId);
|
||||
}
|
||||
|
||||
return true;
|
||||
} else if ((localEC.hash !== remoteEC.hash || localEC.isErased !== remoteEC.isErased) && !localECIsOlderOrSameAsRemote) {
|
||||
// the change on our side is newer than on the other side, so the other side should update
|
||||
entityChangesService.putEntityChangeForOtherInstances(localEC);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function preProcessContent(remoteEC: EntityChange, remoteEntityRow: EntityRow) {
|
||||
if (remoteEC.entityName === "blobs" && remoteEntityRow.content !== null) {
|
||||
// we always use a Buffer object which is different from normal saving - there we use a simple string type for
|
||||
// "string notes". The problem is that in general, it's not possible to detect whether a blob content
|
||||
// is string note or note (syncs can arrive out of order)
|
||||
if (typeof remoteEntityRow.content === "string") {
|
||||
remoteEntityRow.content = Buffer.from(remoteEntityRow.content, "base64");
|
||||
|
||||
if (remoteEntityRow.content.byteLength === 0) {
|
||||
// there seems to be a bug which causes empty buffer to be stored as NULL which is then picked up as inconsistency
|
||||
// (possibly not a problem anymore with the newer better-sqlite3)
|
||||
remoteEntityRow.content = "";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function updateNoteReordering(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string) {
|
||||
if (!remoteEntityRow) {
|
||||
throw new Error(`Empty note_reordering body for: ${JSON.stringify(remoteEC)}`);
|
||||
}
|
||||
|
||||
for (const key in remoteEntityRow) {
|
||||
sql.execute("UPDATE branches SET notePosition = ? WHERE branchId = ?", [remoteEntityRow[key as keyof EntityRow], key]);
|
||||
}
|
||||
|
||||
entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
function eraseEntity(entityChange: EntityChange) {
|
||||
const { entityName, entityId } = entityChange;
|
||||
|
||||
const entityNames = ["notes", "branches", "attributes", "revisions", "attachments", "blobs"];
|
||||
|
||||
if (!entityNames.includes(entityName)) {
|
||||
log.error(`Cannot erase ${entityName} '${entityId}'.`);
|
||||
return;
|
||||
}
|
||||
|
||||
const primaryKeyName = entity_constructor.getEntityFromEntityName(entityName).primaryKeyName;
|
||||
|
||||
sql.execute(/*sql*/`DELETE FROM ${entityName} WHERE ${primaryKeyName} = ?`, [entityId]);
|
||||
}
|
||||
|
||||
function logUpdateContext(updateContext: UpdateContext) {
|
||||
const message = JSON.stringify(updateContext).replaceAll('"', "").replaceAll(":", ": ").replaceAll(",", ", ");
|
||||
|
||||
log.info(message.substr(1, message.length - 2));
|
||||
}
|
||||
|
||||
export default {
|
||||
updateEntities
|
||||
};
|
||||
import { sync_update } from "@triliumnext/core";
|
||||
export default sync_update;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { utils as coreUtils } from "@triliumnext/core";
|
||||
import { getCrypto,utils as coreUtils } from "@triliumnext/core";
|
||||
import chardet from "chardet";
|
||||
import crypto from "crypto";
|
||||
import { t } from "i18next";
|
||||
@@ -49,10 +49,8 @@ export function fromBase64(encodedText: string) {
|
||||
return Buffer.from(encodedText, "base64");
|
||||
}
|
||||
|
||||
export function hmac(secret: any, value: any) {
|
||||
const hmac = crypto.createHmac("sha256", Buffer.from(secret.toString(), "ascii"));
|
||||
hmac.update(value.toString());
|
||||
return hmac.digest("base64");
|
||||
export function hmac(secret: string | Uint8Array, value: string | Uint8Array) {
|
||||
return getCrypto().hmac(secret, value);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -85,10 +83,6 @@ export function constantTimeCompare(a: string | null | undefined, b: string | nu
|
||||
return crypto.timingSafeEqual(bufA, bufB);
|
||||
}
|
||||
|
||||
export function sanitizeSqlIdentifier(str: string) {
|
||||
return str.replace(/[^A-Za-z0-9_]/g, "");
|
||||
}
|
||||
|
||||
export function toObject<T, K extends string | number | symbol, V>(array: T[], fn: (item: T) => [K, V]): Record<K, V> {
|
||||
const obj: Record<K, V> = {} as Record<K, V>; // TODO: unsafe?
|
||||
|
||||
@@ -172,35 +166,6 @@ export function getNoteTitle(filePath: string, replaceUnderscoresWithSpaces: boo
|
||||
return replaceUnderscoresWithSpaces ? basename.replace(/_/g, " ").trim() : basename;
|
||||
}
|
||||
|
||||
export function timeLimit<T>(promise: Promise<T>, limitMs: number, errorMessage?: string): Promise<T> {
|
||||
// TriliumNextTODO: since TS avoids this from ever happening – do we need this check?
|
||||
if (!promise || !promise.then) {
|
||||
// it's not actually a promise
|
||||
return promise;
|
||||
}
|
||||
|
||||
// better stack trace if created outside of promise
|
||||
const errorTimeLimit = new Error(errorMessage || `Process exceeded time limit ${limitMs}`);
|
||||
|
||||
return new Promise((res, rej) => {
|
||||
let resolved = false;
|
||||
|
||||
promise
|
||||
.then((result) => {
|
||||
resolved = true;
|
||||
|
||||
res(result);
|
||||
})
|
||||
.catch((error) => rej(error));
|
||||
|
||||
setTimeout(() => {
|
||||
if (!resolved) {
|
||||
rej(errorTimeLimit);
|
||||
}
|
||||
}, limitMs);
|
||||
});
|
||||
}
|
||||
|
||||
/** @deprecated */
|
||||
export function removeDiacritic(str: string) {
|
||||
return coreUtils.removeDiacritic(str);
|
||||
@@ -337,36 +302,6 @@ export function processStringOrBuffer(data: string | Buffer | null) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalizes URL by removing trailing slashes and fixing double slashes.
|
||||
* Preserves the protocol (http://, https://) but removes trailing slashes from the rest.
|
||||
*
|
||||
* @param url The URL to normalize
|
||||
* @returns The normalized URL without trailing slashes
|
||||
*/
|
||||
export function normalizeUrl(url: string | null | undefined): string | null | undefined {
|
||||
if (!url || typeof url !== 'string') {
|
||||
return url;
|
||||
}
|
||||
|
||||
// Trim whitespace
|
||||
url = url.trim();
|
||||
|
||||
if (!url) {
|
||||
return url;
|
||||
}
|
||||
|
||||
// Fix double slashes (except in protocol) first
|
||||
url = url.replace(/([^:]\/)\/+/g, '$1');
|
||||
|
||||
// Remove trailing slash, but preserve protocol
|
||||
if (url.endsWith('/') && !url.match(/^https?:\/\/$/)) {
|
||||
url = url.slice(0, -1);
|
||||
}
|
||||
|
||||
return url;
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalizes a path pattern for custom request handlers.
|
||||
* Ensures both trailing slash and non-trailing slash versions are handled.
|
||||
@@ -455,6 +390,10 @@ export const randomSecureToken = coreUtils.randomSecureToken;
|
||||
export const safeExtractMessageAndStackFromError = coreUtils.safeExtractMessageAndStackFromError;
|
||||
/** @deprecated */
|
||||
export const isEmptyOrWhitespace = coreUtils.isEmptyOrWhitespace;
|
||||
/** @deprecated */
|
||||
export const normalizeUrl = coreUtils.normalizeUrl;
|
||||
export const timeLimit = coreUtils.timeLimit;
|
||||
export const sanitizeSqlIdentifier = coreUtils.sanitizeSqlIdentifier;
|
||||
|
||||
export function waitForStreamToFinish(stream: any): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
@@ -487,7 +426,6 @@ export default {
|
||||
newEntityId,
|
||||
normalize,
|
||||
normalizeCustomHandlerPattern,
|
||||
normalizeUrl,
|
||||
quoteRegex,
|
||||
randomSecureToken,
|
||||
randomString,
|
||||
@@ -495,10 +433,8 @@ export default {
|
||||
removeFileExtension,
|
||||
replaceAll,
|
||||
safeExtractMessageAndStackFromError,
|
||||
sanitizeSqlIdentifier,
|
||||
stripTags,
|
||||
slugify,
|
||||
timeLimit,
|
||||
toBase64,
|
||||
toMap,
|
||||
toObject,
|
||||
|
||||
@@ -1,254 +1,2 @@
|
||||
import { type EntityChange,WebSocketMessage } from "@triliumnext/commons";
|
||||
import { AbstractBeccaEntity } from "@triliumnext/core";
|
||||
import type { IncomingMessage, Server as HttpServer } from "http";
|
||||
import { WebSocket,WebSocketServer } from "ws";
|
||||
|
||||
import becca from "../becca/becca.js";
|
||||
import cls from "./cls.js";
|
||||
import config from "./config.js";
|
||||
import log from "./log.js";
|
||||
import protectedSessionService from "./protected_session.js";
|
||||
import sql from "./sql.js";
|
||||
import syncMutexService from "./sync_mutex.js";
|
||||
import { isElectron, randomString } from "./utils.js";
|
||||
|
||||
let webSocketServer!: WebSocketServer;
|
||||
let lastSyncedPush: number;
|
||||
|
||||
type SessionParser = (req: IncomingMessage, params: {}, cb: () => void) => void;
|
||||
function init(httpServer: HttpServer, sessionParser: SessionParser) {
|
||||
webSocketServer = new WebSocketServer({
|
||||
verifyClient: (info, done) => {
|
||||
sessionParser(info.req, {}, () => {
|
||||
const allowed = isElectron || (info.req as any).session.loggedIn || (config.General && config.General.noAuthentication);
|
||||
|
||||
if (!allowed) {
|
||||
log.error("WebSocket connection not allowed because session is neither electron nor logged in.");
|
||||
}
|
||||
|
||||
done(allowed);
|
||||
});
|
||||
},
|
||||
server: httpServer
|
||||
});
|
||||
|
||||
webSocketServer.on("connection", (ws, req) => {
|
||||
(ws as any).id = randomString(10);
|
||||
|
||||
console.log(`websocket client connected`);
|
||||
|
||||
ws.on("message", async (messageJson) => {
|
||||
const message = JSON.parse(messageJson as any);
|
||||
|
||||
if (message.type === "log-error") {
|
||||
log.info(`JS Error: ${message.error}\r
|
||||
Stack: ${message.stack}`);
|
||||
} else if (message.type === "log-info") {
|
||||
log.info(`JS Info: ${message.info}`);
|
||||
} else if (message.type === "ping") {
|
||||
await syncMutexService.doExclusively(() => sendPing(ws));
|
||||
} else {
|
||||
log.error("Unrecognized message: ");
|
||||
log.error(message);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
webSocketServer.on("error", (error) => {
|
||||
// https://github.com/zadam/trilium/issues/3374#issuecomment-1341053765
|
||||
console.log(error);
|
||||
});
|
||||
}
|
||||
|
||||
function sendMessage(client: WebSocket, message: WebSocketMessage) {
|
||||
const jsonStr = JSON.stringify(message);
|
||||
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(jsonStr);
|
||||
}
|
||||
}
|
||||
|
||||
function sendMessageToAllClients(message: WebSocketMessage) {
|
||||
const jsonStr = JSON.stringify(message);
|
||||
|
||||
if (webSocketServer) {
|
||||
if (message.type !== "sync-failed" && message.type !== "api-log-messages") {
|
||||
log.info(`Sending message to all clients: ${jsonStr}`);
|
||||
}
|
||||
|
||||
webSocketServer.clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(jsonStr);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function fillInAdditionalProperties(entityChange: EntityChange) {
|
||||
if (entityChange.isErased) {
|
||||
return;
|
||||
}
|
||||
|
||||
// fill in some extra data needed by the frontend
|
||||
// first try to use becca, which works for non-deleted entities
|
||||
// only when that fails, try to load from the database
|
||||
if (entityChange.entityName === "attributes") {
|
||||
entityChange.entity = becca.getAttribute(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM attributes WHERE attributeId = ?`, [entityChange.entityId]);
|
||||
}
|
||||
} else if (entityChange.entityName === "branches") {
|
||||
entityChange.entity = becca.getBranch(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM branches WHERE branchId = ?`, [entityChange.entityId]);
|
||||
}
|
||||
} else if (entityChange.entityName === "notes") {
|
||||
entityChange.entity = becca.getNote(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM notes WHERE noteId = ?`, [entityChange.entityId]);
|
||||
|
||||
if (entityChange.entity?.isProtected) {
|
||||
entityChange.entity.title = protectedSessionService.decryptString(entityChange.entity.title || "");
|
||||
}
|
||||
}
|
||||
} else if (entityChange.entityName === "revisions") {
|
||||
entityChange.noteId = sql.getValue<string>(
|
||||
/*sql*/`SELECT noteId
|
||||
FROM revisions
|
||||
WHERE revisionId = ?`,
|
||||
[entityChange.entityId]
|
||||
);
|
||||
} else if (entityChange.entityName === "note_reordering") {
|
||||
entityChange.positions = {};
|
||||
|
||||
const parentNote = becca.getNote(entityChange.entityId);
|
||||
|
||||
if (parentNote) {
|
||||
for (const childBranch of parentNote.getChildBranches()) {
|
||||
if (childBranch?.branchId) {
|
||||
entityChange.positions[childBranch.branchId] = childBranch.notePosition;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (entityChange.entityName === "options") {
|
||||
entityChange.entity = becca.getOption(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM options WHERE name = ?`, [entityChange.entityId]);
|
||||
}
|
||||
} else if (entityChange.entityName === "attachments") {
|
||||
entityChange.entity = becca.getAttachment(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(
|
||||
/*sql*/`SELECT attachments.*, LENGTH(blobs.content) AS contentLength
|
||||
FROM attachments
|
||||
JOIN blobs USING (blobId)
|
||||
WHERE attachmentId = ?`,
|
||||
[entityChange.entityId]
|
||||
);
|
||||
|
||||
if (entityChange.entity?.isProtected) {
|
||||
entityChange.entity.title = protectedSessionService.decryptString(entityChange.entity.title || "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (entityChange.entity instanceof AbstractBeccaEntity) {
|
||||
entityChange.entity = entityChange.entity.getPojo();
|
||||
}
|
||||
}
|
||||
|
||||
// entities with higher number can reference the entities with lower number
|
||||
const ORDERING: Record<string, number> = {
|
||||
etapi_tokens: 0,
|
||||
attributes: 2,
|
||||
branches: 2,
|
||||
blobs: 0,
|
||||
note_reordering: 2,
|
||||
revisions: 2,
|
||||
attachments: 3,
|
||||
notes: 1,
|
||||
options: 0,
|
||||
};
|
||||
|
||||
function sendPing(client: WebSocket, entityChangeIds = []) {
|
||||
if (entityChangeIds.length === 0) {
|
||||
sendMessage(client, { type: "ping" });
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const entityChanges = sql.getManyRows<EntityChange>(/*sql*/`SELECT * FROM entity_changes WHERE id IN (???)`, entityChangeIds);
|
||||
if (!entityChanges) {
|
||||
return;
|
||||
}
|
||||
|
||||
// sort entity changes since froca expects "referential order", i.e. referenced entities should already exist
|
||||
// in froca.
|
||||
// Froca needs this since it is an incomplete copy, it can't create "skeletons" like becca.
|
||||
entityChanges.sort((a, b) => ORDERING[a.entityName] - ORDERING[b.entityName]);
|
||||
|
||||
for (const entityChange of entityChanges) {
|
||||
try {
|
||||
fillInAdditionalProperties(entityChange);
|
||||
} catch (e: any) {
|
||||
log.error(`Could not fill additional properties for entity change ${JSON.stringify(entityChange)} because of error: ${e.message}: ${e.stack}`);
|
||||
}
|
||||
}
|
||||
|
||||
sendMessage(client, {
|
||||
type: "frontend-update",
|
||||
data: {
|
||||
lastSyncedPush,
|
||||
entityChanges
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function sendTransactionEntityChangesToAllClients() {
|
||||
if (webSocketServer) {
|
||||
const entityChangeIds = cls.getAndClearEntityChangeIds();
|
||||
|
||||
webSocketServer.clients.forEach((client) => sendPing(client, entityChangeIds));
|
||||
}
|
||||
}
|
||||
|
||||
function syncPullInProgress() {
|
||||
sendMessageToAllClients({ type: "sync-pull-in-progress", lastSyncedPush });
|
||||
}
|
||||
|
||||
function syncPushInProgress() {
|
||||
sendMessageToAllClients({ type: "sync-push-in-progress", lastSyncedPush });
|
||||
}
|
||||
|
||||
function syncFinished() {
|
||||
sendMessageToAllClients({ type: "sync-finished", lastSyncedPush });
|
||||
}
|
||||
|
||||
function syncFailed() {
|
||||
sendMessageToAllClients({ type: "sync-failed", lastSyncedPush });
|
||||
}
|
||||
|
||||
function reloadFrontend(reason: string) {
|
||||
sendMessageToAllClients({ type: "reload-frontend", reason });
|
||||
}
|
||||
|
||||
function setLastSyncedPush(entityChangeId: number) {
|
||||
lastSyncedPush = entityChangeId;
|
||||
}
|
||||
|
||||
export default {
|
||||
init,
|
||||
sendMessageToAllClients,
|
||||
syncPushInProgress,
|
||||
syncPullInProgress,
|
||||
syncFinished,
|
||||
syncFailed,
|
||||
sendTransactionEntityChangesToAllClients,
|
||||
setLastSyncedPush,
|
||||
reloadFrontend
|
||||
};
|
||||
import { ws } from "@triliumnext/core";
|
||||
export default ws;
|
||||
|
||||
106
apps/server/src/services/ws_messaging_provider.ts
Normal file
106
apps/server/src/services/ws_messaging_provider.ts
Normal file
@@ -0,0 +1,106 @@
|
||||
import type { WebSocketMessage } from "@triliumnext/commons";
|
||||
import type { ClientMessageHandler, MessagingProvider } from "@triliumnext/core";
|
||||
import type { IncomingMessage, Server as HttpServer } from "http";
|
||||
import { WebSocket, WebSocketServer } from "ws";
|
||||
|
||||
import config from "./config.js";
|
||||
import log from "./log.js";
|
||||
import { isElectron, randomString } from "./utils.js";
|
||||
|
||||
type SessionParser = (req: IncomingMessage, params: {}, cb: () => void) => void;
|
||||
|
||||
/**
|
||||
* WebSocket-based implementation of MessagingProvider.
|
||||
*
|
||||
* Handles the raw WebSocket transport: server setup, connection management,
|
||||
* message serialization, and client tracking.
|
||||
*/
|
||||
export default class WebSocketMessagingProvider implements MessagingProvider {
|
||||
private webSocketServer!: WebSocketServer;
|
||||
private clientMap = new Map<string, WebSocket>();
|
||||
private clientMessageHandler?: ClientMessageHandler;
|
||||
|
||||
init(httpServer: HttpServer, sessionParser: SessionParser) {
|
||||
this.webSocketServer = new WebSocketServer({
|
||||
verifyClient: (info, done) => {
|
||||
sessionParser(info.req, {}, () => {
|
||||
const allowed = isElectron || (info.req as any).session.loggedIn || (config.General && config.General.noAuthentication);
|
||||
|
||||
if (!allowed) {
|
||||
log.error("WebSocket connection not allowed because session is neither electron nor logged in.");
|
||||
}
|
||||
|
||||
done(allowed);
|
||||
});
|
||||
},
|
||||
server: httpServer
|
||||
});
|
||||
|
||||
this.webSocketServer.on("connection", (ws, req) => {
|
||||
const id = randomString(10);
|
||||
(ws as any).id = id;
|
||||
this.clientMap.set(id, ws);
|
||||
|
||||
console.log(`websocket client connected`);
|
||||
|
||||
ws.on("message", async (messageJson) => {
|
||||
const message = JSON.parse(messageJson as any);
|
||||
|
||||
if (this.clientMessageHandler) {
|
||||
await this.clientMessageHandler(id, message);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("close", () => {
|
||||
this.clientMap.delete(id);
|
||||
});
|
||||
});
|
||||
|
||||
this.webSocketServer.on("error", (error) => {
|
||||
// https://github.com/zadam/trilium/issues/3374#issuecomment-1341053765
|
||||
console.log(error);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a handler for incoming client messages.
|
||||
*/
|
||||
setClientMessageHandler(handler: ClientMessageHandler) {
|
||||
this.clientMessageHandler = handler;
|
||||
}
|
||||
|
||||
sendMessageToAllClients(message: WebSocketMessage): void {
|
||||
const jsonStr = JSON.stringify(message);
|
||||
|
||||
if (this.webSocketServer) {
|
||||
if (message.type !== "sync-failed" && message.type !== "api-log-messages") {
|
||||
log.info(`Sending message to all clients: ${jsonStr}`);
|
||||
}
|
||||
|
||||
this.webSocketServer.clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(jsonStr);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
sendMessageToClient(clientId: string, message: WebSocketMessage): boolean {
|
||||
const client = this.clientMap.get(clientId);
|
||||
if (!client || client.readyState !== WebSocket.OPEN) {
|
||||
return false;
|
||||
}
|
||||
|
||||
client.send(JSON.stringify(message));
|
||||
return true;
|
||||
}
|
||||
|
||||
getClientCount(): number {
|
||||
return this.webSocketServer?.clients?.size ?? 0;
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
this.webSocketServer?.close();
|
||||
this.clientMap.clear();
|
||||
}
|
||||
}
|
||||
@@ -1,17 +1,19 @@
|
||||
import { getMessagingProvider } from "@triliumnext/core";
|
||||
import type { Express } from "express";
|
||||
import fs from "fs";
|
||||
import http from "http";
|
||||
import https from "https";
|
||||
import tmp from "tmp";
|
||||
import config from "./services/config.js";
|
||||
import log from "./services/log.js";
|
||||
import appInfo from "./services/app_info.js";
|
||||
import ws from "./services/ws.js";
|
||||
import utils, { formatSize, formatUtcTime } from "./services/utils.js";
|
||||
import port from "./services/port.js";
|
||||
import host from "./services/host.js";
|
||||
|
||||
import buildApp from "./app.js";
|
||||
import type { Express } from "express";
|
||||
import appInfo from "./services/app_info.js";
|
||||
import config from "./services/config.js";
|
||||
import host from "./services/host.js";
|
||||
import log from "./services/log.js";
|
||||
import port from "./services/port.js";
|
||||
import { getDbSize } from "./services/sql_init.js";
|
||||
import utils, { formatSize, formatUtcTime } from "./services/utils.js";
|
||||
import WebSocketMessagingProvider from "./services/ws_messaging_provider.js";
|
||||
|
||||
const MINIMUM_NODE_VERSION = "20.0.0";
|
||||
|
||||
@@ -58,7 +60,7 @@ export default async function startTriliumServer() {
|
||||
const httpServer = startHttpServer(app);
|
||||
|
||||
const sessionParser = (await import("./routes/session_parser.js")).default;
|
||||
ws.init(httpServer, sessionParser as any); // TODO: Not sure why session parser is incompatible.
|
||||
(getMessagingProvider() as WebSocketMessagingProvider).init(httpServer, sessionParser); // TODO: Not sure why session parser is incompatible.
|
||||
|
||||
if (utils.isElectron) {
|
||||
const electronRouting = await import("./routes/electron.js");
|
||||
@@ -67,8 +69,8 @@ export default async function startTriliumServer() {
|
||||
}
|
||||
|
||||
async function displayStartupMessage() {
|
||||
log.info("\n" + LOGO.replace("[version]", appInfo.appVersion));
|
||||
log.info(`📦 Versions: app=${appInfo.appVersion} db=${appInfo.dbVersion} sync=${appInfo.syncVersion} clipper=${appInfo.clipperProtocolVersion}`)
|
||||
log.info(`\n${LOGO.replace("[version]", appInfo.appVersion)}`);
|
||||
log.info(`📦 Versions: app=${appInfo.appVersion} db=${appInfo.dbVersion} sync=${appInfo.syncVersion} clipper=${appInfo.clipperProtocolVersion}`);
|
||||
log.info(`🔧 Build: ${formatUtcTime(appInfo.buildDate)} (${appInfo.buildRevision.substring(0, 10)})`);
|
||||
log.info(`📂 Data dir: ${appInfo.dataDirectory}`);
|
||||
log.info(`⏰ UTC time: ${formatUtcTime(appInfo.utcDateTime)}`);
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
"desktop:start": "pnpm run --filter desktop dev",
|
||||
"desktop:build": "pnpm run --filter desktop build",
|
||||
"desktop:start-prod": "pnpm run --filter desktop start-prod",
|
||||
"standalone:start": "pnpm run --filter client-standalone dev",
|
||||
"edit-docs:edit-docs": "pnpm run --filter edit-docs edit-docs",
|
||||
"edit-docs:build": "pnpm run --filter edit-docs build",
|
||||
"website:start": "pnpm run --filter website dev",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { Locale } from "./i18n.js";
|
||||
import { AttachmentRow, AttributeRow, BranchRow, NoteRow, NoteType } from "./rows.js";
|
||||
import { AttachmentRow, AttributeRow, BranchRow, NoteRow, NoteType, OptionRow } from "./rows.js";
|
||||
|
||||
type Response = {
|
||||
success: true,
|
||||
@@ -341,3 +341,19 @@ export interface BootstrapDefinition {
|
||||
iconRegistry: IconRegistry;
|
||||
TRILIUM_SAFE_MODE: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Response for /api/setup/status.
|
||||
*/
|
||||
export interface SetupStatusResponse {
|
||||
syncVersion: number;
|
||||
schemaExists: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Response for /api/setup/sync-seed.
|
||||
*/
|
||||
export interface SetupSyncSeedResponse {
|
||||
syncVersion: number;
|
||||
options: OptionRow[];
|
||||
}
|
||||
|
||||
@@ -14,7 +14,8 @@
|
||||
"mime-types": "3.0.2",
|
||||
"sanitize-filename": "1.6.4",
|
||||
"sanitize-html": "2.17.2",
|
||||
"unescape": "1.0.1"
|
||||
"unescape": "1.0.1",
|
||||
"async-mutex": "0.5.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/escape-html": "1.0.4",
|
||||
|
||||
@@ -4,6 +4,7 @@ import { getLog, initLog } from "./services/log";
|
||||
import { initSql } from "./services/sql/index";
|
||||
import { SqlService, SqlServiceParams } from "./services/sql/sql";
|
||||
import { initMessaging, MessagingProvider } from "./services/messaging/index";
|
||||
import { initRequest, RequestProvider } from "./services/request";
|
||||
import { initTranslations, TranslationProvider } from "./services/i18n";
|
||||
import appInfo from "./services/app_info";
|
||||
|
||||
@@ -43,8 +44,10 @@ export { default as bulk_actions } from "./services/bulk_actions";
|
||||
export { default as hoisted_note } from "./services/hoisted_note";
|
||||
export { default as special_notes } from "./services/special_notes";
|
||||
export { default as date_notes } from "./services/date_notes";
|
||||
export { getCrypto } from "./services/encryption/crypto";
|
||||
|
||||
export { default as attribute_formatter} from "./services/attribute_formatter";
|
||||
export { default as attributes } from "./services/attributes";
|
||||
|
||||
// Messaging system
|
||||
export * from "./services/messaging/index";
|
||||
@@ -69,17 +72,29 @@ export { default as Becca } from "./becca/becca-interface";
|
||||
export type { NotePojo } from "./becca/becca-interface";
|
||||
|
||||
export { default as NoteSet } from "./services/search/note_set";
|
||||
export { default as SearchContext } from "./services/search/search_context";
|
||||
export { default as search } from "./services/search/services/search";
|
||||
export { default as note_service } from "./services/notes";
|
||||
export type { NoteParams } from "./services/notes";
|
||||
export * as sanitize from "./services/sanitizer";
|
||||
export * as routes from "./routes";
|
||||
export { default as ws } from "./services/ws";
|
||||
export { default as request } from "./services/request";
|
||||
export { default as sync_options } from "./services/sync_options";
|
||||
export { default as sync_update } from "./services/sync_update";
|
||||
export { default as sync } from "./services/sync";
|
||||
export { default as consistency_checks } from "./services/consistency_checks";
|
||||
export { default as content_hash } from "./services/content_hash";
|
||||
export { default as sync_mutex } from "./services/sync_mutex";
|
||||
export type { RequestProvider, ExecOpts, CookieJar } from "./services/request";
|
||||
|
||||
export async function initializeCore({ dbConfig, executionContext, crypto, translations, messaging, extraAppInfo }: {
|
||||
export async function initializeCore({ dbConfig, executionContext, crypto, translations, messaging, request, extraAppInfo }: {
|
||||
dbConfig: SqlServiceParams,
|
||||
executionContext: ExecutionContext,
|
||||
crypto: CryptoProvider,
|
||||
translations: TranslationProvider,
|
||||
messaging?: MessagingProvider,
|
||||
request?: RequestProvider,
|
||||
extraAppInfo?: {
|
||||
nodeVersion: string;
|
||||
dataDirectory: string;
|
||||
@@ -94,4 +109,7 @@ export async function initializeCore({ dbConfig, executionContext, crypto, trans
|
||||
if (messaging) {
|
||||
initMessaging(messaging);
|
||||
}
|
||||
if (request) {
|
||||
initRequest(request);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,20 +1,20 @@
|
||||
import { type EntityChange, SyncTestResponse } from "@triliumnext/commons";
|
||||
import { ValidationError } from "@triliumnext/core";
|
||||
import type { Request } from "express";
|
||||
import { t } from "i18next";
|
||||
|
||||
import consistencyChecksService from "../../services/consistency_checks.js";
|
||||
import contentHashService from "../../services/content_hash.js";
|
||||
import entityChangesService from "../../services/entity_changes.js";
|
||||
import log from "../../services/log.js";
|
||||
import { getLog } from "../../services/log.js";
|
||||
import optionService from "../../services/options.js";
|
||||
import sql from "../../services/sql.js";
|
||||
import { getSql } from "../../services/sql/index.js";
|
||||
import sqlInit from "../../services/sql_init.js";
|
||||
import syncService from "../../services/sync.js";
|
||||
import syncOptions from "../../services/sync_options.js";
|
||||
import syncUpdateService from "../../services/sync_update.js";
|
||||
import utils, { safeExtractMessageAndStackFromError } from "../../services/utils.js";
|
||||
import * as utils from "../../services/utils/index.js";
|
||||
import ws from "../../services/ws.js";
|
||||
import { ValidationError } from "../../errors.js";
|
||||
|
||||
async function testSync(): Promise<SyncTestResponse> {
|
||||
try {
|
||||
@@ -30,7 +30,7 @@ async function testSync(): Promise<SyncTestResponse> {
|
||||
|
||||
return { success: true, message: t("test_sync.successful") };
|
||||
} catch (e: unknown) {
|
||||
const [errMessage] = safeExtractMessageAndStackFromError(e);
|
||||
const [errMessage] = utils.safeExtractMessageAndStackFromError(e);
|
||||
return {
|
||||
success: false,
|
||||
message: errMessage
|
||||
@@ -45,11 +45,11 @@ function getStats() {
|
||||
}
|
||||
|
||||
const stats = {
|
||||
initialized: sql.getValue("SELECT value FROM options WHERE name = 'initialized'") === "true",
|
||||
initialized: getSql().getValue("SELECT value FROM options WHERE name = 'initialized'") === "true",
|
||||
outstandingPullCount: syncService.getOutstandingPullCount()
|
||||
};
|
||||
|
||||
log.info(`Returning sync stats: ${JSON.stringify(stats)}`);
|
||||
getLog().info(`Returning sync stats: ${JSON.stringify(stats)}`);
|
||||
|
||||
return stats;
|
||||
}
|
||||
@@ -57,12 +57,12 @@ function getStats() {
|
||||
function checkSync() {
|
||||
return {
|
||||
entityHashes: contentHashService.getEntityHashes(),
|
||||
maxEntityChangeId: sql.getValue("SELECT COALESCE(MAX(id), 0) FROM entity_changes WHERE isSynced = 1")
|
||||
maxEntityChangeId: getSql().getValue("SELECT COALESCE(MAX(id), 0) FROM entity_changes WHERE isSynced = 1")
|
||||
};
|
||||
}
|
||||
|
||||
function syncNow() {
|
||||
log.info("Received request to trigger sync now.");
|
||||
getLog().info("Received request to trigger sync now.");
|
||||
|
||||
// when explicitly asked for set in progress status immediately for faster user feedback
|
||||
ws.syncPullInProgress();
|
||||
@@ -73,14 +73,14 @@ function syncNow() {
|
||||
function fillEntityChanges() {
|
||||
entityChangesService.fillAllEntityChanges();
|
||||
|
||||
log.info("Sync rows have been filled.");
|
||||
getLog().info("Sync rows have been filled.");
|
||||
}
|
||||
|
||||
function forceFullSync() {
|
||||
optionService.setOption("lastSyncedPull", 0);
|
||||
optionService.setOption("lastSyncedPush", 0);
|
||||
|
||||
log.info("Forcing full sync.");
|
||||
getLog().info("Forcing full sync.");
|
||||
|
||||
// not awaiting for the job to finish (will probably take a long time)
|
||||
syncService.sync();
|
||||
@@ -149,6 +149,7 @@ function getChanged(req: Request) {
|
||||
const clientInstanceId = req.query.instanceId;
|
||||
let filteredEntityChanges: EntityChange[] = [];
|
||||
|
||||
const sql = getSql();
|
||||
do {
|
||||
const entityChanges: EntityChange[] = sql.getRows<EntityChange>(
|
||||
`
|
||||
@@ -177,7 +178,7 @@ function getChanged(req: Request) {
|
||||
if (entityChangeRecords.length > 0) {
|
||||
lastEntityChangeId = entityChangeRecords[entityChangeRecords.length - 1].entityChange.id;
|
||||
|
||||
log.info(`Returning ${entityChangeRecords.length} entity changes in ${Date.now() - startTime}ms`);
|
||||
getLog().info(`Returning ${entityChangeRecords.length} entity changes in ${Date.now() - startTime}ms`);
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -282,7 +283,7 @@ function update(req: Request) {
|
||||
|
||||
partialRequests[requestId].payload += req.body;
|
||||
|
||||
log.info(`Receiving a partial request ${requestId}, page ${pageIndex + 1} out of ${pageCount} pages.`);
|
||||
getLog().info(`Receiving a partial request ${requestId}, page ${pageIndex + 1} out of ${pageCount} pages.`);
|
||||
|
||||
if (pageIndex !== pageCount - 1) {
|
||||
return;
|
||||
@@ -293,13 +294,13 @@ function update(req: Request) {
|
||||
|
||||
const { entities, instanceId } = body;
|
||||
|
||||
sql.transactional(() => syncUpdateService.updateEntities(entities, instanceId));
|
||||
getSql().transactional(() => syncUpdateService.updateEntities(entities, instanceId));
|
||||
}
|
||||
|
||||
setInterval(() => {
|
||||
for (const key in partialRequests) {
|
||||
if (Date.now() - partialRequests[key].createdAt > 20 * 60 * 1000) {
|
||||
log.info(`Cleaning up unfinished partial requests for ${key}`);
|
||||
getLog().info(`Cleaning up unfinished partial requests for ${key}`);
|
||||
|
||||
delete partialRequests[key];
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import recentChangesApiRoute from "./api/recent_changes";
|
||||
import bulkActionRoute from "./api/bulk_action";
|
||||
import searchRoute from "./api/search";
|
||||
import specialNotesRoute from "./api/special_notes";
|
||||
import syncApiRoute from "./api/sync";
|
||||
|
||||
// TODO: Deduplicate with routes.ts
|
||||
const GET = "get",
|
||||
@@ -28,11 +29,14 @@ const GET = "get",
|
||||
DEL = "delete";
|
||||
|
||||
interface SharedApiRoutesContext {
|
||||
route: any;
|
||||
apiRoute: any;
|
||||
asyncApiRoute: any;
|
||||
checkApiAuth: any;
|
||||
apiResultHandler: any;
|
||||
}
|
||||
|
||||
export function buildSharedApiRoutes({ apiRoute, asyncApiRoute }: SharedApiRoutesContext) {
|
||||
export function buildSharedApiRoutes({ route, apiRoute, asyncApiRoute, checkApiAuth, apiResultHandler }: SharedApiRoutesContext) {
|
||||
apiRoute(GET, '/api/tree', treeApiRoute.getTree);
|
||||
apiRoute(PST, '/api/tree/load', treeApiRoute.load);
|
||||
|
||||
@@ -99,6 +103,18 @@ export function buildSharedApiRoutes({ apiRoute, asyncApiRoute }: SharedApiRoute
|
||||
apiRoute(PUT, "/api/branches/:branchId/set-prefix", branchesApiRoute.setPrefix);
|
||||
apiRoute(PUT, "/api/branches/set-prefix-batch", branchesApiRoute.setPrefixBatch);
|
||||
|
||||
asyncApiRoute(PST, "/api/sync/test", syncApiRoute.testSync);
|
||||
asyncApiRoute(PST, "/api/sync/now", syncApiRoute.syncNow);
|
||||
apiRoute(PST, "/api/sync/fill-entity-changes", syncApiRoute.fillEntityChanges);
|
||||
apiRoute(PST, "/api/sync/force-full-sync", syncApiRoute.forceFullSync);
|
||||
route(GET, "/api/sync/check", [checkApiAuth], syncApiRoute.checkSync, apiResultHandler);
|
||||
route(GET, "/api/sync/changed", [checkApiAuth], syncApiRoute.getChanged, apiResultHandler);
|
||||
route(PUT, "/api/sync/update", [checkApiAuth], syncApiRoute.update, apiResultHandler);
|
||||
route(PST, "/api/sync/finished", [checkApiAuth], syncApiRoute.syncFinished, apiResultHandler);
|
||||
route(PST, "/api/sync/check-entity-changes", [checkApiAuth], syncApiRoute.checkEntityChanges, apiResultHandler);
|
||||
route(PST, "/api/sync/queue-sector/:entityName/:sector", [checkApiAuth], syncApiRoute.queueSector, apiResultHandler);
|
||||
route(GET, "/api/sync/stats", [], syncApiRoute.getStats, apiResultHandler);
|
||||
|
||||
apiRoute(GET, "/api/quick-search/:searchString", searchRoute.quickSearch);
|
||||
apiRoute(GET, "/api/search-note/:noteId", searchRoute.searchFromNote);
|
||||
apiRoute(PST, "/api/search-and-execute-note/:noteId", searchRoute.searchAndExecute);
|
||||
|
||||
973
packages/trilium-core/src/services/consistency_checks.ts
Normal file
973
packages/trilium-core/src/services/consistency_checks.ts
Normal file
@@ -0,0 +1,973 @@
|
||||
import type { BranchRow } from "@triliumnext/commons";
|
||||
import type { EntityChange } from "@triliumnext/commons";
|
||||
|
||||
import becca from "../becca/becca.js";
|
||||
import BBranch from "../becca/entities/bbranch.js";
|
||||
import noteTypesService from "../services/note_types.js";
|
||||
import { hashedBlobId, randomString } from "../services/utils/index.js";
|
||||
import * as cls from "./context.js";
|
||||
import * as utils from "./utils/index.js";
|
||||
import entityChangesService from "./entity_changes.js";
|
||||
import log, { getLog } from "./log.js";
|
||||
import optionsService from "./options.js";
|
||||
import { getSql } from "./sql/index.js";
|
||||
import sqlInit from "./sql_init.js";
|
||||
import syncMutexService from "./sync_mutex.js";
|
||||
import ws from "./ws.js";
|
||||
import { default as eraseService } from "./erase.js";
|
||||
import becca_loader from "../becca/becca_loader.js";
|
||||
const noteTypes = noteTypesService.getNoteTypeNames();
|
||||
|
||||
class ConsistencyChecks {
|
||||
private autoFix: boolean;
|
||||
private unrecoveredConsistencyErrors: boolean;
|
||||
private fixedIssues: boolean;
|
||||
private reloadNeeded: boolean;
|
||||
|
||||
/**
|
||||
* @param autoFix - automatically fix all encountered problems. False is only for debugging during development (fail fast)
|
||||
*/
|
||||
constructor(autoFix: boolean) {
|
||||
this.autoFix = autoFix;
|
||||
this.unrecoveredConsistencyErrors = false;
|
||||
this.fixedIssues = false;
|
||||
this.reloadNeeded = false;
|
||||
}
|
||||
|
||||
findAndFixIssues(query: string, fixerCb: (res: any) => void) {
|
||||
const sql = getSql();
|
||||
const results = sql.getRows(query);
|
||||
|
||||
for (const res of results) {
|
||||
try {
|
||||
sql.transactional(() => fixerCb(res));
|
||||
|
||||
if (this.autoFix) {
|
||||
this.fixedIssues = true;
|
||||
} else {
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
}
|
||||
} catch (e: any) {
|
||||
logError(`Fixer failed with ${e.message} ${e.stack}`);
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
checkTreeCycles() {
|
||||
const childToParents: Record<string, string[]> = {};
|
||||
const rows = getSql().getRows<BranchRow>("SELECT noteId, parentNoteId FROM branches WHERE isDeleted = 0");
|
||||
|
||||
for (const row of rows) {
|
||||
const childNoteId = row.noteId;
|
||||
const parentNoteId = row.parentNoteId;
|
||||
|
||||
childToParents[childNoteId] = childToParents[childNoteId] || [];
|
||||
childToParents[childNoteId].push(parentNoteId);
|
||||
}
|
||||
|
||||
/** @returns true if cycle was found and we should try again */
|
||||
const checkTreeCycle = (noteId: string, path: string[]) => {
|
||||
if (noteId === "root") {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const parentNoteId of childToParents[noteId]) {
|
||||
if (path.includes(parentNoteId)) {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranchFromChildAndParent(noteId, parentNoteId);
|
||||
if (branch) {
|
||||
branch.markAsDeleted("cycle-autofix");
|
||||
logFix(`Branch '${branch.branchId}' between child '${noteId}' and parent '${parentNoteId}' has been deleted since it was causing a tree cycle.`);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
logError(`Tree cycle detected at parent-child relationship: '${parentNoteId}' - '${noteId}', whole path: '${path}'`);
|
||||
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
|
||||
} else {
|
||||
const newPath = path.slice();
|
||||
newPath.push(noteId);
|
||||
|
||||
const retryNeeded = checkTreeCycle(parentNoteId, newPath);
|
||||
|
||||
if (retryNeeded) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
const noteIds = Object.keys(childToParents);
|
||||
|
||||
for (const noteId of noteIds) {
|
||||
const retryNeeded = checkTreeCycle(noteId, []);
|
||||
|
||||
if (retryNeeded) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
checkAndRepairTreeCycles() {
|
||||
let treeFixed = false;
|
||||
|
||||
while (this.checkTreeCycles()) {
|
||||
// fixing cycle means deleting branches, we might need to create a new branch to recover the note
|
||||
this.findExistencyIssues();
|
||||
|
||||
treeFixed = true;
|
||||
}
|
||||
|
||||
if (treeFixed) {
|
||||
this.reloadNeeded = true;
|
||||
}
|
||||
}
|
||||
|
||||
findBrokenReferenceIssues() {
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId, branches.noteId
|
||||
FROM branches
|
||||
LEFT JOIN notes USING (noteId)
|
||||
WHERE branches.isDeleted = 0
|
||||
AND notes.noteId IS NULL`,
|
||||
({ branchId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranch(branchId);
|
||||
if (!branch) {
|
||||
return;
|
||||
}
|
||||
branch.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Branch '${branchId}' has been deleted since it references missing note '${noteId}'`);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' references missing note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId, branches.parentNoteId AS parentNoteId
|
||||
FROM branches
|
||||
LEFT JOIN notes ON notes.noteId = branches.parentNoteId
|
||||
WHERE branches.isDeleted = 0
|
||||
AND branches.noteId != 'root'
|
||||
AND notes.noteId IS NULL`,
|
||||
({ branchId, parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
// Delete the old branch and recreate it with root as parent.
|
||||
const oldBranch = becca.getBranch(branchId);
|
||||
if (!oldBranch) {
|
||||
return;
|
||||
}
|
||||
|
||||
const noteId = oldBranch.noteId;
|
||||
oldBranch.markAsDeleted("missing-parent");
|
||||
|
||||
let message = `Branch '${branchId}' was missing parent note '${parentNoteId}', so it was deleted. `;
|
||||
|
||||
const note = becca.getNote(noteId);
|
||||
if (!note) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (note.getParentBranches().length === 0) {
|
||||
const newBranch = new BBranch({
|
||||
parentNoteId: "root",
|
||||
noteId,
|
||||
prefix: "recovered"
|
||||
}).save();
|
||||
|
||||
message += `${newBranch.branchId} was created in the root instead.`;
|
||||
} else {
|
||||
message += `There is one or more valid branches, so no new one will be created as a replacement.`;
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(message);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' references missing parent note '${parentNoteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId, attributes.noteId
|
||||
FROM attributes
|
||||
LEFT JOIN notes USING (noteId)
|
||||
WHERE attributes.isDeleted = 0
|
||||
AND notes.noteId IS NULL`,
|
||||
({ attributeId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) {
|
||||
return;
|
||||
}
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Attribute '${attributeId}' has been deleted since it references missing source note '${noteId}'`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' references missing source note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId, attributes.value AS noteId
|
||||
FROM attributes
|
||||
LEFT JOIN notes ON notes.noteId = attributes.value
|
||||
WHERE attributes.isDeleted = 0
|
||||
AND attributes.type = 'relation'
|
||||
AND notes.noteId IS NULL`,
|
||||
({ attributeId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) {
|
||||
return;
|
||||
}
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Relation '${attributeId}' has been deleted since it references missing note '${noteId}'`);
|
||||
} else {
|
||||
logError(`Relation '${attributeId}' references missing note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attachmentId, attachments.ownerId AS noteId
|
||||
FROM attachments
|
||||
WHERE attachments.ownerId NOT IN (
|
||||
SELECT noteId FROM notes
|
||||
UNION ALL
|
||||
SELECT revisionId FROM revisions
|
||||
)
|
||||
AND attachments.isDeleted = 0`,
|
||||
({ attachmentId, ownerId }) => {
|
||||
if (this.autoFix) {
|
||||
const attachment = becca.getAttachment(attachmentId);
|
||||
if (!attachment) {
|
||||
return;
|
||||
}
|
||||
attachment.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = false;
|
||||
|
||||
logFix(`Attachment '${attachmentId}' has been deleted since it references missing note/revision '${ownerId}'`);
|
||||
} else {
|
||||
logError(`Attachment '${attachmentId}' references missing note/revision '${ownerId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
findExistencyIssues() {
|
||||
// the principle for fixing inconsistencies is that if the note itself is deleted (isDeleted=true) then all related
|
||||
// entities should be also deleted (branches, attributes), but if the note is not deleted,
|
||||
// then at least one branch should exist.
|
||||
|
||||
// the order here is important - first we might need to delete inconsistent branches, and after that
|
||||
// another check might create missing branch
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId,
|
||||
noteId
|
||||
FROM branches
|
||||
JOIN notes USING (noteId)
|
||||
WHERE notes.isDeleted = 1
|
||||
AND branches.isDeleted = 0`,
|
||||
({ branchId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranch(branchId);
|
||||
if (!branch) return;
|
||||
branch.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Branch '${branchId}' has been deleted since the associated note '${noteId}' is deleted.`);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' is not deleted even though the associated note '${noteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT branchId,
|
||||
parentNoteId
|
||||
FROM branches
|
||||
JOIN notes AS parentNote ON parentNote.noteId = branches.parentNoteId
|
||||
WHERE parentNote.isDeleted = 1
|
||||
AND branches.isDeleted = 0
|
||||
`,
|
||||
({ branchId, parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = becca.getBranch(branchId);
|
||||
if (!branch) {
|
||||
return;
|
||||
}
|
||||
branch.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Branch '${branchId}' has been deleted since the associated parent note '${parentNoteId}' is deleted.`);
|
||||
} else {
|
||||
logError(`Branch '${branchId}' is not deleted even though the associated parent note '${parentNoteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT DISTINCT notes.noteId
|
||||
FROM notes
|
||||
LEFT JOIN branches ON notes.noteId = branches.noteId AND branches.isDeleted = 0
|
||||
WHERE notes.isDeleted = 0
|
||||
AND branches.branchId IS NULL
|
||||
`,
|
||||
({ noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branch = new BBranch({
|
||||
parentNoteId: "root",
|
||||
noteId,
|
||||
prefix: "recovered"
|
||||
}).save();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Created missing branch '${branch.branchId}' for note '${noteId}'`);
|
||||
} else {
|
||||
logError(`No undeleted branch found for note '${noteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// there should be a unique relationship between note and its parent
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT noteId,
|
||||
parentNoteId
|
||||
FROM branches
|
||||
WHERE branches.isDeleted = 0
|
||||
GROUP BY branches.parentNoteId,
|
||||
branches.noteId
|
||||
HAVING COUNT(1) > 1`,
|
||||
({ noteId, parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branchIds = getSql().getColumn<string>(
|
||||
/*sql*/`SELECT branchId
|
||||
FROM branches
|
||||
WHERE noteId = ?
|
||||
and parentNoteId = ?
|
||||
and isDeleted = 0
|
||||
ORDER BY utcDateModified`,
|
||||
[noteId, parentNoteId]
|
||||
);
|
||||
|
||||
const branches = branchIds.map((branchId) => becca.getBranch(branchId));
|
||||
|
||||
// it's not necessarily "original" branch, it's just the only one which will survive
|
||||
const origBranch = branches[0];
|
||||
if (!origBranch) {
|
||||
logError(`Unable to find original branch.`);
|
||||
return;
|
||||
}
|
||||
|
||||
// delete all but the first branch
|
||||
for (const branch of branches.slice(1)) {
|
||||
if (!branch) {
|
||||
continue;
|
||||
}
|
||||
|
||||
branch.markAsDeleted();
|
||||
|
||||
logFix(`Removing branch '${branch.branchId}' since it's a parent-child duplicate of branch '${origBranch.branchId}'`);
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
} else {
|
||||
logError(`Duplicate branches for note '${noteId}' and parent '${parentNoteId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attachmentId,
|
||||
attachments.ownerId AS noteId
|
||||
FROM attachments
|
||||
JOIN notes ON notes.noteId = attachments.ownerId
|
||||
WHERE notes.isDeleted = 1
|
||||
AND attachments.isDeleted = 0`,
|
||||
({ attachmentId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attachment = becca.getAttachment(attachmentId);
|
||||
if (!attachment) return;
|
||||
attachment.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = false;
|
||||
|
||||
logFix(`Attachment '${attachmentId}' has been deleted since the associated note '${noteId}' is deleted.`);
|
||||
} else {
|
||||
logError(`Attachment '${attachmentId}' is not deleted even though the associated note '${noteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
findLogicIssues() {
|
||||
const noteTypesStr = noteTypes.map((nt) => `'${nt}'`).join(", ");
|
||||
const sql = getSql();
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT noteId, type
|
||||
FROM notes
|
||||
WHERE isDeleted = 0
|
||||
AND type NOT IN (${noteTypesStr})`,
|
||||
({ noteId, type }) => {
|
||||
if (this.autoFix) {
|
||||
const note = becca.getNote(noteId);
|
||||
if (!note) return;
|
||||
note.type = "file"; // file is a safe option to recover notes if the type is not known
|
||||
note.save();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note '${noteId}' type has been change to file since it had invalid type '${type}'`);
|
||||
} else {
|
||||
logError(`Note '${noteId}' has invalid type '${type}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT notes.noteId, notes.isProtected, notes.type, notes.mime
|
||||
FROM notes
|
||||
LEFT JOIN blobs USING (blobId)
|
||||
WHERE blobs.blobId IS NULL
|
||||
AND notes.isDeleted = 0`,
|
||||
({ noteId, isProtected, type, mime }) => {
|
||||
if (this.autoFix) {
|
||||
// it might be possible that the blob is not available only because of the interrupted
|
||||
// sync, and it will come later. It's therefore important to guarantee that this artificial
|
||||
// record won't overwrite the real one coming from the sync.
|
||||
const fakeDate = "2000-01-01 00:00:00Z";
|
||||
|
||||
const blankContent = getBlankContent(isProtected, type, mime);
|
||||
if (!blankContent) {
|
||||
logError(`Unable to recover note ${noteId} since it's content could not be retrieved (might be protected note).`);
|
||||
return;
|
||||
}
|
||||
const blobId = hashedBlobId(blankContent);
|
||||
const blobAlreadyExists = !!sql.getValue("SELECT 1 FROM blobs WHERE blobId = ?", [blobId]);
|
||||
|
||||
if (!blobAlreadyExists) {
|
||||
// manually creating row since this can also affect deleted notes
|
||||
sql.upsert("blobs", "blobId", {
|
||||
noteId,
|
||||
content: blankContent,
|
||||
utcDateModified: fakeDate,
|
||||
dateModified: fakeDate
|
||||
});
|
||||
|
||||
const hash = utils.hash(randomString(10));
|
||||
|
||||
entityChangesService.putEntityChange({
|
||||
entityName: "blobs",
|
||||
entityId: blobId,
|
||||
hash,
|
||||
isErased: false,
|
||||
utcDateChanged: fakeDate,
|
||||
isSynced: true
|
||||
});
|
||||
}
|
||||
|
||||
sql.execute("UPDATE notes SET blobId = ? WHERE noteId = ?", [blobId, noteId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note '${noteId}' content was set to empty string since there was no corresponding row`);
|
||||
} else {
|
||||
logError(`Note '${noteId}' content row does not exist`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
if (sqlInit.getDbSize() < 500000) {
|
||||
// querying for "content IS NULL" is expensive since content is not indexed. See e.g. https://github.com/zadam/trilium/issues/2887
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT notes.noteId, notes.type, notes.mime
|
||||
FROM notes
|
||||
JOIN blobs USING (blobId)
|
||||
WHERE isDeleted = 0
|
||||
AND isProtected = 0
|
||||
AND content IS NULL`,
|
||||
({ noteId, type, mime }) => {
|
||||
if (this.autoFix) {
|
||||
const note = becca.getNote(noteId);
|
||||
const blankContent = getBlankContent(false, type, mime);
|
||||
if (!note) return;
|
||||
|
||||
if (blankContent) {
|
||||
note.setContent(blankContent);
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note '${noteId}' content was set to '${blankContent}' since it was null even though it is not deleted`);
|
||||
} else {
|
||||
logError(`Note '${noteId}' content is null even though it is not deleted`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT revisions.revisionId, blobs.blobId
|
||||
FROM revisions
|
||||
LEFT JOIN blobs USING (blobId)
|
||||
WHERE blobs.blobId IS NULL`,
|
||||
({ revisionId, blobId }) => {
|
||||
if (this.autoFix) {
|
||||
eraseService.eraseRevisions([revisionId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Note revision '${revisionId}' was erased since the referenced blob '${blobId}' did not exist.`);
|
||||
} else {
|
||||
logError(`Note revision '${revisionId}' blob '${blobId}' does not exist`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attachments.attachmentId, blobs.blobId
|
||||
FROM attachments
|
||||
LEFT JOIN blobs USING (blobId)
|
||||
WHERE blobs.blobId IS NULL`,
|
||||
({ attachmentId, blobId }) => {
|
||||
if (this.autoFix) {
|
||||
eraseService.eraseAttachments([attachmentId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Attachment '${attachmentId}' was erased since the referenced blob '${blobId}' did not exist.`);
|
||||
} else {
|
||||
logError(`Attachment '${attachmentId}' blob '${blobId}' does not exist`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT parentNoteId
|
||||
FROM branches
|
||||
JOIN notes ON notes.noteId = branches.parentNoteId
|
||||
WHERE notes.isDeleted = 0
|
||||
AND notes.type == 'search'
|
||||
AND branches.isDeleted = 0`,
|
||||
({ parentNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const branchIds = sql.getColumn<string>(
|
||||
`
|
||||
SELECT branchId
|
||||
FROM branches
|
||||
WHERE isDeleted = 0
|
||||
AND parentNoteId = ?`,
|
||||
[parentNoteId]
|
||||
);
|
||||
|
||||
const branches = branchIds.map((branchId) => becca.getBranch(branchId));
|
||||
|
||||
for (const branch of branches) {
|
||||
if (!branch) continue;
|
||||
|
||||
// delete the old wrong branch
|
||||
branch.markAsDeleted("parent-is-search");
|
||||
|
||||
// create a replacement branch in root parent
|
||||
new BBranch({
|
||||
parentNoteId: "root",
|
||||
noteId: branch.noteId,
|
||||
prefix: "recovered"
|
||||
}).save();
|
||||
|
||||
logFix(`Note '${branch.noteId}' has been moved to root since it was a child of a search note '${parentNoteId}'`);
|
||||
}
|
||||
|
||||
this.reloadNeeded = true;
|
||||
} else {
|
||||
logError(`Search note '${parentNoteId}' has children`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId
|
||||
FROM attributes
|
||||
WHERE isDeleted = 0
|
||||
AND type = 'relation'
|
||||
AND value = ''`,
|
||||
({ attributeId }) => {
|
||||
if (this.autoFix) {
|
||||
const relation = becca.getAttribute(attributeId);
|
||||
if (!relation) return;
|
||||
relation.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Removed relation '${relation.attributeId}' of name '${relation.name}' with empty target.`);
|
||||
} else {
|
||||
logError(`Relation '${attributeId}' has empty target.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId,
|
||||
type
|
||||
FROM attributes
|
||||
WHERE isDeleted = 0
|
||||
AND type != 'label'
|
||||
AND type != 'relation'`,
|
||||
({ attributeId, type }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) return;
|
||||
attribute.type = "label";
|
||||
attribute.save();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Attribute '${attributeId}' type was changed to label since it had invalid type '${type}'`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' has invalid type '${type}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId,
|
||||
attributes.noteId
|
||||
FROM attributes
|
||||
JOIN notes ON attributes.noteId = notes.noteId
|
||||
WHERE attributes.isDeleted = 0
|
||||
AND notes.isDeleted = 1`,
|
||||
({ attributeId, noteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) return;
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Removed attribute '${attributeId}' because owning note '${noteId}' is also deleted.`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' is not deleted even though owning note '${noteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT attributeId,
|
||||
attributes.value AS targetNoteId
|
||||
FROM attributes
|
||||
JOIN notes ON attributes.value = notes.noteId
|
||||
WHERE attributes.type = 'relation'
|
||||
AND attributes.isDeleted = 0
|
||||
AND notes.isDeleted = 1`,
|
||||
({ attributeId, targetNoteId }) => {
|
||||
if (this.autoFix) {
|
||||
const attribute = becca.getAttribute(attributeId);
|
||||
if (!attribute) return;
|
||||
attribute.markAsDeleted();
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Removed attribute '${attributeId}' because target note '${targetNoteId}' is also deleted.`);
|
||||
} else {
|
||||
logError(`Attribute '${attributeId}' is not deleted even though target note '${targetNoteId}' is deleted.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
runEntityChangeChecks(entityName: string, key: string) {
|
||||
const sql = getSql();
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT ${key} as entityId
|
||||
FROM ${entityName}
|
||||
LEFT JOIN entity_changes ec ON ec.entityName = '${entityName}' AND ec.entityId = ${entityName}.${key}
|
||||
WHERE ec.id IS NULL`,
|
||||
({ entityId }) => {
|
||||
const entityRow = sql.getRow<EntityChange>(/*sql*/`SELECT * FROM ${entityName} WHERE ${key} = ?`, [entityId]);
|
||||
|
||||
if (this.autoFix) {
|
||||
entityChangesService.putEntityChange({
|
||||
entityName,
|
||||
entityId,
|
||||
hash: randomString(10), // doesn't matter, will force sync, but that's OK
|
||||
isErased: false,
|
||||
utcDateChanged: entityRow.utcDateModified || entityRow.utcDateCreated,
|
||||
isSynced: entityName !== "options" || entityRow.isSynced
|
||||
});
|
||||
|
||||
logFix(`Created missing entity change for entityName '${entityName}', entityId '${entityId}'`);
|
||||
} else {
|
||||
logError(`Missing entity change for entityName '${entityName}', entityId '${entityId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT id, entityId
|
||||
FROM entity_changes
|
||||
LEFT JOIN ${entityName} ON entityId = ${entityName}.${key}
|
||||
WHERE
|
||||
entity_changes.isErased = 0
|
||||
AND entity_changes.entityName = '${entityName}'
|
||||
AND ${entityName}.${key} IS NULL`,
|
||||
({ id, entityId }) => {
|
||||
if (this.autoFix) {
|
||||
sql.execute("DELETE FROM entity_changes WHERE entityName = ? AND entityId = ?", [entityName, entityId]);
|
||||
|
||||
logFix(`Deleted extra entity change id '${id}', entityName '${entityName}', entityId '${entityId}'`);
|
||||
} else {
|
||||
logError(`Unrecognized entity change id '${id}', entityName '${entityName}', entityId '${entityId}'`);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.findAndFixIssues(
|
||||
`
|
||||
SELECT id, entityId
|
||||
FROM entity_changes
|
||||
JOIN ${entityName} ON entityId = ${entityName}.${key}
|
||||
WHERE
|
||||
entity_changes.isErased = 1
|
||||
AND entity_changes.entityName = '${entityName}'`,
|
||||
({ id, entityId }) => {
|
||||
if (this.autoFix) {
|
||||
sql.execute(/*sql*/`DELETE FROM ${entityName} WHERE ${key} = ?`, [entityId]);
|
||||
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Erasing entityName '${entityName}', entityId '${entityId}' since entity change id '${id}' has it as erased.`);
|
||||
} else {
|
||||
logError(`Entity change id '${id}' has entityName '${entityName}', entityId '${entityId}' as erased, but it's not.`);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
findEntityChangeIssues() {
|
||||
this.runEntityChangeChecks("notes", "noteId");
|
||||
this.runEntityChangeChecks("revisions", "revisionId");
|
||||
this.runEntityChangeChecks("attachments", "attachmentId");
|
||||
this.runEntityChangeChecks("blobs", "blobId");
|
||||
this.runEntityChangeChecks("branches", "branchId");
|
||||
this.runEntityChangeChecks("attributes", "attributeId");
|
||||
this.runEntityChangeChecks("etapi_tokens", "etapiTokenId");
|
||||
this.runEntityChangeChecks("options", "name");
|
||||
}
|
||||
|
||||
findWronglyNamedAttributes() {
|
||||
const sql = getSql();
|
||||
const attrNames = sql.getColumn<string>(/*sql*/`SELECT DISTINCT name FROM attributes`);
|
||||
|
||||
for (const origName of attrNames) {
|
||||
const fixedName = utils.sanitizeAttributeName(origName);
|
||||
|
||||
if (fixedName !== origName) {
|
||||
if (this.autoFix) {
|
||||
// there isn't a good way to update this:
|
||||
// - just SQL query will fix it in DB but not notify frontend (or other caches) that it has been fixed
|
||||
// - renaming the attribute would break the invariant that single attribute never changes the name
|
||||
// - deleting the old attribute and creating new will create duplicates across synchronized cluster (specifically in the initial migration)
|
||||
// But in general, we assume there won't be many such problems
|
||||
sql.execute("UPDATE attributes SET name = ? WHERE name = ?", [fixedName, origName]);
|
||||
|
||||
this.fixedIssues = true;
|
||||
this.reloadNeeded = true;
|
||||
|
||||
logFix(`Renamed incorrectly named attributes '${origName}' to '${fixedName}'`);
|
||||
} else {
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
|
||||
logFix(`There are incorrectly named attributes '${origName}'`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
findSyncIssues() {
|
||||
const sql = getSql();
|
||||
const lastSyncedPush = parseInt(sql.getValue("SELECT value FROM options WHERE name = 'lastSyncedPush'"));
|
||||
const maxEntityChangeId = sql.getValue<number>("SELECT MAX(id) FROM entity_changes");
|
||||
|
||||
if (lastSyncedPush > maxEntityChangeId) {
|
||||
if (this.autoFix) {
|
||||
sql.execute("UPDATE options SET value = ? WHERE name = 'lastSyncedPush'", [maxEntityChangeId]);
|
||||
|
||||
this.fixedIssues = true;
|
||||
|
||||
logFix(`Fixed incorrect lastSyncedPush - was ${lastSyncedPush}, needs to be at maximum ${maxEntityChangeId}`);
|
||||
} else {
|
||||
this.unrecoveredConsistencyErrors = true;
|
||||
|
||||
logFix(`Incorrect lastSyncedPush - is ${lastSyncedPush}, needs to be at maximum ${maxEntityChangeId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
runAllChecksAndFixers() {
|
||||
this.unrecoveredConsistencyErrors = false;
|
||||
this.fixedIssues = false;
|
||||
this.reloadNeeded = false;
|
||||
|
||||
this.findEntityChangeIssues();
|
||||
|
||||
this.findBrokenReferenceIssues();
|
||||
|
||||
this.findExistencyIssues();
|
||||
|
||||
this.findLogicIssues();
|
||||
|
||||
this.findWronglyNamedAttributes();
|
||||
|
||||
this.findSyncIssues();
|
||||
|
||||
// root branch should always be expanded
|
||||
getSql().execute("UPDATE branches SET isExpanded = 1 WHERE noteId = 'root'");
|
||||
|
||||
if (!this.unrecoveredConsistencyErrors) {
|
||||
// we run this only if basic checks passed since this assumes basic data consistency
|
||||
|
||||
this.checkAndRepairTreeCycles();
|
||||
}
|
||||
|
||||
if (this.reloadNeeded) {
|
||||
becca_loader.reload("consistency checks need becca reload");
|
||||
}
|
||||
|
||||
return !this.unrecoveredConsistencyErrors;
|
||||
}
|
||||
|
||||
runDbDiagnostics() {
|
||||
function getTableRowCount(tableName: string) {
|
||||
const count = getSql().getValue<number>(/*sql*/`SELECT COUNT(1) FROM ${tableName}`);
|
||||
|
||||
return `${tableName}: ${count}`;
|
||||
}
|
||||
|
||||
const tables = ["notes", "revisions", "attachments", "branches", "attributes", "etapi_tokens", "blobs"];
|
||||
|
||||
getLog().info(`Table counts: ${tables.map((tableName) => getTableRowCount(tableName)).join(", ")}`);
|
||||
}
|
||||
|
||||
async runChecks() {
|
||||
let elapsedTimeMs;
|
||||
|
||||
await syncMutexService.doExclusively(() => {
|
||||
const startTimeMs = Date.now();
|
||||
|
||||
this.runDbDiagnostics();
|
||||
|
||||
this.runAllChecksAndFixers();
|
||||
|
||||
elapsedTimeMs = Date.now() - startTimeMs;
|
||||
});
|
||||
|
||||
if (this.unrecoveredConsistencyErrors) {
|
||||
getLog().info(`Consistency checks failed (took ${elapsedTimeMs}ms)`);
|
||||
|
||||
ws.sendMessageToAllClients({ type: "consistency-checks-failed" });
|
||||
} else {
|
||||
getLog().info(`All consistency checks passed ${ this.fixedIssues ? "after some fixes" : "with no errors detected" } (took ${elapsedTimeMs}ms)`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function getBlankContent(isProtected: boolean, type: string, mime: string) {
|
||||
if (isProtected) {
|
||||
return null; // this is wrong for protected non-erased notes, but we cannot create a valid value without a password
|
||||
}
|
||||
|
||||
if (mime === "application/json") {
|
||||
return "{}";
|
||||
}
|
||||
|
||||
return ""; // empty string might be a wrong choice for some note types, but it's the best guess
|
||||
}
|
||||
|
||||
function logFix(message: string) {
|
||||
getLog().info(`Consistency issue fixed: ${message}`);
|
||||
}
|
||||
|
||||
function logError(message: string) {
|
||||
getLog().info(`Consistency error: ${message}`);
|
||||
}
|
||||
|
||||
function runPeriodicChecks() {
|
||||
const autoFix = optionsService.getOptionBool("autoFixConsistencyIssues");
|
||||
|
||||
const consistencyChecks = new ConsistencyChecks(autoFix);
|
||||
consistencyChecks.runChecks();
|
||||
}
|
||||
|
||||
async function runOnDemandChecks(autoFix: boolean) {
|
||||
const consistencyChecks = new ConsistencyChecks(autoFix);
|
||||
await consistencyChecks.runChecks();
|
||||
}
|
||||
|
||||
function runEntityChangesChecks() {
|
||||
const consistencyChecks = new ConsistencyChecks(true);
|
||||
consistencyChecks.findEntityChangeIssues();
|
||||
}
|
||||
|
||||
function startConsistencyChecks() {
|
||||
sqlInit.dbReady.then(() => {
|
||||
setInterval(cls.wrap(runPeriodicChecks), 60 * 60 * 1000);
|
||||
|
||||
// kickoff checks soon after startup (to not block the initial load)
|
||||
setTimeout(cls.wrap(runPeriodicChecks), 4 * 1000);
|
||||
});
|
||||
}
|
||||
|
||||
export default {
|
||||
runOnDemandChecks,
|
||||
runEntityChangesChecks,
|
||||
startConsistencyChecks
|
||||
};
|
||||
90
packages/trilium-core/src/services/content_hash.ts
Normal file
90
packages/trilium-core/src/services/content_hash.ts
Normal file
@@ -0,0 +1,90 @@
|
||||
import eraseService from "./erase.js";
|
||||
import { getLog } from "./log.js";
|
||||
import { getSql } from "./sql/index.js";
|
||||
import { hash } from "./utils/index.js";
|
||||
|
||||
type SectorHash = Record<string, string>;
|
||||
|
||||
interface FailedCheck {
|
||||
entityName: string;
|
||||
sector: string[1];
|
||||
}
|
||||
|
||||
function getEntityHashes() {
|
||||
// blob erasure is not synced, we should check before each sync if there's some blob to erase
|
||||
eraseService.eraseUnusedBlobs();
|
||||
|
||||
const startTime = new Date();
|
||||
|
||||
// we know this is slow and the total content hash calculation time is logged
|
||||
type HashRow = [string, string, string, boolean];
|
||||
const sql = getSql();
|
||||
const hashRows = sql.disableSlowQueryLogging(() =>
|
||||
sql.getRawRows<HashRow>(`
|
||||
SELECT entityName,
|
||||
entityId,
|
||||
hash,
|
||||
isErased
|
||||
FROM entity_changes
|
||||
WHERE isSynced = 1
|
||||
AND entityName != 'note_reordering'`)
|
||||
);
|
||||
|
||||
// sorting is faster in memory
|
||||
// sorting by entityId is enough, hashes will be segmented by entityName later on anyway
|
||||
hashRows.sort((a, b) => (a[1] < b[1] ? -1 : 1));
|
||||
|
||||
const hashMap: Record<string, SectorHash> = {};
|
||||
|
||||
for (const [entityName, entityId, hash, isErased] of hashRows) {
|
||||
const entityHashMap = (hashMap[entityName] = hashMap[entityName] || {});
|
||||
|
||||
const sector = entityId[0];
|
||||
|
||||
// if the entity is erased, its hash is not updated, so it has to be added extra
|
||||
entityHashMap[sector] = (entityHashMap[sector] || "") + hash + isErased;
|
||||
}
|
||||
|
||||
for (const entityHashMap of Object.values(hashMap)) {
|
||||
for (const key in entityHashMap) {
|
||||
entityHashMap[key] = hash(entityHashMap[key]);
|
||||
}
|
||||
}
|
||||
|
||||
const elapsedTimeMs = Date.now() - startTime.getTime();
|
||||
|
||||
getLog().info(`Content hash computation took ${elapsedTimeMs}ms`);
|
||||
|
||||
return hashMap;
|
||||
}
|
||||
|
||||
function checkContentHashes(otherHashes: Record<string, SectorHash>) {
|
||||
const entityHashes = getEntityHashes();
|
||||
const failedChecks: FailedCheck[] = [];
|
||||
|
||||
for (const entityName in entityHashes) {
|
||||
const thisSectorHashes: SectorHash = entityHashes[entityName] || {};
|
||||
const otherSectorHashes: SectorHash = otherHashes[entityName] || {};
|
||||
|
||||
const sectors = new Set(Object.keys(thisSectorHashes).concat(Object.keys(otherSectorHashes)));
|
||||
|
||||
for (const sector of sectors) {
|
||||
if (thisSectorHashes[sector] !== otherSectorHashes[sector]) {
|
||||
getLog().info(`Content hash check for ${entityName} sector ${sector} FAILED. Local is ${thisSectorHashes[sector]}, remote is ${otherSectorHashes[sector]}`);
|
||||
|
||||
failedChecks.push({ entityName, sector });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failedChecks.length === 0) {
|
||||
getLog().info("Content hash checks PASSED");
|
||||
}
|
||||
|
||||
return failedChecks;
|
||||
}
|
||||
|
||||
export default {
|
||||
getEntityHashes,
|
||||
checkContentHashes
|
||||
};
|
||||
@@ -69,3 +69,11 @@ export function putEntityChange(entityChange: EntityChange) {
|
||||
|
||||
getContext().set("entityChangeIds", entityChangeIds);
|
||||
}
|
||||
|
||||
export function getAndClearEntityChangeIds() {
|
||||
const entityChangeIds = getContext().get("entityChangeIds") || [];
|
||||
|
||||
getContext().set("entityChangeIds", []);
|
||||
|
||||
return entityChangeIds;
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@ export interface CryptoProvider {
|
||||
randomString(length: number): string;
|
||||
createCipheriv(algorithm: "aes-128-cbc", key: Uint8Array, iv: Uint8Array): Cipher;
|
||||
createDecipheriv(algorithm: "aes-128-cbc", key: Uint8Array, iv: Uint8Array): Cipher;
|
||||
hmac(secret: string | Uint8Array, value: string | Uint8Array): string;
|
||||
|
||||
}
|
||||
|
||||
let crypto: CryptoProvider | null = null;
|
||||
|
||||
@@ -25,6 +25,13 @@ export interface MessageClient {
|
||||
* - Worker postMessage for browser environments
|
||||
* - Mock implementations for testing
|
||||
*/
|
||||
/**
|
||||
* Handler for incoming client messages. Receives the client ID and the raw parsed message.
|
||||
* The message is typed as `any` because clients may send message types (e.g. "log-error",
|
||||
* "log-info") that aren't part of the server's WebSocketMessage union.
|
||||
*/
|
||||
export type ClientMessageHandler = (clientId: string, message: any) => void | Promise<void>;
|
||||
|
||||
export interface MessagingProvider {
|
||||
/**
|
||||
* Send a message to all connected clients.
|
||||
@@ -36,7 +43,12 @@ export interface MessagingProvider {
|
||||
* Send a message to a specific client by ID.
|
||||
* Returns false if the client is not found or disconnected.
|
||||
*/
|
||||
sendMessageToClient?(clientId: string, message: WebSocketMessage): boolean;
|
||||
sendMessageToClient(clientId: string, message: WebSocketMessage): boolean;
|
||||
|
||||
/**
|
||||
* Register a handler for incoming client messages.
|
||||
*/
|
||||
setClientMessageHandler(handler: ClientMessageHandler): void;
|
||||
|
||||
/**
|
||||
* Subscribe to incoming messages from clients.
|
||||
|
||||
@@ -1,5 +1,51 @@
|
||||
export default {
|
||||
getImage(url: string) {
|
||||
console.warn("Image download ignored ", url);
|
||||
}
|
||||
export interface CookieJar {
|
||||
header?: string;
|
||||
}
|
||||
|
||||
export interface ExecOpts {
|
||||
proxy: string | null;
|
||||
method: string;
|
||||
url: string;
|
||||
paging?: {
|
||||
pageCount: number;
|
||||
pageIndex: number;
|
||||
requestId: string;
|
||||
};
|
||||
cookieJar?: CookieJar;
|
||||
auth?: {
|
||||
password?: string;
|
||||
};
|
||||
timeout: number;
|
||||
body?: string | {};
|
||||
}
|
||||
|
||||
export interface RequestProvider {
|
||||
exec<T>(opts: ExecOpts): Promise<T>;
|
||||
getImage(imageUrl: string): Promise<ArrayBuffer>;
|
||||
}
|
||||
|
||||
let requestProvider: RequestProvider | null = null;
|
||||
|
||||
export function initRequest(provider: RequestProvider): void {
|
||||
requestProvider = provider;
|
||||
}
|
||||
|
||||
export function getRequestProvider(): RequestProvider {
|
||||
if (!requestProvider) {
|
||||
throw new Error("Request provider not initialized. Call initRequest() first.");
|
||||
}
|
||||
return requestProvider;
|
||||
}
|
||||
|
||||
export function isRequestInitialized(): boolean {
|
||||
return requestProvider !== null;
|
||||
}
|
||||
|
||||
export default {
|
||||
exec<T>(opts: ExecOpts): Promise<T> {
|
||||
return getRequestProvider().exec(opts);
|
||||
},
|
||||
getImage(imageUrl: string): Promise<ArrayBuffer> {
|
||||
return getRequestProvider().getImage(imageUrl);
|
||||
}
|
||||
};
|
||||
|
||||
121
packages/trilium-core/src/services/setup.ts
Normal file
121
packages/trilium-core/src/services/setup.ts
Normal file
@@ -0,0 +1,121 @@
|
||||
import syncService from "./sync.js";
|
||||
import { getLog } from "./log.js";
|
||||
import sqlInit from "./sql_init.js";
|
||||
import optionService from "./options.js";
|
||||
import syncOptions from "./sync_options.js";
|
||||
import appInfo from "./app_info.js";
|
||||
import { timeLimit } from "./utils/index.js";
|
||||
import becca from "../becca/becca.js";
|
||||
import type { SetupStatusResponse, SetupSyncSeedResponse } from "@triliumnext/commons";
|
||||
import request from "./request.js";
|
||||
|
||||
async function hasSyncServerSchemaAndSeed() {
|
||||
const response = await requestToSyncServer<SetupStatusResponse>("GET", "/api/setup/status");
|
||||
|
||||
if (response.syncVersion !== appInfo.syncVersion) {
|
||||
throw new Error(
|
||||
`Could not setup sync since local sync protocol version is ${appInfo.syncVersion} while remote is ${response.syncVersion}. To fix this issue, use same Trilium version on all instances.`
|
||||
);
|
||||
}
|
||||
|
||||
return response.schemaExists;
|
||||
}
|
||||
|
||||
function triggerSync() {
|
||||
getLog().info("Triggering sync.");
|
||||
|
||||
// it's ok to not wait for it here
|
||||
syncService.sync().then((res) => {
|
||||
if (res.success) {
|
||||
sqlInit.setDbAsInitialized();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function sendSeedToSyncServer() {
|
||||
getLog().info("Initiating sync to server");
|
||||
|
||||
await requestToSyncServer<void>("POST", "/api/setup/sync-seed", {
|
||||
options: getSyncSeedOptions(),
|
||||
syncVersion: appInfo.syncVersion
|
||||
});
|
||||
|
||||
// this is a completely new sync, need to reset counters. If this was not a new sync,
|
||||
// the previous request would have failed.
|
||||
optionService.setOption("lastSyncedPush", 0);
|
||||
optionService.setOption("lastSyncedPull", 0);
|
||||
}
|
||||
|
||||
async function requestToSyncServer<T>(method: string, path: string, body?: string | {}): Promise<T> {
|
||||
const timeout = syncOptions.getSyncTimeout();
|
||||
|
||||
return (await timeLimit(
|
||||
request.exec({
|
||||
method,
|
||||
url: syncOptions.getSyncServerHost() + path,
|
||||
body,
|
||||
proxy: syncOptions.getSyncProxy(),
|
||||
timeout: timeout
|
||||
}),
|
||||
timeout
|
||||
)) as T;
|
||||
}
|
||||
|
||||
async function setupSyncFromSyncServer(syncServerHost: string, syncProxy: string, password: string) {
|
||||
if (sqlInit.isDbInitialized()) {
|
||||
return {
|
||||
result: "failure",
|
||||
error: "DB is already initialized."
|
||||
};
|
||||
}
|
||||
|
||||
const log = getLog();
|
||||
try {
|
||||
log.info("Getting document options FROM sync server.");
|
||||
|
||||
// the response is expected to contain documentId and documentSecret options
|
||||
const resp = await request.exec<SetupSyncSeedResponse>({
|
||||
method: "get",
|
||||
url: `${syncServerHost}/api/setup/sync-seed`,
|
||||
auth: { password },
|
||||
proxy: syncProxy,
|
||||
timeout: 30000 // seed request should not take long
|
||||
});
|
||||
|
||||
if (resp.syncVersion !== appInfo.syncVersion) {
|
||||
const message = `Could not setup sync since local sync protocol version is ${appInfo.syncVersion} while remote is ${resp.syncVersion}. To fix this issue, use same Trilium version on all instances.`;
|
||||
|
||||
log.error(message);
|
||||
|
||||
return {
|
||||
result: "failure",
|
||||
error: message
|
||||
};
|
||||
}
|
||||
|
||||
await sqlInit.createDatabaseForSync(resp.options, syncServerHost, syncProxy);
|
||||
|
||||
triggerSync();
|
||||
|
||||
return { result: "success" };
|
||||
} catch (e: any) {
|
||||
log.error(`Sync failed: '${e.message}', stack: ${e.stack}`);
|
||||
|
||||
return {
|
||||
result: "failure",
|
||||
error: e.message
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function getSyncSeedOptions() {
|
||||
return [becca.getOption("documentId"), becca.getOption("documentSecret")];
|
||||
}
|
||||
|
||||
export default {
|
||||
hasSyncServerSchemaAndSeed,
|
||||
triggerSync,
|
||||
sendSeedToSyncServer,
|
||||
setupSyncFromSyncServer,
|
||||
getSyncSeedOptions
|
||||
};
|
||||
@@ -11,4 +11,20 @@ function isDbInitialized() {
|
||||
return true;
|
||||
}
|
||||
|
||||
export default { isDbInitialized };
|
||||
async function createDatabaseForSync(a: any, b: string, c: any) {
|
||||
console.error("createDatabaseForSync is not implemented yet");
|
||||
}
|
||||
|
||||
function setDbAsInitialized() {
|
||||
// Noop.
|
||||
}
|
||||
|
||||
function schemaExists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
function getDbSize() {
|
||||
return 1000;
|
||||
}
|
||||
|
||||
export default { isDbInitialized, createDatabaseForSync, setDbAsInitialized, schemaExists, getDbSize, dbReady };
|
||||
|
||||
472
packages/trilium-core/src/services/sync.ts
Normal file
472
packages/trilium-core/src/services/sync.ts
Normal file
@@ -0,0 +1,472 @@
|
||||
import type { EntityChange, EntityChangeRecord, EntityRow } from "@triliumnext/commons";
|
||||
|
||||
import becca from "../becca/becca.js";
|
||||
import appInfo from "./app_info.js";
|
||||
import * as cls from "./context.js";
|
||||
import consistency_checks from "./consistency_checks.js";
|
||||
import contentHashService from "./content_hash.js";
|
||||
import dateUtils from "./utils/date.js";
|
||||
import entityChangesService from "./entity_changes.js";
|
||||
import { getLog } from "./log.js";
|
||||
import optionService from "./options.js";
|
||||
import setupService from "./setup.js";
|
||||
import { getSql } from "./sql/index.js";
|
||||
import syncMutexService from "./sync_mutex.js";
|
||||
import syncOptions from "./sync_options.js";
|
||||
import syncUpdateService from "./sync_update.js";
|
||||
import { randomString, timeLimit } from "./utils/index.js";
|
||||
import ws from "./ws.js";
|
||||
import getInstanceId from "./instance_id.js";
|
||||
import request, { CookieJar, ExecOpts } from "./request.js";
|
||||
import entity_constructor from "../../src/becca/entity_constructor.js";
|
||||
import becca_loader from "../becca/becca_loader.js";
|
||||
import * as binary_utils from "./utils/binary.js";
|
||||
import { getCrypto } from "./encryption/crypto.js";
|
||||
|
||||
let proxyToggle = true;
|
||||
|
||||
let outstandingPullCount = 0;
|
||||
|
||||
interface CheckResponse {
|
||||
maxEntityChangeId: number;
|
||||
entityHashes: Record<string, Record<string, string>>;
|
||||
}
|
||||
|
||||
interface SyncResponse {
|
||||
instanceId: string;
|
||||
maxEntityChangeId: number;
|
||||
}
|
||||
|
||||
interface ChangesResponse {
|
||||
entityChanges: EntityChangeRecord[];
|
||||
lastEntityChangeId: number;
|
||||
outstandingPullCount: number;
|
||||
}
|
||||
|
||||
interface SyncContext {
|
||||
cookieJar: CookieJar;
|
||||
instanceId?: string;
|
||||
}
|
||||
|
||||
async function sync() {
|
||||
try {
|
||||
return await syncMutexService.doExclusively(async () => {
|
||||
if (!syncOptions.isSyncSetup()) {
|
||||
return { success: false, errorCode: "NOT_CONFIGURED", message: "Sync not configured" };
|
||||
}
|
||||
|
||||
let continueSync = false;
|
||||
|
||||
do {
|
||||
const syncContext = await login();
|
||||
|
||||
await pushChanges(syncContext);
|
||||
|
||||
await pullChanges(syncContext);
|
||||
|
||||
await pushChanges(syncContext);
|
||||
|
||||
await syncFinished(syncContext);
|
||||
|
||||
continueSync = await checkContentHash(syncContext);
|
||||
} while (continueSync);
|
||||
|
||||
ws.syncFinished();
|
||||
|
||||
return {
|
||||
success: true
|
||||
};
|
||||
});
|
||||
} catch (e: any) {
|
||||
// we're dynamically switching whether we're using proxy or not based on whether we encountered error with the current method
|
||||
proxyToggle = !proxyToggle;
|
||||
|
||||
const log = getLog();
|
||||
if (
|
||||
e.message?.includes("ECONNREFUSED") ||
|
||||
e.message?.includes("ERR_") || // node network errors
|
||||
e.message?.includes("Bad Gateway")
|
||||
) {
|
||||
ws.syncFailed();
|
||||
|
||||
log.info("No connection to sync server.");
|
||||
|
||||
return {
|
||||
success: false,
|
||||
message: "No connection to sync server."
|
||||
};
|
||||
}
|
||||
log.info(`Sync failed: '${e.message}', stack: ${e.stack}`);
|
||||
|
||||
ws.syncFailed();
|
||||
|
||||
return {
|
||||
success: false,
|
||||
message: e.message
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
async function login() {
|
||||
if (!(await setupService.hasSyncServerSchemaAndSeed())) {
|
||||
await setupService.sendSeedToSyncServer();
|
||||
}
|
||||
|
||||
return await doLogin();
|
||||
}
|
||||
|
||||
async function doLogin(): Promise<SyncContext> {
|
||||
const timestamp = dateUtils.utcNowDateTime();
|
||||
|
||||
const documentSecret = optionService.getOption("documentSecret");
|
||||
const hash = getCrypto().hmac(documentSecret, timestamp);
|
||||
|
||||
const syncContext: SyncContext = { cookieJar: {} };
|
||||
const resp = await syncRequest<SyncResponse>(syncContext, "POST", "/api/login/sync", {
|
||||
timestamp,
|
||||
syncVersion: appInfo.syncVersion,
|
||||
hash
|
||||
});
|
||||
|
||||
if (!resp) {
|
||||
throw new Error("Got no response.");
|
||||
}
|
||||
|
||||
if (resp.instanceId === getInstanceId()) {
|
||||
throw new Error(
|
||||
`Sync server has instance ID '${resp.instanceId}' which is also local. This usually happens when the sync client is (mis)configured to sync with itself (URL points back to client) instead of the correct sync server.`
|
||||
);
|
||||
}
|
||||
|
||||
syncContext.instanceId = resp.instanceId;
|
||||
|
||||
const lastSyncedPull = getLastSyncedPull();
|
||||
|
||||
// this is important in a scenario where we set up the sync by manually copying the document
|
||||
// lastSyncedPull then could be pretty off for the newly cloned client
|
||||
if (lastSyncedPull > resp.maxEntityChangeId) {
|
||||
getLog().info(`Lowering last synced pull from ${lastSyncedPull} to ${resp.maxEntityChangeId}`);
|
||||
|
||||
setLastSyncedPull(resp.maxEntityChangeId);
|
||||
}
|
||||
|
||||
return syncContext;
|
||||
}
|
||||
|
||||
async function pullChanges(syncContext: SyncContext) {
|
||||
const log = getLog();
|
||||
|
||||
while (true) {
|
||||
const lastSyncedPull = getLastSyncedPull();
|
||||
const logMarkerId = randomString(10); // to easily pair sync events between client and server logs
|
||||
const changesUri = `/api/sync/changed?instanceId=${getInstanceId()}&lastEntityChangeId=${lastSyncedPull}&logMarkerId=${logMarkerId}`;
|
||||
|
||||
const startDate = Date.now();
|
||||
|
||||
const resp = await syncRequest<ChangesResponse>(syncContext, "GET", changesUri);
|
||||
if (!resp) {
|
||||
throw new Error("Request failed.");
|
||||
}
|
||||
const { entityChanges, lastEntityChangeId } = resp;
|
||||
|
||||
outstandingPullCount = resp.outstandingPullCount;
|
||||
|
||||
const pulledDate = Date.now();
|
||||
|
||||
getSql().transactional(() => {
|
||||
if (syncContext.instanceId) {
|
||||
syncUpdateService.updateEntities(entityChanges, syncContext.instanceId);
|
||||
}
|
||||
|
||||
if (lastSyncedPull !== lastEntityChangeId) {
|
||||
setLastSyncedPull(lastEntityChangeId);
|
||||
}
|
||||
});
|
||||
|
||||
if (entityChanges.length === 0) {
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
// https://github.com/zadam/trilium/issues/4310
|
||||
const sizeInKb = Math.round(JSON.stringify(resp).length / 1024);
|
||||
|
||||
log.info(
|
||||
`Sync ${logMarkerId}: Pulled ${entityChanges.length} changes in ${sizeInKb} KB, starting at entityChangeId=${lastSyncedPull} in ${pulledDate - startDate}ms and applied them in ${Date.now() - pulledDate}ms, ${outstandingPullCount} outstanding pulls`
|
||||
);
|
||||
} catch (e: any) {
|
||||
log.error(`Error occurred ${e.message} ${e.stack}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Finished pull");
|
||||
}
|
||||
|
||||
async function pushChanges(syncContext: SyncContext) {
|
||||
let lastSyncedPush: number | null | undefined = getLastSyncedPush();
|
||||
|
||||
while (true) {
|
||||
const entityChanges = getSql().getRows<EntityChange>("SELECT * FROM entity_changes WHERE isSynced = 1 AND id > ? LIMIT 1000", [lastSyncedPush]);
|
||||
|
||||
if (entityChanges.length === 0) {
|
||||
getLog().info("Nothing to push");
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
const filteredEntityChanges = entityChanges.filter((entityChange) => {
|
||||
if (entityChange.instanceId === syncContext.instanceId) {
|
||||
// this may set lastSyncedPush beyond what's actually sent (because of size limit)
|
||||
// so this is applied to the database only if there's no actual update
|
||||
lastSyncedPush = entityChange.id;
|
||||
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
||||
});
|
||||
|
||||
if (filteredEntityChanges.length === 0 && lastSyncedPush) {
|
||||
// there still might be more sync changes (because of batch limit), just all the current batch
|
||||
// has been filtered out
|
||||
setLastSyncedPush(lastSyncedPush);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
const entityChangesRecords = getEntityChangeRecords(filteredEntityChanges);
|
||||
const startDate = new Date();
|
||||
|
||||
const logMarkerId = randomString(10); // to easily pair sync events between client and server logs
|
||||
|
||||
await syncRequest(syncContext, "PUT", `/api/sync/update?logMarkerId=${logMarkerId}`, {
|
||||
entities: entityChangesRecords,
|
||||
instanceId: getInstanceId()
|
||||
});
|
||||
|
||||
ws.syncPushInProgress();
|
||||
|
||||
getLog().info(`Sync ${logMarkerId}: Pushing ${entityChangesRecords.length} sync changes in ${Date.now() - startDate.getTime()}ms`);
|
||||
|
||||
lastSyncedPush = entityChangesRecords[entityChangesRecords.length - 1].entityChange.id;
|
||||
|
||||
if (lastSyncedPush) {
|
||||
setLastSyncedPush(lastSyncedPush);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function syncFinished(syncContext: SyncContext) {
|
||||
await syncRequest(syncContext, "POST", "/api/sync/finished");
|
||||
}
|
||||
|
||||
async function checkContentHash(syncContext: SyncContext) {
|
||||
const resp = await syncRequest<CheckResponse>(syncContext, "GET", "/api/sync/check");
|
||||
if (!resp) {
|
||||
throw new Error("Got no response.");
|
||||
}
|
||||
|
||||
const lastSyncedPullId = getLastSyncedPull();
|
||||
const log = getLog();
|
||||
|
||||
if (lastSyncedPullId < resp.maxEntityChangeId) {
|
||||
log.info(`There are some outstanding pulls (${lastSyncedPullId} vs. ${resp.maxEntityChangeId}), skipping content check.`);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const notPushedSyncs = getSql().getValue("SELECT EXISTS(SELECT 1 FROM entity_changes WHERE isSynced = 1 AND id > ?)", [getLastSyncedPush()]);
|
||||
|
||||
if (notPushedSyncs) {
|
||||
log.info(`There's ${notPushedSyncs} outstanding pushes, skipping content check.`);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const failedChecks = contentHashService.checkContentHashes(resp.entityHashes);
|
||||
|
||||
if (failedChecks.length > 0) {
|
||||
// before re-queuing sectors, make sure the entity changes are correct
|
||||
consistency_checks.runEntityChangesChecks();
|
||||
|
||||
await syncRequest(syncContext, "POST", `/api/sync/check-entity-changes`);
|
||||
}
|
||||
|
||||
for (const { entityName, sector } of failedChecks) {
|
||||
entityChangesService.addEntityChangesForSector(entityName, sector);
|
||||
|
||||
await syncRequest(syncContext, "POST", `/api/sync/queue-sector/${entityName}/${sector}`);
|
||||
}
|
||||
|
||||
return failedChecks.length > 0;
|
||||
}
|
||||
|
||||
const PAGE_SIZE = 1000000;
|
||||
|
||||
interface SyncContext {
|
||||
cookieJar: CookieJar;
|
||||
}
|
||||
|
||||
async function syncRequest<T extends {}>(syncContext: SyncContext, method: string, requestPath: string, _body?: {}) {
|
||||
const body = _body ? JSON.stringify(_body) : "";
|
||||
|
||||
const timeout = syncOptions.getSyncTimeout();
|
||||
|
||||
let response;
|
||||
|
||||
const requestId = randomString(10);
|
||||
const pageCount = Math.max(1, Math.ceil(body.length / PAGE_SIZE));
|
||||
|
||||
for (let pageIndex = 0; pageIndex < pageCount; pageIndex++) {
|
||||
const opts: ExecOpts = {
|
||||
method,
|
||||
url: syncOptions.getSyncServerHost() + requestPath,
|
||||
cookieJar: syncContext.cookieJar,
|
||||
timeout,
|
||||
paging: {
|
||||
pageIndex,
|
||||
pageCount,
|
||||
requestId
|
||||
},
|
||||
body: body.substr(pageIndex * PAGE_SIZE, Math.min(PAGE_SIZE, body.length - pageIndex * PAGE_SIZE)),
|
||||
proxy: proxyToggle ? syncOptions.getSyncProxy() : null
|
||||
};
|
||||
|
||||
response = (await timeLimit(request.exec(opts), timeout)) as T;
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
function getEntityChangeRow(entityChange: EntityChange) {
|
||||
const { entityName, entityId } = entityChange;
|
||||
|
||||
if (entityName === "note_reordering") {
|
||||
return getSql().getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [entityId]);
|
||||
}
|
||||
const primaryKey = entity_constructor.getEntityFromEntityName(entityName).primaryKeyName;
|
||||
|
||||
if (!primaryKey) {
|
||||
throw new Error(`Unknown entity for entity change ${JSON.stringify(entityChange)}`);
|
||||
}
|
||||
|
||||
const entityRow = getSql().getRow<EntityRow>(/*sql*/`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]);
|
||||
|
||||
if (!entityRow) {
|
||||
getLog().error(`Cannot find entity for entity change ${JSON.stringify(entityChange)}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (entityName === "blobs" && entityRow.content !== null) {
|
||||
if (typeof entityRow.content === "string") {
|
||||
entityRow.content = Buffer.from(entityRow.content, "utf-8");
|
||||
}
|
||||
|
||||
if (entityRow.content) {
|
||||
entityRow.content = binary_utils.encodeBase64(entityRow.content);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return entityRow;
|
||||
|
||||
}
|
||||
|
||||
function getEntityChangeRecords(entityChanges: EntityChange[]) {
|
||||
const records: EntityChangeRecord[] = [];
|
||||
let length = 0;
|
||||
|
||||
for (const entityChange of entityChanges) {
|
||||
if (entityChange.isErased) {
|
||||
records.push({ entityChange });
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
const entity = getEntityChangeRow(entityChange);
|
||||
if (!entity) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const record: EntityChangeRecord = { entityChange, entity };
|
||||
|
||||
records.push(record);
|
||||
|
||||
length += JSON.stringify(record).length;
|
||||
|
||||
if (length > 1_000_000) {
|
||||
// each sync request/response should have at most ~1 MB.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return records;
|
||||
}
|
||||
|
||||
function getLastSyncedPull() {
|
||||
return parseInt(optionService.getOption("lastSyncedPull"));
|
||||
}
|
||||
|
||||
function setLastSyncedPull(entityChangeId: number) {
|
||||
const lastSyncedPullOption = becca.getOption("lastSyncedPull");
|
||||
|
||||
if (lastSyncedPullOption) {
|
||||
// might be null in initial sync when becca is not loaded
|
||||
lastSyncedPullOption.value = `${entityChangeId}`;
|
||||
}
|
||||
|
||||
// this way we avoid updating entity_changes which otherwise means that we've never pushed all entity_changes
|
||||
getSql().execute("UPDATE options SET value = ? WHERE name = ?", [entityChangeId, "lastSyncedPull"]);
|
||||
}
|
||||
|
||||
function getLastSyncedPush() {
|
||||
const lastSyncedPush = parseInt(optionService.getOption("lastSyncedPush"));
|
||||
|
||||
ws.setLastSyncedPush(lastSyncedPush);
|
||||
|
||||
return lastSyncedPush;
|
||||
}
|
||||
|
||||
function setLastSyncedPush(entityChangeId: number) {
|
||||
ws.setLastSyncedPush(entityChangeId);
|
||||
|
||||
const lastSyncedPushOption = becca.getOption("lastSyncedPush");
|
||||
|
||||
if (lastSyncedPushOption) {
|
||||
// might be null in initial sync when becca is not loaded
|
||||
lastSyncedPushOption.value = `${entityChangeId}`;
|
||||
}
|
||||
|
||||
// this way we avoid updating entity_changes which otherwise means that we've never pushed all entity_changes
|
||||
getSql().execute("UPDATE options SET value = ? WHERE name = ?", [entityChangeId, "lastSyncedPush"]);
|
||||
}
|
||||
|
||||
function getMaxEntityChangeId() {
|
||||
return getSql().getValue("SELECT COALESCE(MAX(id), 0) FROM entity_changes");
|
||||
}
|
||||
|
||||
function getOutstandingPullCount() {
|
||||
return outstandingPullCount;
|
||||
}
|
||||
|
||||
function startSyncTimer() {
|
||||
becca_loader.beccaLoaded.then(() => {
|
||||
setInterval(cls.wrap(sync), 60000);
|
||||
|
||||
// kickoff initial sync immediately, but should happen after initial consistency checks
|
||||
setTimeout(cls.wrap(sync), 5000);
|
||||
|
||||
// called just so ws.setLastSyncedPush() is called
|
||||
getLastSyncedPush();
|
||||
});
|
||||
}
|
||||
|
||||
export default {
|
||||
sync,
|
||||
login,
|
||||
getEntityChangeRecords,
|
||||
getOutstandingPullCount,
|
||||
getMaxEntityChangeId,
|
||||
startSyncTimer
|
||||
};
|
||||
34
packages/trilium-core/src/services/sync_options.ts
Normal file
34
packages/trilium-core/src/services/sync_options.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
"use strict";
|
||||
|
||||
import optionService from "./options.js";
|
||||
import config from "./config.js";
|
||||
import { normalizeUrl } from "./utils/index.js";
|
||||
|
||||
/*
|
||||
* Primary configuration for sync is in the options (document), but we allow to override
|
||||
* these settings in config file. The reason for that is to avoid a mistake of loading a live/production
|
||||
* document with live sync settings in a dev/debug environment. Changes would then successfully propagate
|
||||
* to live sync server.
|
||||
*/
|
||||
|
||||
function get(name: keyof typeof config.Sync) {
|
||||
return (config["Sync"] && config["Sync"][name]) || optionService.getOption(name);
|
||||
}
|
||||
|
||||
export default {
|
||||
// env variable is the easiest way to guarantee we won't overwrite prod data during development
|
||||
// after copying prod document/data directory
|
||||
getSyncServerHost: () => {
|
||||
const host = get("syncServerHost");
|
||||
return host ? normalizeUrl(host) : host;
|
||||
},
|
||||
isSyncSetup: () => {
|
||||
const syncServerHost = get("syncServerHost");
|
||||
|
||||
// special value "disabled" is here to support a use case where the document is configured with sync server,
|
||||
// and we need to override it with config from config.ini
|
||||
return !!syncServerHost && syncServerHost !== "disabled";
|
||||
},
|
||||
getSyncTimeout: () => parseInt(get("syncServerTimeout")) || 120000,
|
||||
getSyncProxy: () => get("syncProxy")
|
||||
};
|
||||
171
packages/trilium-core/src/services/sync_update.ts
Normal file
171
packages/trilium-core/src/services/sync_update.ts
Normal file
@@ -0,0 +1,171 @@
|
||||
import type { EntityChange, EntityChangeRecord, EntityRow } from "@triliumnext/commons";
|
||||
|
||||
import entityChangesService from "./entity_changes.js";
|
||||
import { getLog } from "./log.js";
|
||||
import { getSql } from "./sql/index.js";
|
||||
import ws from "./ws.js";
|
||||
import { default as eventService } from "./events.js";
|
||||
import entity_constructor from "../becca/entity_constructor.js";
|
||||
|
||||
interface UpdateContext {
|
||||
alreadyErased: number;
|
||||
erased: number;
|
||||
updated: Record<string, string[]>;
|
||||
}
|
||||
|
||||
function updateEntities(entityChanges: EntityChangeRecord[], instanceId: string) {
|
||||
if (entityChanges.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
let atLeastOnePullApplied = false;
|
||||
const updateContext = {
|
||||
updated: {},
|
||||
alreadyUpdated: 0,
|
||||
erased: 0,
|
||||
alreadyErased: 0
|
||||
};
|
||||
|
||||
for (const { entityChange, entity } of entityChanges) {
|
||||
const changeAppliedAlready = entityChange.changeId && !!getSql().getValue("SELECT 1 FROM entity_changes WHERE changeId = ?", [entityChange.changeId]);
|
||||
|
||||
if (changeAppliedAlready) {
|
||||
updateContext.alreadyUpdated++;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!atLeastOnePullApplied) {
|
||||
// avoid spamming and send only for first
|
||||
ws.syncPullInProgress();
|
||||
|
||||
atLeastOnePullApplied = true;
|
||||
}
|
||||
|
||||
updateEntity(entityChange, entity, instanceId, updateContext);
|
||||
}
|
||||
|
||||
logUpdateContext(updateContext);
|
||||
}
|
||||
|
||||
function updateEntity(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string, updateContext: UpdateContext) {
|
||||
if (!remoteEntityRow && remoteEC.entityName === "options") {
|
||||
return; // can be undefined for options with isSynced=false
|
||||
}
|
||||
|
||||
const updated = remoteEC.entityName === "note_reordering"
|
||||
? updateNoteReordering(remoteEC, remoteEntityRow, instanceId)
|
||||
: updateNormalEntity(remoteEC, remoteEntityRow, instanceId, updateContext);
|
||||
|
||||
if (updated) {
|
||||
if (remoteEntityRow?.isDeleted) {
|
||||
eventService.emit(eventService.ENTITY_DELETE_SYNCED, {
|
||||
entityName: remoteEC.entityName,
|
||||
entityId: remoteEC.entityId
|
||||
});
|
||||
} else if (!remoteEC.isErased) {
|
||||
eventService.emit(eventService.ENTITY_CHANGE_SYNCED, {
|
||||
entityName: remoteEC.entityName,
|
||||
entityRow: remoteEntityRow
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function updateNormalEntity(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string, updateContext: UpdateContext) {
|
||||
const localEC = getSql().getRow<EntityChange | undefined>(/*sql*/`SELECT * FROM entity_changes WHERE entityName = ? AND entityId = ?`, [remoteEC.entityName, remoteEC.entityId]);
|
||||
const localECIsOlderOrSameAsRemote = localEC && localEC.utcDateChanged && remoteEC.utcDateChanged && localEC.utcDateChanged <= remoteEC.utcDateChanged;
|
||||
|
||||
if (!localEC || localECIsOlderOrSameAsRemote) {
|
||||
if (remoteEC.isErased) {
|
||||
if (localEC?.isErased) {
|
||||
eraseEntity(remoteEC); // make sure it's erased anyway
|
||||
updateContext.alreadyErased++;
|
||||
} else {
|
||||
eraseEntity(remoteEC);
|
||||
updateContext.erased++;
|
||||
}
|
||||
} else {
|
||||
if (!remoteEntityRow) {
|
||||
throw new Error(`Empty entity row for: ${JSON.stringify(remoteEC)}`);
|
||||
}
|
||||
|
||||
preProcessContent(remoteEC, remoteEntityRow);
|
||||
|
||||
getSql().replace(remoteEC.entityName, remoteEntityRow);
|
||||
|
||||
updateContext.updated[remoteEC.entityName] = updateContext.updated[remoteEC.entityName] || [];
|
||||
updateContext.updated[remoteEC.entityName].push(remoteEC.entityId);
|
||||
}
|
||||
|
||||
if (!localEC || localECIsOlderOrSameAsRemote || localEC.hash !== remoteEC.hash || localEC.isErased !== remoteEC.isErased) {
|
||||
entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId);
|
||||
}
|
||||
|
||||
return true;
|
||||
} else if ((localEC.hash !== remoteEC.hash || localEC.isErased !== remoteEC.isErased) && !localECIsOlderOrSameAsRemote) {
|
||||
// the change on our side is newer than on the other side, so the other side should update
|
||||
entityChangesService.putEntityChangeForOtherInstances(localEC);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function preProcessContent(remoteEC: EntityChange, remoteEntityRow: EntityRow) {
|
||||
if (remoteEC.entityName === "blobs" && remoteEntityRow.content !== null) {
|
||||
// we always use a Buffer object which is different from normal saving - there we use a simple string type for
|
||||
// "string notes". The problem is that in general, it's not possible to detect whether a blob content
|
||||
// is string note or note (syncs can arrive out of order)
|
||||
if (typeof remoteEntityRow.content === "string") {
|
||||
remoteEntityRow.content = Buffer.from(remoteEntityRow.content, "base64");
|
||||
|
||||
if (remoteEntityRow.content.byteLength === 0) {
|
||||
// there seems to be a bug which causes empty buffer to be stored as NULL which is then picked up as inconsistency
|
||||
// (possibly not a problem anymore with the newer better-sqlite3)
|
||||
remoteEntityRow.content = "";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function updateNoteReordering(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string) {
|
||||
if (!remoteEntityRow) {
|
||||
throw new Error(`Empty note_reordering body for: ${JSON.stringify(remoteEC)}`);
|
||||
}
|
||||
|
||||
for (const key in remoteEntityRow) {
|
||||
getSql().execute("UPDATE branches SET notePosition = ? WHERE branchId = ?", [remoteEntityRow[key as keyof EntityRow], key]);
|
||||
}
|
||||
|
||||
entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
function eraseEntity(entityChange: EntityChange) {
|
||||
const { entityName, entityId } = entityChange;
|
||||
|
||||
const entityNames = ["notes", "branches", "attributes", "revisions", "attachments", "blobs"];
|
||||
|
||||
if (!entityNames.includes(entityName)) {
|
||||
getLog().error(`Cannot erase ${entityName} '${entityId}'.`);
|
||||
return;
|
||||
}
|
||||
|
||||
const primaryKeyName = entity_constructor.getEntityFromEntityName(entityName).primaryKeyName;
|
||||
|
||||
getSql().execute(/*sql*/`DELETE FROM ${entityName} WHERE ${primaryKeyName} = ?`, [entityId]);
|
||||
}
|
||||
|
||||
function logUpdateContext(updateContext: UpdateContext) {
|
||||
const message = JSON.stringify(updateContext).replaceAll('"', "").replaceAll(":", ": ").replaceAll(",", ", ");
|
||||
|
||||
getLog().info(message.substr(1, message.length - 2));
|
||||
}
|
||||
|
||||
export default {
|
||||
updateEntities
|
||||
};
|
||||
@@ -68,6 +68,65 @@ export function normalize(str: string) {
|
||||
return removeDiacritic(str).toLowerCase();
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalizes URL by removing trailing slashes and fixing double slashes.
|
||||
* Preserves the protocol (http://, https://) but removes trailing slashes from the rest.
|
||||
*
|
||||
* @param url The URL to normalize
|
||||
* @returns The normalized URL without trailing slashes
|
||||
*/
|
||||
export function normalizeUrl(url: string | null | undefined): string | null | undefined {
|
||||
if (!url || typeof url !== 'string') {
|
||||
return url;
|
||||
}
|
||||
|
||||
// Trim whitespace
|
||||
url = url.trim();
|
||||
|
||||
if (!url) {
|
||||
return url;
|
||||
}
|
||||
|
||||
// Fix double slashes (except in protocol) first
|
||||
url = url.replace(/([^:]\/)\/+/g, '$1');
|
||||
|
||||
// Remove trailing slash, but preserve protocol
|
||||
if (url.endsWith('/') && !url.match(/^https?:\/\/$/)) {
|
||||
url = url.slice(0, -1);
|
||||
}
|
||||
|
||||
return url;
|
||||
}
|
||||
|
||||
export function timeLimit<T>(promise: Promise<T>, limitMs: number, errorMessage?: string): Promise<T> {
|
||||
// TriliumNextTODO: since TS avoids this from ever happening – do we need this check?
|
||||
if (!promise || !promise.then) {
|
||||
// it's not actually a promise
|
||||
return promise;
|
||||
}
|
||||
|
||||
// better stack trace if created outside of promise
|
||||
const errorTimeLimit = new Error(errorMessage || `Process exceeded time limit ${limitMs}`);
|
||||
|
||||
return new Promise((res, rej) => {
|
||||
let resolved = false;
|
||||
|
||||
promise
|
||||
.then((result) => {
|
||||
resolved = true;
|
||||
|
||||
res(result);
|
||||
})
|
||||
.catch((error) => rej(error));
|
||||
|
||||
setTimeout(() => {
|
||||
if (!resolved) {
|
||||
rej(errorTimeLimit);
|
||||
}
|
||||
}, limitMs);
|
||||
});
|
||||
}
|
||||
|
||||
export function sanitizeAttributeName(origName: string) {
|
||||
const fixedName = origName === "" ? "unnamed" : origName.replace(/[^\p{L}\p{N}_:]/gu, "_");
|
||||
// any not allowed character should be replaced with underscore
|
||||
@@ -75,6 +134,10 @@ export function sanitizeAttributeName(origName: string) {
|
||||
return fixedName;
|
||||
}
|
||||
|
||||
export function sanitizeSqlIdentifier(str: string) {
|
||||
return str.replace(/[^A-Za-z0-9_]/g, "");
|
||||
}
|
||||
|
||||
export function getContentDisposition(filename: string) {
|
||||
const sanitizedFilename = sanitizeFileName(filename).trim() || "file";
|
||||
const uriEncodedFilename = encodeURIComponent(sanitizedFilename);
|
||||
|
||||
@@ -1,20 +1,210 @@
|
||||
import type { WebSocketMessage } from "@triliumnext/commons";
|
||||
import { sendMessageToAllClients as sendMessage } from "./messaging/index.js";
|
||||
import { type EntityChange, WebSocketMessage } from "@triliumnext/commons";
|
||||
|
||||
/**
|
||||
* WebSocket service abstraction for core.
|
||||
*
|
||||
* This module provides a simple interface for sending messages to clients.
|
||||
* The actual transport mechanism is provided by the messaging provider
|
||||
* configured during initialization.
|
||||
*
|
||||
* @deprecated Use the messaging module directly instead.
|
||||
*/
|
||||
export default {
|
||||
/**
|
||||
* Send a message to all connected clients.
|
||||
*/
|
||||
sendMessageToAllClients(message: WebSocketMessage) {
|
||||
sendMessage(message);
|
||||
import becca from "../becca/becca.js";
|
||||
import * as cls from "./context.js";
|
||||
import { getLog } from "./log.js";
|
||||
import protectedSessionService from "./protected_session.js";
|
||||
import syncMutexService from "./sync_mutex.js";
|
||||
import { getSql } from "./sql/index.js";
|
||||
import { getMessagingProvider, MessagingProvider } from "./messaging/index.js";
|
||||
import AbstractBeccaEntity from "../becca/entities/abstract_becca_entity.js";
|
||||
|
||||
let messagingProvider!: MessagingProvider;
|
||||
let lastSyncedPush: number;
|
||||
|
||||
function init() {
|
||||
messagingProvider = getMessagingProvider();
|
||||
|
||||
messagingProvider.setClientMessageHandler(async (clientId, message: any) => {
|
||||
const log = getLog();
|
||||
if (message.type === "log-error") {
|
||||
log.info(`JS Error: ${message.error}\r\nStack: ${message.stack}`);
|
||||
} else if (message.type === "log-info") {
|
||||
log.info(`JS Info: ${message.info}`);
|
||||
} else if (message.type === "ping") {
|
||||
await syncMutexService.doExclusively(() => {
|
||||
messagingProvider.sendMessageToClient(clientId, { type: "ping" });
|
||||
});
|
||||
} else {
|
||||
log.error("Unrecognized message: ");
|
||||
log.error(message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function sendMessageToAllClients(message: WebSocketMessage) {
|
||||
if (messagingProvider) {
|
||||
messagingProvider.sendMessageToAllClients(message);
|
||||
}
|
||||
}
|
||||
|
||||
function fillInAdditionalProperties(entityChange: EntityChange) {
|
||||
if (entityChange.isErased) {
|
||||
return;
|
||||
}
|
||||
|
||||
// fill in some extra data needed by the frontend
|
||||
// first try to use becca, which works for non-deleted entities
|
||||
// only when that fails, try to load from the database
|
||||
const sql = getSql();
|
||||
if (entityChange.entityName === "attributes") {
|
||||
entityChange.entity = becca.getAttribute(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM attributes WHERE attributeId = ?`, [entityChange.entityId]);
|
||||
}
|
||||
} else if (entityChange.entityName === "branches") {
|
||||
entityChange.entity = becca.getBranch(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM branches WHERE branchId = ?`, [entityChange.entityId]);
|
||||
}
|
||||
} else if (entityChange.entityName === "notes") {
|
||||
entityChange.entity = becca.getNote(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM notes WHERE noteId = ?`, [entityChange.entityId]);
|
||||
|
||||
if (entityChange.entity?.isProtected) {
|
||||
entityChange.entity.title = protectedSessionService.decryptString(entityChange.entity.title || "");
|
||||
}
|
||||
}
|
||||
} else if (entityChange.entityName === "revisions") {
|
||||
entityChange.noteId = sql.getValue<string>(
|
||||
/*sql*/`SELECT noteId
|
||||
FROM revisions
|
||||
WHERE revisionId = ?`,
|
||||
[entityChange.entityId]
|
||||
);
|
||||
} else if (entityChange.entityName === "note_reordering") {
|
||||
entityChange.positions = {};
|
||||
|
||||
const parentNote = becca.getNote(entityChange.entityId);
|
||||
|
||||
if (parentNote) {
|
||||
for (const childBranch of parentNote.getChildBranches()) {
|
||||
if (childBranch?.branchId) {
|
||||
entityChange.positions[childBranch.branchId] = childBranch.notePosition;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (entityChange.entityName === "options") {
|
||||
entityChange.entity = becca.getOption(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(/*sql*/`SELECT * FROM options WHERE name = ?`, [entityChange.entityId]);
|
||||
}
|
||||
} else if (entityChange.entityName === "attachments") {
|
||||
entityChange.entity = becca.getAttachment(entityChange.entityId);
|
||||
|
||||
if (!entityChange.entity) {
|
||||
entityChange.entity = sql.getRow(
|
||||
/*sql*/`SELECT attachments.*, LENGTH(blobs.content) AS contentLength
|
||||
FROM attachments
|
||||
JOIN blobs USING (blobId)
|
||||
WHERE attachmentId = ?`,
|
||||
[entityChange.entityId]
|
||||
);
|
||||
|
||||
if (entityChange.entity?.isProtected) {
|
||||
entityChange.entity.title = protectedSessionService.decryptString(entityChange.entity.title || "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (entityChange.entity instanceof AbstractBeccaEntity) {
|
||||
entityChange.entity = entityChange.entity.getPojo();
|
||||
}
|
||||
}
|
||||
|
||||
// entities with higher number can reference the entities with lower number
|
||||
const ORDERING: Record<string, number> = {
|
||||
etapi_tokens: 0,
|
||||
attributes: 2,
|
||||
branches: 2,
|
||||
blobs: 0,
|
||||
note_reordering: 2,
|
||||
revisions: 2,
|
||||
attachments: 3,
|
||||
notes: 1,
|
||||
options: 0,
|
||||
};
|
||||
|
||||
function buildFrontendUpdateMessage(entityChangeIds: number[]): WebSocketMessage | null {
|
||||
if (entityChangeIds.length === 0) {
|
||||
return { type: "ping" };
|
||||
}
|
||||
|
||||
const entityChanges = getSql().getManyRows<EntityChange>(/*sql*/`SELECT * FROM entity_changes WHERE id IN (???)`, entityChangeIds);
|
||||
if (!entityChanges) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// sort entity changes since froca expects "referential order", i.e. referenced entities should already exist
|
||||
// in froca.
|
||||
// Froca needs this since it is an incomplete copy, it can't create "skeletons" like becca.
|
||||
entityChanges.sort((a, b) => ORDERING[a.entityName] - ORDERING[b.entityName]);
|
||||
|
||||
for (const entityChange of entityChanges) {
|
||||
try {
|
||||
fillInAdditionalProperties(entityChange);
|
||||
} catch (e: any) {
|
||||
getLog().error(`Could not fill additional properties for entity change ${JSON.stringify(entityChange)} because of error: ${e.message}: ${e.stack}`);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
type: "frontend-update",
|
||||
data: {
|
||||
lastSyncedPush,
|
||||
entityChanges
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function sendTransactionEntityChangesToAllClients() {
|
||||
if (messagingProvider) {
|
||||
const entityChangeIds = cls.getAndClearEntityChangeIds();
|
||||
const message = buildFrontendUpdateMessage(entityChangeIds);
|
||||
|
||||
if (message) {
|
||||
messagingProvider.sendMessageToAllClients(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function syncPullInProgress() {
|
||||
sendMessageToAllClients({ type: "sync-pull-in-progress", lastSyncedPush });
|
||||
}
|
||||
|
||||
function syncPushInProgress() {
|
||||
sendMessageToAllClients({ type: "sync-push-in-progress", lastSyncedPush });
|
||||
}
|
||||
|
||||
function syncFinished() {
|
||||
sendMessageToAllClients({ type: "sync-finished", lastSyncedPush });
|
||||
}
|
||||
|
||||
function syncFailed() {
|
||||
sendMessageToAllClients({ type: "sync-failed", lastSyncedPush });
|
||||
}
|
||||
|
||||
function reloadFrontend(reason: string) {
|
||||
sendMessageToAllClients({ type: "reload-frontend", reason });
|
||||
}
|
||||
|
||||
function setLastSyncedPush(entityChangeId: number) {
|
||||
lastSyncedPush = entityChangeId;
|
||||
}
|
||||
|
||||
export default {
|
||||
init,
|
||||
sendMessageToAllClients,
|
||||
syncPushInProgress,
|
||||
syncPullInProgress,
|
||||
syncFinished,
|
||||
syncFailed,
|
||||
sendTransactionEntityChangesToAllClients,
|
||||
setLastSyncedPush,
|
||||
reloadFrontend
|
||||
};
|
||||
|
||||
24
pnpm-lock.yaml
generated
24
pnpm-lock.yaml
generated
@@ -510,6 +510,9 @@ importers:
|
||||
js-sha1:
|
||||
specifier: 0.7.0
|
||||
version: 0.7.0
|
||||
js-sha256:
|
||||
specifier: 0.11.1
|
||||
version: 0.11.1
|
||||
js-sha512:
|
||||
specifier: 0.9.0
|
||||
version: 0.9.0
|
||||
@@ -863,9 +866,6 @@ importers:
|
||||
archiver:
|
||||
specifier: 7.0.1
|
||||
version: 7.0.1
|
||||
async-mutex:
|
||||
specifier: 0.5.0
|
||||
version: 0.5.0
|
||||
axios:
|
||||
specifier: 1.13.6
|
||||
version: 1.13.6(debug@4.4.3)
|
||||
@@ -1701,6 +1701,9 @@ importers:
|
||||
'@triliumnext/commons':
|
||||
specifier: workspace:*
|
||||
version: link:../commons
|
||||
async-mutex:
|
||||
specifier: 0.5.0
|
||||
version: 0.5.0
|
||||
escape-html:
|
||||
specifier: 1.0.3
|
||||
version: 1.0.3
|
||||
@@ -11163,6 +11166,9 @@ packages:
|
||||
js-sha1@0.7.0:
|
||||
resolution: {integrity: sha512-oQZ1Mo7440BfLSv9TX87VNEyU52pXPVG19F9PL3gTgNt0tVxlZ8F4O6yze3CLuLx28TxotxvlyepCNaaV0ZjMw==}
|
||||
|
||||
js-sha256@0.11.1:
|
||||
resolution: {integrity: sha512-o6WSo/LUvY2uC4j7mO50a2ms7E/EAdbP0swigLV+nzHKTTaYnaLIWJ02VdXrsJX0vGedDESQnLsOekr94ryfjg==}
|
||||
|
||||
js-sha512@0.9.0:
|
||||
resolution: {integrity: sha512-mirki9WS/SUahm+1TbAPkqvbCiCfOAAsyXeHxK1UkullnJVVqoJG2pL9ObvT05CN+tM7fxhfYm0NbXn+1hWoZg==}
|
||||
|
||||
@@ -17736,8 +17742,6 @@ snapshots:
|
||||
'@ckeditor/ckeditor5-widget': 47.6.1
|
||||
ckeditor5: 47.6.1
|
||||
es-toolkit: 1.39.5
|
||||
transitivePeerDependencies:
|
||||
- supports-color
|
||||
|
||||
'@ckeditor/ckeditor5-list-multi-level@47.6.1':
|
||||
dependencies:
|
||||
@@ -17762,8 +17766,6 @@ snapshots:
|
||||
'@ckeditor/ckeditor5-utils': 47.6.1
|
||||
ckeditor5: 47.6.1
|
||||
es-toolkit: 1.39.5
|
||||
transitivePeerDependencies:
|
||||
- supports-color
|
||||
|
||||
'@ckeditor/ckeditor5-markdown-gfm@47.6.1':
|
||||
dependencies:
|
||||
@@ -17801,8 +17803,6 @@ snapshots:
|
||||
'@ckeditor/ckeditor5-utils': 47.6.1
|
||||
'@ckeditor/ckeditor5-widget': 47.6.1
|
||||
ckeditor5: 47.6.1
|
||||
transitivePeerDependencies:
|
||||
- supports-color
|
||||
|
||||
'@ckeditor/ckeditor5-mention@47.6.1(patch_hash=5981fb59ba35829e4dff1d39cf771000f8a8fdfa7a34b51d8af9549541f2d62d)':
|
||||
dependencies:
|
||||
@@ -17812,6 +17812,8 @@ snapshots:
|
||||
'@ckeditor/ckeditor5-utils': 47.6.1
|
||||
ckeditor5: 47.6.1
|
||||
es-toolkit: 1.39.5
|
||||
transitivePeerDependencies:
|
||||
- supports-color
|
||||
|
||||
'@ckeditor/ckeditor5-merge-fields@47.6.1':
|
||||
dependencies:
|
||||
@@ -25526,8 +25528,6 @@ snapshots:
|
||||
ckeditor5-collaboration@47.6.1:
|
||||
dependencies:
|
||||
'@ckeditor/ckeditor5-collaboration-core': 47.6.1
|
||||
transitivePeerDependencies:
|
||||
- supports-color
|
||||
|
||||
ckeditor5-premium-features@47.6.1(bufferutil@4.0.9)(ckeditor5@47.6.1)(utf-8-validate@6.0.5):
|
||||
dependencies:
|
||||
@@ -29189,6 +29189,8 @@ snapshots:
|
||||
|
||||
js-sha1@0.7.0: {}
|
||||
|
||||
js-sha256@0.11.1: {}
|
||||
|
||||
js-sha512@0.9.0: {}
|
||||
|
||||
js-tokens@10.0.0: {}
|
||||
|
||||
Reference in New Issue
Block a user