0

I have a worker_thread, let's say a "Thread manager" in node 18 app that is spawning, let's call them "sub-workers". They do what they do and when finished execution, they send a message to "Thread manager" and the manager calls worker.terminate() I even recently add worker.unref(). After a sleep interval "Thread manager" spawns another "sub-worker" and the process keeps going until the app is stopped.

My here is my question... Is it normal when i create new "sub-worker" to get threadId n+1. For example: 2,3,4 even if 2 is finished already. I asked chat gpt and I got more confused because he says that is not normal to keep growing...

Here is the code of the "Manager"

import {
  parentPort,
  workerData,
  isMainThread,
  Worker,
  threadId,
  WorkerOptions,
} from "worker_threads";
import { v4 as uuidv4 } from "uuid";
import type { TThreadCommand } from "../WorkerDispatcher";
import SyncerWorker from "../SyncerWorker";
import sleep from "../../../utils/sleep";
import * as jsonConfig from "./workers-config.json";

type TWorkerStatusResponse = {
  threadId: number;
  id: string;
  status: "success" | "error" | "info" | "warning" | "critical" | "stopped" | "started";
};

type TLoadWorker = {
  interval: number;
  name: string;
  class: string;
  path: string;
  "max-close-time": number;
  disabled: boolean;
  id: string;
};

type TWorkerTrackinginfo = {
  id: string;
  threadId: number;
  lastActive: number;
  interval: number;
  workerInstance: SyncerWorker | null;
  startAt: number;
  endAt: number;
  path: string;
  "max-close-time": number;
  disabled: boolean;
};

class ThreadManager<T> {
  public workers: SyncerWorker[] = [];
  public workerInfo: TWorkerTrackinginfo[] = [];
  public terminated: boolean = false;
  public isRunning: boolean = false;

  constructor(start: boolean = false, public sleepInterval = 1000) {
    if (isMainThread) {
      console.log("This is main thread");
    } else {
      console.log("This is worker thread");
    }

    if (workerData.configPath) {
      this.loadWorkers(workerData.configPath);
    } else {
      this.loadWorkers();
    }

    if (start) {
      // load and start workers via execute
      this.isRunning = true;
      this.execute();
    }
  }

  private addWorker(worker: TLoadWorker, workerId: string = "") {
    const newWorkerId = workerId || uuidv4();
    // test for ^. in path
    // worker.path.replace(/\.\//g, ""); in the begining only
    if (worker.disabled) {
      return;
    }

    const workerPath = !workerId ? __dirname + worker.path.replace(/^\.+/, "") : worker.path;

    const workerOptions: WorkerOptions = {
      workerData: {
        interval: worker.interval,
        name: worker.name,
        class: worker.class,
        path: workerPath,
        "max-close-time": worker["max-close-time"],
        id: newWorkerId,
      },
    };

    const syncerWorker = new SyncerWorker(
      __dirname + "/wrap.js",
      workerOptions,
      false,
      worker.interval
    );
    syncerWorker.onDone = (data: TWorkerStatusResponse) => {
      // console.log("Worker done in worker-dispatcher-body.ts via Event");
      // console.log("Data", data);

      // update worker and clear instance of it
      this.updateWorker(data.id);
    };

    syncerWorker.onStop = (data: TWorkerStatusResponse) => {
      console.log("Worker stopped in worker-dispatcher-body.ts via Event");
      // console.log("Data", data);
      const worker = this.workers.find((worker) => worker.id === data.id);
      this.terminateWorker(worker);
      worker?.unref();
    };

    this.workers.push(syncerWorker);
    // push only if it is new if we have workerId it means caller is relaunching worker
    if (!workerId) {
      this.workerInfo.push({
        startAt: Date.now(),
        endAt: 0,
        id: newWorkerId,
        interval: worker.interval,
        lastActive: Date.now(),
        threadId: syncerWorker.threadId,
        workerInstance: syncerWorker,
        path: workerPath,
        "max-close-time": worker["max-close-time"],
        disabled: worker.disabled,
      });
    } else {
      const workerIndex = this.workerInfo.findIndex((w) => w.id === workerId);
      if (workerIndex !== -1) {
        // this.workerInfo[workerIndex].startAt = Date.now();
        this.workerInfo[workerIndex].endAt = 0;
        this.workerInfo[workerIndex].lastActive = Date.now();
        this.workerInfo[workerIndex].threadId = syncerWorker.threadId;
        this.workerInfo[workerIndex].workerInstance = syncerWorker;
      }
    }

    // console.log(this.workerInfo);
  }

  private updateWorker(workerId: string) {
    // find worker and update it
    const w = this.workers.find((worker) => {
      return worker.id === workerId;
    });

    if (w) {
      // remove from workers and find and update internal in tracking info
      this.removeWorker(w);
      // update worker
      const workerIndex = this.workerInfo.findIndex((w) => w.id === workerId);

      if (workerIndex !== -1) {
        this.workerInfo[workerIndex].endAt = Date.now();
        this.workerInfo[workerIndex].workerInstance = null;
      }
    }
  }

  relaunchWorker(id: string) {
    const worker = this.workerInfo.find((w) => w.id === id);
    if (worker) {
      const workerInfo: TLoadWorker = {
        interval: worker.interval,
        name: "Relaunched " + worker.id,
        class: "Relaunched " + worker.id,
        path: worker.path,
        "max-close-time": worker["max-close-time"],
        id: worker.id,
        disabled: worker.disabled,
      };

      this.addWorker(workerInfo, worker.id);
    }
  }

  private removeWorker(worker: SyncerWorker) {
    worker.terminate();
    worker.unref();
    this.workers = this.workers.filter((w) => w.id !== worker.id);
  }

  public terminateAll(immediate: boolean = false) {
    this.workers.forEach((worker) => {
      worker.sendMessage({
        message: "terminate",
        command: immediate ? "immediate-stop" : "stop",
        data: null,
      });
    });
  }

  public terminateWorker(worker: SyncerWorker | undefined) {
    if (!worker) return;

    worker.terminate();
    this.workerInfo = this.workerInfo.filter((w) => w.id !== worker.id);
    worker.unref();

    if (this.workerInfo.length === 0) {
      this.terminated = true;
      parentPort?.postMessage({
        command: "done",
        status: "stopped",
      } as TThreadCommand);
    }
  }

  public terminateWorkerById(id: string) {
    const worker = this.workers.find((w) => w.id === id);
    this.terminateWorker(worker);
  }

  public terminateWorkerByThreadId(threadId: number) {
    const worker = this.workers.find((w) => w.threadId === threadId);
    this.terminateWorker(worker);
  }

  async loadWorkers(configPath: string = "./workers-config.json") {
    // load workers from config
    // import json dynamic here
    const config = await import(configPath);
    const workers: TLoadWorker[] = config.default.workers;
    for (const worker of workers) {
      this.addWorker(worker, worker.id);
    }
  }

  report() {
    console.log("Ping");
    parentPort?.postMessage({
      command: "ping",
    } as TThreadCommand);
  }

  ping(id?: string) {
    if (id) {
      const worker = this.workers.find((w) => w.id === id);
      if (worker) {
        worker.sendMessage({ command: "ping" });
      }
    } else {
      this.workers.forEach((worker) => {
        worker.sendMessage({ command: "ping" });
      });
    }
  }

  public async execute() {
    while (!this.terminated) {
      await sleep(this.sleepInterval);

      // check wich worker is ready to be started again based on stop time and interval of the worker from workerInfo
      const now = Date.now();
      // console.log(this.workerInfo[0]);
      const workersToStart = this.workerInfo.filter((w) => {
        // console.log(
        //   w.endAt,
        //   w.interval,
        //   w.endAt - w.interval * 1000,
        //   now - w.endAt,
        //   w.interval * 1000
        // );

        if (w.workerInstance && w.workerInstance.isRunning) return false;

        if (w.endAt === 0) return false;

        return now - w.endAt > w.interval * 1000;
      });

      // add workers to start
      workersToStart.forEach((worker) => {
        this.relaunchWorker(worker.id);
      });
    }
  }
}

const threadManager = new ThreadManager<TThreadCommand>(
  workerData?.start ?? true,
  workerData?.interval ?? 1000
);

if (parentPort) {
  parentPort.on("message", (p: TThreadCommand<TWorkerStatusResponse>) => {
    // console.log(`Command received in dispatcher body`, p);
    if (p.command === "start") {
      console.log("Start command received in worker dispatcher body");
      !threadManager.isRunning && threadManager.execute();
    }

    if (p.command === "stop") {
      threadManager.terminated = true;
      threadManager.isRunning = false;
      // send command to all workers to stop with wait for completion
      // threadManager.workers.forEach((worker) => {
      //   worker.sendMessage({ message: "terminate", command: "stop", data: null });
      // });
      threadManager.terminateAll();
    }

    if (p.command === "immediate-stop") {
      threadManager.terminated = true;
      threadManager.isRunning = false;
      threadManager.terminateAll(true);
    }

    // // child report done with status stopped it became event
    // if (p.command === "done" && p.status === "stopped") {
    //   console.log("Child worker done with status stopped");
    //   const worker = threadManager.workers.find((worker) => worker.id === p.data.id);
    //   threadManager.terminateWorker(worker);
    // }
  });
}

Here is my terminate() function from the extended class

async terminate(): Promise<number> {
    // console.log("Dispatcher is terminating...");
    this._terminated = true;
    this._isRunning = false;
    // send message to worker to stop (all threads)
    this._sendMessage({ message: "terminate", command: "stop", data: null });
    // wait for all threads to stop or maxCloseTime in seconds
    const startClosing = Date.now();
    while (!this.canClose && Date.now() - startClosing < this.maxCloseTime * 1000) {
      // console.log("Dispatcher trying to close");
      await sleep(1000);
    }
    console.log("Dispatcher is terminated");
    return super.terminate();
  }

I think I maybe have memmory leak. The reason I am thinking that is because it runs for several hours in docker container for example it reaches threadId = 250 or somethimes threadId = 140

I tried to call worker.unerf() after terminate because i read that if it is already terminated it will not have any effect, but still threadId number is growing

Thanks, any help appreciated

New contributor
Svetoslav Trifonov is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Browse other questions tagged or ask your own question.