add env var for timeouts
This commit is contained in:
		
							parent
							
								
									cba42962f6
								
							
						
					
					
						commit
						8c5dc36788
					
				@ -1,10 +1,14 @@
 | 
				
			|||||||
// src/utils/rateLimitedQueue.ts - FIXED: Memory leak and better cleanup
 | 
					// src/utils/rateLimitedQueue.ts
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import dotenv from "dotenv";
 | 
					import dotenv from "dotenv";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
dotenv.config();
 | 
					dotenv.config();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const RATE_LIMIT_DELAY_MS = Number.parseInt(process.env.AI_RATE_LIMIT_DELAY_MS ?? "2000", 10) || 2000;
 | 
					const RATE_LIMIT_DELAY_MS =
 | 
				
			||||||
 | 
					  Number.parseInt(process.env.AI_RATE_LIMIT_DELAY_MS ?? "2000", 10) || 2000;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const TASK_TIMEOUT_MS =
 | 
				
			||||||
 | 
					  Number.parseInt(process.env.AI_TASK_TIMEOUT_MS ?? "300000", 10) || 300000;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
export type Task<T = unknown> = () => Promise<T>;
 | 
					export type Task<T = unknown> = () => Promise<T>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -12,7 +16,7 @@ interface QueuedTask {
 | 
				
			|||||||
  id: string;
 | 
					  id: string;
 | 
				
			||||||
  task: Task;
 | 
					  task: Task;
 | 
				
			||||||
  addedAt: number;
 | 
					  addedAt: number;
 | 
				
			||||||
  status: 'queued' | 'processing' | 'completed' | 'failed';
 | 
					  status: "queued" | "processing" | "completed" | "failed" | "timedout";
 | 
				
			||||||
  startedAt?: number;
 | 
					  startedAt?: number;
 | 
				
			||||||
  completedAt?: number;
 | 
					  completedAt?: number;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -29,11 +33,12 @@ class RateLimitedQueue {
 | 
				
			|||||||
  private tasks: QueuedTask[] = [];
 | 
					  private tasks: QueuedTask[] = [];
 | 
				
			||||||
  private isProcessing = false;
 | 
					  private isProcessing = false;
 | 
				
			||||||
  private delayMs = RATE_LIMIT_DELAY_MS;
 | 
					  private delayMs = RATE_LIMIT_DELAY_MS;
 | 
				
			||||||
 | 
					  private taskTimeoutMs = TASK_TIMEOUT_MS;
 | 
				
			||||||
  private lastProcessedAt = 0;
 | 
					  private lastProcessedAt = 0;
 | 
				
			||||||
  private currentlyProcessingTaskId: string | null = null;
 | 
					  private currentlyProcessingTaskId: string | null = null;
 | 
				
			||||||
  
 | 
					
 | 
				
			||||||
  private cleanupInterval: NodeJS.Timeout;
 | 
					  private cleanupInterval: NodeJS.Timeout;
 | 
				
			||||||
  private readonly TASK_RETENTION_MS = 30000; 
 | 
					  private readonly TASK_RETENTION_MS = 300000; // 5 minutes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  constructor() {
 | 
					  constructor() {
 | 
				
			||||||
    this.cleanupInterval = setInterval(() => {
 | 
					    this.cleanupInterval = setInterval(() => {
 | 
				
			||||||
@ -44,19 +49,19 @@ class RateLimitedQueue {
 | 
				
			|||||||
  private cleanupOldTasks(): void {
 | 
					  private cleanupOldTasks(): void {
 | 
				
			||||||
    const now = Date.now();
 | 
					    const now = Date.now();
 | 
				
			||||||
    const initialLength = this.tasks.length;
 | 
					    const initialLength = this.tasks.length;
 | 
				
			||||||
    
 | 
					
 | 
				
			||||||
    this.tasks = this.tasks.filter(task => {
 | 
					    this.tasks = this.tasks.filter((task) => {
 | 
				
			||||||
      if (task.status === 'queued' || task.status === 'processing') {
 | 
					      if (task.status === "queued" || task.status === "processing") {
 | 
				
			||||||
        return true;
 | 
					        return true;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      
 | 
					
 | 
				
			||||||
      if (task.completedAt && (now - task.completedAt) > this.TASK_RETENTION_MS) {
 | 
					      if (task.completedAt && now - task.completedAt > this.TASK_RETENTION_MS) {
 | 
				
			||||||
        return false;
 | 
					        return false;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      
 | 
					
 | 
				
			||||||
      return true;
 | 
					      return true;
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
    
 | 
					
 | 
				
			||||||
    const cleaned = initialLength - this.tasks.length;
 | 
					    const cleaned = initialLength - this.tasks.length;
 | 
				
			||||||
    if (cleaned > 0) {
 | 
					    if (cleaned > 0) {
 | 
				
			||||||
      console.log(`[QUEUE] Cleaned up ${cleaned} old tasks, ${this.tasks.length} remaining`);
 | 
					      console.log(`[QUEUE] Cleaned up ${cleaned} old tasks, ${this.tasks.length} remaining`);
 | 
				
			||||||
@ -86,11 +91,12 @@ class RateLimitedQueue {
 | 
				
			|||||||
          }
 | 
					          }
 | 
				
			||||||
        },
 | 
					        },
 | 
				
			||||||
        addedAt: Date.now(),
 | 
					        addedAt: Date.now(),
 | 
				
			||||||
        status: 'queued'
 | 
					        status: "queued",
 | 
				
			||||||
      };
 | 
					      };
 | 
				
			||||||
      
 | 
					
 | 
				
			||||||
      this.tasks.push(queuedTask);
 | 
					      this.tasks.push(queuedTask);
 | 
				
			||||||
      
 | 
					
 | 
				
			||||||
 | 
					      // Kick the processor soon.
 | 
				
			||||||
      setTimeout(() => {
 | 
					      setTimeout(() => {
 | 
				
			||||||
        this.processQueue();
 | 
					        this.processQueue();
 | 
				
			||||||
      }, 100);
 | 
					      }, 100);
 | 
				
			||||||
@ -98,12 +104,12 @@ class RateLimitedQueue {
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  getStatus(taskId?: string): QueueStatus {
 | 
					  getStatus(taskId?: string): QueueStatus {
 | 
				
			||||||
    const queuedTasks = this.tasks.filter(t => t.status === 'queued');
 | 
					    const queuedTasks = this.tasks.filter((t) => t.status === "queued");
 | 
				
			||||||
    const processingTasks = this.tasks.filter(t => t.status === 'processing');
 | 
					    const processingTasks = this.tasks.filter((t) => t.status === "processing");
 | 
				
			||||||
    const queueLength = queuedTasks.length + processingTasks.length;
 | 
					    const queueLength = queuedTasks.length + processingTasks.length;
 | 
				
			||||||
    
 | 
					
 | 
				
			||||||
    const now = Date.now();
 | 
					    const now = Date.now();
 | 
				
			||||||
    
 | 
					
 | 
				
			||||||
    let estimatedWaitTime = 0;
 | 
					    let estimatedWaitTime = 0;
 | 
				
			||||||
    if (queueLength > 0) {
 | 
					    if (queueLength > 0) {
 | 
				
			||||||
      if (this.isProcessing && this.lastProcessedAt > 0) {
 | 
					      if (this.isProcessing && this.lastProcessedAt > 0) {
 | 
				
			||||||
@ -118,38 +124,34 @@ class RateLimitedQueue {
 | 
				
			|||||||
    const status: QueueStatus = {
 | 
					    const status: QueueStatus = {
 | 
				
			||||||
      queueLength,
 | 
					      queueLength,
 | 
				
			||||||
      isProcessing: this.isProcessing,
 | 
					      isProcessing: this.isProcessing,
 | 
				
			||||||
      estimatedWaitTime
 | 
					      estimatedWaitTime,
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (taskId) {
 | 
					    if (taskId) {
 | 
				
			||||||
      const task = this.tasks.find(t => t.id === taskId);
 | 
					      const task = this.tasks.find((t) => t.id === taskId);
 | 
				
			||||||
      
 | 
					
 | 
				
			||||||
      if (task) {
 | 
					      if (task) {
 | 
				
			||||||
        status.taskStatus = task.status;
 | 
					        status.taskStatus = task.status;
 | 
				
			||||||
        
 | 
					
 | 
				
			||||||
        if (task.status === 'processing') {
 | 
					        if (task.status === "processing") {
 | 
				
			||||||
          status.currentPosition = 1;
 | 
					          status.currentPosition = 1;
 | 
				
			||||||
        } else if (task.status === 'queued') {
 | 
					        } else if (task.status === "queued") {
 | 
				
			||||||
          const queuedTasksInOrder = this.tasks
 | 
					          const queuedTasksInOrder = this.tasks
 | 
				
			||||||
            .filter(t => t.status === 'queued')
 | 
					            .filter((t) => t.status === "queued")
 | 
				
			||||||
            .sort((a, b) => a.addedAt - b.addedAt);
 | 
					            .sort((a, b) => a.addedAt - b.addedAt);
 | 
				
			||||||
          
 | 
					
 | 
				
			||||||
          const positionInQueue = queuedTasksInOrder.findIndex(t => t.id === taskId);
 | 
					          const positionInQueue = queuedTasksInOrder.findIndex((t) => t.id === taskId);
 | 
				
			||||||
          
 | 
					
 | 
				
			||||||
          if (positionInQueue >= 0) {
 | 
					          if (positionInQueue >= 0) {
 | 
				
			||||||
            const processingOffset = processingTasks.length > 0 ? 1 : 0;
 | 
					            const processingOffset = processingTasks.length > 0 ? 1 : 0;
 | 
				
			||||||
            status.currentPosition = processingOffset + positionInQueue + 1;
 | 
					            status.currentPosition = processingOffset + positionInQueue + 1;
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      } else {        
 | 
					      } else {
 | 
				
			||||||
        const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1];
 | 
					        const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1];
 | 
				
			||||||
        if (taskTimestamp) {
 | 
					        if (taskTimestamp) {
 | 
				
			||||||
          const taskAge = now - parseInt(taskTimestamp);
 | 
					          const taskAge = now - parseInt(taskTimestamp);
 | 
				
			||||||
          if (taskAge < 30000) { 
 | 
					          status.taskStatus = taskAge < 300000 ? "starting" : "unknown";
 | 
				
			||||||
            status.taskStatus = 'starting';
 | 
					 | 
				
			||||||
          } else {
 | 
					 | 
				
			||||||
            status.taskStatus = 'unknown';
 | 
					 | 
				
			||||||
          }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -158,41 +160,47 @@ class RateLimitedQueue {
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private async processQueue(): Promise<void> {
 | 
					  private async processQueue(): Promise<void> {
 | 
				
			||||||
    if (this.isProcessing) {
 | 
					    if (this.isProcessing) return;
 | 
				
			||||||
      return;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    this.isProcessing = true;
 | 
					    this.isProcessing = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try {
 | 
					    try {
 | 
				
			||||||
      while (true) {
 | 
					      while (true) {
 | 
				
			||||||
        const nextTask = this.tasks
 | 
					        const nextTask = this.tasks
 | 
				
			||||||
          .filter(t => t.status === 'queued')
 | 
					          .filter((t) => t.status === "queued")
 | 
				
			||||||
          .sort((a, b) => a.addedAt - b.addedAt)[0];
 | 
					          .sort((a, b) => a.addedAt - b.addedAt)[0];
 | 
				
			||||||
        
 | 
					
 | 
				
			||||||
        if (!nextTask) {
 | 
					        if (!nextTask) break; // No more work
 | 
				
			||||||
          break;
 | 
					
 | 
				
			||||||
        }
 | 
					        nextTask.status = "processing";
 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        nextTask.status = 'processing';
 | 
					 | 
				
			||||||
        nextTask.startedAt = Date.now();
 | 
					        nextTask.startedAt = Date.now();
 | 
				
			||||||
        this.currentlyProcessingTaskId = nextTask.id;
 | 
					        this.currentlyProcessingTaskId = nextTask.id;
 | 
				
			||||||
        this.lastProcessedAt = Date.now();
 | 
					        this.lastProcessedAt = Date.now();
 | 
				
			||||||
        
 | 
					
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
          await nextTask.task();
 | 
					          await Promise.race([
 | 
				
			||||||
          nextTask.status = 'completed';
 | 
					            nextTask.task(),
 | 
				
			||||||
 | 
					            new Promise((_, reject) =>
 | 
				
			||||||
 | 
					              setTimeout(
 | 
				
			||||||
 | 
					                () => reject(new Error(`Task ${nextTask.id} timed out after ${this.taskTimeoutMs} ms`)),
 | 
				
			||||||
 | 
					                this.taskTimeoutMs,
 | 
				
			||||||
 | 
					              ),
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					          ]);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          nextTask.status = "completed";
 | 
				
			||||||
          nextTask.completedAt = Date.now();
 | 
					          nextTask.completedAt = Date.now();
 | 
				
			||||||
          console.log(`[QUEUE] Task ${nextTask.id} completed`);
 | 
					          console.log(`[QUEUE] Task ${nextTask.id} completed`);
 | 
				
			||||||
        } catch (error) {
 | 
					        } catch (error) {
 | 
				
			||||||
          nextTask.status = 'failed';
 | 
					          const err = error as Error;
 | 
				
			||||||
 | 
					          nextTask.status = err.message.includes("timed out") ? "timedout" : "failed";
 | 
				
			||||||
          nextTask.completedAt = Date.now();
 | 
					          nextTask.completedAt = Date.now();
 | 
				
			||||||
          console.error(`[QUEUE] Task ${nextTask.id} failed:`, error);
 | 
					          console.error(`[QUEUE] Task ${nextTask.id} failed:`, error);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        
 | 
					
 | 
				
			||||||
        this.currentlyProcessingTaskId = null;
 | 
					        this.currentlyProcessingTaskId = null;
 | 
				
			||||||
        
 | 
					
 | 
				
			||||||
        const hasMoreQueued = this.tasks.some(t => t.status === 'queued');
 | 
					        const hasMoreQueued = this.tasks.some((t) => t.status === "queued");
 | 
				
			||||||
        if (hasMoreQueued) {
 | 
					        if (hasMoreQueued) {
 | 
				
			||||||
          console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`);
 | 
					          console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`);
 | 
				
			||||||
          await new Promise((r) => setTimeout(r, this.delayMs));
 | 
					          await new Promise((r) => setTimeout(r, this.delayMs));
 | 
				
			||||||
@ -223,4 +231,4 @@ export function shutdownQueue(): void {
 | 
				
			|||||||
  queue.shutdown();
 | 
					  queue.shutdown();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
export default queue;
 | 
					export default queue;
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user