forensic-pathways/src/utils/rateLimitedQueue.ts
2025-08-02 12:57:38 +02:00

236 lines
6.2 KiB
TypeScript

// src/utils/rateLimitedQueue.ts
import { forensicConfig } from './forensicConfig.js';
import dotenv from "dotenv";
dotenv.config();
const RATE_LIMIT_DELAY_MS = forensicConfig.getThresholds().rateLimitDelayMs;
export type Task<T = unknown> = () => Promise<T>;
interface QueuedTask {
id: string;
task: Task;
addedAt: number;
status: 'queued' | 'processing' | 'completed' | 'failed';
startedAt?: number;
completedAt?: number;
}
export interface QueueStatus {
queueLength: number;
isProcessing: boolean;
estimatedWaitTime: number;
currentPosition?: number;
taskStatus?: string;
}
class RateLimitedQueue {
private tasks: QueuedTask[] = [];
private isProcessing = false;
private delayMs = RATE_LIMIT_DELAY_MS;
private lastProcessedAt = 0;
private currentlyProcessingTaskId: string | null = null;
private cleanupInterval: NodeJS.Timeout;
private readonly TASK_RETENTION_MS = 30000;
constructor() {
this.cleanupInterval = setInterval(() => {
this.cleanupOldTasks();
}, 30000);
}
private cleanupOldTasks(): void {
const now = Date.now();
const initialLength = this.tasks.length;
this.tasks = this.tasks.filter(task => {
if (task.status === 'queued' || task.status === 'processing') {
return true;
}
if (task.completedAt && (now - task.completedAt) > this.TASK_RETENTION_MS) {
return false;
}
return true;
});
const cleaned = initialLength - this.tasks.length;
if (cleaned > 0) {
console.log(`[QUEUE] Cleaned up ${cleaned} old tasks, ${this.tasks.length} remaining`);
}
}
public shutdown(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
}
}
add<T>(task: Task<T>, taskId?: string): Promise<T> {
const id = taskId || this.generateTaskId();
return new Promise<T>((resolve, reject) => {
const queuedTask: QueuedTask = {
id,
task: async () => {
try {
const result = await task();
resolve(result);
return result;
} catch (err) {
reject(err);
throw err;
}
},
addedAt: Date.now(),
status: 'queued'
};
this.tasks.push(queuedTask);
setTimeout(() => {
this.processQueue();
}, 100);
});
}
getStatus(taskId?: string): QueueStatus {
const queuedTasks = this.tasks.filter(t => t.status === 'queued');
const processingTasks = this.tasks.filter(t => t.status === 'processing');
const queueLength = queuedTasks.length + processingTasks.length;
const now = Date.now();
let estimatedWaitTime = 0;
if (queueLength > 0) {
if (this.isProcessing && this.lastProcessedAt > 0) {
const timeSinceLastRequest = now - this.lastProcessedAt;
const remainingDelay = Math.max(0, this.delayMs * 2 - timeSinceLastRequest);
estimatedWaitTime = remainingDelay + queuedTasks.length * this.delayMs;
} else {
estimatedWaitTime = queueLength * this.delayMs;
}
}
const status: QueueStatus = {
queueLength,
isProcessing: this.isProcessing,
estimatedWaitTime
};
if (taskId) {
const task = this.tasks.find(t => t.id === taskId);
if (task) {
status.taskStatus = task.status;
if (task.status === 'processing') {
status.currentPosition = 1;
} else if (task.status === 'queued') {
const queuedTasksInOrder = this.tasks
.filter(t => t.status === 'queued')
.sort((a, b) => a.addedAt - b.addedAt);
const positionInQueue = queuedTasksInOrder.findIndex(t => t.id === taskId);
if (positionInQueue >= 0) {
const processingOffset = processingTasks.length > 0 ? 1 : 0;
status.currentPosition = processingOffset + positionInQueue + 1;
}
}
} else {
const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1];
if (taskTimestamp) {
const taskAge = now - parseInt(taskTimestamp);
if (taskAge < 30000) {
status.taskStatus = 'starting';
} else {
status.taskStatus = 'unknown';
}
}
}
}
return status;
}
setDelay(ms: number): void {
if (!Number.isFinite(ms) || ms < 0) return;
this.delayMs = ms;
}
getDelay(): number {
return this.delayMs;
}
private async processQueue(): Promise<void> {
if (this.isProcessing) {
return;
}
this.isProcessing = true;
try {
while (true) {
const nextTask = this.tasks
.filter(t => t.status === 'queued')
.sort((a, b) => a.addedAt - b.addedAt)[0];
if (!nextTask) {
break;
}
nextTask.status = 'processing';
nextTask.startedAt = Date.now();
this.currentlyProcessingTaskId = nextTask.id;
this.lastProcessedAt = Date.now();
try {
await nextTask.task();
nextTask.status = 'completed';
nextTask.completedAt = Date.now();
console.log(`[QUEUE] Task ${nextTask.id} completed`);
} catch (error) {
nextTask.status = 'failed';
nextTask.completedAt = Date.now();
console.error(`[QUEUE] Task ${nextTask.id} failed:`, error);
}
this.currentlyProcessingTaskId = null;
const hasMoreQueued = this.tasks.some(t => t.status === 'queued');
if (hasMoreQueued) {
console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`);
await new Promise((r) => setTimeout(r, this.delayMs));
}
}
} finally {
this.isProcessing = false;
console.log(`[QUEUE] Queue processing finished`);
}
}
private generateTaskId(): string {
return `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
const queue = new RateLimitedQueue();
export function enqueueApiCall<T>(task: Task<T>, taskId?: string): Promise<T> {
return queue.add(task, taskId);
}
export function getQueueStatus(taskId?: string): QueueStatus {
return queue.getStatus(taskId);
}
export function shutdownQueue(): void {
queue.shutdown();
}
export default queue;