From 8c5dc36788ded121d554b1bc657ff46691ab555b Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Tue, 5 Aug 2025 15:01:07 +0200 Subject: [PATCH] add env var for timeouts --- src/utils/rateLimitedQueue.ts | 114 ++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 53 deletions(-) diff --git a/src/utils/rateLimitedQueue.ts b/src/utils/rateLimitedQueue.ts index 603807a..18286f8 100644 --- a/src/utils/rateLimitedQueue.ts +++ b/src/utils/rateLimitedQueue.ts @@ -1,10 +1,14 @@ -// src/utils/rateLimitedQueue.ts - FIXED: Memory leak and better cleanup +// src/utils/rateLimitedQueue.ts import dotenv from "dotenv"; dotenv.config(); -const RATE_LIMIT_DELAY_MS = Number.parseInt(process.env.AI_RATE_LIMIT_DELAY_MS ?? "2000", 10) || 2000; +const RATE_LIMIT_DELAY_MS = + Number.parseInt(process.env.AI_RATE_LIMIT_DELAY_MS ?? "2000", 10) || 2000; + +const TASK_TIMEOUT_MS = + Number.parseInt(process.env.AI_TASK_TIMEOUT_MS ?? "300000", 10) || 300000; export type Task = () => Promise; @@ -12,7 +16,7 @@ interface QueuedTask { id: string; task: Task; addedAt: number; - status: 'queued' | 'processing' | 'completed' | 'failed'; + status: "queued" | "processing" | "completed" | "failed" | "timedout"; startedAt?: number; completedAt?: number; } @@ -29,11 +33,12 @@ class RateLimitedQueue { private tasks: QueuedTask[] = []; private isProcessing = false; private delayMs = RATE_LIMIT_DELAY_MS; + private taskTimeoutMs = TASK_TIMEOUT_MS; private lastProcessedAt = 0; private currentlyProcessingTaskId: string | null = null; - + private cleanupInterval: NodeJS.Timeout; - private readonly TASK_RETENTION_MS = 30000; + private readonly TASK_RETENTION_MS = 300000; // 5 minutes constructor() { this.cleanupInterval = setInterval(() => { @@ -44,19 +49,19 @@ class RateLimitedQueue { private cleanupOldTasks(): void { const now = Date.now(); const initialLength = this.tasks.length; - - this.tasks = this.tasks.filter(task => { - if (task.status === 'queued' || task.status === 'processing') { + + this.tasks = this.tasks.filter((task) => { + if (task.status === "queued" || task.status === "processing") { return true; } - - if (task.completedAt && (now - task.completedAt) > this.TASK_RETENTION_MS) { + + if (task.completedAt && now - task.completedAt > this.TASK_RETENTION_MS) { return false; } - + return true; }); - + const cleaned = initialLength - this.tasks.length; if (cleaned > 0) { console.log(`[QUEUE] Cleaned up ${cleaned} old tasks, ${this.tasks.length} remaining`); @@ -86,11 +91,12 @@ class RateLimitedQueue { } }, addedAt: Date.now(), - status: 'queued' + status: "queued", }; - + this.tasks.push(queuedTask); - + + // Kick the processor soon. setTimeout(() => { this.processQueue(); }, 100); @@ -98,12 +104,12 @@ class RateLimitedQueue { } getStatus(taskId?: string): QueueStatus { - const queuedTasks = this.tasks.filter(t => t.status === 'queued'); - const processingTasks = this.tasks.filter(t => t.status === 'processing'); + const queuedTasks = this.tasks.filter((t) => t.status === "queued"); + const processingTasks = this.tasks.filter((t) => t.status === "processing"); const queueLength = queuedTasks.length + processingTasks.length; - + const now = Date.now(); - + let estimatedWaitTime = 0; if (queueLength > 0) { if (this.isProcessing && this.lastProcessedAt > 0) { @@ -118,38 +124,34 @@ class RateLimitedQueue { const status: QueueStatus = { queueLength, isProcessing: this.isProcessing, - estimatedWaitTime + estimatedWaitTime, }; if (taskId) { - const task = this.tasks.find(t => t.id === taskId); - + const task = this.tasks.find((t) => t.id === taskId); + if (task) { status.taskStatus = task.status; - - if (task.status === 'processing') { + + if (task.status === "processing") { status.currentPosition = 1; - } else if (task.status === 'queued') { + } else if (task.status === "queued") { const queuedTasksInOrder = this.tasks - .filter(t => t.status === 'queued') + .filter((t) => t.status === "queued") .sort((a, b) => a.addedAt - b.addedAt); - - const positionInQueue = queuedTasksInOrder.findIndex(t => t.id === taskId); - + + const positionInQueue = queuedTasksInOrder.findIndex((t) => t.id === taskId); + if (positionInQueue >= 0) { const processingOffset = processingTasks.length > 0 ? 1 : 0; status.currentPosition = processingOffset + positionInQueue + 1; } } - } else { + } else { const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1]; if (taskTimestamp) { const taskAge = now - parseInt(taskTimestamp); - if (taskAge < 30000) { - status.taskStatus = 'starting'; - } else { - status.taskStatus = 'unknown'; - } + status.taskStatus = taskAge < 300000 ? "starting" : "unknown"; } } } @@ -158,41 +160,47 @@ class RateLimitedQueue { } private async processQueue(): Promise { - if (this.isProcessing) { - return; - } + if (this.isProcessing) return; this.isProcessing = true; try { while (true) { const nextTask = this.tasks - .filter(t => t.status === 'queued') + .filter((t) => t.status === "queued") .sort((a, b) => a.addedAt - b.addedAt)[0]; - - if (!nextTask) { - break; - } - - nextTask.status = 'processing'; + + if (!nextTask) break; // No more work + + nextTask.status = "processing"; nextTask.startedAt = Date.now(); this.currentlyProcessingTaskId = nextTask.id; this.lastProcessedAt = Date.now(); - + try { - await nextTask.task(); - nextTask.status = 'completed'; + await Promise.race([ + nextTask.task(), + new Promise((_, reject) => + setTimeout( + () => reject(new Error(`Task ${nextTask.id} timed out after ${this.taskTimeoutMs} ms`)), + this.taskTimeoutMs, + ), + ), + ]); + + nextTask.status = "completed"; nextTask.completedAt = Date.now(); console.log(`[QUEUE] Task ${nextTask.id} completed`); } catch (error) { - nextTask.status = 'failed'; + const err = error as Error; + nextTask.status = err.message.includes("timed out") ? "timedout" : "failed"; nextTask.completedAt = Date.now(); console.error(`[QUEUE] Task ${nextTask.id} failed:`, error); } - + this.currentlyProcessingTaskId = null; - - const hasMoreQueued = this.tasks.some(t => t.status === 'queued'); + + const hasMoreQueued = this.tasks.some((t) => t.status === "queued"); if (hasMoreQueued) { console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`); await new Promise((r) => setTimeout(r, this.delayMs)); @@ -223,4 +231,4 @@ export function shutdownQueue(): void { queue.shutdown(); } -export default queue; \ No newline at end of file +export default queue;