🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
129 lines
3.0 KiB
TypeScript
Executable File
129 lines
3.0 KiB
TypeScript
Executable File
import { AsyncResource } from 'async_hooks';
|
|
import { Worker } from 'worker_threads';
|
|
import { cpus } from 'os';
|
|
import { EventEmitter } from 'events';
|
|
|
|
import serializeJavascript from 'serialize-javascript';
|
|
|
|
import { freeWorker, taskInfo, workerPoolWorkerFlag } from './constants';
|
|
|
|
import type {
|
|
WorkerCallback,
|
|
WorkerContext,
|
|
WorkerOutput,
|
|
WorkerPoolOptions,
|
|
WorkerPoolTask,
|
|
WorkerWithTaskInfo
|
|
} from './type';
|
|
|
|
class WorkerPoolTaskInfo extends AsyncResource {
|
|
constructor(private callback: WorkerCallback) {
|
|
super('WorkerPoolTaskInfo');
|
|
}
|
|
|
|
done(err: Error | null, result: any) {
|
|
this.runInAsyncScope(this.callback, null, err, result);
|
|
this.emitDestroy();
|
|
}
|
|
}
|
|
|
|
export class WorkerPool extends EventEmitter {
|
|
protected maxInstances: number;
|
|
|
|
protected filePath: string;
|
|
|
|
protected tasks: WorkerPoolTask[] = [];
|
|
|
|
protected workers: WorkerWithTaskInfo[] = [];
|
|
protected freeWorkers: WorkerWithTaskInfo[] = [];
|
|
|
|
constructor(options: WorkerPoolOptions) {
|
|
super();
|
|
|
|
this.maxInstances = options.maxWorkers || cpus().length;
|
|
this.filePath = options.filePath;
|
|
|
|
this.on(freeWorker, () => {
|
|
if (this.tasks.length > 0) {
|
|
const { context, cb } = this.tasks.shift()!;
|
|
this.runTask(context, cb);
|
|
}
|
|
});
|
|
}
|
|
|
|
get numWorkers(): number {
|
|
return this.workers.length;
|
|
}
|
|
|
|
addAsync(context: WorkerContext): Promise<WorkerOutput> {
|
|
return new Promise((resolve, reject) => {
|
|
this.runTask(context, (err, output) => {
|
|
if (err) {
|
|
reject(err);
|
|
return;
|
|
}
|
|
|
|
if (!output) {
|
|
reject(new Error('The output is empty'));
|
|
return;
|
|
}
|
|
|
|
resolve(output);
|
|
});
|
|
});
|
|
}
|
|
|
|
close() {
|
|
for (let i = 0; i < this.workers.length; i++) {
|
|
const worker = this.workers[i];
|
|
worker.terminate();
|
|
}
|
|
}
|
|
|
|
private addNewWorker() {
|
|
const worker: WorkerWithTaskInfo = new Worker(this.filePath, {
|
|
workerData: workerPoolWorkerFlag
|
|
});
|
|
|
|
worker.on('message', (result) => {
|
|
worker[taskInfo]?.done(null, result);
|
|
worker[taskInfo] = null;
|
|
this.freeWorkers.push(worker);
|
|
this.emit(freeWorker);
|
|
});
|
|
|
|
worker.on('error', (err) => {
|
|
if (worker[taskInfo]) {
|
|
worker[taskInfo].done(err, null);
|
|
} else {
|
|
this.emit('error', err);
|
|
}
|
|
this.workers.splice(this.workers.indexOf(worker), 1);
|
|
this.addNewWorker();
|
|
});
|
|
|
|
this.workers.push(worker);
|
|
this.freeWorkers.push(worker);
|
|
this.emit(freeWorker);
|
|
}
|
|
|
|
private runTask(context: WorkerContext, cb: WorkerCallback) {
|
|
if (this.freeWorkers.length === 0) {
|
|
this.tasks.push({ context, cb });
|
|
if (this.numWorkers < this.maxInstances) {
|
|
this.addNewWorker();
|
|
}
|
|
return;
|
|
}
|
|
|
|
const worker = this.freeWorkers.pop();
|
|
if (worker) {
|
|
worker[taskInfo] = new WorkerPoolTaskInfo(cb);
|
|
worker.postMessage({
|
|
code: context.code,
|
|
options: serializeJavascript(context.options)
|
|
});
|
|
}
|
|
}
|
|
}
|