From b5cc33954336dec0d85a54683b84a76738c9f038 Mon Sep 17 00:00:00 2001 From: Fabian Seluska Date: Thu, 14 Oct 2021 19:20:09 +0200 Subject: [PATCH] Working multithreading w. one job at a time and no memory leaks... --- libs/utils/multithread.js | 54 ++++++++++++++++++++----- libs/utils/multithread/genericWorker.js | 2 +- tests/multihread.test.js | 51 ++++++++++++++++------- 3 files changed, 81 insertions(+), 26 deletions(-) diff --git a/libs/utils/multithread.js b/libs/utils/multithread.js index af28b25..73f9e2c 100644 --- a/libs/utils/multithread.js +++ b/libs/utils/multithread.js @@ -1,6 +1,25 @@ +import { randomUUID } from 'crypto' import { Worker } from 'worker_threads' let workerFarm = [] +let workerReady = [] + +function waitForThread() { + return new Promise( + (resolve, reject) => { + let interval = setInterval( + () => { + const readyIndex = workerReady.indexOf(true) + if (readyIndex !== -1) { + clearInterval(interval) + resolve(readyIndex) + } + }, + 50 + ) + } + ) +} export function initWorkers(threads) { for (let i = 0; i < threads; ++i) { @@ -13,27 +32,42 @@ export function initWorkers(threads) { type: 'initWorker', name: `Worker ${parseInt(i) + 1}` }) + workerReady[i] = true } - + console.log(`Initialised ${threads} workers!`) } -let spreadIndex = 0 export function spread(script, data) { return new Promise( - (resolve, reject) => { - const worker = workerFarm[spreadIndex] + async (resolve, reject) => { + const readyIndex = await waitForThread() + workerReady[readyIndex] = false + + const worker = workerFarm[readyIndex] + const jobID = randomUUID() worker.postMessage({ type: 'runScript', script, - data + data, + id: jobID }) - worker.on('message', result => resolve(result)) - - ++spreadIndex - if(spreadIndex >= workerFarm.length) - spreadIndex = 0 + const messageHandler = message => { + if (message.id === jobID) { + workerReady[readyIndex] = true + worker.removeListener( + 'message', + messageHandler + ) + resolve(message.result) + } + } + + worker.on( + 'message', + messageHandler + ) } ) } \ No newline at end of file diff --git a/libs/utils/multithread/genericWorker.js b/libs/utils/multithread/genericWorker.js index e146719..8534fe2 100644 --- a/libs/utils/multithread/genericWorker.js +++ b/libs/utils/multithread/genericWorker.js @@ -23,7 +23,7 @@ parentPort.on( import(runScript).then( script => script.main(message.data).then( result => { - parentPort.postMessage(result) + parentPort.postMessage({ result, id: message.id }) process.env.DEBUG && log(`Script done -> «${runScript}»`) } ) diff --git a/tests/multihread.test.js b/tests/multihread.test.js index 23bd224..1c8ac8a 100644 --- a/tests/multihread.test.js +++ b/tests/multihread.test.js @@ -2,23 +2,44 @@ import { initWorkers, spread } from '../libs/utils/multithread.js' import Finish from '../schemas/Finish-copy.js' import initLog from '../libs/utils/log.js' -const log = initLog("sqlite2mongo") +const log = initLog("Fib. Test") initWorkers(6) -export default async function() { - log("Checking for new finishes...") +const jobs = 100 +let completed = 0 +for (let i = 0; i < jobs; ++i) { + spread( + './fibonacci.js', + {} + ).then( + result => { + ++completed + log(`Completed job ${completed}/${jobs} -> ${result}`) - await Finish.deleteMany({}) + if (completed === jobs) process.exit(0) + } + ) +} - let offset = -1 - while (offset < 10000000) { - await spread( - './db.test.js', - { - offset - } - ) - offset += 5000 - } -} \ No newline at end of file + + + + + +// export default async function() { +// log("Checking for new finishes...") + +// await Finish.deleteMany({}) + +// let offset = -1 +// while (offset < 10000000) { +// await spread( +// './db.test.js', +// { +// offset +// } +// ) +// offset += 5000 +// } +// } \ No newline at end of file