ai queue repr

This commit is contained in:
overcuriousity
2025-07-26 14:33:51 +02:00
parent d2fdeccce3
commit 69fc97f7a0
5 changed files with 371 additions and 137 deletions

View File

@@ -1,9 +1,6 @@
// src/utils/rateLimitedQueue.ts
// ------------------------------------------------------------
// A tiny FIFO, singleinstance queue that spaces API requests by
// a configurable delay. Import `enqueueApiCall()` wherever you
// call the AI API and the queue will make sure calls are sent
// one after another with the defined pause inbetween.
// Enhanced FIFO queue with status tracking for visual feedback
// ------------------------------------------------------------
import dotenv from "dotenv";
@@ -12,53 +9,113 @@ dotenv.config();
/**
* Delay (in **milliseconds**) between two consecutive API calls.
*
* Configure it in your `.env` file, e.g.
* AI_RATE_LIMIT_DELAY_MS=2000
* Defaults to **1000ms** (≈1 request / second) when not set or invalid.
* Defaults to **2000 ms** (2 seconds) when not set or invalid.
*/
const RATE_LIMIT_DELAY_MS = Number.parseInt(process.env.AI_RATE_LIMIT_DELAY_MS ?? "1000", 10) || 1000;
const RATE_LIMIT_DELAY_MS = Number.parseInt(process.env.AI_RATE_LIMIT_DELAY_MS ?? "2000", 10) || 2000;
/**
* Internal task type. Every task returns a Promise so callers get the
* real API response transparently.
* Internal task type with ID tracking for status updates
*/
export type Task<T = unknown> = () => Promise<T>;
interface QueuedTask {
id: string;
task: Task;
addedAt: number;
}
export interface QueueStatus {
queueLength: number;
isProcessing: boolean;
estimatedWaitTime: number; // in milliseconds
currentPosition?: number; // position of specific request
}
class RateLimitedQueue {
private queue: Task[] = [];
private queue: QueuedTask[] = [];
private processing = false;
private delayMs = RATE_LIMIT_DELAY_MS;
private lastProcessedAt = 0;
/**
* Schedule a task. Returns a Promise that resolves/rejects with the
* task result once the queue reaches it.
* Schedule a task with ID tracking. Returns a Promise that resolves/rejects
* with the task result once the queue reaches it.
*/
add<T>(task: Task<T>): Promise<T> {
add<T>(task: Task<T>, taskId?: string): Promise<T> {
const id = taskId || this.generateTaskId();
return new Promise<T>((resolve, reject) => {
this.queue.push(async () => {
try {
const result = await task();
resolve(result);
} catch (err) {
reject(err);
}
this.queue.push({
id,
task: async () => {
try {
const result = await task();
resolve(result);
} catch (err) {
reject(err);
}
},
addedAt: Date.now()
});
this.process();
});
}
/**
* Change the delay at runtime e.g. if you reload env vars without
* restarting the server.
* Get current queue status for visual feedback
*/
getStatus(taskId?: string): QueueStatus {
const queueLength = this.queue.length;
const now = Date.now();
// Calculate estimated wait time
let estimatedWaitTime = 0;
if (queueLength > 0) {
if (this.processing) {
// Time since last request + remaining delay + queue length * delay
const timeSinceLastRequest = now - this.lastProcessedAt;
const remainingDelay = Math.max(0, this.delayMs - timeSinceLastRequest);
estimatedWaitTime = remainingDelay + (queueLength - 1) * this.delayMs;
} else {
// Queue will start immediately, so just queue length * delay
estimatedWaitTime = queueLength * this.delayMs;
}
}
const status: QueueStatus = {
queueLength,
isProcessing: this.processing,
estimatedWaitTime
};
// Find position of specific task if ID provided
if (taskId) {
const position = this.queue.findIndex(item => item.id === taskId);
if (position >= 0) {
status.currentPosition = position + 1; // 1-based indexing for user display
}
}
return status;
}
/**
* Change the delay at runtime
*/
setDelay(ms: number): void {
if (!Number.isFinite(ms) || ms < 0) return;
this.delayMs = ms;
}
/**
* Get current delay setting
*/
getDelay(): number {
return this.delayMs;
}
// ---------------------------------------
// ️🌐 Internal helpers
// Internal helpers
// ---------------------------------------
private async process(): Promise<void> {
if (this.processing) return;
@@ -67,26 +124,41 @@ class RateLimitedQueue {
while (this.queue.length > 0) {
const next = this.queue.shift();
if (!next) continue;
await next();
// Wait before the next one
await new Promise((r) => setTimeout(r, this.delayMs));
this.lastProcessedAt = Date.now();
await next.task();
// Wait before the next one (only if there are more tasks)
if (this.queue.length > 0) {
await new Promise((r) => setTimeout(r, this.delayMs));
}
}
this.processing = false;
}
private generateTaskId(): string {
return `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
// ------------------------------------------------------------
// Export a **singleton** instance so every import shares the
// same queue. That way the ratelimit is enforced globally.
// Export singleton instance and convenience functions
// ------------------------------------------------------------
const queue = new RateLimitedQueue();
/**
* Helper for convenience: `enqueueApiCall(() => fetch(...))`.
* Helper for convenience: `enqueueApiCall(() => fetch(...), 'optional-id')`.
*/
export function enqueueApiCall<T>(task: Task<T>): Promise<T> {
return queue.add(task);
export function enqueueApiCall<T>(task: Task<T>, taskId?: string): Promise<T> {
return queue.add(task, taskId);
}
export default queue;
/**
* Get current queue status for visual feedback
*/
export function getQueueStatus(taskId?: string): QueueStatus {
return queue.getStatus(taskId);
}
export default queue;