rate limit queue, content
This commit is contained in:
@@ -12,51 +12,67 @@ interface QueuedTask {
|
||||
id: string;
|
||||
task: Task;
|
||||
addedAt: number;
|
||||
status: 'queued' | 'processing' | 'completed' | 'failed';
|
||||
startedAt?: number;
|
||||
completedAt?: number;
|
||||
}
|
||||
|
||||
export interface QueueStatus {
|
||||
queueLength: number;
|
||||
isProcessing: boolean;
|
||||
estimatedWaitTime: number; // in milliseconds
|
||||
currentPosition?: number;
|
||||
estimatedWaitTime: number;
|
||||
currentPosition?: number;
|
||||
taskStatus?: string;
|
||||
}
|
||||
|
||||
class RateLimitedQueue {
|
||||
private queue: QueuedTask[] = [];
|
||||
private processing = false;
|
||||
private tasks: QueuedTask[] = [];
|
||||
private isProcessing = false;
|
||||
private delayMs = RATE_LIMIT_DELAY_MS;
|
||||
private lastProcessedAt = 0;
|
||||
private currentlyProcessingTaskId: string | null = null;
|
||||
|
||||
add<T>(task: Task<T>, taskId?: string): Promise<T> {
|
||||
const id = taskId || this.generateTaskId();
|
||||
|
||||
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
this.queue.push({
|
||||
const queuedTask: QueuedTask = {
|
||||
id,
|
||||
task: async () => {
|
||||
try {
|
||||
const result = await task();
|
||||
resolve(result);
|
||||
return result;
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
throw err;
|
||||
}
|
||||
},
|
||||
addedAt: Date.now()
|
||||
});
|
||||
this.process();
|
||||
addedAt: Date.now(),
|
||||
status: 'queued'
|
||||
};
|
||||
|
||||
this.tasks.push(queuedTask);
|
||||
|
||||
setTimeout(() => {
|
||||
this.processQueue();
|
||||
}, 100);
|
||||
});
|
||||
}
|
||||
|
||||
getStatus(taskId?: string): QueueStatus {
|
||||
const queueLength = this.queue.length;
|
||||
const queuedTasks = this.tasks.filter(t => t.status === 'queued');
|
||||
const processingTasks = this.tasks.filter(t => t.status === 'processing');
|
||||
const queueLength = queuedTasks.length + processingTasks.length;
|
||||
|
||||
const now = Date.now();
|
||||
|
||||
let estimatedWaitTime = 0;
|
||||
if (queueLength > 0) {
|
||||
if (this.processing) {
|
||||
if (this.isProcessing && this.lastProcessedAt > 0) {
|
||||
const timeSinceLastRequest = now - this.lastProcessedAt;
|
||||
const remainingDelay = Math.max(0, this.delayMs - timeSinceLastRequest);
|
||||
estimatedWaitTime = remainingDelay + (queueLength - 1) * this.delayMs;
|
||||
const remainingDelay = Math.max(0, this.delayMs * 2 - timeSinceLastRequest);
|
||||
estimatedWaitTime = remainingDelay + queuedTasks.length * this.delayMs;
|
||||
} else {
|
||||
estimatedWaitTime = queueLength * this.delayMs;
|
||||
}
|
||||
@@ -64,14 +80,41 @@ class RateLimitedQueue {
|
||||
|
||||
const status: QueueStatus = {
|
||||
queueLength,
|
||||
isProcessing: this.processing,
|
||||
isProcessing: this.isProcessing,
|
||||
estimatedWaitTime
|
||||
};
|
||||
|
||||
if (taskId) {
|
||||
const position = this.queue.findIndex(item => item.id === taskId);
|
||||
if (position >= 0) {
|
||||
status.currentPosition = position + 1;
|
||||
const task = this.tasks.find(t => t.id === taskId);
|
||||
|
||||
if (task) {
|
||||
status.taskStatus = task.status;
|
||||
|
||||
if (task.status === 'processing') {
|
||||
status.currentPosition = 1;
|
||||
} else if (task.status === 'queued') {
|
||||
const queuedTasksInOrder = this.tasks
|
||||
.filter(t => t.status === 'queued')
|
||||
.sort((a, b) => a.addedAt - b.addedAt);
|
||||
|
||||
const positionInQueue = queuedTasksInOrder.findIndex(t => t.id === taskId);
|
||||
|
||||
if (positionInQueue >= 0) {
|
||||
const processingOffset = processingTasks.length > 0 ? 1 : 0;
|
||||
status.currentPosition = processingOffset + positionInQueue + 1;
|
||||
}
|
||||
} else if (task.status === 'completed' || task.status === 'failed') {
|
||||
}
|
||||
} else {
|
||||
const taskTimestamp = taskId.match(/ai_(\d+)_/)?.[1];
|
||||
if (taskTimestamp) {
|
||||
const taskAge = now - parseInt(taskTimestamp);
|
||||
if (taskAge < 30000) {
|
||||
status.taskStatus = 'starting';
|
||||
} else {
|
||||
status.taskStatus = 'unknown';
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,23 +130,60 @@ class RateLimitedQueue {
|
||||
return this.delayMs;
|
||||
}
|
||||
|
||||
private async process(): Promise<void> {
|
||||
if (this.processing) return;
|
||||
this.processing = true;
|
||||
|
||||
while (this.queue.length > 0) {
|
||||
const next = this.queue.shift();
|
||||
if (!next) continue;
|
||||
|
||||
this.lastProcessedAt = Date.now();
|
||||
await next.task();
|
||||
|
||||
if (this.queue.length > 0) {
|
||||
await new Promise((r) => setTimeout(r, this.delayMs));
|
||||
}
|
||||
private async processQueue(): Promise<void> {
|
||||
if (this.isProcessing) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.processing = false;
|
||||
this.isProcessing = true;
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const nextTask = this.tasks
|
||||
.filter(t => t.status === 'queued')
|
||||
.sort((a, b) => a.addedAt - b.addedAt)[0];
|
||||
|
||||
if (!nextTask) {
|
||||
break;
|
||||
}
|
||||
|
||||
nextTask.status = 'processing';
|
||||
nextTask.startedAt = Date.now();
|
||||
this.currentlyProcessingTaskId = nextTask.id;
|
||||
this.lastProcessedAt = Date.now();
|
||||
|
||||
|
||||
try {
|
||||
await nextTask.task();
|
||||
nextTask.status = 'completed';
|
||||
nextTask.completedAt = Date.now();
|
||||
console.log(`[QUEUE] Task ${nextTask.id} completed`);
|
||||
} catch (error) {
|
||||
nextTask.status = 'failed';
|
||||
nextTask.completedAt = Date.now();
|
||||
console.error(`[QUEUE] Task ${nextTask.id} failed:`, error);
|
||||
}
|
||||
|
||||
this.currentlyProcessingTaskId = null;
|
||||
|
||||
setTimeout(() => {
|
||||
const index = this.tasks.findIndex(t => t.id === nextTask.id);
|
||||
if (index >= 0) {
|
||||
console.log(`[QUEUE] Removing completed task ${nextTask.id}`);
|
||||
this.tasks.splice(index, 1);
|
||||
}
|
||||
}, 10000);
|
||||
|
||||
const hasMoreQueued = this.tasks.some(t => t.status === 'queued');
|
||||
if (hasMoreQueued) {
|
||||
console.log(`[QUEUE] Waiting ${this.delayMs}ms before next task`);
|
||||
await new Promise((r) => setTimeout(r, this.delayMs));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.isProcessing = false;
|
||||
console.log(`[QUEUE] Queue processing finished`);
|
||||
}
|
||||
}
|
||||
|
||||
private generateTaskId(): string {
|
||||
|
||||
Reference in New Issue
Block a user