235 lines
6.4 KiB
TypeScript
235 lines
6.4 KiB
TypeScript
// src/utils/rateLimitedQueue.ts
|
|
|
|
import dotenv from "dotenv";
|
|
|
|
dotenv.config();
|
|
|
|
const RATE_LIMIT_DELAY_MS =
|
|
Number.parseInt(process.env.AI_RATE_LIMIT_DELAY_MS ?? "2000", 10) || 2000;
|
|
|
|
const TASK_TIMEOUT_MS =
|
|
Number.parseInt(process.env.AI_TASK_TIMEOUT_MS ?? "300000", 10) || 300000;
|
|
|
|
export type Task<T = unknown> = () => Promise<T>;
|
|
|
|
interface QueuedTask {
|
|
id: string;
|
|
task: Task;
|
|
addedAt: number;
|
|
status: "queued" | "processing" | "completed" | "failed" | "timedout";
|
|
startedAt?: number;
|
|
completedAt?: number;
|
|
}
|
|
|
|
export interface QueueStatus {
|
|
queueLength: number;
|
|
isProcessing: boolean;
|
|
estimatedWaitTime: number;
|
|
currentPosition?: number;
|
|
taskStatus?: string;
|
|
}
|
|
|
|
class RateLimitedQueue {
|
|
private tasks: QueuedTask[] = [];
|
|
private isProcessing = false;
|
|
private delayMs = RATE_LIMIT_DELAY_MS;
|
|
private taskTimeoutMs = TASK_TIMEOUT_MS;
|
|
private lastProcessedAt = 0;
|
|
private currentlyProcessingTaskId: string | null = null;
|
|
|
|
private cleanupInterval: NodeJS.Timeout;
|
|
private readonly TASK_RETENTION_MS = 300000; // 5 minutes
|
|
|
|
constructor() {
|
|
this.cleanupInterval = setInterval(() => {
|
|
this.cleanupOldTasks();
|
|
}, 30000);
|
|
}
|
|
|
|
private cleanupOldTasks(): void {
|
|
const now = Date.now();
|
|
const initialLength = this.tasks.length;
|
|
|
|
this.tasks = this.tasks.filter((task) => {
|
|
if (task.status === "queued" || task.status === "processing") {
|
|
return true;
|
|
}
|
|
|
|
if (task.completedAt && now - task.completedAt > this.TASK_RETENTION_MS) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
});
|
|
|
|
const cleaned = initialLength - this.tasks.length;
|
|
if (cleaned > 0) {
|
|
console.log(`[QUEUE] Cleaned up ${cleaned} old tasks, ${this.tasks.length} remaining`);
|
|
}
|
|
}
|
|
|
|
public shutdown(): void {
|
|
if (this.cleanupInterval) {
|
|
clearInterval(this.cleanupInterval);
|
|
}
|
|
}
|
|
|
|
add<T>(task: Task<T>, taskId?: string): Promise<T> {
|
|
const id = taskId || this.generateTaskId();
|
|
|
|
return new Promise<T>((resolve, reject) => {
|
|
const queuedTask: QueuedTask = {
|
|
id,
|
|
task: async () => {
|
|
try {
|
|
const result = await task();
|
|
resolve(result);
|
|
return result;
|
|
} catch (err) {
|
|
reject(err);
|
|
throw err;
|
|
}
|
|
},
|
|
addedAt: Date.now(),
|
|
status: "queued",
|
|
};
|
|
|
|
this.tasks.push(queuedTask);
|
|
|
|
// Kick the processor soon.
|
|
setTimeout(() => {
|
|
this.processQueue();
|
|
}, 100);
|
|
});
|
|
}
|
|
|
|
getStatus(taskId?: string): QueueStatus {
|
|
const queuedTasks = this.tasks.filter((t) => t.status === "queued");
|
|
const processingTasks = this.tasks.filter((t) => t.status === "processing");
|
|
const queueLength = queuedTasks.length + processingTasks.length;
|
|
|
|
const now = Date.now();
|
|
|
|
let estimatedWaitTime = 0;
|
|
if (queueLength > 0) {
|
|
if (this.isProcessing && this.lastProcessedAt > 0) {
|
|
const timeSinceLastRequest = now - this.lastProcessedAt;
|
|
const remainingDelay = Math.max(0, this.delayMs * 2 - timeSinceLastRequest);
|
|
estimatedWaitTime = remainingDelay + queuedTasks.length * this.delayMs;
|
|
} else {
|
|
estimatedWaitTime = queueLength * this.delayMs;
|
|
}
|
|
}
|
|
|
|
const status: QueueStatus = {
|
|
queueLength,
|
|
isProcessing: this.isProcessing,
|
|
estimatedWaitTime,
|
|
};
|
|
|
|
if (taskId) {
|
|
const task = this.tasks.find((t) => t.id === taskId);
|
|
|
|
if (task) {
|
|
status.taskStatus = task.status;
|
|
|
|
if (task.status === "processing") {
|
|
status.currentPosition = 1;
|
|
} else if (task.status === "queued") {
|
|
const queuedTasksInOrder = this.tasks
|
|
.filter((t) => t.status === "queued")
|
|
.sort((a, b) => a.addedAt - b.addedAt);
|
|
|
|
const positionInQueue = queuedTasksInOrder.findIndex((t) => t.id === taskId);
|
|
|
|
if (positionInQueue >= 0) {
|
|
const processingOffset = processingTasks.length > 0 ? 1 : 0;
|
|
status.currentPosition = processingOffset + positionInQueue + 1;
|
|
}
|
|
}
|
|
} else {
|
|
const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1];
|
|
if (taskTimestamp) {
|
|
const taskAge = now - parseInt(taskTimestamp);
|
|
status.taskStatus = taskAge < 300000 ? "starting" : "unknown";
|
|
}
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
private async processQueue(): Promise<void> {
|
|
if (this.isProcessing) return;
|
|
|
|
this.isProcessing = true;
|
|
|
|
try {
|
|
while (true) {
|
|
const nextTask = this.tasks
|
|
.filter((t) => t.status === "queued")
|
|
.sort((a, b) => a.addedAt - b.addedAt)[0];
|
|
|
|
if (!nextTask) break; // No more work
|
|
|
|
nextTask.status = "processing";
|
|
nextTask.startedAt = Date.now();
|
|
this.currentlyProcessingTaskId = nextTask.id;
|
|
this.lastProcessedAt = Date.now();
|
|
|
|
try {
|
|
await Promise.race([
|
|
nextTask.task(),
|
|
new Promise((_, reject) =>
|
|
setTimeout(
|
|
() => reject(new Error(`Task ${nextTask.id} timed out after ${this.taskTimeoutMs} ms`)),
|
|
this.taskTimeoutMs,
|
|
),
|
|
),
|
|
]);
|
|
|
|
nextTask.status = "completed";
|
|
nextTask.completedAt = Date.now();
|
|
console.log(`[QUEUE] Task ${nextTask.id} completed`);
|
|
} catch (error) {
|
|
const err = error as Error;
|
|
nextTask.status = err.message.includes("timed out") ? "timedout" : "failed";
|
|
nextTask.completedAt = Date.now();
|
|
console.error(`[QUEUE] Task ${nextTask.id} failed:`, error);
|
|
}
|
|
|
|
this.currentlyProcessingTaskId = null;
|
|
|
|
const hasMoreQueued = this.tasks.some((t) => t.status === "queued");
|
|
if (hasMoreQueued) {
|
|
console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`);
|
|
await new Promise((r) => setTimeout(r, this.delayMs));
|
|
}
|
|
}
|
|
} finally {
|
|
this.isProcessing = false;
|
|
console.log(`[QUEUE] Queue processing finished`);
|
|
}
|
|
}
|
|
|
|
private generateTaskId(): string {
|
|
return `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
|
}
|
|
}
|
|
|
|
const queue = new RateLimitedQueue();
|
|
|
|
export function enqueueApiCall<T>(task: Task<T>, taskId?: string): Promise<T> {
|
|
return queue.add(task, taskId);
|
|
}
|
|
|
|
export function getQueueStatus(taskId?: string): QueueStatus {
|
|
return queue.getStatus(taskId);
|
|
}
|
|
|
|
export function shutdownQueue(): void {
|
|
queue.shutdown();
|
|
}
|
|
|
|
export default queue;
|