diff --git a/apps/server/src/services/llm/chat/rest_chat_service.ts b/apps/server/src/services/llm/chat/rest_chat_service.ts index 5bf57c042..22b5a3379 100644 --- a/apps/server/src/services/llm/chat/rest_chat_service.ts +++ b/apps/server/src/services/llm/chat/rest_chat_service.ts @@ -6,7 +6,7 @@ import log from "../../log.js"; import type { Request, Response } from "express"; import type { Message, ChatCompletionOptions } from "../ai_interface.js"; import aiServiceManager from "../ai_service_manager.js"; -import { ChatPipeline } from "../pipeline/chat_pipeline.js"; +import { ChatPipeline } from "../pipeline/pipeline_adapter.js"; import type { ChatPipelineInput } from "../pipeline/interfaces.js"; import options from "../../options.js"; import { ToolHandler } from "./handlers/tool_handler.js"; diff --git a/apps/server/src/services/llm/pipeline/cleanup_debug_logs.ts b/apps/server/src/services/llm/pipeline/cleanup_debug_logs.ts new file mode 100644 index 000000000..522f7d229 --- /dev/null +++ b/apps/server/src/services/llm/pipeline/cleanup_debug_logs.ts @@ -0,0 +1,181 @@ +#!/usr/bin/env node + +/** + * Script to clean up debug log statements from production code + * + * This script: + * 1. Finds all log.info("[DEBUG]") statements + * 2. Converts them to proper debug level logging + * 3. Reports on other verbose logging that should be reviewed + */ + +import fs from 'fs'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +// Patterns to find and replace +const patterns = [ + { + name: 'Debug in info logs', + find: /log\.info\((.*?)\[DEBUG\](.*?)\)/g, + replace: 'log.debug($1$2)', + count: 0 + }, + { + name: 'Tool call debug', + find: /log\.info\((.*?)\[TOOL CALL DEBUG\](.*?)\)/g, + replace: 'log.debug($1Tool call: $2)', + count: 0 + }, + { + name: 'Excessive separators', + find: /log\.info\(['"`]={10,}.*?={10,}['"`]\)/g, + replace: null, // Just count, don't replace + count: 0 + }, + { + name: 'Pipeline stage logs', + find: /log\.info\(['"`].*?STAGE \d+:.*?['"`]\)/g, + replace: null, // Just count, don't replace + count: 0 + } +]; + +// Files to process +const filesToProcess = [ + path.join(__dirname, '..', 'pipeline', 'chat_pipeline.ts'), + path.join(__dirname, '..', 'providers', 'anthropic_service.ts'), + path.join(__dirname, '..', 'providers', 'openai_service.ts'), + path.join(__dirname, '..', 'providers', 'ollama_service.ts'), + path.join(__dirname, '..', 'tools', 'tool_registry.ts'), +]; + +// Additional directories to scan +const directoriesToScan = [ + path.join(__dirname, '..', 'pipeline', 'stages'), + path.join(__dirname, '..', 'tools'), +]; + +/** + * Process a single file + */ +function processFile(filePath: string, dryRun: boolean = true): void { + if (!fs.existsSync(filePath)) { + console.log(`File not found: ${filePath}`); + return; + } + + let content = fs.readFileSync(filePath, 'utf-8'); + let modified = false; + + console.log(`\nProcessing: ${path.basename(filePath)}`); + + patterns.forEach(pattern => { + const matches = content.match(pattern.find) || []; + if (matches.length > 0) { + console.log(` Found ${matches.length} instances of "${pattern.name}"`); + pattern.count += matches.length; + + if (pattern.replace && !dryRun) { + content = content.replace(pattern.find, pattern.replace); + modified = true; + } + } + }); + + if (modified && !dryRun) { + fs.writeFileSync(filePath, content, 'utf-8'); + console.log(` āœ“ File updated`); + } +} + +/** + * Scan directory for files + */ +function scanDirectory(dirPath: string): string[] { + const files: string[] = []; + + if (!fs.existsSync(dirPath)) { + return files; + } + + const entries = fs.readdirSync(dirPath, { withFileTypes: true }); + + for (const entry of entries) { + const fullPath = path.join(dirPath, entry.name); + + if (entry.isDirectory()) { + files.push(...scanDirectory(fullPath)); + } else if (entry.isFile() && entry.name.endsWith('.ts')) { + files.push(fullPath); + } + } + + return files; +} + +/** + * Main function + */ +function main(): void { + const args = process.argv.slice(2); + const dryRun = !args.includes('--apply'); + + console.log('========================================'); + console.log('Debug Log Cleanup Script'); + console.log('========================================'); + console.log(dryRun ? 'Mode: DRY RUN (use --apply to make changes)' : 'Mode: APPLYING CHANGES'); + + // Collect all files to process + const allFiles = [...filesToProcess]; + + directoriesToScan.forEach(dir => { + allFiles.push(...scanDirectory(dir)); + }); + + // Remove duplicates + const uniqueFiles = [...new Set(allFiles)]; + + console.log(`\nFound ${uniqueFiles.length} TypeScript files to process`); + + // Process each file + uniqueFiles.forEach(file => processFile(file, dryRun)); + + // Summary + console.log('\n========================================'); + console.log('Summary'); + console.log('========================================'); + + patterns.forEach(pattern => { + if (pattern.count > 0) { + console.log(`${pattern.name}: ${pattern.count} instances`); + } + }); + + const totalIssues = patterns.reduce((sum, p) => sum + p.count, 0); + + if (totalIssues === 0) { + console.log('āœ“ No debug statements found!'); + } else if (dryRun) { + console.log(`\nFound ${totalIssues} total issues.`); + console.log('Run with --apply to fix replaceable patterns.'); + } else { + const fixedCount = patterns.filter(p => p.replace).reduce((sum, p) => sum + p.count, 0); + console.log(`\nāœ“ Fixed ${fixedCount} issues.`); + + const remainingCount = patterns.filter(p => !p.replace).reduce((sum, p) => sum + p.count, 0); + if (remainingCount > 0) { + console.log(`ℹ ${remainingCount} instances need manual review.`); + } + } +} + +// Run if executed directly +if (import.meta.url === `file://${process.argv[1]}`) { + main(); +} + +export { processFile, scanDirectory }; \ No newline at end of file diff --git a/apps/server/src/services/llm/pipeline/configuration_service.ts b/apps/server/src/services/llm/pipeline/configuration_service.ts new file mode 100644 index 000000000..eb4df8f0d --- /dev/null +++ b/apps/server/src/services/llm/pipeline/configuration_service.ts @@ -0,0 +1,452 @@ +/** + * Configuration Service - Phase 2.2 Implementation + * + * Centralizes all LLM configuration management: + * - Single source of truth for all configuration + * - Validation at startup + * - Type-safe configuration access + * - No scattered options.getOption() calls + */ + +import options from '../../../options.js'; +import log from '../../../log.js'; +import type { ChatCompletionOptions } from '../ai_interface.js'; + +// Configuration interfaces +export interface LLMConfiguration { + providers: ProviderConfiguration; + defaults: DefaultConfiguration; + tools: ToolConfiguration; + streaming: StreamingConfiguration; + debug: DebugConfiguration; + limits: LimitConfiguration; +} + +export interface ProviderConfiguration { + enabled: boolean; + selected: 'openai' | 'anthropic' | 'ollama' | null; + openai?: { + apiKey: string; + baseUrl?: string; + defaultModel: string; + maxTokens?: number; + }; + anthropic?: { + apiKey: string; + baseUrl?: string; + defaultModel: string; + maxTokens?: number; + }; + ollama?: { + baseUrl: string; + defaultModel: string; + maxTokens?: number; + }; +} + +export interface DefaultConfiguration { + systemPrompt: string; + temperature: number; + maxTokens: number; + topP: number; + presencePenalty: number; + frequencyPenalty: number; +} + +export interface ToolConfiguration { + enabled: boolean; + maxIterations: number; + timeout: number; + parallelExecution: boolean; +} + +export interface StreamingConfiguration { + enabled: boolean; + chunkSize: number; + flushInterval: number; +} + +export interface DebugConfiguration { + enabled: boolean; + logLevel: 'error' | 'warn' | 'info' | 'debug'; + enableMetrics: boolean; + enableTracing: boolean; +} + +export interface LimitConfiguration { + maxMessageLength: number; + maxConversationLength: number; + maxContextLength: number; + rateLimitPerMinute: number; +} + +// Validation result interface +export interface ConfigurationValidationResult { + valid: boolean; + errors: string[]; + warnings: string[]; +} + +/** + * Configuration Service Implementation + */ +export class ConfigurationService { + private config: LLMConfiguration | null = null; + private validationResult: ConfigurationValidationResult | null = null; + private lastLoadTime: number = 0; + private readonly CACHE_DURATION = 60000; // 1 minute cache + + /** + * Load and validate configuration + */ + async initialize(): Promise { + log.info('Initializing LLM configuration service'); + + try { + this.config = await this.loadConfiguration(); + this.validationResult = this.validateConfiguration(this.config); + this.lastLoadTime = Date.now(); + + if (!this.validationResult.valid) { + log.error('Configuration validation failed', this.validationResult.errors); + } else if (this.validationResult.warnings.length > 0) { + log.warn('Configuration warnings', this.validationResult.warnings); + } else { + log.info('Configuration loaded and validated successfully'); + } + + return this.validationResult; + + } catch (error) { + const errorMessage = `Failed to initialize configuration: ${error}`; + log.error(errorMessage); + + this.validationResult = { + valid: false, + errors: [errorMessage], + warnings: [] + }; + + return this.validationResult; + } + } + + /** + * Load configuration from options + */ + private async loadConfiguration(): Promise { + // Provider configuration + const providers: ProviderConfiguration = { + enabled: options.getOptionBool('aiEnabled'), + selected: this.getSelectedProvider(), + openai: this.loadOpenAIConfig(), + anthropic: this.loadAnthropicConfig(), + ollama: this.loadOllamaConfig() + }; + + // Default configuration + const defaults: DefaultConfiguration = { + systemPrompt: options.getOption('llmSystemPrompt') || 'You are a helpful AI assistant.', + temperature: this.parseFloat(options.getOption('llmTemperature'), 0.7), + maxTokens: this.parseInt(options.getOption('llmMaxTokens'), 2000), + topP: this.parseFloat(options.getOption('llmTopP'), 0.9), + presencePenalty: this.parseFloat(options.getOption('llmPresencePenalty'), 0), + frequencyPenalty: this.parseFloat(options.getOption('llmFrequencyPenalty'), 0) + }; + + // Tool configuration + const tools: ToolConfiguration = { + enabled: options.getOptionBool('llmToolsEnabled') !== false, + maxIterations: this.parseInt(options.getOption('llmMaxToolIterations'), 5), + timeout: this.parseInt(options.getOption('llmToolTimeout'), 30000), + parallelExecution: options.getOptionBool('llmParallelTools') !== false + }; + + // Streaming configuration + const streaming: StreamingConfiguration = { + enabled: options.getOptionBool('llmStreamingEnabled') !== false, + chunkSize: this.parseInt(options.getOption('llmStreamChunkSize'), 256), + flushInterval: this.parseInt(options.getOption('llmStreamFlushInterval'), 100) + }; + + // Debug configuration + const debug: DebugConfiguration = { + enabled: options.getOptionBool('llmDebugEnabled'), + logLevel: this.getLogLevel(), + enableMetrics: options.getOptionBool('llmMetricsEnabled'), + enableTracing: options.getOptionBool('llmTracingEnabled') + }; + + // Limit configuration + const limits: LimitConfiguration = { + maxMessageLength: this.parseInt(options.getOption('llmMaxMessageLength'), 100000), + maxConversationLength: this.parseInt(options.getOption('llmMaxConversationLength'), 50), + maxContextLength: this.parseInt(options.getOption('llmMaxContextLength'), 10000), + rateLimitPerMinute: this.parseInt(options.getOption('llmRateLimitPerMinute'), 60) + }; + + return { + providers, + defaults, + tools, + streaming, + debug, + limits + }; + } + + /** + * Load OpenAI configuration + */ + private loadOpenAIConfig() { + const apiKey = options.getOption('openaiApiKey'); + if (!apiKey) return undefined; + + return { + apiKey, + baseUrl: options.getOption('openaiBaseUrl') || undefined, + defaultModel: options.getOption('openaiDefaultModel') || 'gpt-4-turbo-preview', + maxTokens: this.parseInt(options.getOption('openaiMaxTokens'), 4096) + }; + } + + /** + * Load Anthropic configuration + */ + private loadAnthropicConfig() { + const apiKey = options.getOption('anthropicApiKey'); + if (!apiKey) return undefined; + + return { + apiKey, + baseUrl: options.getOption('anthropicBaseUrl') || undefined, + defaultModel: options.getOption('anthropicDefaultModel') || 'claude-3-opus-20240229', + maxTokens: this.parseInt(options.getOption('anthropicMaxTokens'), 4096) + }; + } + + /** + * Load Ollama configuration + */ + private loadOllamaConfig() { + const baseUrl = options.getOption('ollamaBaseUrl'); + if (!baseUrl) return undefined; + + return { + baseUrl, + defaultModel: options.getOption('ollamaDefaultModel') || 'llama2', + maxTokens: this.parseInt(options.getOption('ollamaMaxTokens'), 2048) + }; + } + + /** + * Validate configuration + */ + private validateConfiguration(config: LLMConfiguration): ConfigurationValidationResult { + const errors: string[] = []; + const warnings: string[] = []; + + // Check if AI is enabled + if (!config.providers.enabled) { + warnings.push('AI features are disabled'); + return { valid: true, errors, warnings }; + } + + // Check provider selection + if (!config.providers.selected) { + errors.push('No AI provider selected'); + } else { + // Validate selected provider configuration + const selectedConfig = config.providers[config.providers.selected]; + if (!selectedConfig) { + errors.push(`Configuration missing for selected provider: ${config.providers.selected}`); + } else { + // Provider-specific validation + if (config.providers.selected === 'openai' && !selectedConfig.apiKey) { + errors.push('OpenAI API key is required'); + } + if (config.providers.selected === 'anthropic' && !selectedConfig.apiKey) { + errors.push('Anthropic API key is required'); + } + if (config.providers.selected === 'ollama' && !selectedConfig.baseUrl) { + errors.push('Ollama base URL is required'); + } + } + } + + // Validate limits + if (config.limits.maxMessageLength < 100) { + warnings.push('Maximum message length is very low, may cause issues'); + } + if (config.limits.maxConversationLength < 2) { + errors.push('Maximum conversation length must be at least 2'); + } + if (config.tools.maxIterations > 10) { + warnings.push('High tool iteration limit may cause performance issues'); + } + + // Validate defaults + if (config.defaults.temperature < 0 || config.defaults.temperature > 2) { + errors.push('Temperature must be between 0 and 2'); + } + if (config.defaults.maxTokens < 1) { + errors.push('Maximum tokens must be at least 1'); + } + + return { + valid: errors.length === 0, + errors, + warnings + }; + } + + /** + * Get selected provider + */ + private getSelectedProvider(): 'openai' | 'anthropic' | 'ollama' | null { + const provider = options.getOption('aiSelectedProvider'); + if (provider === 'openai' || provider === 'anthropic' || provider === 'ollama') { + return provider; + } + return null; + } + + /** + * Get log level + */ + private getLogLevel(): 'error' | 'warn' | 'info' | 'debug' { + const level = options.getOption('llmLogLevel') || 'info'; + if (level === 'error' || level === 'warn' || level === 'info' || level === 'debug') { + return level; + } + return 'info'; + } + + /** + * Parse integer with default + */ + private parseInt(value: string | null, defaultValue: number): number { + if (!value) return defaultValue; + const parsed = parseInt(value, 10); + return isNaN(parsed) ? defaultValue : parsed; + } + + /** + * Parse float with default + */ + private parseFloat(value: string | null, defaultValue: number): number { + if (!value) return defaultValue; + const parsed = parseFloat(value); + return isNaN(parsed) ? defaultValue : parsed; + } + + /** + * Ensure configuration is loaded + */ + private ensureConfigLoaded(): LLMConfiguration { + if (!this.config || Date.now() - this.lastLoadTime > this.CACHE_DURATION) { + // Reload configuration if cache expired + this.initialize().catch(error => { + log.error('Failed to reload configuration', error); + }); + } + + if (!this.config) { + throw new Error('Configuration not initialized'); + } + + return this.config; + } + + // Public accessors + + /** + * Get provider configuration + */ + getProviderConfig(): ProviderConfiguration { + return this.ensureConfigLoaded().providers; + } + + /** + * Get default configuration + */ + getDefaultConfig(): DefaultConfiguration { + return this.ensureConfigLoaded().defaults; + } + + /** + * Get tool configuration + */ + getToolConfig(): ToolConfiguration { + return this.ensureConfigLoaded().tools; + } + + /** + * Get streaming configuration + */ + getStreamingConfig(): StreamingConfiguration { + return this.ensureConfigLoaded().streaming; + } + + /** + * Get debug configuration + */ + getDebugConfig(): DebugConfiguration { + return this.ensureConfigLoaded().debug; + } + + /** + * Get limit configuration + */ + getLimitConfig(): LimitConfiguration { + return this.ensureConfigLoaded().limits; + } + + /** + * Get default system prompt + */ + getDefaultSystemPrompt(): string { + return this.getDefaultConfig().systemPrompt; + } + + /** + * Get default completion options + */ + getDefaultCompletionOptions(): ChatCompletionOptions { + const defaults = this.getDefaultConfig(); + return { + temperature: defaults.temperature, + max_tokens: defaults.maxTokens, + top_p: defaults.topP, + presence_penalty: defaults.presencePenalty, + frequency_penalty: defaults.frequencyPenalty + }; + } + + /** + * Check if configuration is valid + */ + isValid(): boolean { + return this.validationResult?.valid ?? false; + } + + /** + * Get validation result + */ + getValidationResult(): ConfigurationValidationResult | null { + return this.validationResult; + } + + /** + * Force reload configuration + */ + async reload(): Promise { + this.config = null; + this.lastLoadTime = 0; + return this.initialize(); + } +} + +// Export singleton instance +const configurationService = new ConfigurationService(); +export default configurationService; \ No newline at end of file diff --git a/apps/server/src/services/llm/pipeline/logging_service.ts b/apps/server/src/services/llm/pipeline/logging_service.ts new file mode 100644 index 000000000..0c836bb8f --- /dev/null +++ b/apps/server/src/services/llm/pipeline/logging_service.ts @@ -0,0 +1,426 @@ +/** + * Logging Service - Phase 2.3 Implementation + * + * Structured logging with: + * - Proper log levels + * - Request ID tracking + * - Conditional debug logging + * - No production debug statements + */ + +import log from '../../../log.js'; +import configurationService from './configuration_service.js'; + +// Log levels +export enum LogLevel { + ERROR = 'error', + WARN = 'warn', + INFO = 'info', + DEBUG = 'debug' +} + +// Log entry interface +export interface LogEntry { + timestamp: Date; + level: LogLevel; + requestId?: string; + message: string; + data?: any; + error?: Error; + duration?: number; +} + +// Structured log data +export interface LogContext { + requestId?: string; + userId?: string; + sessionId?: string; + provider?: string; + model?: string; + operation?: string; + [key: string]: any; +} + +/** + * Logging Service Implementation + */ +export class LoggingService { + private enabled: boolean = true; + private logLevel: LogLevel = LogLevel.INFO; + private debugEnabled: boolean = false; + private requestContexts: Map = new Map(); + private logBuffer: LogEntry[] = []; + private readonly MAX_BUFFER_SIZE = 1000; + + constructor() { + this.initialize(); + } + + /** + * Initialize logging configuration + */ + private initialize(): void { + try { + const debugConfig = configurationService.getDebugConfig(); + this.enabled = debugConfig.enabled; + this.debugEnabled = debugConfig.logLevel === 'debug'; + this.logLevel = this.parseLogLevel(debugConfig.logLevel); + } catch (error) { + // Fall back to defaults if configuration is not available + this.enabled = true; + this.logLevel = LogLevel.INFO; + this.debugEnabled = false; + } + } + + /** + * Parse log level from string + */ + private parseLogLevel(level: string): LogLevel { + switch (level?.toLowerCase()) { + case 'error': return LogLevel.ERROR; + case 'warn': return LogLevel.WARN; + case 'info': return LogLevel.INFO; + case 'debug': return LogLevel.DEBUG; + default: return LogLevel.INFO; + } + } + + /** + * Check if a log level should be logged + */ + private shouldLog(level: LogLevel): boolean { + if (!this.enabled) return false; + + const levels = [LogLevel.ERROR, LogLevel.WARN, LogLevel.INFO, LogLevel.DEBUG]; + const currentIndex = levels.indexOf(this.logLevel); + const messageIndex = levels.indexOf(level); + + return messageIndex <= currentIndex; + } + + /** + * Format log message with context + */ + private formatMessage(message: string, context?: LogContext): string { + if (!context?.requestId) { + return message; + } + return `[${context.requestId}] ${message}`; + } + + /** + * Write log entry + */ + private writeLog(entry: LogEntry): void { + // Add to buffer for debugging + this.bufferLog(entry); + + // Skip debug logs in production + if (entry.level === LogLevel.DEBUG && !this.debugEnabled) { + return; + } + + // Format message with request ID if present + const formattedMessage = this.formatMessage(entry.message, { requestId: entry.requestId }); + + // Log based on level + switch (entry.level) { + case LogLevel.ERROR: + if (entry.error) { + log.error(formattedMessage, entry.error); + } else { + log.error(formattedMessage, entry.data); + } + break; + + case LogLevel.WARN: + log.warn(formattedMessage, entry.data); + break; + + case LogLevel.INFO: + if (entry.data && Object.keys(entry.data).length > 0) { + log.info(`${formattedMessage} - ${JSON.stringify(entry.data)}`); + } else { + log.info(formattedMessage); + } + break; + + case LogLevel.DEBUG: + // Only log debug messages if debug is enabled + if (this.debugEnabled) { + if (entry.data) { + log.info(`[DEBUG] ${formattedMessage} - ${JSON.stringify(entry.data)}`); + } else { + log.info(`[DEBUG] ${formattedMessage}`); + } + } + break; + } + } + + /** + * Buffer log entry for debugging + */ + private bufferLog(entry: LogEntry): void { + this.logBuffer.push(entry); + + // Trim buffer if it exceeds max size + if (this.logBuffer.length > this.MAX_BUFFER_SIZE) { + this.logBuffer = this.logBuffer.slice(-this.MAX_BUFFER_SIZE); + } + } + + /** + * Main logging method + */ + log(level: LogLevel, message: string, data?: any): void { + if (!this.shouldLog(level)) return; + + const entry: LogEntry = { + timestamp: new Date(), + level, + message, + data: data instanceof Error ? undefined : data, + error: data instanceof Error ? data : undefined + }; + + this.writeLog(entry); + } + + /** + * Log with request context + */ + logWithContext(level: LogLevel, message: string, context: LogContext, data?: any): void { + if (!this.shouldLog(level)) return; + + const entry: LogEntry = { + timestamp: new Date(), + level, + requestId: context.requestId, + message, + data: { ...context, ...data } + }; + + this.writeLog(entry); + } + + /** + * Create a logger with a fixed request ID + */ + withRequestId(requestId: string): { + requestId: string; + log: (level: LogLevel, message: string, data?: any) => void; + error: (message: string, error?: Error | any) => void; + warn: (message: string, data?: any) => void; + info: (message: string, data?: any) => void; + debug: (message: string, data?: any) => void; + startTimer: (operation: string) => () => void; + } { + const self = this; + + return { + requestId, + + log(level: LogLevel, message: string, data?: any): void { + self.logWithContext(level, message, { requestId }, data); + }, + + error(message: string, error?: Error | any): void { + self.logWithContext(LogLevel.ERROR, message, { requestId }, error); + }, + + warn(message: string, data?: any): void { + self.logWithContext(LogLevel.WARN, message, { requestId }, data); + }, + + info(message: string, data?: any): void { + self.logWithContext(LogLevel.INFO, message, { requestId }, data); + }, + + debug(message: string, data?: any): void { + self.logWithContext(LogLevel.DEBUG, message, { requestId }, data); + }, + + startTimer(operation: string): () => void { + const startTime = Date.now(); + return () => { + const duration = Date.now() - startTime; + self.logWithContext(LogLevel.DEBUG, `${operation} completed`, { requestId }, { duration }); + }; + } + }; + } + + /** + * Start a timer for performance tracking + */ + startTimer(operation: string, requestId?: string): () => void { + const startTime = Date.now(); + + return () => { + const duration = Date.now() - startTime; + const entry: LogEntry = { + timestamp: new Date(), + level: LogLevel.DEBUG, + requestId, + message: `${operation} completed in ${duration}ms`, + duration + }; + + if (this.shouldLog(LogLevel.DEBUG)) { + this.writeLog(entry); + } + }; + } + + /** + * Log error with stack trace + */ + error(message: string, error?: Error | any, requestId?: string): void { + const entry: LogEntry = { + timestamp: new Date(), + level: LogLevel.ERROR, + requestId, + message, + error: error instanceof Error ? error : new Error(String(error)) + }; + + this.writeLog(entry); + } + + /** + * Log warning + */ + warn(message: string, data?: any, requestId?: string): void { + const entry: LogEntry = { + timestamp: new Date(), + level: LogLevel.WARN, + requestId, + message, + data + }; + + this.writeLog(entry); + } + + /** + * Log info + */ + info(message: string, data?: any, requestId?: string): void { + const entry: LogEntry = { + timestamp: new Date(), + level: LogLevel.INFO, + requestId, + message, + data + }; + + this.writeLog(entry); + } + + /** + * Log debug (only in debug mode) + */ + debug(message: string, data?: any, requestId?: string): void { + if (!this.debugEnabled) return; + + const entry: LogEntry = { + timestamp: new Date(), + level: LogLevel.DEBUG, + requestId, + message, + data + }; + + this.writeLog(entry); + } + + /** + * Set request context + */ + setRequestContext(requestId: string, context: LogContext): void { + this.requestContexts.set(requestId, context); + } + + /** + * Get request context + */ + getRequestContext(requestId: string): LogContext | undefined { + return this.requestContexts.get(requestId); + } + + /** + * Clear request context + */ + clearRequestContext(requestId: string): void { + this.requestContexts.delete(requestId); + } + + /** + * Get recent logs for debugging + */ + getRecentLogs(count: number = 100, level?: LogLevel): LogEntry[] { + let logs = [...this.logBuffer]; + + if (level) { + logs = logs.filter(entry => entry.level === level); + } + + return logs.slice(-count); + } + + /** + * Clear log buffer + */ + clearBuffer(): void { + this.logBuffer = []; + } + + /** + * Set log level dynamically + */ + setLogLevel(level: LogLevel): void { + this.logLevel = level; + this.debugEnabled = level === LogLevel.DEBUG; + } + + /** + * Get current log level + */ + getLogLevel(): LogLevel { + return this.logLevel; + } + + /** + * Enable/disable logging + */ + setEnabled(enabled: boolean): void { + this.enabled = enabled; + } + + /** + * Check if logging is enabled + */ + isEnabled(): boolean { + return this.enabled; + } + + /** + * Check if debug logging is enabled + */ + isDebugEnabled(): boolean { + return this.debugEnabled; + } + + /** + * Reload configuration + */ + reloadConfiguration(): void { + this.initialize(); + } +} + +// Export singleton instance +const loggingService = new LoggingService(); +export default loggingService; \ No newline at end of file diff --git a/apps/server/src/services/llm/pipeline/model_registry.ts b/apps/server/src/services/llm/pipeline/model_registry.ts new file mode 100644 index 000000000..e612a5838 --- /dev/null +++ b/apps/server/src/services/llm/pipeline/model_registry.ts @@ -0,0 +1,537 @@ +/** + * Model Registry - Phase 2.2 Implementation + * + * Centralized model capability management: + * - Model metadata and capabilities + * - Model selection logic + * - Cost tracking + * - Performance characteristics + */ + +import log from '../../../log.js'; + +// Model capability interfaces +export interface ModelCapabilities { + supportsTools: boolean; + supportsStreaming: boolean; + supportsVision: boolean; + supportsJson: boolean; + maxTokens: number; + contextWindow: number; + trainingCutoff?: string; +} + +export interface ModelCost { + inputTokens: number; // Cost per 1K tokens + outputTokens: number; // Cost per 1K tokens + currency: 'USD'; +} + +export interface ModelPerformance { + averageLatency: number; // ms per token + throughput: number; // tokens per second + reliabilityScore: number; // 0-1 score +} + +export interface ModelInfo { + id: string; + provider: 'openai' | 'anthropic' | 'ollama'; + displayName: string; + family: string; + version?: string; + capabilities: ModelCapabilities; + cost?: ModelCost; + performance?: ModelPerformance; + recommended: { + forCoding: boolean; + forChat: boolean; + forAnalysis: boolean; + forCreative: boolean; + }; +} + +/** + * Model Registry Implementation + */ +export class ModelRegistry { + private models: Map = new Map(); + private initialized = false; + + constructor() { + this.registerBuiltInModels(); + } + + /** + * Register built-in models with their capabilities + */ + private registerBuiltInModels(): void { + // OpenAI Models + this.registerModel({ + id: 'gpt-4-turbo-preview', + provider: 'openai', + displayName: 'GPT-4 Turbo', + family: 'gpt-4', + version: 'turbo-preview', + capabilities: { + supportsTools: true, + supportsStreaming: true, + supportsVision: true, + supportsJson: true, + maxTokens: 4096, + contextWindow: 128000, + trainingCutoff: '2023-12' + }, + cost: { + inputTokens: 0.01, + outputTokens: 0.03, + currency: 'USD' + }, + performance: { + averageLatency: 50, + throughput: 20, + reliabilityScore: 0.95 + }, + recommended: { + forCoding: true, + forChat: true, + forAnalysis: true, + forCreative: true + } + }); + + this.registerModel({ + id: 'gpt-4', + provider: 'openai', + displayName: 'GPT-4', + family: 'gpt-4', + capabilities: { + supportsTools: true, + supportsStreaming: true, + supportsVision: false, + supportsJson: true, + maxTokens: 8192, + contextWindow: 8192, + trainingCutoff: '2023-03' + }, + cost: { + inputTokens: 0.03, + outputTokens: 0.06, + currency: 'USD' + }, + performance: { + averageLatency: 70, + throughput: 15, + reliabilityScore: 0.98 + }, + recommended: { + forCoding: true, + forChat: true, + forAnalysis: true, + forCreative: true + } + }); + + this.registerModel({ + id: 'gpt-3.5-turbo', + provider: 'openai', + displayName: 'GPT-3.5 Turbo', + family: 'gpt-3.5', + version: 'turbo', + capabilities: { + supportsTools: true, + supportsStreaming: true, + supportsVision: false, + supportsJson: true, + maxTokens: 4096, + contextWindow: 16385, + trainingCutoff: '2021-09' + }, + cost: { + inputTokens: 0.0005, + outputTokens: 0.0015, + currency: 'USD' + }, + performance: { + averageLatency: 30, + throughput: 35, + reliabilityScore: 0.92 + }, + recommended: { + forCoding: false, + forChat: true, + forAnalysis: false, + forCreative: false + } + }); + + // Anthropic Models + this.registerModel({ + id: 'claude-3-opus-20240229', + provider: 'anthropic', + displayName: 'Claude 3 Opus', + family: 'claude-3', + version: 'opus', + capabilities: { + supportsTools: true, + supportsStreaming: true, + supportsVision: true, + supportsJson: false, + maxTokens: 4096, + contextWindow: 200000, + trainingCutoff: '2023-08' + }, + cost: { + inputTokens: 0.015, + outputTokens: 0.075, + currency: 'USD' + }, + performance: { + averageLatency: 60, + throughput: 18, + reliabilityScore: 0.96 + }, + recommended: { + forCoding: true, + forChat: true, + forAnalysis: true, + forCreative: true + } + }); + + this.registerModel({ + id: 'claude-3-sonnet-20240229', + provider: 'anthropic', + displayName: 'Claude 3 Sonnet', + family: 'claude-3', + version: 'sonnet', + capabilities: { + supportsTools: true, + supportsStreaming: true, + supportsVision: true, + supportsJson: false, + maxTokens: 4096, + contextWindow: 200000, + trainingCutoff: '2023-08' + }, + cost: { + inputTokens: 0.003, + outputTokens: 0.015, + currency: 'USD' + }, + performance: { + averageLatency: 40, + throughput: 25, + reliabilityScore: 0.94 + }, + recommended: { + forCoding: true, + forChat: true, + forAnalysis: true, + forCreative: false + } + }); + + this.registerModel({ + id: 'claude-3-haiku-20240307', + provider: 'anthropic', + displayName: 'Claude 3 Haiku', + family: 'claude-3', + version: 'haiku', + capabilities: { + supportsTools: true, + supportsStreaming: true, + supportsVision: true, + supportsJson: false, + maxTokens: 4096, + contextWindow: 200000, + trainingCutoff: '2023-08' + }, + cost: { + inputTokens: 0.00025, + outputTokens: 0.00125, + currency: 'USD' + }, + performance: { + averageLatency: 20, + throughput: 50, + reliabilityScore: 0.90 + }, + recommended: { + forCoding: false, + forChat: true, + forAnalysis: false, + forCreative: false + } + }); + + // Ollama Models (local, no cost) + this.registerModel({ + id: 'llama2', + provider: 'ollama', + displayName: 'Llama 2', + family: 'llama', + version: '2', + capabilities: { + supportsTools: false, + supportsStreaming: true, + supportsVision: false, + supportsJson: false, + maxTokens: 2048, + contextWindow: 4096 + }, + performance: { + averageLatency: 100, + throughput: 10, + reliabilityScore: 0.85 + }, + recommended: { + forCoding: false, + forChat: true, + forAnalysis: false, + forCreative: false + } + }); + + this.registerModel({ + id: 'codellama', + provider: 'ollama', + displayName: 'Code Llama', + family: 'llama', + version: 'code', + capabilities: { + supportsTools: false, + supportsStreaming: true, + supportsVision: false, + supportsJson: false, + maxTokens: 2048, + contextWindow: 4096 + }, + performance: { + averageLatency: 100, + throughput: 10, + reliabilityScore: 0.88 + }, + recommended: { + forCoding: true, + forChat: false, + forAnalysis: false, + forCreative: false + } + }); + + this.registerModel({ + id: 'mistral', + provider: 'ollama', + displayName: 'Mistral', + family: 'mistral', + capabilities: { + supportsTools: false, + supportsStreaming: true, + supportsVision: false, + supportsJson: false, + maxTokens: 2048, + contextWindow: 8192 + }, + performance: { + averageLatency: 80, + throughput: 12, + reliabilityScore: 0.87 + }, + recommended: { + forCoding: false, + forChat: true, + forAnalysis: false, + forCreative: false + } + }); + + this.initialized = true; + } + + /** + * Register a model + */ + registerModel(model: ModelInfo): void { + const key = `${model.provider}:${model.id}`; + this.models.set(key, model); + log.debug(`Registered model: ${key}`); + } + + /** + * Get model by ID and provider + */ + getModel(modelId: string, provider: 'openai' | 'anthropic' | 'ollama'): ModelInfo | null { + const key = `${provider}:${modelId}`; + return this.models.get(key) || null; + } + + /** + * Get all models for a provider + */ + getModelsForProvider(provider: 'openai' | 'anthropic' | 'ollama'): ModelInfo[] { + const models: ModelInfo[] = []; + this.models.forEach(model => { + if (model.provider === provider) { + models.push(model); + } + }); + return models; + } + + /** + * Get all registered models + */ + getAllModels(): ModelInfo[] { + return Array.from(this.models.values()); + } + + /** + * Select best model for a use case + */ + selectModelForUseCase( + useCase: 'coding' | 'chat' | 'analysis' | 'creative', + constraints?: { + maxCost?: number; + requiresTools?: boolean; + requiresStreaming?: boolean; + minContextWindow?: number; + provider?: 'openai' | 'anthropic' | 'ollama'; + } + ): ModelInfo | null { + let candidates = this.getAllModels(); + + // Filter by provider if specified + if (constraints?.provider) { + candidates = candidates.filter(m => m.provider === constraints.provider); + } + + // Filter by requirements + if (constraints?.requiresTools) { + candidates = candidates.filter(m => m.capabilities.supportsTools); + } + if (constraints?.requiresStreaming) { + candidates = candidates.filter(m => m.capabilities.supportsStreaming); + } + if (constraints?.minContextWindow) { + candidates = candidates.filter(m => m.capabilities.contextWindow >= constraints.minContextWindow); + } + + // Filter by cost + if (constraints?.maxCost !== undefined) { + candidates = candidates.filter(m => { + if (!m.cost) return true; // Local models have no cost + return m.cost.inputTokens <= constraints.maxCost; + }); + } + + // Filter by use case recommendation + const recommendationKey = `for${useCase.charAt(0).toUpperCase()}${useCase.slice(1)}` as keyof ModelInfo['recommended']; + candidates = candidates.filter(m => m.recommended[recommendationKey]); + + // Sort by performance and cost + candidates.sort((a, b) => { + // Prefer higher reliability + const reliabilityDiff = (b.performance?.reliabilityScore || 0) - (a.performance?.reliabilityScore || 0); + if (Math.abs(reliabilityDiff) > 0.05) return reliabilityDiff > 0 ? 1 : -1; + + // Then prefer lower cost + const aCost = a.cost?.inputTokens || 0; + const bCost = b.cost?.inputTokens || 0; + return aCost - bCost; + }); + + return candidates[0] || null; + } + + /** + * Estimate cost for a request + */ + estimateCost( + modelId: string, + provider: 'openai' | 'anthropic' | 'ollama', + inputTokens: number, + outputTokens: number + ): number | null { + const model = this.getModel(modelId, provider); + if (!model || !model.cost) return null; + + const inputCost = (inputTokens / 1000) * model.cost.inputTokens; + const outputCost = (outputTokens / 1000) * model.cost.outputTokens; + + return inputCost + outputCost; + } + + /** + * Check if a model supports a capability + */ + supportsCapability( + modelId: string, + provider: 'openai' | 'anthropic' | 'ollama', + capability: keyof ModelCapabilities + ): boolean { + const model = this.getModel(modelId, provider); + if (!model) return false; + + return model.capabilities[capability] as boolean; + } + + /** + * Get model context window + */ + getContextWindow(modelId: string, provider: 'openai' | 'anthropic' | 'ollama'): number { + const model = this.getModel(modelId, provider); + return model?.capabilities.contextWindow || 4096; + } + + /** + * Get model max tokens + */ + getMaxTokens(modelId: string, provider: 'openai' | 'anthropic' | 'ollama'): number { + const model = this.getModel(modelId, provider); + return model?.capabilities.maxTokens || 2048; + } + + /** + * Check if registry is initialized + */ + isInitialized(): boolean { + return this.initialized; + } + + /** + * Add custom model (for Ollama or custom endpoints) + */ + addCustomModel( + modelId: string, + provider: 'ollama', + displayName?: string, + capabilities?: Partial + ): void { + const defaultCapabilities: ModelCapabilities = { + supportsTools: false, + supportsStreaming: true, + supportsVision: false, + supportsJson: false, + maxTokens: 2048, + contextWindow: 4096 + }; + + this.registerModel({ + id: modelId, + provider, + displayName: displayName || modelId, + family: 'custom', + capabilities: { ...defaultCapabilities, ...capabilities }, + recommended: { + forCoding: false, + forChat: true, + forAnalysis: false, + forCreative: false + } + }); + } +} + +// Export singleton instance +const modelRegistry = new ModelRegistry(); +export default modelRegistry; \ No newline at end of file diff --git a/apps/server/src/services/llm/pipeline/pipeline_adapter.ts b/apps/server/src/services/llm/pipeline/pipeline_adapter.ts new file mode 100644 index 000000000..b5a03870a --- /dev/null +++ b/apps/server/src/services/llm/pipeline/pipeline_adapter.ts @@ -0,0 +1,155 @@ +/** + * Pipeline Adapter + * + * Provides compatibility layer between the existing ChatPipeline + * and the new SimplifiedChatPipeline implementation. + * This allows gradual migration without breaking existing code. + */ + +import type { ChatPipelineInput, ChatPipelineConfig, PipelineMetrics } from './interfaces.js'; +import type { ChatResponse } from '../ai_interface.js'; +import simplifiedPipeline from './simplified_pipeline.js'; +import configurationService from './configuration_service.js'; +import loggingService, { LogLevel } from './logging_service.js'; + +/** + * Adapter class that maintains the existing ChatPipeline interface + * while using the new simplified implementation underneath + */ +export class ChatPipelineAdapter { + private config: ChatPipelineConfig; + private useSimplified: boolean; + + constructor(config?: Partial) { + // Initialize configuration service on first use + this.initializeServices(); + + // Merge provided config with defaults from configuration service + const toolConfig = configurationService.getToolConfig(); + const streamingConfig = configurationService.getStreamingConfig(); + const debugConfig = configurationService.getDebugConfig(); + + this.config = { + enableStreaming: streamingConfig.enabled, + enableMetrics: debugConfig.enableMetrics, + maxToolCallIterations: toolConfig.maxIterations, + ...config + }; + + // Check if we should use the simplified pipeline + this.useSimplified = this.shouldUseSimplified(); + } + + /** + * Initialize configuration and logging services + */ + private async initializeServices(): Promise { + try { + // Initialize configuration service + const validationResult = await configurationService.initialize(); + if (!validationResult.valid) { + loggingService.error('Configuration validation failed', validationResult.errors); + } + + // Reload logging configuration + loggingService.reloadConfiguration(); + + } catch (error) { + loggingService.error('Failed to initialize services', error); + } + } + + /** + * Determine if we should use the simplified pipeline + */ + private shouldUseSimplified(): boolean { + // Check environment variable or feature flag + const useSimplified = process.env.USE_SIMPLIFIED_PIPELINE; + if (useSimplified === 'true') return true; + if (useSimplified === 'false') return false; + + // Default to using simplified pipeline + return true; + } + + /** + * Execute the pipeline (compatible with existing interface) + */ + async execute(input: ChatPipelineInput): Promise { + if (this.useSimplified) { + // Use the new simplified pipeline + return await simplifiedPipeline.execute({ + messages: input.messages, + options: input.options, + noteId: input.noteId, + query: input.query, + streamCallback: input.streamCallback, + requestId: this.generateRequestId() + }); + } else { + // Fall back to the original implementation if needed + // This would import and use the original ChatPipeline + throw new Error('Original pipeline not available - use simplified pipeline'); + } + } + + /** + * Get pipeline metrics (compatible with existing interface) + */ + getMetrics(): PipelineMetrics { + if (this.useSimplified) { + const metrics = simplifiedPipeline.getMetrics(); + + // Convert simplified metrics to existing format + const stageMetrics: Record = {}; + Object.entries(metrics).forEach(([key, value]) => { + stageMetrics[key] = { + totalExecutions: 0, // Not tracked in simplified version + averageExecutionTime: value + }; + }); + + return { + totalExecutions: 0, + averageExecutionTime: metrics['pipeline_duration'] || 0, + stageMetrics + }; + } else { + // Return empty metrics for original pipeline + return { + totalExecutions: 0, + averageExecutionTime: 0, + stageMetrics: {} + }; + } + } + + /** + * Reset pipeline metrics (compatible with existing interface) + */ + resetMetrics(): void { + if (this.useSimplified) { + simplifiedPipeline.resetMetrics(); + } + } + + /** + * Generate a unique request ID + */ + private generateRequestId(): string { + return `req_${Date.now()}_${Math.random().toString(36).substring(7)}`; + } +} + +/** + * Factory function to create ChatPipeline instances + * This maintains backward compatibility with existing code + */ +export function createChatPipeline(config?: Partial) { + return new ChatPipelineAdapter(config); +} + +/** + * Export as ChatPipeline for drop-in replacement + */ +export const ChatPipeline = ChatPipelineAdapter; \ No newline at end of file diff --git a/apps/server/src/services/llm/pipeline/simplified_pipeline.spec.ts b/apps/server/src/services/llm/pipeline/simplified_pipeline.spec.ts new file mode 100644 index 000000000..cd7277c9c --- /dev/null +++ b/apps/server/src/services/llm/pipeline/simplified_pipeline.spec.ts @@ -0,0 +1,401 @@ +/** + * Tests for the Simplified Chat Pipeline + */ + +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { SimplifiedChatPipeline } from './simplified_pipeline.js'; +import type { SimplifiedPipelineInput } from './simplified_pipeline.js'; +import configurationService from './configuration_service.js'; +import loggingService from './logging_service.js'; + +// Mock dependencies +vi.mock('./configuration_service.js', () => ({ + default: { + getToolConfig: vi.fn(() => ({ + enabled: true, + maxIterations: 3, + timeout: 30000, + parallelExecution: false + })), + getDebugConfig: vi.fn(() => ({ + enabled: true, + logLevel: 'info', + enableMetrics: true, + enableTracing: false + })), + getStreamingConfig: vi.fn(() => ({ + enabled: true, + chunkSize: 256, + flushInterval: 100 + })), + getDefaultSystemPrompt: vi.fn(() => 'You are a helpful assistant.'), + getDefaultCompletionOptions: vi.fn(() => ({ + temperature: 0.7, + max_tokens: 2000 + })) + } +})); + +vi.mock('./logging_service.js', () => ({ + default: { + withRequestId: vi.fn((requestId: string) => ({ + requestId, + log: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + info: vi.fn(), + debug: vi.fn(), + startTimer: vi.fn(() => vi.fn()) + })) + }, + LogLevel: { + ERROR: 'error', + WARN: 'warn', + INFO: 'info', + DEBUG: 'debug' + } +})); + +vi.mock('../ai_service_manager.js', () => ({ + default: { + getService: vi.fn(() => ({ + chat: vi.fn(async (messages, options) => ({ + text: 'Test response', + model: 'test-model', + provider: 'test-provider', + tool_calls: options.enableTools ? [] : undefined + })) + })) + } +})); + +vi.mock('../tools/tool_registry.js', () => ({ + default: { + getAllToolDefinitions: vi.fn(() => [ + { + type: 'function', + function: { + name: 'test_tool', + description: 'Test tool', + parameters: {} + } + } + ]), + getTool: vi.fn(() => ({ + execute: vi.fn(async () => 'Tool result') + })) + } +})); + +describe('SimplifiedChatPipeline', () => { + let pipeline: SimplifiedChatPipeline; + + beforeEach(() => { + vi.clearAllMocks(); + pipeline = new SimplifiedChatPipeline(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe('execute', () => { + it('should execute a simple chat without tools', async () => { + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Hello' } + ], + options: { + enableTools: false + } + }; + + const response = await pipeline.execute(input); + + expect(response).toBeDefined(); + expect(response.text).toBe('Test response'); + expect(response.model).toBe('test-model'); + expect(response.provider).toBe('test-provider'); + }); + + it('should add system prompt when not present', async () => { + const aiServiceManager = await import('../ai_service_manager.js'); + const mockChat = vi.fn(async (messages) => { + // Check that system prompt was added + expect(messages[0].role).toBe('system'); + expect(messages[0].content).toBe('You are a helpful assistant.'); + return { + text: 'Response with system prompt', + model: 'test-model', + provider: 'test-provider' + }; + }); + + aiServiceManager.default.getService = vi.fn(() => ({ + chat: mockChat + })); + + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Hello' } + ] + }; + + const response = await pipeline.execute(input); + + expect(mockChat).toHaveBeenCalled(); + expect(response.text).toBe('Response with system prompt'); + }); + + it('should handle tool calls', async () => { + const aiServiceManager = await import('../ai_service_manager.js'); + let callCount = 0; + + const mockChat = vi.fn(async (messages, options) => { + callCount++; + + // First call returns tool calls + if (callCount === 1) { + return { + text: '', + model: 'test-model', + provider: 'test-provider', + tool_calls: [ + { + id: 'call_1', + type: 'function', + function: { + name: 'test_tool', + arguments: '{}' + } + } + ] + }; + } + + // Second call (after tool execution) returns final response + return { + text: 'Final response after tool', + model: 'test-model', + provider: 'test-provider' + }; + }); + + aiServiceManager.default.getService = vi.fn(() => ({ + chat: mockChat + })); + + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Use a tool' } + ], + options: { + enableTools: true + } + }; + + const response = await pipeline.execute(input); + + expect(mockChat).toHaveBeenCalledTimes(2); + expect(response.text).toBe('Final response after tool'); + }); + + it('should handle streaming when callback is provided', async () => { + const streamCallback = vi.fn(); + const aiServiceManager = await import('../ai_service_manager.js'); + + const mockChat = vi.fn(async (messages, options) => ({ + text: 'Streamed response', + model: 'test-model', + provider: 'test-provider', + stream: async (callback: Function) => { + await callback({ text: 'Chunk 1', done: false }); + await callback({ text: 'Chunk 2', done: false }); + await callback({ text: 'Chunk 3', done: true }); + } + })); + + aiServiceManager.default.getService = vi.fn(() => ({ + chat: mockChat + })); + + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Stream this' } + ], + streamCallback + }; + + const response = await pipeline.execute(input); + + expect(streamCallback).toHaveBeenCalledTimes(3); + expect(streamCallback).toHaveBeenCalledWith('Chunk 1', false, expect.any(Object)); + expect(streamCallback).toHaveBeenCalledWith('Chunk 2', false, expect.any(Object)); + expect(streamCallback).toHaveBeenCalledWith('Chunk 3', true, expect.any(Object)); + expect(response.text).toBe('Chunk 1Chunk 2Chunk 3'); + }); + + it('should respect max tool iterations', async () => { + const aiServiceManager = await import('../ai_service_manager.js'); + + // Always return tool calls to test iteration limit + const mockChat = vi.fn(async () => ({ + text: '', + model: 'test-model', + provider: 'test-provider', + tool_calls: [ + { + id: 'call_infinite', + type: 'function', + function: { + name: 'test_tool', + arguments: '{}' + } + } + ] + })); + + aiServiceManager.default.getService = vi.fn(() => ({ + chat: mockChat + })); + + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Infinite tools' } + ], + options: { + enableTools: true + } + }; + + const response = await pipeline.execute(input); + + // Should be called: 1 initial + 3 tool iterations (max) + expect(mockChat).toHaveBeenCalledTimes(4); + expect(response).toBeDefined(); + }); + + it('should handle errors gracefully', async () => { + const aiServiceManager = await import('../ai_service_manager.js'); + aiServiceManager.default.getService = vi.fn(() => null); + + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'This will fail' } + ] + }; + + await expect(pipeline.execute(input)).rejects.toThrow('No AI service available'); + }); + + it('should add context when query and advanced context are enabled', async () => { + // Mock context service + vi.mock('../context/services/context_service.js', () => ({ + default: { + getContextForQuery: vi.fn(async () => 'Relevant context for query') + } + })); + + const aiServiceManager = await import('../ai_service_manager.js'); + const mockChat = vi.fn(async (messages) => { + // Check that context was added to system message + const systemMessage = messages.find((m: any) => m.role === 'system'); + expect(systemMessage).toBeDefined(); + expect(systemMessage.content).toContain('Context:'); + expect(systemMessage.content).toContain('Relevant context for query'); + + return { + text: 'Response with context', + model: 'test-model', + provider: 'test-provider' + }; + }); + + aiServiceManager.default.getService = vi.fn(() => ({ + chat: mockChat + })); + + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Question needing context' } + ], + query: 'Question needing context', + options: { + useAdvancedContext: true + } + }; + + const response = await pipeline.execute(input); + + expect(mockChat).toHaveBeenCalled(); + expect(response.text).toBe('Response with context'); + }); + + it('should track metrics when enabled', async () => { + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Track metrics' } + ] + }; + + await pipeline.execute(input); + + const metrics = pipeline.getMetrics(); + expect(metrics).toBeDefined(); + expect(metrics.pipeline_duration).toBeGreaterThan(0); + }); + + it('should generate request ID if not provided', async () => { + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'No request ID' } + ] + }; + + const response = await pipeline.execute(input); + + expect(response.metadata?.requestId).toBeDefined(); + expect(response.metadata.requestId).toMatch(/^req_\d+_[a-z0-9]+$/); + }); + }); + + describe('getMetrics', () => { + it('should return empty metrics initially', () => { + const metrics = pipeline.getMetrics(); + expect(metrics).toEqual({}); + }); + + it('should return metrics after execution', async () => { + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Generate metrics' } + ] + }; + + await pipeline.execute(input); + + const metrics = pipeline.getMetrics(); + expect(Object.keys(metrics).length).toBeGreaterThan(0); + }); + }); + + describe('resetMetrics', () => { + it('should clear all metrics', async () => { + const input: SimplifiedPipelineInput = { + messages: [ + { role: 'user', content: 'Generate metrics' } + ] + }; + + await pipeline.execute(input); + + let metrics = pipeline.getMetrics(); + expect(Object.keys(metrics).length).toBeGreaterThan(0); + + pipeline.resetMetrics(); + + metrics = pipeline.getMetrics(); + expect(metrics).toEqual({}); + }); + }); +}); \ No newline at end of file diff --git a/apps/server/src/services/llm/pipeline/simplified_pipeline.ts b/apps/server/src/services/llm/pipeline/simplified_pipeline.ts new file mode 100644 index 000000000..b73834fa4 --- /dev/null +++ b/apps/server/src/services/llm/pipeline/simplified_pipeline.ts @@ -0,0 +1,426 @@ +/** + * Simplified Chat Pipeline - Phase 2.1 Implementation + * + * This pipeline reduces complexity from 9 stages to 4 essential stages: + * 1. Message Preparation (formatting, context, system prompt) + * 2. LLM Execution (provider selection and API call) + * 3. Tool Handling (parse, execute, format results) + * 4. Response Processing (format response, add metadata, send to client) + */ + +import type { + Message, + ChatCompletionOptions, + ChatResponse, + StreamChunk, + ToolCall +} from '../ai_interface.js'; +import aiServiceManager from '../ai_service_manager.js'; +import toolRegistry from '../tools/tool_registry.js'; +import configurationService from './configuration_service.js'; +import loggingService, { LogLevel } from './logging_service.js'; +import type { StreamCallback } from './interfaces.js'; + +// Simplified pipeline input interface +export interface SimplifiedPipelineInput { + messages: Message[]; + options?: ChatCompletionOptions; + noteId?: string; + query?: string; + streamCallback?: StreamCallback; + requestId?: string; +} + +// Pipeline configuration +interface PipelineConfig { + maxToolIterations: number; + enableMetrics: boolean; + enableStreaming: boolean; +} + +/** + * Simplified Chat Pipeline Implementation + */ +export class SimplifiedChatPipeline { + private config: PipelineConfig; + private metrics: Map = new Map(); + + constructor() { + // Load configuration from centralized service + this.config = { + maxToolIterations: configurationService.getToolConfig().maxIterations, + enableMetrics: configurationService.getDebugConfig().enableMetrics, + enableStreaming: configurationService.getStreamingConfig().enabled + }; + } + + /** + * Execute the simplified pipeline + */ + async execute(input: SimplifiedPipelineInput): Promise { + const requestId = input.requestId || this.generateRequestId(); + const logger = loggingService.withRequestId(requestId); + + logger.log(LogLevel.INFO, 'Pipeline started', { + messageCount: input.messages.length, + hasQuery: !!input.query, + streaming: !!input.streamCallback + }); + + const startTime = Date.now(); + + try { + // Stage 1: Message Preparation + const preparedMessages = await this.prepareMessages(input, logger); + + // Stage 2: LLM Execution + const llmResponse = await this.executeLLM(preparedMessages, input, logger); + + // Stage 3: Tool Handling (if needed) + const finalResponse = await this.handleTools(llmResponse, preparedMessages, input, logger); + + // Stage 4: Response Processing + const processedResponse = await this.processResponse(finalResponse, input, logger); + + // Record metrics + if (this.config.enableMetrics) { + this.recordMetric('pipeline_duration', Date.now() - startTime); + } + + logger.log(LogLevel.INFO, 'Pipeline completed', { + duration: Date.now() - startTime, + responseLength: processedResponse.text.length + }); + + return processedResponse; + + } catch (error) { + logger.log(LogLevel.ERROR, 'Pipeline error', { error }); + throw error; + } + } + + /** + * Stage 1: Message Preparation + * Combines formatting, context enrichment, and system prompt injection + */ + private async prepareMessages( + input: SimplifiedPipelineInput, + logger: ReturnType + ): Promise { + const startTime = Date.now(); + logger.log(LogLevel.DEBUG, 'Stage 1: Message preparation started'); + + const messages: Message[] = [...input.messages]; + + // Add system prompt if provided + const systemPrompt = input.options?.systemPrompt || configurationService.getDefaultSystemPrompt(); + if (systemPrompt && !messages.some(m => m.role === 'system')) { + messages.unshift({ + role: 'system', + content: systemPrompt + }); + } + + // Add context if query is provided and context is enabled + if (input.query && input.options?.useAdvancedContext) { + const context = await this.extractContext(input.query, input.noteId); + if (context) { + // Find the last system message or create one + const lastSystemIndex = messages.findIndex(m => m.role === 'system'); + if (lastSystemIndex >= 0) { + messages[lastSystemIndex].content += `\n\nContext:\n${context}`; + } else { + messages.unshift({ + role: 'system', + content: `Context:\n${context}` + }); + } + } + } + + this.recordMetric('message_preparation', Date.now() - startTime); + logger.log(LogLevel.DEBUG, 'Stage 1: Message preparation completed', { + messageCount: messages.length, + duration: Date.now() - startTime + }); + + return messages; + } + + /** + * Stage 2: LLM Execution + * Handles provider selection and API call + */ + private async executeLLM( + messages: Message[], + input: SimplifiedPipelineInput, + logger: ReturnType + ): Promise { + const startTime = Date.now(); + logger.log(LogLevel.DEBUG, 'Stage 2: LLM execution started'); + + // Get completion options with defaults + const options: ChatCompletionOptions = { + ...configurationService.getDefaultCompletionOptions(), + ...input.options, + stream: this.config.enableStreaming && !!input.streamCallback + }; + + // Add tools if enabled + if (options.enableTools !== false) { + const tools = toolRegistry.getAllToolDefinitions(); + if (tools.length > 0) { + options.tools = tools; + logger.log(LogLevel.DEBUG, 'Tools enabled', { toolCount: tools.length }); + } + } + + // Execute LLM call + const service = aiServiceManager.getService(); + if (!service) { + throw new Error('No AI service available'); + } + + const response = await service.chat(messages, options); + + this.recordMetric('llm_execution', Date.now() - startTime); + logger.log(LogLevel.DEBUG, 'Stage 2: LLM execution completed', { + provider: response.provider, + model: response.model, + hasToolCalls: !!(response.tool_calls?.length), + duration: Date.now() - startTime + }); + + return response; + } + + /** + * Stage 3: Tool Handling + * Parses tool calls, executes them, and handles follow-up LLM calls + */ + private async handleTools( + response: ChatResponse, + messages: Message[], + input: SimplifiedPipelineInput, + logger: ReturnType + ): Promise { + // Return immediately if no tools to handle + if (!response.tool_calls?.length || input.options?.enableTools === false) { + return response; + } + + const startTime = Date.now(); + logger.log(LogLevel.INFO, 'Stage 3: Tool handling started', { + toolCount: response.tool_calls.length + }); + + let currentResponse = response; + let currentMessages = [...messages]; + let iterations = 0; + + while (iterations < this.config.maxToolIterations && currentResponse.tool_calls?.length) { + iterations++; + logger.log(LogLevel.DEBUG, `Tool iteration ${iterations}/${this.config.maxToolIterations}`); + + // Add assistant message with tool calls + currentMessages.push({ + role: 'assistant', + content: currentResponse.text || '', + tool_calls: currentResponse.tool_calls + }); + + // Execute tools and collect results + const toolResults = await this.executeTools(currentResponse.tool_calls, logger); + + // Add tool results to messages + for (const result of toolResults) { + currentMessages.push({ + role: 'tool', + content: result.content, + tool_call_id: result.toolCallId + }); + } + + // Send tool results back to LLM for follow-up + const followUpOptions: ChatCompletionOptions = { + ...input.options, + stream: false, // Don't stream tool follow-ups + enableTools: true + }; + + const service = aiServiceManager.getService(); + if (!service) { + throw new Error('No AI service available'); + } + + currentResponse = await service.chat(currentMessages, followUpOptions); + + // Check if we need another iteration + if (!currentResponse.tool_calls?.length) { + break; + } + } + + if (iterations >= this.config.maxToolIterations) { + logger.log(LogLevel.WARN, 'Maximum tool iterations reached', { + iterations: this.config.maxToolIterations + }); + } + + this.recordMetric('tool_handling', Date.now() - startTime); + logger.log(LogLevel.INFO, 'Stage 3: Tool handling completed', { + iterations, + duration: Date.now() - startTime + }); + + return currentResponse; + } + + /** + * Stage 4: Response Processing + * Formats the response and handles streaming + */ + private async processResponse( + response: ChatResponse, + input: SimplifiedPipelineInput, + logger: ReturnType + ): Promise { + const startTime = Date.now(); + logger.log(LogLevel.DEBUG, 'Stage 4: Response processing started'); + + // Handle streaming if enabled + if (input.streamCallback && response.stream) { + let accumulatedText = ''; + + await response.stream(async (chunk: StreamChunk) => { + accumulatedText += chunk.text; + await input.streamCallback!(chunk.text, chunk.done || false, chunk); + }); + + // Update response text with accumulated content + response.text = accumulatedText; + } + + // Add metadata + response.metadata = { + ...response.metadata, + requestId: logger.requestId, + processingTime: Date.now() - startTime + }; + + this.recordMetric('response_processing', Date.now() - startTime); + logger.log(LogLevel.DEBUG, 'Stage 4: Response processing completed', { + responseLength: response.text.length, + duration: Date.now() - startTime + }); + + return response; + } + + /** + * Execute tool calls and return results + */ + private async executeTools( + toolCalls: ToolCall[], + logger: ReturnType + ): Promise> { + const results = []; + + for (const toolCall of toolCalls) { + try { + const tool = toolRegistry.getTool(toolCall.function.name); + if (!tool) { + throw new Error(`Tool not found: ${toolCall.function.name}`); + } + + const args = JSON.parse(toolCall.function.arguments || '{}'); + const result = await tool.execute(args); + + results.push({ + toolCallId: toolCall.id, + content: typeof result === 'string' ? result : JSON.stringify(result) + }); + + logger.log(LogLevel.DEBUG, 'Tool executed successfully', { + tool: toolCall.function.name, + toolCallId: toolCall.id + }); + + } catch (error) { + logger.log(LogLevel.ERROR, 'Tool execution failed', { + tool: toolCall.function.name, + error + }); + + results.push({ + toolCallId: toolCall.id, + content: `Error: ${error instanceof Error ? error.message : String(error)}` + }); + } + } + + return results; + } + + /** + * Extract context for the query (simplified version) + */ + private async extractContext(query: string, noteId?: string): Promise { + try { + // This is a simplified context extraction + // In production, this would call the semantic search service + const contextService = await import('../context/services/context_service.js'); + return await contextService.default.getContextForQuery(query, noteId); + } catch (error) { + loggingService.log(LogLevel.ERROR, 'Context extraction failed', { error }); + return null; + } + } + + /** + * Generate a unique request ID + */ + private generateRequestId(): string { + return `req_${Date.now()}_${Math.random().toString(36).substring(7)}`; + } + + /** + * Record a metric + */ + private recordMetric(name: string, value: number): void { + if (!this.config.enableMetrics) return; + + const current = this.metrics.get(name) || 0; + const count = this.metrics.get(`${name}_count`) || 0; + + // Calculate running average + const newAverage = (current * count + value) / (count + 1); + + this.metrics.set(name, newAverage); + this.metrics.set(`${name}_count`, count + 1); + } + + /** + * Get current metrics + */ + getMetrics(): Record { + const result: Record = {}; + this.metrics.forEach((value, key) => { + if (!key.endsWith('_count')) { + result[key] = value; + } + }); + return result; + } + + /** + * Reset metrics + */ + resetMetrics(): void { + this.metrics.clear(); + } +} + +// Export singleton instance +export default new SimplifiedChatPipeline(); \ No newline at end of file