"lock" notes that are having their embeddings created

This commit is contained in:
perf3ct
2025-03-16 20:36:47 +00:00
parent 781a2506f0
commit d2072c2a6f
6 changed files with 160 additions and 76 deletions

View File

@@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" (
"attempts" INTEGER NOT NULL DEFAULT 0,
"lastAttempt" TEXT NULL,
"error" TEXT NULL,
"failed" INTEGER NOT NULL DEFAULT 0
"failed" INTEGER NOT NULL DEFAULT 0,
"isProcessing" INTEGER NOT NULL DEFAULT 0
);
-- Table to store embedding provider configurations

View File

@@ -159,7 +159,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" (
"attempts" INTEGER NOT NULL DEFAULT 0,
"lastAttempt" TEXT NULL,
"error" TEXT NULL,
"failed" INTEGER NOT NULL DEFAULT 0
"failed" INTEGER NOT NULL DEFAULT 0,
"isProcessing" INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS "embedding_providers" (

View File

@@ -34,7 +34,8 @@ const MAX_TOTAL_PROCESSING_TIME = 5 * 60 * 1000; // 5 minutes
const MAX_CHUNK_RETRY_ATTEMPTS = 2;
// Maximum time per chunk processing (to prevent individual chunks from hanging)
const MAX_CHUNK_PROCESSING_TIME = 60 * 1000; // 1 minute
const DEFAULT_MAX_CHUNK_PROCESSING_TIME = 60 * 1000; // 1 minute
const OLLAMA_MAX_CHUNK_PROCESSING_TIME = 120 * 1000; // 2 minutes
/**
* Categorize an error as temporary or permanent based on its message
@@ -166,6 +167,11 @@ export async function processNoteWithChunking(
log.info(`Processing ${chunks.length} chunks for note ${noteId} (${note.title})`);
// Get the current time to prevent duplicate processing from timeouts
const processingStartTime = Date.now();
const processingId = `${noteId}-${processingStartTime}`;
log.info(`Starting processing run ${processingId}`);
// Process each chunk with a delay based on provider to avoid rate limits
for (let i = 0; i < chunks.length; i++) {
// Check if we've exceeded the overall time limit
@@ -194,7 +200,7 @@ export async function processNoteWithChunking(
const embedding = await processChunkWithTimeout(
provider,
chunk,
MAX_CHUNK_PROCESSING_TIME
provider.name === 'ollama' ? OLLAMA_MAX_CHUNK_PROCESSING_TIME : DEFAULT_MAX_CHUNK_PROCESSING_TIME
);
// Store with chunk information in a unique ID format
@@ -212,7 +218,7 @@ export async function processNoteWithChunking(
// Small delay between chunks to avoid rate limits - longer for Ollama
if (i < chunks.length - 1) {
await new Promise(resolve => setTimeout(resolve,
provider.name === 'ollama' ? 500 : 100));
provider.name === 'ollama' ? 2000 : 100));
}
} catch (error: any) {
const errorMessage = error.message || 'Unknown error';
@@ -274,7 +280,7 @@ export async function processNoteWithChunking(
const embedding = await processChunkWithTimeout(
provider,
item.chunk,
MAX_CHUNK_PROCESSING_TIME
provider.name === 'ollama' ? OLLAMA_MAX_CHUNK_PROCESSING_TIME : DEFAULT_MAX_CHUNK_PROCESSING_TIME
);
// Store with unique ID that indicates it was a retry
@@ -335,7 +341,7 @@ export async function processNoteWithChunking(
// Log information about the processed chunks
if (successfulChunks > 0) {
log.info(`Generated ${successfulChunks} chunk embeddings for note ${noteId} (${note.title})`);
log.info(`[${processingId}] Generated ${successfulChunks} chunk embeddings for note ${noteId} (${note.title})`);
}
if (failedChunks > 0) {
@@ -344,7 +350,7 @@ export async function processNoteWithChunking(
const temporaryErrors = failedChunkDetails.filter(d => d.category === 'temporary').length;
const unknownErrors = failedChunkDetails.filter(d => d.category === 'unknown').length;
log.info(`Failed to generate ${failedChunks} chunk embeddings for note ${noteId} (${note.title}). ` +
log.info(`[${processingId}] Failed to generate ${failedChunks} chunk embeddings for note ${noteId} (${note.title}). ` +
`Permanent: ${permanentErrors}, Temporary: ${temporaryErrors}, Unknown: ${unknownErrors}`);
}
@@ -394,7 +400,7 @@ export async function processNoteWithChunking(
// Track total processing time
const totalTime = Date.now() - startTime;
log.info(`Total processing time for note ${noteId}: ${totalTime}ms`);
log.info(`[${processingId}] Total processing time for note ${noteId}: ${totalTime}ms`);
} catch (error: any) {
log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`);

View File

@@ -6,6 +6,9 @@ import { processEmbeddingQueue, queueNoteForEmbedding } from "./queue.js";
import eventService from "../../../services/events.js";
import becca from "../../../becca/becca.js";
// Add mutex to prevent concurrent processing
let isProcessingEmbeddings = false;
/**
* Setup event listeners for embedding-related events
*/
@@ -54,12 +57,23 @@ export async function setupEmbeddingBackgroundProcessing() {
setInterval(async () => {
try {
// Skip if already processing
if (isProcessingEmbeddings) {
return;
}
// Set mutex
isProcessingEmbeddings = true;
// Wrap in cls.init to ensure proper context
cls.init(async () => {
await processEmbeddingQueue();
});
} catch (error: any) {
log.error(`Error in background embedding processing: ${error.message || 'Unknown error'}`);
} finally {
// Always release the mutex
isProcessingEmbeddings = false;
}
}, interval);
}

View File

@@ -173,46 +173,77 @@ export class OllamaEmbeddingProvider extends BaseEmbeddingProvider {
* Generate embeddings for a single text
*/
async generateEmbeddings(text: string): Promise<Float32Array> {
try {
if (!text.trim()) {
return new Float32Array(this.config.dimension);
}
const modelName = this.config.model || "llama3";
// Ensure we have model info
const modelInfo = await this.getModelInfo(modelName);
// Trim text if it might exceed context window (rough character estimate)
// This is a simplistic approach - ideally we'd count tokens properly
const charLimit = modelInfo.contextWindow * 4; // Rough estimate: avg 4 chars per token
const trimmedText = text.length > charLimit ? text.substring(0, charLimit) : text;
const response = await axios.post(
`${this.baseUrl}/api/embeddings`,
{
model: modelName,
prompt: trimmedText,
format: "json"
},
{
headers: {
"Content-Type": "application/json"
},
timeout: 30000 // Longer timeout for larger texts
}
);
if (response.data && Array.isArray(response.data.embedding)) {
return new Float32Array(response.data.embedding);
} else {
throw new Error("Unexpected response structure from Ollama API");
}
} catch (error: any) {
const errorMessage = error.response?.data?.error?.message || error.message || "Unknown error";
log.error(`Ollama embedding error: ${errorMessage}`);
throw new Error(`Ollama embedding error: ${errorMessage}`);
// Handle empty text
if (!text.trim()) {
return new Float32Array(this.config.dimension);
}
// Configuration for retries
const maxRetries = 3;
let retryCount = 0;
let lastError: any = null;
while (retryCount <= maxRetries) {
try {
const modelName = this.config.model || "llama3";
// Ensure we have model info
const modelInfo = await this.getModelInfo(modelName);
// Trim text if it might exceed context window (rough character estimate)
// This is a simplistic approach - ideally we'd count tokens properly
const charLimit = modelInfo.contextWindow * 4; // Rough estimate: avg 4 chars per token
const trimmedText = text.length > charLimit ? text.substring(0, charLimit) : text;
const response = await axios.post(
`${this.baseUrl}/api/embeddings`,
{
model: modelName,
prompt: trimmedText,
format: "json"
},
{
headers: {
"Content-Type": "application/json"
},
timeout: 60000 // Increased timeout for larger texts (60 seconds)
}
);
if (response.data && Array.isArray(response.data.embedding)) {
// Success! Return the embedding
return new Float32Array(response.data.embedding);
} else {
throw new Error("Unexpected response structure from Ollama API");
}
} catch (error: any) {
lastError = error;
// Only retry on timeout or connection errors
const errorMessage = error.response?.data?.error?.message || error.message || "Unknown error";
const isTimeoutError = errorMessage.includes('timeout') ||
errorMessage.includes('socket hang up') ||
errorMessage.includes('ECONNREFUSED') ||
errorMessage.includes('ECONNRESET');
if (isTimeoutError && retryCount < maxRetries) {
// Exponential backoff with jitter
const delay = Math.min(Math.pow(2, retryCount) * 1000 + Math.random() * 1000, 15000);
log.info(`Ollama embedding timeout, retrying in ${Math.round(delay/1000)}s (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
retryCount++;
} else {
// Non-retryable error or max retries exceeded
const errorMessage = error.response?.data?.error?.message || error.message || "Unknown error";
log.error(`Ollama embedding error: ${errorMessage}`);
throw new Error(`Ollama embedding error: ${errorMessage}`);
}
}
}
// If we get here, we've exceeded our retry limit
const errorMessage = lastError.response?.data?.error?.message || lastError.message || "Unknown error";
log.error(`Ollama embedding error after ${maxRetries} retries: ${errorMessage}`);
throw new Error(`Ollama embedding error after ${maxRetries} retries: ${errorMessage}`);
}
/**

View File

@@ -10,6 +10,9 @@ import type { QueueItem } from "./types.js";
import { getChunkingOperations } from "./chunking/chunking_interface.js";
import indexService from '../index_service.js';
// Track which notes are currently being processed
const notesInProcess = new Set<string>();
/**
* Queues a note for embedding update
*/
@@ -19,11 +22,17 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE'
// Check if note is already in queue and whether it's marked as permanently failed
const queueInfo = await sql.getRow(
"SELECT 1 as exists_flag, failed FROM embedding_queue WHERE noteId = ?",
"SELECT 1 as exists_flag, failed, isProcessing FROM embedding_queue WHERE noteId = ?",
[noteId]
) as {exists_flag: number, failed: number} | null;
) as {exists_flag: number, failed: number, isProcessing: number} | null;
if (queueInfo) {
// If the note is currently being processed, don't change its status
if (queueInfo.isProcessing === 1) {
log.info(`Note ${noteId} is currently being processed, skipping queue update`);
return;
}
// Only update if not permanently failed
if (queueInfo.failed !== 1) {
// Update existing queue entry but preserve the failed status
@@ -41,8 +50,8 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE'
// Add new queue entry
await sql.execute(`
INSERT INTO embedding_queue
(noteId, operation, dateQueued, utcDateQueued, failed)
VALUES (?, ?, ?, ?, 0)`,
(noteId, operation, dateQueued, utcDateQueued, failed, isProcessing)
VALUES (?, ?, ?, ?, 0, 0)`,
[noteId, operation, now, utcNow]
);
}
@@ -180,11 +189,11 @@ export async function processEmbeddingQueue() {
return;
}
// Get notes from queue (excluding failed ones)
// Get notes from queue (excluding failed ones and those being processed)
const notes = await sql.getRows(`
SELECT noteId, operation, attempts
FROM embedding_queue
WHERE failed = 0
WHERE failed = 0 AND isProcessing = 0
ORDER BY priority DESC, utcDateQueued ASC
LIMIT ?`,
[batchSize]
@@ -198,30 +207,47 @@ export async function processEmbeddingQueue() {
let processedCount = 0;
for (const note of notes) {
const noteData = note as unknown as QueueItem;
const noteId = noteData.noteId;
// Double-check that this note isn't already being processed
if (notesInProcess.has(noteId)) {
log.info(`Note ${noteId} is already being processed by another thread, skipping`);
continue;
}
try {
const noteData = note as unknown as QueueItem;
// Mark the note as being processed
notesInProcess.add(noteId);
await sql.execute(
"UPDATE embedding_queue SET isProcessing = 1 WHERE noteId = ?",
[noteId]
);
// Skip if note no longer exists
if (!becca.getNote(noteData.noteId)) {
if (!becca.getNote(noteId)) {
await sql.execute(
"DELETE FROM embedding_queue WHERE noteId = ?",
[noteData.noteId]
[noteId]
);
await deleteNoteEmbeddings(noteData.noteId);
await deleteNoteEmbeddings(noteId);
continue;
}
if (noteData.operation === 'DELETE') {
await deleteNoteEmbeddings(noteData.noteId);
await deleteNoteEmbeddings(noteId);
await sql.execute(
"DELETE FROM embedding_queue WHERE noteId = ?",
[noteData.noteId]
[noteId]
);
continue;
}
// Log that we're starting to process this note
log.info(`Starting embedding generation for note ${noteId}`);
// Get note context for embedding
const context = await getNoteEmbeddingContext(noteData.noteId);
const context = await getNoteEmbeddingContext(noteId);
// Check if we should use chunking for large content
const useChunking = context.content.length > 5000;
@@ -236,7 +262,7 @@ export async function processEmbeddingQueue() {
if (useChunking) {
// Process large notes using chunking
const chunkingOps = await getChunkingOperations();
await chunkingOps.processNoteWithChunking(noteData.noteId, provider, context);
await chunkingOps.processNoteWithChunking(noteId, provider, context);
allProvidersFailed = false;
} else {
// Standard approach: Generate a single embedding for the whole note
@@ -246,7 +272,7 @@ export async function processEmbeddingQueue() {
const config = provider.getConfig();
await import('./storage.js').then(storage => {
return storage.storeNoteEmbedding(
noteData.noteId,
noteId,
provider.name,
config.model,
embedding
@@ -259,7 +285,7 @@ export async function processEmbeddingQueue() {
} catch (providerError: any) {
// This provider failed
allProvidersSucceeded = false;
log.error(`Error generating embedding with provider ${provider.name} for note ${noteData.noteId}: ${providerError.message || 'Unknown error'}`);
log.error(`Error generating embedding with provider ${provider.name} for note ${noteId}: ${providerError.message || 'Unknown error'}`);
}
}
@@ -267,8 +293,10 @@ export async function processEmbeddingQueue() {
// At least one provider succeeded, remove from queue
await sql.execute(
"DELETE FROM embedding_queue WHERE noteId = ?",
[noteData.noteId]
[noteId]
);
log.info(`Successfully completed embedding processing for note ${noteId}`);
// Count as successfully processed
processedCount++;
} else {
@@ -277,49 +305,52 @@ export async function processEmbeddingQueue() {
UPDATE embedding_queue
SET attempts = attempts + 1,
lastAttempt = ?,
error = ?
error = ?,
isProcessing = 0
WHERE noteId = ?`,
[dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteData.noteId]
[dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteId]
);
// Mark as permanently failed if too many attempts
if (noteData.attempts + 1 >= 3) {
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
log.error(`Marked note ${noteId} as permanently failed after multiple embedding attempts`);
// Set the failed flag but keep the actual attempts count
await sql.execute(`
UPDATE embedding_queue
SET failed = 1
WHERE noteId = ?
`, [noteData.noteId]);
`, [noteId]);
}
}
} catch (error: any) {
const noteData = note as unknown as QueueItem;
// Update attempt count and log error
await sql.execute(`
UPDATE embedding_queue
SET attempts = attempts + 1,
lastAttempt = ?,
error = ?
error = ?,
isProcessing = 0
WHERE noteId = ?`,
[dateUtils.utcNowDateTime(), error.message || 'Unknown error', noteData.noteId]
[dateUtils.utcNowDateTime(), error.message || 'Unknown error', noteId]
);
log.error(`Error processing embedding for note ${noteData.noteId}: ${error.message || 'Unknown error'}`);
log.error(`Error processing embedding for note ${noteId}: ${error.message || 'Unknown error'}`);
// Mark as permanently failed if too many attempts
if (noteData.attempts + 1 >= 3) {
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
log.error(`Marked note ${noteId} as permanently failed after multiple embedding attempts`);
// Set the failed flag but keep the actual attempts count
await sql.execute(`
UPDATE embedding_queue
SET failed = 1
WHERE noteId = ?
`, [noteData.noteId]);
`, [noteId]);
}
} finally {
// Always clean up the processing status in the in-memory set
notesInProcess.delete(noteId);
}
}