From 4a1acc9544719238d777351fe566f659b873b762 Mon Sep 17 00:00:00 2001 From: karishmas6 Date: Wed, 12 Jun 2024 20:09:01 +0530 Subject: [PATCH] feat: concurrency --- mx-interpreter/utils/concurrency.ts | 59 +++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 mx-interpreter/utils/concurrency.ts diff --git a/mx-interpreter/utils/concurrency.ts b/mx-interpreter/utils/concurrency.ts new file mode 100644 index 00000000..78ebdf39 --- /dev/null +++ b/mx-interpreter/utils/concurrency.ts @@ -0,0 +1,59 @@ + +export default class Concurrency { + + maxConcurrency : number = 1; + + + activeWorkers : number = 0; + + + private jobQueue : Function[] = []; + + + private waiting : Function[] = []; + + + constructor(maxConcurrency: number) { + this.maxConcurrency = maxConcurrency; + } + + + private runNextJob() : void { + const job = this.jobQueue.pop(); + + if (job) { + // console.debug("Running a job..."); + job().then(() => { + // console.debug("Job finished, running the next waiting job..."); + this.runNextJob(); + }); + } else { + // console.debug("No waiting job found!"); + this.activeWorkers -= 1; + if (this.activeWorkers === 0) { + // console.debug("This concurrency manager is idle!"); + this.waiting.forEach((x) => x()); + } + } + } + + + addJob(job: () => Promise) : void { + // console.debug("Adding a worker!"); + this.jobQueue.push(job); + + if (!this.maxConcurrency || this.activeWorkers < this.maxConcurrency) { + this.runNextJob(); + this.activeWorkers += 1; + } else { + // console.debug("No capacity to run a worker now, waiting!"); + } + } + + + waitForCompletion() : Promise { + return new Promise((res) => { + this.waiting.push(res); + }); + } + } \ No newline at end of file