feat(llm): update pipeline steps

This commit is contained in:
perfectra1n
2025-08-08 22:30:11 -07:00
parent 0d898385f6
commit 3db145b6e6
8 changed files with 2579 additions and 1 deletions

View File

@@ -6,7 +6,7 @@ import log from "../../log.js";
import type { Request, Response } from "express";
import type { Message, ChatCompletionOptions } from "../ai_interface.js";
import aiServiceManager from "../ai_service_manager.js";
import { ChatPipeline } from "../pipeline/chat_pipeline.js";
import { ChatPipeline } from "../pipeline/pipeline_adapter.js";
import type { ChatPipelineInput } from "../pipeline/interfaces.js";
import options from "../../options.js";
import { ToolHandler } from "./handlers/tool_handler.js";

View File

@@ -0,0 +1,181 @@
#!/usr/bin/env node
/**
* Script to clean up debug log statements from production code
*
* This script:
* 1. Finds all log.info("[DEBUG]") statements
* 2. Converts them to proper debug level logging
* 3. Reports on other verbose logging that should be reviewed
*/
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// Patterns to find and replace
const patterns = [
{
name: 'Debug in info logs',
find: /log\.info\((.*?)\[DEBUG\](.*?)\)/g,
replace: 'log.debug($1$2)',
count: 0
},
{
name: 'Tool call debug',
find: /log\.info\((.*?)\[TOOL CALL DEBUG\](.*?)\)/g,
replace: 'log.debug($1Tool call: $2)',
count: 0
},
{
name: 'Excessive separators',
find: /log\.info\(['"`]={10,}.*?={10,}['"`]\)/g,
replace: null, // Just count, don't replace
count: 0
},
{
name: 'Pipeline stage logs',
find: /log\.info\(['"`].*?STAGE \d+:.*?['"`]\)/g,
replace: null, // Just count, don't replace
count: 0
}
];
// Files to process
const filesToProcess = [
path.join(__dirname, '..', 'pipeline', 'chat_pipeline.ts'),
path.join(__dirname, '..', 'providers', 'anthropic_service.ts'),
path.join(__dirname, '..', 'providers', 'openai_service.ts'),
path.join(__dirname, '..', 'providers', 'ollama_service.ts'),
path.join(__dirname, '..', 'tools', 'tool_registry.ts'),
];
// Additional directories to scan
const directoriesToScan = [
path.join(__dirname, '..', 'pipeline', 'stages'),
path.join(__dirname, '..', 'tools'),
];
/**
* Process a single file
*/
function processFile(filePath: string, dryRun: boolean = true): void {
if (!fs.existsSync(filePath)) {
console.log(`File not found: ${filePath}`);
return;
}
let content = fs.readFileSync(filePath, 'utf-8');
let modified = false;
console.log(`\nProcessing: ${path.basename(filePath)}`);
patterns.forEach(pattern => {
const matches = content.match(pattern.find) || [];
if (matches.length > 0) {
console.log(` Found ${matches.length} instances of "${pattern.name}"`);
pattern.count += matches.length;
if (pattern.replace && !dryRun) {
content = content.replace(pattern.find, pattern.replace);
modified = true;
}
}
});
if (modified && !dryRun) {
fs.writeFileSync(filePath, content, 'utf-8');
console.log(` ✓ File updated`);
}
}
/**
* Scan directory for files
*/
function scanDirectory(dirPath: string): string[] {
const files: string[] = [];
if (!fs.existsSync(dirPath)) {
return files;
}
const entries = fs.readdirSync(dirPath, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(dirPath, entry.name);
if (entry.isDirectory()) {
files.push(...scanDirectory(fullPath));
} else if (entry.isFile() && entry.name.endsWith('.ts')) {
files.push(fullPath);
}
}
return files;
}
/**
* Main function
*/
function main(): void {
const args = process.argv.slice(2);
const dryRun = !args.includes('--apply');
console.log('========================================');
console.log('Debug Log Cleanup Script');
console.log('========================================');
console.log(dryRun ? 'Mode: DRY RUN (use --apply to make changes)' : 'Mode: APPLYING CHANGES');
// Collect all files to process
const allFiles = [...filesToProcess];
directoriesToScan.forEach(dir => {
allFiles.push(...scanDirectory(dir));
});
// Remove duplicates
const uniqueFiles = [...new Set(allFiles)];
console.log(`\nFound ${uniqueFiles.length} TypeScript files to process`);
// Process each file
uniqueFiles.forEach(file => processFile(file, dryRun));
// Summary
console.log('\n========================================');
console.log('Summary');
console.log('========================================');
patterns.forEach(pattern => {
if (pattern.count > 0) {
console.log(`${pattern.name}: ${pattern.count} instances`);
}
});
const totalIssues = patterns.reduce((sum, p) => sum + p.count, 0);
if (totalIssues === 0) {
console.log('✓ No debug statements found!');
} else if (dryRun) {
console.log(`\nFound ${totalIssues} total issues.`);
console.log('Run with --apply to fix replaceable patterns.');
} else {
const fixedCount = patterns.filter(p => p.replace).reduce((sum, p) => sum + p.count, 0);
console.log(`\n✓ Fixed ${fixedCount} issues.`);
const remainingCount = patterns.filter(p => !p.replace).reduce((sum, p) => sum + p.count, 0);
if (remainingCount > 0) {
console.log(` ${remainingCount} instances need manual review.`);
}
}
}
// Run if executed directly
if (import.meta.url === `file://${process.argv[1]}`) {
main();
}
export { processFile, scanDirectory };

View File

@@ -0,0 +1,452 @@
/**
* Configuration Service - Phase 2.2 Implementation
*
* Centralizes all LLM configuration management:
* - Single source of truth for all configuration
* - Validation at startup
* - Type-safe configuration access
* - No scattered options.getOption() calls
*/
import options from '../../../options.js';
import log from '../../../log.js';
import type { ChatCompletionOptions } from '../ai_interface.js';
// Configuration interfaces
export interface LLMConfiguration {
providers: ProviderConfiguration;
defaults: DefaultConfiguration;
tools: ToolConfiguration;
streaming: StreamingConfiguration;
debug: DebugConfiguration;
limits: LimitConfiguration;
}
export interface ProviderConfiguration {
enabled: boolean;
selected: 'openai' | 'anthropic' | 'ollama' | null;
openai?: {
apiKey: string;
baseUrl?: string;
defaultModel: string;
maxTokens?: number;
};
anthropic?: {
apiKey: string;
baseUrl?: string;
defaultModel: string;
maxTokens?: number;
};
ollama?: {
baseUrl: string;
defaultModel: string;
maxTokens?: number;
};
}
export interface DefaultConfiguration {
systemPrompt: string;
temperature: number;
maxTokens: number;
topP: number;
presencePenalty: number;
frequencyPenalty: number;
}
export interface ToolConfiguration {
enabled: boolean;
maxIterations: number;
timeout: number;
parallelExecution: boolean;
}
export interface StreamingConfiguration {
enabled: boolean;
chunkSize: number;
flushInterval: number;
}
export interface DebugConfiguration {
enabled: boolean;
logLevel: 'error' | 'warn' | 'info' | 'debug';
enableMetrics: boolean;
enableTracing: boolean;
}
export interface LimitConfiguration {
maxMessageLength: number;
maxConversationLength: number;
maxContextLength: number;
rateLimitPerMinute: number;
}
// Validation result interface
export interface ConfigurationValidationResult {
valid: boolean;
errors: string[];
warnings: string[];
}
/**
* Configuration Service Implementation
*/
export class ConfigurationService {
private config: LLMConfiguration | null = null;
private validationResult: ConfigurationValidationResult | null = null;
private lastLoadTime: number = 0;
private readonly CACHE_DURATION = 60000; // 1 minute cache
/**
* Load and validate configuration
*/
async initialize(): Promise<ConfigurationValidationResult> {
log.info('Initializing LLM configuration service');
try {
this.config = await this.loadConfiguration();
this.validationResult = this.validateConfiguration(this.config);
this.lastLoadTime = Date.now();
if (!this.validationResult.valid) {
log.error('Configuration validation failed', this.validationResult.errors);
} else if (this.validationResult.warnings.length > 0) {
log.warn('Configuration warnings', this.validationResult.warnings);
} else {
log.info('Configuration loaded and validated successfully');
}
return this.validationResult;
} catch (error) {
const errorMessage = `Failed to initialize configuration: ${error}`;
log.error(errorMessage);
this.validationResult = {
valid: false,
errors: [errorMessage],
warnings: []
};
return this.validationResult;
}
}
/**
* Load configuration from options
*/
private async loadConfiguration(): Promise<LLMConfiguration> {
// Provider configuration
const providers: ProviderConfiguration = {
enabled: options.getOptionBool('aiEnabled'),
selected: this.getSelectedProvider(),
openai: this.loadOpenAIConfig(),
anthropic: this.loadAnthropicConfig(),
ollama: this.loadOllamaConfig()
};
// Default configuration
const defaults: DefaultConfiguration = {
systemPrompt: options.getOption('llmSystemPrompt') || 'You are a helpful AI assistant.',
temperature: this.parseFloat(options.getOption('llmTemperature'), 0.7),
maxTokens: this.parseInt(options.getOption('llmMaxTokens'), 2000),
topP: this.parseFloat(options.getOption('llmTopP'), 0.9),
presencePenalty: this.parseFloat(options.getOption('llmPresencePenalty'), 0),
frequencyPenalty: this.parseFloat(options.getOption('llmFrequencyPenalty'), 0)
};
// Tool configuration
const tools: ToolConfiguration = {
enabled: options.getOptionBool('llmToolsEnabled') !== false,
maxIterations: this.parseInt(options.getOption('llmMaxToolIterations'), 5),
timeout: this.parseInt(options.getOption('llmToolTimeout'), 30000),
parallelExecution: options.getOptionBool('llmParallelTools') !== false
};
// Streaming configuration
const streaming: StreamingConfiguration = {
enabled: options.getOptionBool('llmStreamingEnabled') !== false,
chunkSize: this.parseInt(options.getOption('llmStreamChunkSize'), 256),
flushInterval: this.parseInt(options.getOption('llmStreamFlushInterval'), 100)
};
// Debug configuration
const debug: DebugConfiguration = {
enabled: options.getOptionBool('llmDebugEnabled'),
logLevel: this.getLogLevel(),
enableMetrics: options.getOptionBool('llmMetricsEnabled'),
enableTracing: options.getOptionBool('llmTracingEnabled')
};
// Limit configuration
const limits: LimitConfiguration = {
maxMessageLength: this.parseInt(options.getOption('llmMaxMessageLength'), 100000),
maxConversationLength: this.parseInt(options.getOption('llmMaxConversationLength'), 50),
maxContextLength: this.parseInt(options.getOption('llmMaxContextLength'), 10000),
rateLimitPerMinute: this.parseInt(options.getOption('llmRateLimitPerMinute'), 60)
};
return {
providers,
defaults,
tools,
streaming,
debug,
limits
};
}
/**
* Load OpenAI configuration
*/
private loadOpenAIConfig() {
const apiKey = options.getOption('openaiApiKey');
if (!apiKey) return undefined;
return {
apiKey,
baseUrl: options.getOption('openaiBaseUrl') || undefined,
defaultModel: options.getOption('openaiDefaultModel') || 'gpt-4-turbo-preview',
maxTokens: this.parseInt(options.getOption('openaiMaxTokens'), 4096)
};
}
/**
* Load Anthropic configuration
*/
private loadAnthropicConfig() {
const apiKey = options.getOption('anthropicApiKey');
if (!apiKey) return undefined;
return {
apiKey,
baseUrl: options.getOption('anthropicBaseUrl') || undefined,
defaultModel: options.getOption('anthropicDefaultModel') || 'claude-3-opus-20240229',
maxTokens: this.parseInt(options.getOption('anthropicMaxTokens'), 4096)
};
}
/**
* Load Ollama configuration
*/
private loadOllamaConfig() {
const baseUrl = options.getOption('ollamaBaseUrl');
if (!baseUrl) return undefined;
return {
baseUrl,
defaultModel: options.getOption('ollamaDefaultModel') || 'llama2',
maxTokens: this.parseInt(options.getOption('ollamaMaxTokens'), 2048)
};
}
/**
* Validate configuration
*/
private validateConfiguration(config: LLMConfiguration): ConfigurationValidationResult {
const errors: string[] = [];
const warnings: string[] = [];
// Check if AI is enabled
if (!config.providers.enabled) {
warnings.push('AI features are disabled');
return { valid: true, errors, warnings };
}
// Check provider selection
if (!config.providers.selected) {
errors.push('No AI provider selected');
} else {
// Validate selected provider configuration
const selectedConfig = config.providers[config.providers.selected];
if (!selectedConfig) {
errors.push(`Configuration missing for selected provider: ${config.providers.selected}`);
} else {
// Provider-specific validation
if (config.providers.selected === 'openai' && !selectedConfig.apiKey) {
errors.push('OpenAI API key is required');
}
if (config.providers.selected === 'anthropic' && !selectedConfig.apiKey) {
errors.push('Anthropic API key is required');
}
if (config.providers.selected === 'ollama' && !selectedConfig.baseUrl) {
errors.push('Ollama base URL is required');
}
}
}
// Validate limits
if (config.limits.maxMessageLength < 100) {
warnings.push('Maximum message length is very low, may cause issues');
}
if (config.limits.maxConversationLength < 2) {
errors.push('Maximum conversation length must be at least 2');
}
if (config.tools.maxIterations > 10) {
warnings.push('High tool iteration limit may cause performance issues');
}
// Validate defaults
if (config.defaults.temperature < 0 || config.defaults.temperature > 2) {
errors.push('Temperature must be between 0 and 2');
}
if (config.defaults.maxTokens < 1) {
errors.push('Maximum tokens must be at least 1');
}
return {
valid: errors.length === 0,
errors,
warnings
};
}
/**
* Get selected provider
*/
private getSelectedProvider(): 'openai' | 'anthropic' | 'ollama' | null {
const provider = options.getOption('aiSelectedProvider');
if (provider === 'openai' || provider === 'anthropic' || provider === 'ollama') {
return provider;
}
return null;
}
/**
* Get log level
*/
private getLogLevel(): 'error' | 'warn' | 'info' | 'debug' {
const level = options.getOption('llmLogLevel') || 'info';
if (level === 'error' || level === 'warn' || level === 'info' || level === 'debug') {
return level;
}
return 'info';
}
/**
* Parse integer with default
*/
private parseInt(value: string | null, defaultValue: number): number {
if (!value) return defaultValue;
const parsed = parseInt(value, 10);
return isNaN(parsed) ? defaultValue : parsed;
}
/**
* Parse float with default
*/
private parseFloat(value: string | null, defaultValue: number): number {
if (!value) return defaultValue;
const parsed = parseFloat(value);
return isNaN(parsed) ? defaultValue : parsed;
}
/**
* Ensure configuration is loaded
*/
private ensureConfigLoaded(): LLMConfiguration {
if (!this.config || Date.now() - this.lastLoadTime > this.CACHE_DURATION) {
// Reload configuration if cache expired
this.initialize().catch(error => {
log.error('Failed to reload configuration', error);
});
}
if (!this.config) {
throw new Error('Configuration not initialized');
}
return this.config;
}
// Public accessors
/**
* Get provider configuration
*/
getProviderConfig(): ProviderConfiguration {
return this.ensureConfigLoaded().providers;
}
/**
* Get default configuration
*/
getDefaultConfig(): DefaultConfiguration {
return this.ensureConfigLoaded().defaults;
}
/**
* Get tool configuration
*/
getToolConfig(): ToolConfiguration {
return this.ensureConfigLoaded().tools;
}
/**
* Get streaming configuration
*/
getStreamingConfig(): StreamingConfiguration {
return this.ensureConfigLoaded().streaming;
}
/**
* Get debug configuration
*/
getDebugConfig(): DebugConfiguration {
return this.ensureConfigLoaded().debug;
}
/**
* Get limit configuration
*/
getLimitConfig(): LimitConfiguration {
return this.ensureConfigLoaded().limits;
}
/**
* Get default system prompt
*/
getDefaultSystemPrompt(): string {
return this.getDefaultConfig().systemPrompt;
}
/**
* Get default completion options
*/
getDefaultCompletionOptions(): ChatCompletionOptions {
const defaults = this.getDefaultConfig();
return {
temperature: defaults.temperature,
max_tokens: defaults.maxTokens,
top_p: defaults.topP,
presence_penalty: defaults.presencePenalty,
frequency_penalty: defaults.frequencyPenalty
};
}
/**
* Check if configuration is valid
*/
isValid(): boolean {
return this.validationResult?.valid ?? false;
}
/**
* Get validation result
*/
getValidationResult(): ConfigurationValidationResult | null {
return this.validationResult;
}
/**
* Force reload configuration
*/
async reload(): Promise<ConfigurationValidationResult> {
this.config = null;
this.lastLoadTime = 0;
return this.initialize();
}
}
// Export singleton instance
const configurationService = new ConfigurationService();
export default configurationService;

View File

@@ -0,0 +1,426 @@
/**
* Logging Service - Phase 2.3 Implementation
*
* Structured logging with:
* - Proper log levels
* - Request ID tracking
* - Conditional debug logging
* - No production debug statements
*/
import log from '../../../log.js';
import configurationService from './configuration_service.js';
// Log levels
export enum LogLevel {
ERROR = 'error',
WARN = 'warn',
INFO = 'info',
DEBUG = 'debug'
}
// Log entry interface
export interface LogEntry {
timestamp: Date;
level: LogLevel;
requestId?: string;
message: string;
data?: any;
error?: Error;
duration?: number;
}
// Structured log data
export interface LogContext {
requestId?: string;
userId?: string;
sessionId?: string;
provider?: string;
model?: string;
operation?: string;
[key: string]: any;
}
/**
* Logging Service Implementation
*/
export class LoggingService {
private enabled: boolean = true;
private logLevel: LogLevel = LogLevel.INFO;
private debugEnabled: boolean = false;
private requestContexts: Map<string, LogContext> = new Map();
private logBuffer: LogEntry[] = [];
private readonly MAX_BUFFER_SIZE = 1000;
constructor() {
this.initialize();
}
/**
* Initialize logging configuration
*/
private initialize(): void {
try {
const debugConfig = configurationService.getDebugConfig();
this.enabled = debugConfig.enabled;
this.debugEnabled = debugConfig.logLevel === 'debug';
this.logLevel = this.parseLogLevel(debugConfig.logLevel);
} catch (error) {
// Fall back to defaults if configuration is not available
this.enabled = true;
this.logLevel = LogLevel.INFO;
this.debugEnabled = false;
}
}
/**
* Parse log level from string
*/
private parseLogLevel(level: string): LogLevel {
switch (level?.toLowerCase()) {
case 'error': return LogLevel.ERROR;
case 'warn': return LogLevel.WARN;
case 'info': return LogLevel.INFO;
case 'debug': return LogLevel.DEBUG;
default: return LogLevel.INFO;
}
}
/**
* Check if a log level should be logged
*/
private shouldLog(level: LogLevel): boolean {
if (!this.enabled) return false;
const levels = [LogLevel.ERROR, LogLevel.WARN, LogLevel.INFO, LogLevel.DEBUG];
const currentIndex = levels.indexOf(this.logLevel);
const messageIndex = levels.indexOf(level);
return messageIndex <= currentIndex;
}
/**
* Format log message with context
*/
private formatMessage(message: string, context?: LogContext): string {
if (!context?.requestId) {
return message;
}
return `[${context.requestId}] ${message}`;
}
/**
* Write log entry
*/
private writeLog(entry: LogEntry): void {
// Add to buffer for debugging
this.bufferLog(entry);
// Skip debug logs in production
if (entry.level === LogLevel.DEBUG && !this.debugEnabled) {
return;
}
// Format message with request ID if present
const formattedMessage = this.formatMessage(entry.message, { requestId: entry.requestId });
// Log based on level
switch (entry.level) {
case LogLevel.ERROR:
if (entry.error) {
log.error(formattedMessage, entry.error);
} else {
log.error(formattedMessage, entry.data);
}
break;
case LogLevel.WARN:
log.warn(formattedMessage, entry.data);
break;
case LogLevel.INFO:
if (entry.data && Object.keys(entry.data).length > 0) {
log.info(`${formattedMessage} - ${JSON.stringify(entry.data)}`);
} else {
log.info(formattedMessage);
}
break;
case LogLevel.DEBUG:
// Only log debug messages if debug is enabled
if (this.debugEnabled) {
if (entry.data) {
log.info(`[DEBUG] ${formattedMessage} - ${JSON.stringify(entry.data)}`);
} else {
log.info(`[DEBUG] ${formattedMessage}`);
}
}
break;
}
}
/**
* Buffer log entry for debugging
*/
private bufferLog(entry: LogEntry): void {
this.logBuffer.push(entry);
// Trim buffer if it exceeds max size
if (this.logBuffer.length > this.MAX_BUFFER_SIZE) {
this.logBuffer = this.logBuffer.slice(-this.MAX_BUFFER_SIZE);
}
}
/**
* Main logging method
*/
log(level: LogLevel, message: string, data?: any): void {
if (!this.shouldLog(level)) return;
const entry: LogEntry = {
timestamp: new Date(),
level,
message,
data: data instanceof Error ? undefined : data,
error: data instanceof Error ? data : undefined
};
this.writeLog(entry);
}
/**
* Log with request context
*/
logWithContext(level: LogLevel, message: string, context: LogContext, data?: any): void {
if (!this.shouldLog(level)) return;
const entry: LogEntry = {
timestamp: new Date(),
level,
requestId: context.requestId,
message,
data: { ...context, ...data }
};
this.writeLog(entry);
}
/**
* Create a logger with a fixed request ID
*/
withRequestId(requestId: string): {
requestId: string;
log: (level: LogLevel, message: string, data?: any) => void;
error: (message: string, error?: Error | any) => void;
warn: (message: string, data?: any) => void;
info: (message: string, data?: any) => void;
debug: (message: string, data?: any) => void;
startTimer: (operation: string) => () => void;
} {
const self = this;
return {
requestId,
log(level: LogLevel, message: string, data?: any): void {
self.logWithContext(level, message, { requestId }, data);
},
error(message: string, error?: Error | any): void {
self.logWithContext(LogLevel.ERROR, message, { requestId }, error);
},
warn(message: string, data?: any): void {
self.logWithContext(LogLevel.WARN, message, { requestId }, data);
},
info(message: string, data?: any): void {
self.logWithContext(LogLevel.INFO, message, { requestId }, data);
},
debug(message: string, data?: any): void {
self.logWithContext(LogLevel.DEBUG, message, { requestId }, data);
},
startTimer(operation: string): () => void {
const startTime = Date.now();
return () => {
const duration = Date.now() - startTime;
self.logWithContext(LogLevel.DEBUG, `${operation} completed`, { requestId }, { duration });
};
}
};
}
/**
* Start a timer for performance tracking
*/
startTimer(operation: string, requestId?: string): () => void {
const startTime = Date.now();
return () => {
const duration = Date.now() - startTime;
const entry: LogEntry = {
timestamp: new Date(),
level: LogLevel.DEBUG,
requestId,
message: `${operation} completed in ${duration}ms`,
duration
};
if (this.shouldLog(LogLevel.DEBUG)) {
this.writeLog(entry);
}
};
}
/**
* Log error with stack trace
*/
error(message: string, error?: Error | any, requestId?: string): void {
const entry: LogEntry = {
timestamp: new Date(),
level: LogLevel.ERROR,
requestId,
message,
error: error instanceof Error ? error : new Error(String(error))
};
this.writeLog(entry);
}
/**
* Log warning
*/
warn(message: string, data?: any, requestId?: string): void {
const entry: LogEntry = {
timestamp: new Date(),
level: LogLevel.WARN,
requestId,
message,
data
};
this.writeLog(entry);
}
/**
* Log info
*/
info(message: string, data?: any, requestId?: string): void {
const entry: LogEntry = {
timestamp: new Date(),
level: LogLevel.INFO,
requestId,
message,
data
};
this.writeLog(entry);
}
/**
* Log debug (only in debug mode)
*/
debug(message: string, data?: any, requestId?: string): void {
if (!this.debugEnabled) return;
const entry: LogEntry = {
timestamp: new Date(),
level: LogLevel.DEBUG,
requestId,
message,
data
};
this.writeLog(entry);
}
/**
* Set request context
*/
setRequestContext(requestId: string, context: LogContext): void {
this.requestContexts.set(requestId, context);
}
/**
* Get request context
*/
getRequestContext(requestId: string): LogContext | undefined {
return this.requestContexts.get(requestId);
}
/**
* Clear request context
*/
clearRequestContext(requestId: string): void {
this.requestContexts.delete(requestId);
}
/**
* Get recent logs for debugging
*/
getRecentLogs(count: number = 100, level?: LogLevel): LogEntry[] {
let logs = [...this.logBuffer];
if (level) {
logs = logs.filter(entry => entry.level === level);
}
return logs.slice(-count);
}
/**
* Clear log buffer
*/
clearBuffer(): void {
this.logBuffer = [];
}
/**
* Set log level dynamically
*/
setLogLevel(level: LogLevel): void {
this.logLevel = level;
this.debugEnabled = level === LogLevel.DEBUG;
}
/**
* Get current log level
*/
getLogLevel(): LogLevel {
return this.logLevel;
}
/**
* Enable/disable logging
*/
setEnabled(enabled: boolean): void {
this.enabled = enabled;
}
/**
* Check if logging is enabled
*/
isEnabled(): boolean {
return this.enabled;
}
/**
* Check if debug logging is enabled
*/
isDebugEnabled(): boolean {
return this.debugEnabled;
}
/**
* Reload configuration
*/
reloadConfiguration(): void {
this.initialize();
}
}
// Export singleton instance
const loggingService = new LoggingService();
export default loggingService;

View File

@@ -0,0 +1,537 @@
/**
* Model Registry - Phase 2.2 Implementation
*
* Centralized model capability management:
* - Model metadata and capabilities
* - Model selection logic
* - Cost tracking
* - Performance characteristics
*/
import log from '../../../log.js';
// Model capability interfaces
export interface ModelCapabilities {
supportsTools: boolean;
supportsStreaming: boolean;
supportsVision: boolean;
supportsJson: boolean;
maxTokens: number;
contextWindow: number;
trainingCutoff?: string;
}
export interface ModelCost {
inputTokens: number; // Cost per 1K tokens
outputTokens: number; // Cost per 1K tokens
currency: 'USD';
}
export interface ModelPerformance {
averageLatency: number; // ms per token
throughput: number; // tokens per second
reliabilityScore: number; // 0-1 score
}
export interface ModelInfo {
id: string;
provider: 'openai' | 'anthropic' | 'ollama';
displayName: string;
family: string;
version?: string;
capabilities: ModelCapabilities;
cost?: ModelCost;
performance?: ModelPerformance;
recommended: {
forCoding: boolean;
forChat: boolean;
forAnalysis: boolean;
forCreative: boolean;
};
}
/**
* Model Registry Implementation
*/
export class ModelRegistry {
private models: Map<string, ModelInfo> = new Map();
private initialized = false;
constructor() {
this.registerBuiltInModels();
}
/**
* Register built-in models with their capabilities
*/
private registerBuiltInModels(): void {
// OpenAI Models
this.registerModel({
id: 'gpt-4-turbo-preview',
provider: 'openai',
displayName: 'GPT-4 Turbo',
family: 'gpt-4',
version: 'turbo-preview',
capabilities: {
supportsTools: true,
supportsStreaming: true,
supportsVision: true,
supportsJson: true,
maxTokens: 4096,
contextWindow: 128000,
trainingCutoff: '2023-12'
},
cost: {
inputTokens: 0.01,
outputTokens: 0.03,
currency: 'USD'
},
performance: {
averageLatency: 50,
throughput: 20,
reliabilityScore: 0.95
},
recommended: {
forCoding: true,
forChat: true,
forAnalysis: true,
forCreative: true
}
});
this.registerModel({
id: 'gpt-4',
provider: 'openai',
displayName: 'GPT-4',
family: 'gpt-4',
capabilities: {
supportsTools: true,
supportsStreaming: true,
supportsVision: false,
supportsJson: true,
maxTokens: 8192,
contextWindow: 8192,
trainingCutoff: '2023-03'
},
cost: {
inputTokens: 0.03,
outputTokens: 0.06,
currency: 'USD'
},
performance: {
averageLatency: 70,
throughput: 15,
reliabilityScore: 0.98
},
recommended: {
forCoding: true,
forChat: true,
forAnalysis: true,
forCreative: true
}
});
this.registerModel({
id: 'gpt-3.5-turbo',
provider: 'openai',
displayName: 'GPT-3.5 Turbo',
family: 'gpt-3.5',
version: 'turbo',
capabilities: {
supportsTools: true,
supportsStreaming: true,
supportsVision: false,
supportsJson: true,
maxTokens: 4096,
contextWindow: 16385,
trainingCutoff: '2021-09'
},
cost: {
inputTokens: 0.0005,
outputTokens: 0.0015,
currency: 'USD'
},
performance: {
averageLatency: 30,
throughput: 35,
reliabilityScore: 0.92
},
recommended: {
forCoding: false,
forChat: true,
forAnalysis: false,
forCreative: false
}
});
// Anthropic Models
this.registerModel({
id: 'claude-3-opus-20240229',
provider: 'anthropic',
displayName: 'Claude 3 Opus',
family: 'claude-3',
version: 'opus',
capabilities: {
supportsTools: true,
supportsStreaming: true,
supportsVision: true,
supportsJson: false,
maxTokens: 4096,
contextWindow: 200000,
trainingCutoff: '2023-08'
},
cost: {
inputTokens: 0.015,
outputTokens: 0.075,
currency: 'USD'
},
performance: {
averageLatency: 60,
throughput: 18,
reliabilityScore: 0.96
},
recommended: {
forCoding: true,
forChat: true,
forAnalysis: true,
forCreative: true
}
});
this.registerModel({
id: 'claude-3-sonnet-20240229',
provider: 'anthropic',
displayName: 'Claude 3 Sonnet',
family: 'claude-3',
version: 'sonnet',
capabilities: {
supportsTools: true,
supportsStreaming: true,
supportsVision: true,
supportsJson: false,
maxTokens: 4096,
contextWindow: 200000,
trainingCutoff: '2023-08'
},
cost: {
inputTokens: 0.003,
outputTokens: 0.015,
currency: 'USD'
},
performance: {
averageLatency: 40,
throughput: 25,
reliabilityScore: 0.94
},
recommended: {
forCoding: true,
forChat: true,
forAnalysis: true,
forCreative: false
}
});
this.registerModel({
id: 'claude-3-haiku-20240307',
provider: 'anthropic',
displayName: 'Claude 3 Haiku',
family: 'claude-3',
version: 'haiku',
capabilities: {
supportsTools: true,
supportsStreaming: true,
supportsVision: true,
supportsJson: false,
maxTokens: 4096,
contextWindow: 200000,
trainingCutoff: '2023-08'
},
cost: {
inputTokens: 0.00025,
outputTokens: 0.00125,
currency: 'USD'
},
performance: {
averageLatency: 20,
throughput: 50,
reliabilityScore: 0.90
},
recommended: {
forCoding: false,
forChat: true,
forAnalysis: false,
forCreative: false
}
});
// Ollama Models (local, no cost)
this.registerModel({
id: 'llama2',
provider: 'ollama',
displayName: 'Llama 2',
family: 'llama',
version: '2',
capabilities: {
supportsTools: false,
supportsStreaming: true,
supportsVision: false,
supportsJson: false,
maxTokens: 2048,
contextWindow: 4096
},
performance: {
averageLatency: 100,
throughput: 10,
reliabilityScore: 0.85
},
recommended: {
forCoding: false,
forChat: true,
forAnalysis: false,
forCreative: false
}
});
this.registerModel({
id: 'codellama',
provider: 'ollama',
displayName: 'Code Llama',
family: 'llama',
version: 'code',
capabilities: {
supportsTools: false,
supportsStreaming: true,
supportsVision: false,
supportsJson: false,
maxTokens: 2048,
contextWindow: 4096
},
performance: {
averageLatency: 100,
throughput: 10,
reliabilityScore: 0.88
},
recommended: {
forCoding: true,
forChat: false,
forAnalysis: false,
forCreative: false
}
});
this.registerModel({
id: 'mistral',
provider: 'ollama',
displayName: 'Mistral',
family: 'mistral',
capabilities: {
supportsTools: false,
supportsStreaming: true,
supportsVision: false,
supportsJson: false,
maxTokens: 2048,
contextWindow: 8192
},
performance: {
averageLatency: 80,
throughput: 12,
reliabilityScore: 0.87
},
recommended: {
forCoding: false,
forChat: true,
forAnalysis: false,
forCreative: false
}
});
this.initialized = true;
}
/**
* Register a model
*/
registerModel(model: ModelInfo): void {
const key = `${model.provider}:${model.id}`;
this.models.set(key, model);
log.debug(`Registered model: ${key}`);
}
/**
* Get model by ID and provider
*/
getModel(modelId: string, provider: 'openai' | 'anthropic' | 'ollama'): ModelInfo | null {
const key = `${provider}:${modelId}`;
return this.models.get(key) || null;
}
/**
* Get all models for a provider
*/
getModelsForProvider(provider: 'openai' | 'anthropic' | 'ollama'): ModelInfo[] {
const models: ModelInfo[] = [];
this.models.forEach(model => {
if (model.provider === provider) {
models.push(model);
}
});
return models;
}
/**
* Get all registered models
*/
getAllModels(): ModelInfo[] {
return Array.from(this.models.values());
}
/**
* Select best model for a use case
*/
selectModelForUseCase(
useCase: 'coding' | 'chat' | 'analysis' | 'creative',
constraints?: {
maxCost?: number;
requiresTools?: boolean;
requiresStreaming?: boolean;
minContextWindow?: number;
provider?: 'openai' | 'anthropic' | 'ollama';
}
): ModelInfo | null {
let candidates = this.getAllModels();
// Filter by provider if specified
if (constraints?.provider) {
candidates = candidates.filter(m => m.provider === constraints.provider);
}
// Filter by requirements
if (constraints?.requiresTools) {
candidates = candidates.filter(m => m.capabilities.supportsTools);
}
if (constraints?.requiresStreaming) {
candidates = candidates.filter(m => m.capabilities.supportsStreaming);
}
if (constraints?.minContextWindow) {
candidates = candidates.filter(m => m.capabilities.contextWindow >= constraints.minContextWindow);
}
// Filter by cost
if (constraints?.maxCost !== undefined) {
candidates = candidates.filter(m => {
if (!m.cost) return true; // Local models have no cost
return m.cost.inputTokens <= constraints.maxCost;
});
}
// Filter by use case recommendation
const recommendationKey = `for${useCase.charAt(0).toUpperCase()}${useCase.slice(1)}` as keyof ModelInfo['recommended'];
candidates = candidates.filter(m => m.recommended[recommendationKey]);
// Sort by performance and cost
candidates.sort((a, b) => {
// Prefer higher reliability
const reliabilityDiff = (b.performance?.reliabilityScore || 0) - (a.performance?.reliabilityScore || 0);
if (Math.abs(reliabilityDiff) > 0.05) return reliabilityDiff > 0 ? 1 : -1;
// Then prefer lower cost
const aCost = a.cost?.inputTokens || 0;
const bCost = b.cost?.inputTokens || 0;
return aCost - bCost;
});
return candidates[0] || null;
}
/**
* Estimate cost for a request
*/
estimateCost(
modelId: string,
provider: 'openai' | 'anthropic' | 'ollama',
inputTokens: number,
outputTokens: number
): number | null {
const model = this.getModel(modelId, provider);
if (!model || !model.cost) return null;
const inputCost = (inputTokens / 1000) * model.cost.inputTokens;
const outputCost = (outputTokens / 1000) * model.cost.outputTokens;
return inputCost + outputCost;
}
/**
* Check if a model supports a capability
*/
supportsCapability(
modelId: string,
provider: 'openai' | 'anthropic' | 'ollama',
capability: keyof ModelCapabilities
): boolean {
const model = this.getModel(modelId, provider);
if (!model) return false;
return model.capabilities[capability] as boolean;
}
/**
* Get model context window
*/
getContextWindow(modelId: string, provider: 'openai' | 'anthropic' | 'ollama'): number {
const model = this.getModel(modelId, provider);
return model?.capabilities.contextWindow || 4096;
}
/**
* Get model max tokens
*/
getMaxTokens(modelId: string, provider: 'openai' | 'anthropic' | 'ollama'): number {
const model = this.getModel(modelId, provider);
return model?.capabilities.maxTokens || 2048;
}
/**
* Check if registry is initialized
*/
isInitialized(): boolean {
return this.initialized;
}
/**
* Add custom model (for Ollama or custom endpoints)
*/
addCustomModel(
modelId: string,
provider: 'ollama',
displayName?: string,
capabilities?: Partial<ModelCapabilities>
): void {
const defaultCapabilities: ModelCapabilities = {
supportsTools: false,
supportsStreaming: true,
supportsVision: false,
supportsJson: false,
maxTokens: 2048,
contextWindow: 4096
};
this.registerModel({
id: modelId,
provider,
displayName: displayName || modelId,
family: 'custom',
capabilities: { ...defaultCapabilities, ...capabilities },
recommended: {
forCoding: false,
forChat: true,
forAnalysis: false,
forCreative: false
}
});
}
}
// Export singleton instance
const modelRegistry = new ModelRegistry();
export default modelRegistry;

View File

@@ -0,0 +1,155 @@
/**
* Pipeline Adapter
*
* Provides compatibility layer between the existing ChatPipeline
* and the new SimplifiedChatPipeline implementation.
* This allows gradual migration without breaking existing code.
*/
import type { ChatPipelineInput, ChatPipelineConfig, PipelineMetrics } from './interfaces.js';
import type { ChatResponse } from '../ai_interface.js';
import simplifiedPipeline from './simplified_pipeline.js';
import configurationService from './configuration_service.js';
import loggingService, { LogLevel } from './logging_service.js';
/**
* Adapter class that maintains the existing ChatPipeline interface
* while using the new simplified implementation underneath
*/
export class ChatPipelineAdapter {
private config: ChatPipelineConfig;
private useSimplified: boolean;
constructor(config?: Partial<ChatPipelineConfig>) {
// Initialize configuration service on first use
this.initializeServices();
// Merge provided config with defaults from configuration service
const toolConfig = configurationService.getToolConfig();
const streamingConfig = configurationService.getStreamingConfig();
const debugConfig = configurationService.getDebugConfig();
this.config = {
enableStreaming: streamingConfig.enabled,
enableMetrics: debugConfig.enableMetrics,
maxToolCallIterations: toolConfig.maxIterations,
...config
};
// Check if we should use the simplified pipeline
this.useSimplified = this.shouldUseSimplified();
}
/**
* Initialize configuration and logging services
*/
private async initializeServices(): Promise<void> {
try {
// Initialize configuration service
const validationResult = await configurationService.initialize();
if (!validationResult.valid) {
loggingService.error('Configuration validation failed', validationResult.errors);
}
// Reload logging configuration
loggingService.reloadConfiguration();
} catch (error) {
loggingService.error('Failed to initialize services', error);
}
}
/**
* Determine if we should use the simplified pipeline
*/
private shouldUseSimplified(): boolean {
// Check environment variable or feature flag
const useSimplified = process.env.USE_SIMPLIFIED_PIPELINE;
if (useSimplified === 'true') return true;
if (useSimplified === 'false') return false;
// Default to using simplified pipeline
return true;
}
/**
* Execute the pipeline (compatible with existing interface)
*/
async execute(input: ChatPipelineInput): Promise<ChatResponse> {
if (this.useSimplified) {
// Use the new simplified pipeline
return await simplifiedPipeline.execute({
messages: input.messages,
options: input.options,
noteId: input.noteId,
query: input.query,
streamCallback: input.streamCallback,
requestId: this.generateRequestId()
});
} else {
// Fall back to the original implementation if needed
// This would import and use the original ChatPipeline
throw new Error('Original pipeline not available - use simplified pipeline');
}
}
/**
* Get pipeline metrics (compatible with existing interface)
*/
getMetrics(): PipelineMetrics {
if (this.useSimplified) {
const metrics = simplifiedPipeline.getMetrics();
// Convert simplified metrics to existing format
const stageMetrics: Record<string, any> = {};
Object.entries(metrics).forEach(([key, value]) => {
stageMetrics[key] = {
totalExecutions: 0, // Not tracked in simplified version
averageExecutionTime: value
};
});
return {
totalExecutions: 0,
averageExecutionTime: metrics['pipeline_duration'] || 0,
stageMetrics
};
} else {
// Return empty metrics for original pipeline
return {
totalExecutions: 0,
averageExecutionTime: 0,
stageMetrics: {}
};
}
}
/**
* Reset pipeline metrics (compatible with existing interface)
*/
resetMetrics(): void {
if (this.useSimplified) {
simplifiedPipeline.resetMetrics();
}
}
/**
* Generate a unique request ID
*/
private generateRequestId(): string {
return `req_${Date.now()}_${Math.random().toString(36).substring(7)}`;
}
}
/**
* Factory function to create ChatPipeline instances
* This maintains backward compatibility with existing code
*/
export function createChatPipeline(config?: Partial<ChatPipelineConfig>) {
return new ChatPipelineAdapter(config);
}
/**
* Export as ChatPipeline for drop-in replacement
*/
export const ChatPipeline = ChatPipelineAdapter;

View File

@@ -0,0 +1,401 @@
/**
* Tests for the Simplified Chat Pipeline
*/
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { SimplifiedChatPipeline } from './simplified_pipeline.js';
import type { SimplifiedPipelineInput } from './simplified_pipeline.js';
import configurationService from './configuration_service.js';
import loggingService from './logging_service.js';
// Mock dependencies
vi.mock('./configuration_service.js', () => ({
default: {
getToolConfig: vi.fn(() => ({
enabled: true,
maxIterations: 3,
timeout: 30000,
parallelExecution: false
})),
getDebugConfig: vi.fn(() => ({
enabled: true,
logLevel: 'info',
enableMetrics: true,
enableTracing: false
})),
getStreamingConfig: vi.fn(() => ({
enabled: true,
chunkSize: 256,
flushInterval: 100
})),
getDefaultSystemPrompt: vi.fn(() => 'You are a helpful assistant.'),
getDefaultCompletionOptions: vi.fn(() => ({
temperature: 0.7,
max_tokens: 2000
}))
}
}));
vi.mock('./logging_service.js', () => ({
default: {
withRequestId: vi.fn((requestId: string) => ({
requestId,
log: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
info: vi.fn(),
debug: vi.fn(),
startTimer: vi.fn(() => vi.fn())
}))
},
LogLevel: {
ERROR: 'error',
WARN: 'warn',
INFO: 'info',
DEBUG: 'debug'
}
}));
vi.mock('../ai_service_manager.js', () => ({
default: {
getService: vi.fn(() => ({
chat: vi.fn(async (messages, options) => ({
text: 'Test response',
model: 'test-model',
provider: 'test-provider',
tool_calls: options.enableTools ? [] : undefined
}))
}))
}
}));
vi.mock('../tools/tool_registry.js', () => ({
default: {
getAllToolDefinitions: vi.fn(() => [
{
type: 'function',
function: {
name: 'test_tool',
description: 'Test tool',
parameters: {}
}
}
]),
getTool: vi.fn(() => ({
execute: vi.fn(async () => 'Tool result')
}))
}
}));
describe('SimplifiedChatPipeline', () => {
let pipeline: SimplifiedChatPipeline;
beforeEach(() => {
vi.clearAllMocks();
pipeline = new SimplifiedChatPipeline();
});
afterEach(() => {
vi.restoreAllMocks();
});
describe('execute', () => {
it('should execute a simple chat without tools', async () => {
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Hello' }
],
options: {
enableTools: false
}
};
const response = await pipeline.execute(input);
expect(response).toBeDefined();
expect(response.text).toBe('Test response');
expect(response.model).toBe('test-model');
expect(response.provider).toBe('test-provider');
});
it('should add system prompt when not present', async () => {
const aiServiceManager = await import('../ai_service_manager.js');
const mockChat = vi.fn(async (messages) => {
// Check that system prompt was added
expect(messages[0].role).toBe('system');
expect(messages[0].content).toBe('You are a helpful assistant.');
return {
text: 'Response with system prompt',
model: 'test-model',
provider: 'test-provider'
};
});
aiServiceManager.default.getService = vi.fn(() => ({
chat: mockChat
}));
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Hello' }
]
};
const response = await pipeline.execute(input);
expect(mockChat).toHaveBeenCalled();
expect(response.text).toBe('Response with system prompt');
});
it('should handle tool calls', async () => {
const aiServiceManager = await import('../ai_service_manager.js');
let callCount = 0;
const mockChat = vi.fn(async (messages, options) => {
callCount++;
// First call returns tool calls
if (callCount === 1) {
return {
text: '',
model: 'test-model',
provider: 'test-provider',
tool_calls: [
{
id: 'call_1',
type: 'function',
function: {
name: 'test_tool',
arguments: '{}'
}
}
]
};
}
// Second call (after tool execution) returns final response
return {
text: 'Final response after tool',
model: 'test-model',
provider: 'test-provider'
};
});
aiServiceManager.default.getService = vi.fn(() => ({
chat: mockChat
}));
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Use a tool' }
],
options: {
enableTools: true
}
};
const response = await pipeline.execute(input);
expect(mockChat).toHaveBeenCalledTimes(2);
expect(response.text).toBe('Final response after tool');
});
it('should handle streaming when callback is provided', async () => {
const streamCallback = vi.fn();
const aiServiceManager = await import('../ai_service_manager.js');
const mockChat = vi.fn(async (messages, options) => ({
text: 'Streamed response',
model: 'test-model',
provider: 'test-provider',
stream: async (callback: Function) => {
await callback({ text: 'Chunk 1', done: false });
await callback({ text: 'Chunk 2', done: false });
await callback({ text: 'Chunk 3', done: true });
}
}));
aiServiceManager.default.getService = vi.fn(() => ({
chat: mockChat
}));
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Stream this' }
],
streamCallback
};
const response = await pipeline.execute(input);
expect(streamCallback).toHaveBeenCalledTimes(3);
expect(streamCallback).toHaveBeenCalledWith('Chunk 1', false, expect.any(Object));
expect(streamCallback).toHaveBeenCalledWith('Chunk 2', false, expect.any(Object));
expect(streamCallback).toHaveBeenCalledWith('Chunk 3', true, expect.any(Object));
expect(response.text).toBe('Chunk 1Chunk 2Chunk 3');
});
it('should respect max tool iterations', async () => {
const aiServiceManager = await import('../ai_service_manager.js');
// Always return tool calls to test iteration limit
const mockChat = vi.fn(async () => ({
text: '',
model: 'test-model',
provider: 'test-provider',
tool_calls: [
{
id: 'call_infinite',
type: 'function',
function: {
name: 'test_tool',
arguments: '{}'
}
}
]
}));
aiServiceManager.default.getService = vi.fn(() => ({
chat: mockChat
}));
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Infinite tools' }
],
options: {
enableTools: true
}
};
const response = await pipeline.execute(input);
// Should be called: 1 initial + 3 tool iterations (max)
expect(mockChat).toHaveBeenCalledTimes(4);
expect(response).toBeDefined();
});
it('should handle errors gracefully', async () => {
const aiServiceManager = await import('../ai_service_manager.js');
aiServiceManager.default.getService = vi.fn(() => null);
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'This will fail' }
]
};
await expect(pipeline.execute(input)).rejects.toThrow('No AI service available');
});
it('should add context when query and advanced context are enabled', async () => {
// Mock context service
vi.mock('../context/services/context_service.js', () => ({
default: {
getContextForQuery: vi.fn(async () => 'Relevant context for query')
}
}));
const aiServiceManager = await import('../ai_service_manager.js');
const mockChat = vi.fn(async (messages) => {
// Check that context was added to system message
const systemMessage = messages.find((m: any) => m.role === 'system');
expect(systemMessage).toBeDefined();
expect(systemMessage.content).toContain('Context:');
expect(systemMessage.content).toContain('Relevant context for query');
return {
text: 'Response with context',
model: 'test-model',
provider: 'test-provider'
};
});
aiServiceManager.default.getService = vi.fn(() => ({
chat: mockChat
}));
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Question needing context' }
],
query: 'Question needing context',
options: {
useAdvancedContext: true
}
};
const response = await pipeline.execute(input);
expect(mockChat).toHaveBeenCalled();
expect(response.text).toBe('Response with context');
});
it('should track metrics when enabled', async () => {
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Track metrics' }
]
};
await pipeline.execute(input);
const metrics = pipeline.getMetrics();
expect(metrics).toBeDefined();
expect(metrics.pipeline_duration).toBeGreaterThan(0);
});
it('should generate request ID if not provided', async () => {
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'No request ID' }
]
};
const response = await pipeline.execute(input);
expect(response.metadata?.requestId).toBeDefined();
expect(response.metadata.requestId).toMatch(/^req_\d+_[a-z0-9]+$/);
});
});
describe('getMetrics', () => {
it('should return empty metrics initially', () => {
const metrics = pipeline.getMetrics();
expect(metrics).toEqual({});
});
it('should return metrics after execution', async () => {
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Generate metrics' }
]
};
await pipeline.execute(input);
const metrics = pipeline.getMetrics();
expect(Object.keys(metrics).length).toBeGreaterThan(0);
});
});
describe('resetMetrics', () => {
it('should clear all metrics', async () => {
const input: SimplifiedPipelineInput = {
messages: [
{ role: 'user', content: 'Generate metrics' }
]
};
await pipeline.execute(input);
let metrics = pipeline.getMetrics();
expect(Object.keys(metrics).length).toBeGreaterThan(0);
pipeline.resetMetrics();
metrics = pipeline.getMetrics();
expect(metrics).toEqual({});
});
});
});

View File

@@ -0,0 +1,426 @@
/**
* Simplified Chat Pipeline - Phase 2.1 Implementation
*
* This pipeline reduces complexity from 9 stages to 4 essential stages:
* 1. Message Preparation (formatting, context, system prompt)
* 2. LLM Execution (provider selection and API call)
* 3. Tool Handling (parse, execute, format results)
* 4. Response Processing (format response, add metadata, send to client)
*/
import type {
Message,
ChatCompletionOptions,
ChatResponse,
StreamChunk,
ToolCall
} from '../ai_interface.js';
import aiServiceManager from '../ai_service_manager.js';
import toolRegistry from '../tools/tool_registry.js';
import configurationService from './configuration_service.js';
import loggingService, { LogLevel } from './logging_service.js';
import type { StreamCallback } from './interfaces.js';
// Simplified pipeline input interface
export interface SimplifiedPipelineInput {
messages: Message[];
options?: ChatCompletionOptions;
noteId?: string;
query?: string;
streamCallback?: StreamCallback;
requestId?: string;
}
// Pipeline configuration
interface PipelineConfig {
maxToolIterations: number;
enableMetrics: boolean;
enableStreaming: boolean;
}
/**
* Simplified Chat Pipeline Implementation
*/
export class SimplifiedChatPipeline {
private config: PipelineConfig;
private metrics: Map<string, number> = new Map();
constructor() {
// Load configuration from centralized service
this.config = {
maxToolIterations: configurationService.getToolConfig().maxIterations,
enableMetrics: configurationService.getDebugConfig().enableMetrics,
enableStreaming: configurationService.getStreamingConfig().enabled
};
}
/**
* Execute the simplified pipeline
*/
async execute(input: SimplifiedPipelineInput): Promise<ChatResponse> {
const requestId = input.requestId || this.generateRequestId();
const logger = loggingService.withRequestId(requestId);
logger.log(LogLevel.INFO, 'Pipeline started', {
messageCount: input.messages.length,
hasQuery: !!input.query,
streaming: !!input.streamCallback
});
const startTime = Date.now();
try {
// Stage 1: Message Preparation
const preparedMessages = await this.prepareMessages(input, logger);
// Stage 2: LLM Execution
const llmResponse = await this.executeLLM(preparedMessages, input, logger);
// Stage 3: Tool Handling (if needed)
const finalResponse = await this.handleTools(llmResponse, preparedMessages, input, logger);
// Stage 4: Response Processing
const processedResponse = await this.processResponse(finalResponse, input, logger);
// Record metrics
if (this.config.enableMetrics) {
this.recordMetric('pipeline_duration', Date.now() - startTime);
}
logger.log(LogLevel.INFO, 'Pipeline completed', {
duration: Date.now() - startTime,
responseLength: processedResponse.text.length
});
return processedResponse;
} catch (error) {
logger.log(LogLevel.ERROR, 'Pipeline error', { error });
throw error;
}
}
/**
* Stage 1: Message Preparation
* Combines formatting, context enrichment, and system prompt injection
*/
private async prepareMessages(
input: SimplifiedPipelineInput,
logger: ReturnType<typeof loggingService.withRequestId>
): Promise<Message[]> {
const startTime = Date.now();
logger.log(LogLevel.DEBUG, 'Stage 1: Message preparation started');
const messages: Message[] = [...input.messages];
// Add system prompt if provided
const systemPrompt = input.options?.systemPrompt || configurationService.getDefaultSystemPrompt();
if (systemPrompt && !messages.some(m => m.role === 'system')) {
messages.unshift({
role: 'system',
content: systemPrompt
});
}
// Add context if query is provided and context is enabled
if (input.query && input.options?.useAdvancedContext) {
const context = await this.extractContext(input.query, input.noteId);
if (context) {
// Find the last system message or create one
const lastSystemIndex = messages.findIndex(m => m.role === 'system');
if (lastSystemIndex >= 0) {
messages[lastSystemIndex].content += `\n\nContext:\n${context}`;
} else {
messages.unshift({
role: 'system',
content: `Context:\n${context}`
});
}
}
}
this.recordMetric('message_preparation', Date.now() - startTime);
logger.log(LogLevel.DEBUG, 'Stage 1: Message preparation completed', {
messageCount: messages.length,
duration: Date.now() - startTime
});
return messages;
}
/**
* Stage 2: LLM Execution
* Handles provider selection and API call
*/
private async executeLLM(
messages: Message[],
input: SimplifiedPipelineInput,
logger: ReturnType<typeof loggingService.withRequestId>
): Promise<ChatResponse> {
const startTime = Date.now();
logger.log(LogLevel.DEBUG, 'Stage 2: LLM execution started');
// Get completion options with defaults
const options: ChatCompletionOptions = {
...configurationService.getDefaultCompletionOptions(),
...input.options,
stream: this.config.enableStreaming && !!input.streamCallback
};
// Add tools if enabled
if (options.enableTools !== false) {
const tools = toolRegistry.getAllToolDefinitions();
if (tools.length > 0) {
options.tools = tools;
logger.log(LogLevel.DEBUG, 'Tools enabled', { toolCount: tools.length });
}
}
// Execute LLM call
const service = aiServiceManager.getService();
if (!service) {
throw new Error('No AI service available');
}
const response = await service.chat(messages, options);
this.recordMetric('llm_execution', Date.now() - startTime);
logger.log(LogLevel.DEBUG, 'Stage 2: LLM execution completed', {
provider: response.provider,
model: response.model,
hasToolCalls: !!(response.tool_calls?.length),
duration: Date.now() - startTime
});
return response;
}
/**
* Stage 3: Tool Handling
* Parses tool calls, executes them, and handles follow-up LLM calls
*/
private async handleTools(
response: ChatResponse,
messages: Message[],
input: SimplifiedPipelineInput,
logger: ReturnType<typeof loggingService.withRequestId>
): Promise<ChatResponse> {
// Return immediately if no tools to handle
if (!response.tool_calls?.length || input.options?.enableTools === false) {
return response;
}
const startTime = Date.now();
logger.log(LogLevel.INFO, 'Stage 3: Tool handling started', {
toolCount: response.tool_calls.length
});
let currentResponse = response;
let currentMessages = [...messages];
let iterations = 0;
while (iterations < this.config.maxToolIterations && currentResponse.tool_calls?.length) {
iterations++;
logger.log(LogLevel.DEBUG, `Tool iteration ${iterations}/${this.config.maxToolIterations}`);
// Add assistant message with tool calls
currentMessages.push({
role: 'assistant',
content: currentResponse.text || '',
tool_calls: currentResponse.tool_calls
});
// Execute tools and collect results
const toolResults = await this.executeTools(currentResponse.tool_calls, logger);
// Add tool results to messages
for (const result of toolResults) {
currentMessages.push({
role: 'tool',
content: result.content,
tool_call_id: result.toolCallId
});
}
// Send tool results back to LLM for follow-up
const followUpOptions: ChatCompletionOptions = {
...input.options,
stream: false, // Don't stream tool follow-ups
enableTools: true
};
const service = aiServiceManager.getService();
if (!service) {
throw new Error('No AI service available');
}
currentResponse = await service.chat(currentMessages, followUpOptions);
// Check if we need another iteration
if (!currentResponse.tool_calls?.length) {
break;
}
}
if (iterations >= this.config.maxToolIterations) {
logger.log(LogLevel.WARN, 'Maximum tool iterations reached', {
iterations: this.config.maxToolIterations
});
}
this.recordMetric('tool_handling', Date.now() - startTime);
logger.log(LogLevel.INFO, 'Stage 3: Tool handling completed', {
iterations,
duration: Date.now() - startTime
});
return currentResponse;
}
/**
* Stage 4: Response Processing
* Formats the response and handles streaming
*/
private async processResponse(
response: ChatResponse,
input: SimplifiedPipelineInput,
logger: ReturnType<typeof loggingService.withRequestId>
): Promise<ChatResponse> {
const startTime = Date.now();
logger.log(LogLevel.DEBUG, 'Stage 4: Response processing started');
// Handle streaming if enabled
if (input.streamCallback && response.stream) {
let accumulatedText = '';
await response.stream(async (chunk: StreamChunk) => {
accumulatedText += chunk.text;
await input.streamCallback!(chunk.text, chunk.done || false, chunk);
});
// Update response text with accumulated content
response.text = accumulatedText;
}
// Add metadata
response.metadata = {
...response.metadata,
requestId: logger.requestId,
processingTime: Date.now() - startTime
};
this.recordMetric('response_processing', Date.now() - startTime);
logger.log(LogLevel.DEBUG, 'Stage 4: Response processing completed', {
responseLength: response.text.length,
duration: Date.now() - startTime
});
return response;
}
/**
* Execute tool calls and return results
*/
private async executeTools(
toolCalls: ToolCall[],
logger: ReturnType<typeof loggingService.withRequestId>
): Promise<Array<{ toolCallId: string; content: string }>> {
const results = [];
for (const toolCall of toolCalls) {
try {
const tool = toolRegistry.getTool(toolCall.function.name);
if (!tool) {
throw new Error(`Tool not found: ${toolCall.function.name}`);
}
const args = JSON.parse(toolCall.function.arguments || '{}');
const result = await tool.execute(args);
results.push({
toolCallId: toolCall.id,
content: typeof result === 'string' ? result : JSON.stringify(result)
});
logger.log(LogLevel.DEBUG, 'Tool executed successfully', {
tool: toolCall.function.name,
toolCallId: toolCall.id
});
} catch (error) {
logger.log(LogLevel.ERROR, 'Tool execution failed', {
tool: toolCall.function.name,
error
});
results.push({
toolCallId: toolCall.id,
content: `Error: ${error instanceof Error ? error.message : String(error)}`
});
}
}
return results;
}
/**
* Extract context for the query (simplified version)
*/
private async extractContext(query: string, noteId?: string): Promise<string | null> {
try {
// This is a simplified context extraction
// In production, this would call the semantic search service
const contextService = await import('../context/services/context_service.js');
return await contextService.default.getContextForQuery(query, noteId);
} catch (error) {
loggingService.log(LogLevel.ERROR, 'Context extraction failed', { error });
return null;
}
}
/**
* Generate a unique request ID
*/
private generateRequestId(): string {
return `req_${Date.now()}_${Math.random().toString(36).substring(7)}`;
}
/**
* Record a metric
*/
private recordMetric(name: string, value: number): void {
if (!this.config.enableMetrics) return;
const current = this.metrics.get(name) || 0;
const count = this.metrics.get(`${name}_count`) || 0;
// Calculate running average
const newAverage = (current * count + value) / (count + 1);
this.metrics.set(name, newAverage);
this.metrics.set(`${name}_count`, count + 1);
}
/**
* Get current metrics
*/
getMetrics(): Record<string, number> {
const result: Record<string, number> = {};
this.metrics.forEach((value, key) => {
if (!key.endsWith('_count')) {
result[key] = value;
}
});
return result;
}
/**
* Reset metrics
*/
resetMetrics(): void {
this.metrics.clear();
}
}
// Export singleton instance
export default new SimplifiedChatPipeline();