embeddings-1 #2

Merged
mstoeck3 merged 11 commits from embeddings-1 into main 2025-08-02 09:59:35 +00:00
6 changed files with 426 additions and 269 deletions
Showing only changes of commit 8693cd87d4 - Show all commits

File diff suppressed because one or more lines are too long

View File

@ -14,7 +14,6 @@
"astro": "^5.12.3", "astro": "^5.12.3",
"cookie": "^1.0.2", "cookie": "^1.0.2",
"dotenv": "^16.4.5", "dotenv": "^16.4.5",
"hnswlib-node": "^3.0.0",
"jose": "^5.2.0", "jose": "^5.2.0",
"js-yaml": "^4.1.0", "js-yaml": "^4.1.0",
"jsonwebtoken": "^9.0.2", "jsonwebtoken": "^9.0.2",

View File

@ -1,4 +1,5 @@
// src/pages/api/ai/query.ts - Enhanced for micro-task pipeline // src/pages/api/ai/query.ts - FIXED: Rate limiting for micro-task pipeline
import type { APIRoute } from 'astro'; import type { APIRoute } from 'astro';
import { withAPIAuth } from '../../../utils/auth.js'; import { withAPIAuth } from '../../../utils/auth.js';
import { apiError, apiServerError, createAuthErrorResponse } from '../../../utils/api.js'; import { apiError, apiServerError, createAuthErrorResponse } from '../../../utils/api.js';
@ -7,79 +8,94 @@ import { aiPipeline } from '../../../utils/aiPipeline.js';
export const prerender = false; export const prerender = false;
const rateLimitStore = new Map<string, { count: number; resetTime: number }>(); interface RateLimitData {
count: number;
resetTime: number;
microTaskCount: number;
}
const rateLimitStore = new Map<string, RateLimitData>();
// Enhanced rate limiting for micro-task architecture
const RATE_LIMIT_WINDOW = 60 * 1000; // 1 minute const RATE_LIMIT_WINDOW = 60 * 1000; // 1 minute
const RATE_LIMIT_MAX = parseInt(process.env.AI_RATE_LIMIT_MAX_REQUESTS || '8', 10); // Reduced due to micro-tasks const MAIN_RATE_LIMIT_MAX = parseInt(process.env.AI_RATE_LIMIT_MAX_REQUESTS || '4', 10);
const MICRO_TASK_TOTAL_LIMIT = parseInt(process.env.AI_MICRO_TASK_TOTAL_LIMIT || '50', 10);
// Micro-task specific rate limiting
const MICRO_TASK_RATE_LIMIT = parseInt(process.env.AI_MICRO_TASK_RATE_LIMIT || '30', 10);
const microTaskRateLimitStore = new Map<string, { count: number; resetTime: number }>();
function sanitizeInput(input: string): string { function sanitizeInput(input: string): string {
let sanitized = input let sanitized = input
.replace(/```[\s\S]*?```/g, '[CODE_BLOCK_REMOVED]') // Remove code blocks .replace(/```[\s\S]*?```/g, '[CODE_BLOCK_REMOVED]')
.replace(/\<\/?[^>]+(>|$)/g, '') // Remove HTML tags .replace(/\<\/?[^>]+(>|$)/g, '')
.replace(/\b(system|assistant|user)\s*[:]/gi, '[ROLE_REMOVED]') .replace(/\b(system|assistant|user)\s*[:]/gi, '[ROLE_REMOVED]')
.replace(/\b(ignore|forget|disregard)\s+(previous|all|your)\s+(instructions?|context|rules?)/gi, '[INSTRUCTION_REMOVED]') .replace(/\b(ignore|forget|disregard)\s+(previous|all|your)\s+(instructions?|context|rules?)/gi, '[INSTRUCTION_REMOVED]')
.trim(); .trim();
sanitized = sanitized.slice(0, 2000).replace(/\s+/g, ' '); sanitized = sanitized.slice(0, 2000).replace(/\s+/g, ' ');
return sanitized; return sanitized;
} }
function checkRateLimit(userId: string): boolean { function checkRateLimit(userId: string): { allowed: boolean; reason?: string; microTasksRemaining?: number } {
const now = Date.now(); const now = Date.now();
const userLimit = rateLimitStore.get(userId); const userLimit = rateLimitStore.get(userId);
if (!userLimit || now > userLimit.resetTime) { if (!userLimit || now > userLimit.resetTime) {
rateLimitStore.set(userId, { count: 1, resetTime: now + RATE_LIMIT_WINDOW }); rateLimitStore.set(userId, {
return true; count: 1,
resetTime: now + RATE_LIMIT_WINDOW,
microTaskCount: 0
});
return {
allowed: true,
microTasksRemaining: MICRO_TASK_TOTAL_LIMIT
};
} }
if (userLimit.count >= RATE_LIMIT_MAX) { if (userLimit.count >= MAIN_RATE_LIMIT_MAX) {
return false; return {
allowed: false,
reason: `Main rate limit exceeded. Max ${MAIN_RATE_LIMIT_MAX} requests per minute.`
};
}
if (userLimit.microTaskCount >= MICRO_TASK_TOTAL_LIMIT) {
return {
allowed: false,
reason: `Micro-task limit exceeded. Max ${MICRO_TASK_TOTAL_LIMIT} AI calls per minute.`
};
} }
userLimit.count++; userLimit.count++;
return true;
return {
allowed: true,
microTasksRemaining: MICRO_TASK_TOTAL_LIMIT - userLimit.microTaskCount
};
} }
// Enhanced: Check micro-task rate limiting function incrementMicroTaskCount(userId: string, aiCallsMade: number): void {
function checkMicroTaskRateLimit(userId: string): { allowed: boolean; remaining: number } { const userLimit = rateLimitStore.get(userId);
const now = Date.now(); if (userLimit) {
const userLimit = microTaskRateLimitStore.get(userId); userLimit.microTaskCount += aiCallsMade;
console.log(`[RATE LIMIT] User ${userId} now at ${userLimit.microTaskCount}/${MICRO_TASK_TOTAL_LIMIT} micro-task calls`);
if (!userLimit || now > userLimit.resetTime) {
microTaskRateLimitStore.set(userId, { count: 1, resetTime: now + RATE_LIMIT_WINDOW });
return { allowed: true, remaining: MICRO_TASK_RATE_LIMIT - 1 };
} }
if (userLimit.count >= MICRO_TASK_RATE_LIMIT) {
return { allowed: false, remaining: 0 };
}
userLimit.count++;
return { allowed: true, remaining: MICRO_TASK_RATE_LIMIT - userLimit.count };
} }
function cleanupExpiredRateLimits() { function cleanupExpiredRateLimits() {
const now = Date.now(); const now = Date.now();
const maxStoreSize = 1000;
// Clean up main rate limits
for (const [userId, limit] of rateLimitStore.entries()) { for (const [userId, limit] of rateLimitStore.entries()) {
if (now > limit.resetTime) { if (now > limit.resetTime) {
rateLimitStore.delete(userId); rateLimitStore.delete(userId);
} }
} }
// Clean up micro-task rate limits if (rateLimitStore.size > maxStoreSize) {
for (const [userId, limit] of microTaskRateLimitStore.entries()) { const entries = Array.from(rateLimitStore.entries());
if (now > limit.resetTime) { entries.sort((a, b) => a[1].resetTime - b[1].resetTime);
microTaskRateLimitStore.delete(userId);
} const toRemove = entries.slice(0, entries.length - maxStoreSize);
toRemove.forEach(([userId]) => rateLimitStore.delete(userId));
console.log(`[RATE LIMIT] Cleanup: removed ${toRemove.length} old entries`);
} }
} }
@ -94,24 +110,16 @@ export const POST: APIRoute = async ({ request }) => {
const userId = authResult.userId; const userId = authResult.userId;
// Check main rate limit const rateLimitResult = checkRateLimit(userId);
if (!checkRateLimit(userId)) { if (!rateLimitResult.allowed) {
return apiError.rateLimit('Rate limit exceeded'); return apiError.rateLimit(rateLimitResult.reason || 'Rate limit exceeded');
}
// Enhanced: Check micro-task rate limit
const microTaskLimit = checkMicroTaskRateLimit(userId);
if (!microTaskLimit.allowed) {
return apiError.rateLimit(
`Micro-task rate limit exceeded. The new AI pipeline uses multiple smaller requests. Please wait before trying again.`
);
} }
const body = await request.json(); const body = await request.json();
const { query, mode = 'workflow', taskId: clientTaskId } = body; const { query, mode = 'workflow', taskId: clientTaskId } = body;
console.log(`[MICRO-TASK API] Received request - TaskId: ${clientTaskId}, Mode: ${mode}, Query length: ${query?.length || 0}`); console.log(`[MICRO-TASK API] Received request - TaskId: ${clientTaskId}, Mode: ${mode}, Query length: ${query?.length || 0}`);
console.log(`[MICRO-TASK API] Micro-task rate limit remaining: ${microTaskLimit.remaining}`); console.log(`[MICRO-TASK API] Micro-task calls remaining: ${rateLimitResult.microTasksRemaining}`);
if (!query || typeof query !== 'string') { if (!query || typeof query !== 'string') {
console.log(`[MICRO-TASK API] Invalid query for task ${clientTaskId}`); console.log(`[MICRO-TASK API] Invalid query for task ${clientTaskId}`);
@ -133,7 +141,6 @@ export const POST: APIRoute = async ({ request }) => {
console.log(`[MICRO-TASK API] About to enqueue micro-task pipeline ${taskId}`); console.log(`[MICRO-TASK API] About to enqueue micro-task pipeline ${taskId}`);
// Use the enhanced micro-task AI pipeline
const result = await enqueueApiCall(() => const result = await enqueueApiCall(() =>
aiPipeline.processQuery(sanitizedQuery, mode) aiPipeline.processQuery(sanitizedQuery, mode)
, taskId); , taskId);
@ -142,8 +149,10 @@ export const POST: APIRoute = async ({ request }) => {
return apiServerError.unavailable('No response from micro-task AI pipeline'); return apiServerError.unavailable('No response from micro-task AI pipeline');
} }
// Enhanced: Log micro-task statistics
const stats = result.processingStats; const stats = result.processingStats;
const estimatedAICallsMade = stats.microTasksCompleted + stats.microTasksFailed;
incrementMicroTaskCount(userId, estimatedAICallsMade);
console.log(`[MICRO-TASK API] Pipeline completed for ${taskId}:`); console.log(`[MICRO-TASK API] Pipeline completed for ${taskId}:`);
console.log(` - Mode: ${mode}`); console.log(` - Mode: ${mode}`);
console.log(` - User: ${userId}`); console.log(` - User: ${userId}`);
@ -151,10 +160,14 @@ export const POST: APIRoute = async ({ request }) => {
console.log(` - Processing time: ${stats.processingTimeMs}ms`); console.log(` - Processing time: ${stats.processingTimeMs}ms`);
console.log(` - Micro-tasks completed: ${stats.microTasksCompleted}`); console.log(` - Micro-tasks completed: ${stats.microTasksCompleted}`);
console.log(` - Micro-tasks failed: ${stats.microTasksFailed}`); console.log(` - Micro-tasks failed: ${stats.microTasksFailed}`);
console.log(` - Estimated AI calls: ${estimatedAICallsMade}`);
console.log(` - Embeddings used: ${stats.embeddingsUsed}`); console.log(` - Embeddings used: ${stats.embeddingsUsed}`);
console.log(` - Final items: ${stats.finalSelectedItems}`); console.log(` - Final items: ${stats.finalSelectedItems}`);
// Enhanced: Include pipeline information in response const currentLimit = rateLimitStore.get(userId);
const remainingMicroTasks = currentLimit ?
MICRO_TASK_TOTAL_LIMIT - currentLimit.microTaskCount : MICRO_TASK_TOTAL_LIMIT;
return new Response(JSON.stringify({ return new Response(JSON.stringify({
success: true, success: true,
mode, mode,
@ -163,14 +176,14 @@ export const POST: APIRoute = async ({ request }) => {
query: sanitizedQuery, query: sanitizedQuery,
processingStats: { processingStats: {
...result.processingStats, ...result.processingStats,
// Add micro-task specific info
pipelineType: 'micro-task', pipelineType: 'micro-task',
microTasksSuccessRate: stats.microTasksCompleted / (stats.microTasksCompleted + stats.microTasksFailed), microTasksSuccessRate: stats.microTasksCompleted / (stats.microTasksCompleted + stats.microTasksFailed),
averageTaskTime: stats.processingTimeMs / (stats.microTasksCompleted + stats.microTasksFailed) averageTaskTime: stats.processingTimeMs / (stats.microTasksCompleted + stats.microTasksFailed),
estimatedAICallsMade
}, },
// Enhanced: Rate limiting info for client
rateLimitInfo: { rateLimitInfo: {
remaining: microTaskLimit.remaining, mainRequestsRemaining: MAIN_RATE_LIMIT_MAX - (currentLimit?.count || 0),
microTaskCallsRemaining: remainingMicroTasks,
resetTime: Date.now() + RATE_LIMIT_WINDOW resetTime: Date.now() + RATE_LIMIT_WINDOW
} }
}), { }), {
@ -181,15 +194,14 @@ export const POST: APIRoute = async ({ request }) => {
} catch (error) { } catch (error) {
console.error('[MICRO-TASK API] Pipeline error:', error); console.error('[MICRO-TASK API] Pipeline error:', error);
// Enhanced: More specific error messages for micro-task pipeline
if (error.message.includes('embeddings')) { if (error.message.includes('embeddings')) {
return apiServerError.unavailable('Embeddings service error - falling back to selector AI'); return apiServerError.unavailable('Embeddings service error - using AI fallback');
} else if (error.message.includes('micro-task')) { } else if (error.message.includes('micro-task')) {
return apiServerError.unavailable('Micro-task pipeline error - some analysis steps may have failed'); return apiServerError.unavailable('Micro-task pipeline error - some analysis steps failed');
} else if (error.message.includes('selector')) { } else if (error.message.includes('selector')) {
return apiServerError.unavailable('AI selector service error'); return apiServerError.unavailable('AI selector service error');
} else if (error.message.includes('rate limit')) { } else if (error.message.includes('rate limit')) {
return apiError.rateLimit('AI service rate limits exceeded due to micro-task processing'); return apiError.rateLimit('AI service rate limits exceeded during micro-task processing');
} else { } else {
return apiServerError.internal('Micro-task AI pipeline error'); return apiServerError.internal('Micro-task AI pipeline error');
} }

View File

@ -1,8 +1,7 @@
// src/utils/aiPipeline.ts - FIXED: Restore proper structure with context continuity // src/utils/aiPipeline.ts - FIXED: Critical error corrections
import { getCompressedToolsDataForAI } from './dataService.js'; import { getCompressedToolsDataForAI } from './dataService.js';
import { embeddingsService, type EmbeddingData } from './embeddings.js'; import { embeddingsService, type EmbeddingData } from './embeddings.js';
import { vectorIndex } from './vectorIndex.js';
interface AIConfig { interface AIConfig {
endpoint: string; endpoint: string;
@ -31,21 +30,25 @@ interface AnalysisResult {
}; };
} }
// FIXED: Context object that builds up through pipeline
interface AnalysisContext { interface AnalysisContext {
userQuery: string; userQuery: string;
mode: string; mode: string;
filteredData: any; filteredData: any;
// ADDED: Context continuity
contextHistory: string[]; contextHistory: string[];
// Results (same as original) // FIXED: Add max context length tracking
maxContextLength: number;
currentContextLength: number;
scenarioAnalysis?: string; scenarioAnalysis?: string;
problemAnalysis?: string; problemAnalysis?: string;
investigationApproach?: string; investigationApproach?: string;
criticalConsiderations?: string; criticalConsiderations?: string;
selectedTools?: Array<{tool: any, phase: string, priority: string, justification?: string}>; selectedTools?: Array<{tool: any, phase: string, priority: string, justification?: string}>;
backgroundKnowledge?: Array<{concept: any, relevance: string}>; backgroundKnowledge?: Array<{concept: any, relevance: string}>;
// FIXED: Add seen tools tracking to prevent duplicates
seenToolNames: Set<string>;
} }
class ImprovedMicroTaskAIPipeline { class ImprovedMicroTaskAIPipeline {
@ -55,6 +58,10 @@ class ImprovedMicroTaskAIPipeline {
private similarityThreshold: number; private similarityThreshold: number;
private microTaskDelay: number; private microTaskDelay: number;
// FIXED: Add proper token management
private maxContextTokens: number;
private maxPromptTokens: number;
constructor() { constructor() {
this.config = { this.config = {
endpoint: this.getEnv('AI_ANALYZER_ENDPOINT'), endpoint: this.getEnv('AI_ANALYZER_ENDPOINT'),
@ -62,11 +69,14 @@ class ImprovedMicroTaskAIPipeline {
model: this.getEnv('AI_ANALYZER_MODEL') model: this.getEnv('AI_ANALYZER_MODEL')
}; };
// FIXED: Optimized for vectorIndex (HNSW) usage
this.maxSelectedItems = parseInt(process.env.AI_MAX_SELECTED_ITEMS || '60', 10); this.maxSelectedItems = parseInt(process.env.AI_MAX_SELECTED_ITEMS || '60', 10);
this.embeddingCandidates = parseInt(process.env.AI_EMBEDDING_CANDIDATES || '60', 10); // HNSW is more efficient this.embeddingCandidates = parseInt(process.env.AI_EMBEDDING_CANDIDATES || '60', 10);
this.similarityThreshold = 0.3; // Not used by vectorIndex, kept for fallback compatibility this.similarityThreshold = 0.3;
this.microTaskDelay = parseInt(process.env.AI_MICRO_TASK_DELAY_MS || '500', 10); this.microTaskDelay = parseInt(process.env.AI_MICRO_TASK_DELAY_MS || '500', 10);
// FIXED: Token management
this.maxContextTokens = parseInt(process.env.AI_MAX_CONTEXT_TOKENS || '4000', 10);
this.maxPromptTokens = parseInt(process.env.AI_MAX_PROMPT_TOKENS || '1500', 10);
} }
private getEnv(key: string): string { private getEnv(key: string): string {
@ -77,12 +87,68 @@ class ImprovedMicroTaskAIPipeline {
return value; return value;
} }
// IMPROVED: AI-driven selection (no hard-coded keywords) // FIXED: Estimate token count (rough approximation)
private async getIntelligentCandidates(userQuery: string, toolsData: any, mode: string) { private estimateTokens(text: string): number {
let candidateTools = new Set<string>(); return Math.ceil(text.length / 4); // Rough estimate: 4 chars per token
let candidateConcepts = new Set<string>(); }
// FIXED: Manage context history with token limits
private addToContextHistory(context: AnalysisContext, newEntry: string): void {
const entryTokens = this.estimateTokens(newEntry);
// Add new entry
context.contextHistory.push(newEntry);
context.currentContextLength += entryTokens;
// Prune old entries if exceeding limits
while (context.currentContextLength > this.maxContextTokens && context.contextHistory.length > 1) {
const removed = context.contextHistory.shift()!;
context.currentContextLength -= this.estimateTokens(removed);
}
}
// FIXED: Safe JSON parsing with validation
private safeParseJSON(jsonString: string, fallback: any = null): any {
try {
const cleaned = jsonString
.replace(/^```json\s*/i, '')
.replace(/\s*```\s*$/g, '')
.trim();
const parsed = JSON.parse(cleaned);
return parsed;
} catch (error) {
console.warn('[AI PIPELINE] JSON parsing failed:', error.message);
console.warn('[AI PIPELINE] Raw content:', jsonString.slice(0, 200));
return fallback;
}
}
// FIXED: Add tool deduplication
private addToolToSelection(context: AnalysisContext, tool: any, phase: string, priority: string, justification?: string): boolean {
if (context.seenToolNames.has(tool.name)) {
console.log(`[AI PIPELINE] Skipping duplicate tool: ${tool.name}`);
return false;
}
context.seenToolNames.add(tool.name);
if (!context.selectedTools) context.selectedTools = [];
context.selectedTools.push({
tool,
phase,
priority,
justification
});
return true;
}
private async getIntelligentCandidates(userQuery: string, toolsData: any, mode: string) {
let candidateTools: any[] = [];
let candidateConcepts: any[] = [];
let selectionMethod = 'unknown';
// Method 1: Embeddings-based selection (primary)
if (embeddingsService.isEnabled()) { if (embeddingsService.isEnabled()) {
const similarItems = await embeddingsService.findSimilar( const similarItems = await embeddingsService.findSimilar(
userQuery, userQuery,
@ -90,119 +156,249 @@ class ImprovedMicroTaskAIPipeline {
this.similarityThreshold this.similarityThreshold
); );
const toolNames = new Set<string>();
const conceptNames = new Set<string>();
similarItems.forEach(item => { similarItems.forEach(item => {
if (item.type === 'tool') candidateTools.add(item.name); if (item.type === 'tool') toolNames.add(item.name);
if (item.type === 'concept') candidateConcepts.add(item.name); if (item.type === 'concept') conceptNames.add(item.name);
}); });
console.log(`[IMPROVED PIPELINE] Embeddings selected: ${candidateTools.size} tools, ${candidateConcepts.size} concepts`); console.log(`[IMPROVED PIPELINE] Embeddings found: ${toolNames.size} tools, ${conceptNames.size} concepts`);
// FIXED: Use your expected flow - get full data of embeddings results
if (toolNames.size >= 15) { // Reasonable threshold for quality
candidateTools = toolsData.tools.filter((tool: any) => toolNames.has(tool.name));
candidateConcepts = toolsData.concepts.filter((concept: any) => conceptNames.has(concept.name));
selectionMethod = 'embeddings_candidates';
console.log(`[IMPROVED PIPELINE] Using embeddings candidates: ${candidateTools.length} tools`);
} else {
console.log(`[IMPROVED PIPELINE] Embeddings insufficient (${toolNames.size} < 15), using full dataset`);
candidateTools = toolsData.tools;
candidateConcepts = toolsData.concepts;
selectionMethod = 'full_dataset';
}
} else {
console.log(`[IMPROVED PIPELINE] Embeddings disabled, using full dataset`);
candidateTools = toolsData.tools;
candidateConcepts = toolsData.concepts;
selectionMethod = 'full_dataset';
}
// FIXED: NOW AI ANALYZES FULL DATA of the candidates
console.log(`[IMPROVED PIPELINE] AI will analyze FULL DATA of ${candidateTools.length} candidate tools`);
const finalSelection = await this.aiSelectionWithFullData(userQuery, candidateTools, candidateConcepts, mode, selectionMethod);
if (candidateTools.size >= 20) {
return { return {
tools: toolsData.tools.filter((tool: any) => candidateTools.has(tool.name)), tools: finalSelection.selectedTools,
concepts: toolsData.concepts.filter((concept: any) => candidateConcepts.has(concept.name)), concepts: finalSelection.selectedConcepts,
domains: toolsData.domains, domains: toolsData.domains,
phases: toolsData.phases, phases: toolsData.phases,
'domain-agnostic-software': toolsData['domain-agnostic-software'] 'domain-agnostic-software': toolsData['domain-agnostic-software']
}; };
} }
}
// Method 2: Fallback AI selection (like original selector) // src/utils/aiPipeline.ts - FIXED: De-biased AI selection prompt
console.log(`[IMPROVED PIPELINE] Using AI selector fallback`);
return await this.fallbackAISelection(userQuery, toolsData, mode);
}
// Fallback AI selection private async aiSelectionWithFullData(
private async fallbackAISelection(userQuery: string, toolsData: any, mode: string) { userQuery: string,
const toolsList = toolsData.tools.map((tool: any) => ({ candidateTools: any[],
candidateConcepts: any[],
mode: string,
selectionMethod: string
) {
const modeInstruction = mode === 'workflow'
? 'The user wants a COMPREHENSIVE WORKFLOW with multiple tools/methods across different phases. Select 15-25 tools that cover the full investigation lifecycle.'
: 'The user wants SPECIFIC TOOLS/METHODS that directly solve their particular problem. Select 3-8 tools that are most relevant and effective.';
// FIXED: Give AI the COMPLETE tool data, not truncated
const toolsWithFullData = candidateTools.map((tool: any) => ({
name: tool.name, name: tool.name,
type: tool.type, type: tool.type,
description: tool.description.slice(0, 200) + '...', description: tool.description,
domains: tool.domains, domains: tool.domains,
phases: tool.phases, phases: tool.phases,
tags: tool.tags?.slice(0, 5) || [], platforms: tool.platforms || [],
skillLevel: tool.skillLevel tags: tool.tags || [],
skillLevel: tool.skillLevel,
license: tool.license,
accessType: tool.accessType,
projectUrl: tool.projectUrl,
knowledgebase: tool.knowledgebase,
related_concepts: tool.related_concepts || [],
related_software: tool.related_software || []
})); }));
const conceptsList = toolsData.concepts.map((concept: any) => ({ const conceptsWithFullData = candidateConcepts.map((concept: any) => ({
name: concept.name, name: concept.name,
type: 'concept', type: 'concept',
description: concept.description.slice(0, 200) + '...', description: concept.description,
domains: concept.domains, domains: concept.domains,
phases: concept.phases, phases: concept.phases,
tags: concept.tags?.slice(0, 5) || [] tags: concept.tags || [],
skillLevel: concept.skillLevel,
related_concepts: concept.related_concepts || [],
related_software: concept.related_software || []
})); }));
const modeInstruction = mode === 'workflow' const prompt = `You are a DFIR expert with access to the complete forensics tool database. You need to select the most relevant tools and concepts for this specific query.
? 'The user wants a COMPREHENSIVE WORKFLOW with multiple tools/methods across different phases.'
: 'The user wants SPECIFIC TOOLS/METHODS that directly solve their particular problem.';
const prompt = `You are a DFIR expert tasked with selecting the most relevant tools and concepts for a user query. SELECTION METHOD: ${selectionMethod}
${selectionMethod === 'embeddings_candidates' ?
'These tools were pre-filtered by vector similarity, so they are already relevant. Your job is to select the BEST ones from this relevant set.' :
'You have access to the full tool database. Select the most relevant tools for the query.'}
${modeInstruction} ${modeInstruction}
AVAILABLE TOOLS:
${JSON.stringify(toolsList.slice(0, 50), null, 2)}
AVAILABLE CONCEPTS:
${JSON.stringify(conceptsList, null, 2)}
USER QUERY: "${userQuery}" USER QUERY: "${userQuery}"
Select the most relevant items (max ${this.maxSelectedItems} total). For workflow mode, prioritize breadth across phases. For tool mode, prioritize specificity and direct relevance. CRITICAL SELECTION PRINCIPLES:
1. **CONTEXT OVER POPULARITY**: Don't default to "famous" tools like Volatility, Wireshark, or Autopsy just because they're well-known. Choose based on SPECIFIC scenario needs.
2. **METHODOLOGY vs SOFTWARE**:
- For RAPID/URGENT scenarios Prioritize METHODS and rapid response approaches
- For TIME-CRITICAL incidents Choose triage methods over deep analysis tools
- For COMPREHENSIVE analysis Then consider detailed software tools
- METHODS (type: "method") are often better than SOFTWARE for procedural guidance
3. **SCENARIO-SPECIFIC LOGIC**:
- "Rapid/Quick/Urgent/Triage" scenarios Rapid Incident Response and Triage METHOD > Volatility
- "Industrial/SCADA/ICS" scenarios Specialized ICS tools > generic network tools
- "Mobile/Android/iOS" scenarios Mobile-specific tools > desktop forensics tools
- "Memory analysis needed urgently" Quick memory tools/methods > comprehensive Volatility analysis
4. **AVOID TOOL BIAS**:
- Volatility is NOT always the answer for memory analysis
- Wireshark is NOT always the answer for network analysis
- Autopsy is NOT always the answer for disk analysis
- Consider lighter, faster, more appropriate alternatives
AVAILABLE TOOLS (with complete data):
${JSON.stringify(toolsWithFullData.slice(0, 30), null, 2)}
AVAILABLE CONCEPTS (with complete data):
${JSON.stringify(conceptsWithFullData.slice(0, 10), null, 2)}
ANALYSIS INSTRUCTIONS:
1. Read the FULL description of each tool/concept
2. Consider ALL tags, platforms, related tools, and metadata
3. **MATCH URGENCY LEVEL**: Rapid scenarios need rapid methods, not deep analysis tools
4. **MATCH SPECIFICITY**: Specialized scenarios need specialized tools, not generic ones
5. **CONSIDER TYPE**: Methods provide procedural guidance, software provides technical capability
6. For SCADA/ICS queries: prioritize specialized ICS tools over generic network tools
7. For mobile queries: prioritize mobile-specific tools over desktop tools
8. For rapid/urgent queries: prioritize methodology and triage approaches
BIAS PREVENTION:
- If query mentions "rapid", "quick", "urgent", "triage" Strongly favor METHODS over deep analysis SOFTWARE
- If query mentions specific technologies (SCADA, Android, etc.) Strongly favor specialized tools
- Don't recommend Volatility unless deep memory analysis is specifically needed AND time allows
- Don't recommend generic tools when specialized ones are available
- Consider the SKILL LEVEL and TIME CONSTRAINTS implied by the query
Select the most relevant items (max ${this.maxSelectedItems} total).
Respond with ONLY this JSON format: Respond with ONLY this JSON format:
{ {
"selectedTools": ["Tool Name 1", "Tool Name 2", ...], "selectedTools": ["Tool Name 1", "Tool Name 2", ...],
"selectedConcepts": ["Concept Name 1", "Concept Name 2", ...], "selectedConcepts": ["Concept Name 1", "Concept Name 2", ...],
"reasoning": "Brief explanation of selection criteria and approach" "reasoning": "Detailed explanation of why these specific tools were selected for this query, addressing why certain popular tools were NOT selected if they were inappropriate for the scenario context"
}`; }`;
try { try {
const response = await this.callAI(prompt, 1500); const response = await this.callAI(prompt, 2500); // More tokens for bias prevention logic
const cleaned = response.replace(/^```json\s*/i, '').replace(/\s*```\s*$/g, '').trim();
const result = JSON.parse(cleaned);
if (!Array.isArray(result.selectedTools) || !Array.isArray(result.selectedConcepts)) { const result = this.safeParseJSON(response, null);
throw new Error('Invalid selection result structure');
if (!result || !Array.isArray(result.selectedTools) || !Array.isArray(result.selectedConcepts)) {
console.error('[IMPROVED PIPELINE] AI selection returned invalid structure:', response.slice(0, 200));
throw new Error('AI selection failed to return valid tool selection');
} }
const totalSelected = result.selectedTools.length + result.selectedConcepts.length; const totalSelected = result.selectedTools.length + result.selectedConcepts.length;
if (totalSelected > this.maxSelectedItems) { if (totalSelected === 0) {
console.warn(`[IMPROVED PIPELINE] Selection exceeded limit (${totalSelected}), truncating`); console.error('[IMPROVED PIPELINE] AI selection returned no tools');
result.selectedTools = result.selectedTools.slice(0, Math.floor(this.maxSelectedItems * 0.8)); throw new Error('AI selection returned empty selection');
result.selectedConcepts = result.selectedConcepts.slice(0, Math.ceil(this.maxSelectedItems * 0.2));
} }
console.log(`[IMPROVED PIPELINE] AI selector: ${result.selectedTools.length} tools, ${result.selectedConcepts.length} concepts`); console.log(`[IMPROVED PIPELINE] AI selected: ${result.selectedTools.length} tools, ${result.selectedConcepts.length} concepts`);
console.log(`[IMPROVED PIPELINE] AI reasoning: ${result.reasoning}`); console.log(`[IMPROVED PIPELINE] AI reasoning: ${result.reasoning}`);
// Return the actual tool/concept objects
const selectedTools = candidateTools.filter(tool => result.selectedTools.includes(tool.name));
const selectedConcepts = candidateConcepts.filter(concept => result.selectedConcepts.includes(concept.name));
console.log(`[IMPROVED PIPELINE] Final selection: ${selectedTools.length} tools with bias prevention applied`);
return { return {
tools: toolsData.tools.filter((tool: any) => result.selectedTools.includes(tool.name)), selectedTools,
concepts: toolsData.concepts.filter((concept: any) => result.selectedConcepts.includes(concept.name)), selectedConcepts
domains: toolsData.domains,
phases: toolsData.phases,
'domain-agnostic-software': toolsData['domain-agnostic-software']
}; };
} catch (error) { } catch (error) {
console.error('[IMPROVED PIPELINE] Failed to parse selector response'); console.error('[IMPROVED PIPELINE] AI selection failed:', error);
throw new Error('Invalid JSON response from selector AI');
// Emergency fallback with bias awareness
console.log('[IMPROVED PIPELINE] Using emergency keyword-based selection');
return this.emergencyKeywordSelection(userQuery, candidateTools, candidateConcepts, mode);
} }
} }
private emergencyKeywordSelection(userQuery: string, candidateTools: any[], candidateConcepts: any[], mode: string) {
const queryLower = userQuery.toLowerCase();
const keywords = queryLower.split(/\s+/).filter(word => word.length > 3);
// Score tools based on keyword matches in full data
const scoredTools = candidateTools.map(tool => {
const toolText = (
tool.name + ' ' +
tool.description + ' ' +
(tool.tags || []).join(' ') + ' ' +
(tool.platforms || []).join(' ') + ' ' +
(tool.domains || []).join(' ')
).toLowerCase();
const score = keywords.reduce((acc, keyword) => {
return acc + (toolText.includes(keyword) ? 1 : 0);
}, 0);
return { tool, score };
}).filter(item => item.score > 0)
.sort((a, b) => b.score - a.score);
const maxTools = mode === 'workflow' ? 20 : 8;
const selectedTools = scoredTools.slice(0, maxTools).map(item => item.tool);
console.log(`[IMPROVED PIPELINE] Emergency selection: ${selectedTools.length} tools, keywords: ${keywords.slice(0, 5).join(', ')}`);
return {
selectedTools,
selectedConcepts: candidateConcepts.slice(0, 3)
};
}
private async delay(ms: number): Promise<void> { private async delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms)); return new Promise(resolve => setTimeout(resolve, ms));
} }
// IMPROVED: Enhanced micro-task with context history
private async callMicroTaskAI(prompt: string, context: AnalysisContext, maxTokens: number = 300): Promise<MicroTaskResult> { private async callMicroTaskAI(prompt: string, context: AnalysisContext, maxTokens: number = 300): Promise<MicroTaskResult> {
const startTime = Date.now(); const startTime = Date.now();
// ADDED: Include context history for continuity // FIXED: Build context prompt with token management
const contextPrompt = context.contextHistory.length > 0 ? let contextPrompt = prompt;
`BISHERIGE ANALYSE:\n${context.contextHistory.join('\n\n')}\n\nAKTUELLE AUFGABE:\n${prompt}` : if (context.contextHistory.length > 0) {
prompt; const contextSection = `BISHERIGE ANALYSE:\n${context.contextHistory.join('\n\n')}\n\nAKTUELLE AUFGABE:\n`;
const combinedPrompt = contextSection + prompt;
// Check if combined prompt exceeds limits
if (this.estimateTokens(combinedPrompt) <= this.maxPromptTokens) {
contextPrompt = combinedPrompt;
} else {
console.warn('[AI PIPELINE] Context too long, using prompt only');
// Could implement smarter context truncation here
}
}
try { try {
const response = await this.callAI(contextPrompt, maxTokens); const response = await this.callAI(contextPrompt, maxTokens);
@ -225,9 +421,6 @@ Respond with ONLY this JSON format:
} }
} }
// FIXED: Restore original micro-task structure with context continuity
// MICRO-TASK 1: Scenario/Problem Analysis
private async analyzeScenario(context: AnalysisContext): Promise<MicroTaskResult> { private async analyzeScenario(context: AnalysisContext): Promise<MicroTaskResult> {
const isWorkflow = context.mode === 'workflow'; const isWorkflow = context.mode === 'workflow';
@ -258,17 +451,15 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen, Aufzählun
context.problemAnalysis = result.content; context.problemAnalysis = result.content;
} }
// ADDED: Build context history // FIXED: Use new context management
context.contextHistory.push(`${isWorkflow ? 'Szenario' : 'Problem'}-Analyse: ${result.content.slice(0, 200)}...`); this.addToContextHistory(context, `${isWorkflow ? 'Szenario' : 'Problem'}-Analyse: ${result.content.slice(0, 200)}...`);
} }
return result; return result;
} }
// MICRO-TASK 2: Investigation/Solution Approach
private async generateApproach(context: AnalysisContext): Promise<MicroTaskResult> { private async generateApproach(context: AnalysisContext): Promise<MicroTaskResult> {
const isWorkflow = context.mode === 'workflow'; const isWorkflow = context.mode === 'workflow';
const analysis = isWorkflow ? context.scenarioAnalysis : context.problemAnalysis;
const prompt = `Basierend auf der Analyse entwickeln Sie einen fundierten ${isWorkflow ? 'Untersuchungsansatz' : 'Lösungsansatz'} nach NIST SP 800-86 Methodik. const prompt = `Basierend auf der Analyse entwickeln Sie einen fundierten ${isWorkflow ? 'Untersuchungsansatz' : 'Lösungsansatz'} nach NIST SP 800-86 Methodik.
@ -291,13 +482,12 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
if (result.success) { if (result.success) {
context.investigationApproach = result.content; context.investigationApproach = result.content;
context.contextHistory.push(`${isWorkflow ? 'Untersuchungs' : 'Lösungs'}ansatz: ${result.content.slice(0, 200)}...`); this.addToContextHistory(context, `${isWorkflow ? 'Untersuchungs' : 'Lösungs'}ansatz: ${result.content.slice(0, 200)}...`);
} }
return result; return result;
} }
// MICRO-TASK 3: Critical Considerations
private async generateCriticalConsiderations(context: AnalysisContext): Promise<MicroTaskResult> { private async generateCriticalConsiderations(context: AnalysisContext): Promise<MicroTaskResult> {
const isWorkflow = context.mode === 'workflow'; const isWorkflow = context.mode === 'workflow';
@ -324,13 +514,12 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
if (result.success) { if (result.success) {
context.criticalConsiderations = result.content; context.criticalConsiderations = result.content;
context.contextHistory.push(`Kritische Überlegungen: ${result.content.slice(0, 200)}...`); this.addToContextHistory(context, `Kritische Überlegungen: ${result.content.slice(0, 200)}...`);
} }
return result; return result;
} }
// MICRO-TASK 4: Tool Selection for Phase (Workflow mode)
private async selectToolsForPhase(context: AnalysisContext, phase: any): Promise<MicroTaskResult> { private async selectToolsForPhase(context: AnalysisContext, phase: any): Promise<MicroTaskResult> {
const phaseTools = context.filteredData.tools.filter((tool: any) => const phaseTools = context.filteredData.tools.filter((tool: any) =>
tool.phases && tool.phases.includes(phase.id) tool.phases && tool.phases.includes(phase.id)
@ -370,41 +559,27 @@ Antworten Sie AUSSCHLIESSLICH mit diesem JSON-Format (kein zusätzlicher Text):
const result = await this.callMicroTaskAI(prompt, context, 450); const result = await this.callMicroTaskAI(prompt, context, 450);
if (result.success) { if (result.success) {
try { // FIXED: Safe JSON parsing with validation
const selections = JSON.parse(result.content.replace(/^```json\s*/i, '').replace(/\s*```\s*$/g, '').trim()); const selections = this.safeParseJSON(result.content, []);
if (Array.isArray(selections)) {
const validSelections = selections.filter((sel: any) => const validSelections = selections.filter((sel: any) =>
phaseTools.some((tool: any) => tool.name === sel.toolName) sel.toolName && phaseTools.some((tool: any) => tool.name === sel.toolName)
); );
if (!context.selectedTools) context.selectedTools = [];
validSelections.forEach((sel: any) => { validSelections.forEach((sel: any) => {
const tool = phaseTools.find((t: any) => t.name === sel.toolName); const tool = phaseTools.find((t: any) => t.name === sel.toolName);
if (tool) { if (tool) {
context.selectedTools!.push({ // FIXED: Use deduplication helper
tool, this.addToolToSelection(context, tool, phase.id, sel.priority, sel.justification);
phase: phase.id,
priority: sel.priority,
justification: sel.justification
});
} }
}); });
} catch (parseError) {
console.warn(`[IMPROVED PIPELINE] Failed to parse tool selection for ${phase.name}:`, result.content.slice(0, 200));
return {
...result,
success: false,
error: 'JSON parsing failed'
};
} }
} }
return result; return result;
} }
// MICRO-TASK 5: Tool Evaluation (Tool mode)
private async evaluateSpecificTool(context: AnalysisContext, tool: any, rank: number): Promise<MicroTaskResult> { private async evaluateSpecificTool(context: AnalysisContext, tool: any, rank: number): Promise<MicroTaskResult> {
const prompt = `Bewerten Sie diese Methode/Tool fallbezogen für das spezifische Problem nach forensischen Qualitätskriterien. const prompt = `Bewerten Sie diese Methode/Tool fallbezogen für das spezifische Problem nach forensischen Qualitätskriterien.
@ -428,36 +603,29 @@ Bewerten Sie nach forensischen Standards und antworten Sie AUSSCHLIESSLICH mit d
const result = await this.callMicroTaskAI(prompt, context, 650); const result = await this.callMicroTaskAI(prompt, context, 650);
if (result.success) { if (result.success) {
try { // FIXED: Safe JSON parsing
const evaluation = JSON.parse(result.content.replace(/^```json\s*/i, '').replace(/\s*```\s*$/g, '').trim()); const evaluation = this.safeParseJSON(result.content, {
suitability_score: 'medium',
detailed_explanation: 'Evaluation failed',
implementation_approach: '',
pros: [],
cons: [],
alternatives: ''
});
if (!context.selectedTools) context.selectedTools = []; // FIXED: Use deduplication helper
context.selectedTools.push({ this.addToolToSelection(context, {
tool: {
...tool, ...tool,
evaluation: { evaluation: {
...evaluation, ...evaluation,
rank rank
} }
}, }, 'evaluation', evaluation.suitability_score);
phase: 'evaluation',
priority: evaluation.suitability_score
});
} catch (parseError) {
console.warn(`[IMPROVED PIPELINE] Failed to parse tool evaluation for ${tool.name}:`, result.content.slice(0, 200));
return {
...result,
success: false,
error: 'JSON parsing failed'
};
}
} }
return result; return result;
} }
// MICRO-TASK 6: Background Knowledge
private async selectBackgroundKnowledge(context: AnalysisContext): Promise<MicroTaskResult> { private async selectBackgroundKnowledge(context: AnalysisContext): Promise<MicroTaskResult> {
const availableConcepts = context.filteredData.concepts; const availableConcepts = context.filteredData.concepts;
@ -493,30 +661,22 @@ Antworten Sie AUSSCHLIESSLICH mit diesem JSON-Format:
const result = await this.callMicroTaskAI(prompt, context, 400); const result = await this.callMicroTaskAI(prompt, context, 400);
if (result.success) { if (result.success) {
try { // FIXED: Safe JSON parsing
const selections = JSON.parse(result.content.replace(/^```json\s*/i, '').replace(/\s*```\s*$/g, '').trim()); const selections = this.safeParseJSON(result.content, []);
if (Array.isArray(selections)) {
context.backgroundKnowledge = selections.filter((sel: any) => context.backgroundKnowledge = selections.filter((sel: any) =>
availableConcepts.some((concept: any) => concept.name === sel.conceptName) sel.conceptName && availableConcepts.some((concept: any) => concept.name === sel.conceptName)
).map((sel: any) => ({ ).map((sel: any) => ({
concept: availableConcepts.find((c: any) => c.name === sel.conceptName), concept: availableConcepts.find((c: any) => c.name === sel.conceptName),
relevance: sel.relevance relevance: sel.relevance
})); }));
} catch (parseError) {
console.warn('[IMPROVED PIPELINE] Failed to parse background knowledge selection:', result.content.slice(0, 200));
return {
...result,
success: false,
error: 'JSON parsing failed'
};
} }
} }
return result; return result;
} }
// MICRO-TASK 7: Final Recommendations
private async generateFinalRecommendations(context: AnalysisContext): Promise<MicroTaskResult> { private async generateFinalRecommendations(context: AnalysisContext): Promise<MicroTaskResult> {
const isWorkflow = context.mode === 'workflow'; const isWorkflow = context.mode === 'workflow';
@ -543,7 +703,6 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
return result; return result;
} }
// Helper method for AI calls
private async callAI(prompt: string, maxTokens: number = 1000): Promise<string> { private async callAI(prompt: string, maxTokens: number = 1000): Promise<string> {
const response = await fetch(`${this.config.endpoint}/v1/chat/completions`, { const response = await fetch(`${this.config.endpoint}/v1/chat/completions`, {
method: 'POST', method: 'POST',
@ -574,7 +733,6 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
return content; return content;
} }
// MAIN PROCESSING: Restored original structure with context continuity
async processQuery(userQuery: string, mode: string): Promise<AnalysisResult> { async processQuery(userQuery: string, mode: string): Promise<AnalysisResult> {
const startTime = Date.now(); const startTime = Date.now();
let completedTasks = 0; let completedTasks = 0;
@ -587,17 +745,20 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
const toolsData = await getCompressedToolsDataForAI(); const toolsData = await getCompressedToolsDataForAI();
const filteredData = await this.getIntelligentCandidates(userQuery, toolsData, mode); const filteredData = await this.getIntelligentCandidates(userQuery, toolsData, mode);
// Initialize context with continuity // FIXED: Initialize context with proper state management
const context: AnalysisContext = { const context: AnalysisContext = {
userQuery, userQuery,
mode, mode,
filteredData, filteredData,
contextHistory: [] // ADDED: Context continuity contextHistory: [],
maxContextLength: this.maxContextTokens,
currentContextLength: 0,
seenToolNames: new Set<string>() // FIXED: Add deduplication tracking
}; };
console.log(`[IMPROVED PIPELINE] Starting micro-tasks with ${filteredData.tools.length} tools visible`); console.log(`[IMPROVED PIPELINE] Starting micro-tasks with ${filteredData.tools.length} tools visible`);
// MICRO-TASK SEQUENCE (restored original structure) // MICRO-TASK SEQUENCE
// Task 1: Scenario/Problem Analysis // Task 1: Scenario/Problem Analysis
const analysisResult = await this.analyzeScenario(context); const analysisResult = await this.analyzeScenario(context);
@ -642,12 +803,11 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
const finalResult = await this.generateFinalRecommendations(context); const finalResult = await this.generateFinalRecommendations(context);
if (finalResult.success) completedTasks++; else failedTasks++; if (finalResult.success) completedTasks++; else failedTasks++;
// Build final recommendation (same as original) // Build final recommendation
const recommendation = this.buildRecommendation(context, mode, finalResult.content); const recommendation = this.buildRecommendation(context, mode, finalResult.content);
const processingStats = { const processingStats = {
embeddingsUsed: embeddingsService.isEnabled(), embeddingsUsed: embeddingsService.isEnabled(),
vectorIndexUsed: embeddingsService.isEnabled(), // VectorIndex is used when embeddings are enabled
candidatesFromEmbeddings: filteredData.tools.length, candidatesFromEmbeddings: filteredData.tools.length,
finalSelectedItems: (context.selectedTools?.length || 0) + finalSelectedItems: (context.selectedTools?.length || 0) +
(context.backgroundKnowledge?.length || 0), (context.backgroundKnowledge?.length || 0),
@ -658,7 +818,7 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
}; };
console.log(`[IMPROVED PIPELINE] Completed: ${completedTasks} tasks, Failed: ${failedTasks} tasks`); console.log(`[IMPROVED PIPELINE] Completed: ${completedTasks} tasks, Failed: ${failedTasks} tasks`);
console.log(`[IMPROVED PIPELINE] VectorIndex used: ${embeddingsService.isEnabled()}, Candidates: ${filteredData.tools.length}`); console.log(`[IMPROVED PIPELINE] Unique tools selected: ${context.seenToolNames.size}`);
return { return {
recommendation, recommendation,
@ -671,7 +831,7 @@ WICHTIG: Antworten Sie NUR in fließendem deutschen Text ohne Listen oder Markdo
} }
} }
// Build recommendation (same as original structure) // Build recommendation (same structure but using fixed context)
private buildRecommendation(context: AnalysisContext, mode: string, finalContent: string): any { private buildRecommendation(context: AnalysisContext, mode: string, finalContent: string): any {
const isWorkflow = mode === 'workflow'; const isWorkflow = mode === 'workflow';

View File

@ -1,4 +1,4 @@
// src/utils/rateLimitedQueue.ts // src/utils/rateLimitedQueue.ts - FIXED: Memory leak and better cleanup
import dotenv from "dotenv"; import dotenv from "dotenv";
@ -32,6 +32,43 @@ class RateLimitedQueue {
private lastProcessedAt = 0; private lastProcessedAt = 0;
private currentlyProcessingTaskId: string | null = null; private currentlyProcessingTaskId: string | null = null;
private cleanupInterval: NodeJS.Timeout;
private readonly TASK_RETENTION_MS = 30000;
constructor() {
this.cleanupInterval = setInterval(() => {
this.cleanupOldTasks();
}, 30000);
}
private cleanupOldTasks(): void {
const now = Date.now();
const initialLength = this.tasks.length;
this.tasks = this.tasks.filter(task => {
if (task.status === 'queued' || task.status === 'processing') {
return true;
}
if (task.completedAt && (now - task.completedAt) > this.TASK_RETENTION_MS) {
return false;
}
return true;
});
const cleaned = initialLength - this.tasks.length;
if (cleaned > 0) {
console.log(`[QUEUE] Cleaned up ${cleaned} old tasks, ${this.tasks.length} remaining`);
}
}
public shutdown(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
}
}
add<T>(task: Task<T>, taskId?: string): Promise<T> { add<T>(task: Task<T>, taskId?: string): Promise<T> {
const id = taskId || this.generateTaskId(); const id = taskId || this.generateTaskId();
@ -103,7 +140,6 @@ class RateLimitedQueue {
const processingOffset = processingTasks.length > 0 ? 1 : 0; const processingOffset = processingTasks.length > 0 ? 1 : 0;
status.currentPosition = processingOffset + positionInQueue + 1; status.currentPosition = processingOffset + positionInQueue + 1;
} }
} else if (task.status === 'completed' || task.status === 'failed') {
} }
} else { } else {
const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1]; const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1];
@ -152,7 +188,6 @@ class RateLimitedQueue {
this.currentlyProcessingTaskId = nextTask.id; this.currentlyProcessingTaskId = nextTask.id;
this.lastProcessedAt = Date.now(); this.lastProcessedAt = Date.now();
try { try {
await nextTask.task(); await nextTask.task();
nextTask.status = 'completed'; nextTask.status = 'completed';
@ -166,14 +201,6 @@ class RateLimitedQueue {
this.currentlyProcessingTaskId = null; this.currentlyProcessingTaskId = null;
setTimeout(() => {
const index = this.tasks.findIndex(t => t.id === nextTask.id);
if (index >= 0) {
console.log(`[QUEUE] Removing completed task ${nextTask.id}`);
this.tasks.splice(index, 1);
}
}, 10000);
const hasMoreQueued = this.tasks.some(t => t.status === 'queued'); const hasMoreQueued = this.tasks.some(t => t.status === 'queued');
if (hasMoreQueued) { if (hasMoreQueued) {
console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`); console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`);
@ -201,4 +228,8 @@ export function getQueueStatus(taskId?: string): QueueStatus {
return queue.getStatus(taskId); return queue.getStatus(taskId);
} }
export function shutdownQueue(): void {
queue.shutdown();
}
export default queue; export default queue;

View File

@ -1,45 +0,0 @@
import { embeddingsService, type EmbeddingData } from "./embeddings.js";
// Fix for CommonJS module import in ESM environment
import pkg from "hnswlib-node";
const { HierarchicalNSW } = pkg;
export interface SimilarItem extends EmbeddingData {
similarity: number; // 1 = identical, 0 = orthogonal
}
class VectorIndex {
private index: InstanceType<typeof HierarchicalNSW> | null = null;
private idToItem: SimilarItem[] = [];
private readonly dim = 1024; // MistralAI embedding dimensionality
/** Build HNSW index once (idempotent) */
private async build(): Promise<void> {
if (this.index) return;
await embeddingsService.initialize();
const catalogue = (embeddingsService as any).embeddings as EmbeddingData[];
this.index = new HierarchicalNSW("cosine", this.dim);
this.index.initIndex(catalogue.length);
catalogue.forEach((item, id) => {
this.index!.addPoint(item.embedding, id);
this.idToItem[id] = { ...item, similarity: 0 } as SimilarItem;
});
}
/** Returns the K most similar catalogue items to an adhoc query string. */
async findSimilar(text: string, k = 40): Promise<SimilarItem[]> {
await this.build();
const queryEmb = await embeddingsService.embedText(text.toLowerCase());
const { neighbors, distances } = this.index!.searchKnn(queryEmb, k);
return neighbors.map((id: number, i: number) => ({
...this.idToItem[id],
similarity: 1 - distances[i], // cosine distance → similarity
}));
}
}
export const vectorIndex = new VectorIndex();