diff --git a/libs/utils/multithread.js b/libs/utils/multithread.js index 4c94336..e1a4d5b 100644 --- a/libs/utils/multithread.js +++ b/libs/utils/multithread.js @@ -6,71 +6,79 @@ const log = initLog("Worker Setup") let workerFarm = [] let workerReady = [] +let scheduledJobs = [] -function waitForThread() { - return new Promise( - (resolve, reject) => { - let interval = setInterval( - () => { - const readyIndex = workerReady.indexOf(true) - if (readyIndex !== -1) { - clearInterval(interval) - resolve(readyIndex) - } - }, - 50 +function scheduler() { + if (scheduledJobs.length === 0) return + + const readyIndex = workerReady.indexOf(true) + if (readyIndex === -1) return + + workerReady[readyIndex] = false + const worker = workerFarm[readyIndex] + + const job = scheduledJobs.shift() + + worker.postMessage({ + type: 'runScript', + script: job.script, + data: job.data, + id: job.jobID + }) + + const messageHandler = message => { + if (message.id === job.jobID) { + workerReady[readyIndex] = true + worker.removeListener( + 'message', + messageHandler ) + return job.callback(message.result) } + } + + worker.on( + 'message', + messageHandler ) } export function initWorkers(threads) { for (let i = 0; i < threads; ++i) { workerFarm.push(new Worker('./libs/utils/multithread/genericWorker.js', { env: SHARE_ENV })) - } - for (const i in workerFarm) { const worker = workerFarm[i] worker.postMessage({ type: 'initWorker', name: `Worker ${parseInt(i) + 1}` }) + workerReady[i] = true } log(`Initialised ${threads} workers!`) + + setInterval( + scheduler, + 50 + ) + + log(`Initialised the scheduler!`) } export function spread(script, data) { return new Promise( async (resolve, reject) => { - const readyIndex = await waitForThread() - workerReady[readyIndex] = false - - const worker = workerFarm[readyIndex] const jobID = randomUUID() - worker.postMessage({ - type: 'runScript', + + scheduledJobs.push({ script, data, - id: jobID - }) - - const messageHandler = message => { - if (message.id === jobID) { - workerReady[readyIndex] = true - worker.removeListener( - 'message', - messageHandler - ) - resolve(message.result) + jobID, + callback: result => { + resolve(result) } - } - - worker.on( - 'message', - messageHandler - ) + }) } ) } \ No newline at end of file diff --git a/tests/multihread.test.js b/tests/multihread.test.js index 0511326..e9b1278 100644 --- a/tests/multihread.test.js +++ b/tests/multihread.test.js @@ -5,23 +5,23 @@ import initLog from '../libs/utils/log.js' const log = initLog("Fib. Test") -initWorkers(10) +initWorkers(3) -// const jobs = 100 -// let completed = 0 -// for (let i = 0; i < jobs; ++i) { -// spread( -// './fibonacci.js', -// {} -// ).then( -// result => { -// ++completed -// log(`Completed job ${completed}/${jobs} -> ${result}`) +const jobs = 10 +let completed = 0 +for (let i = 0; i < jobs; ++i) { + spread( + './fibonacci.js', + {} + ).then( + result => { + ++completed + log(`Completed job ${completed}/${jobs} -> ${result}`) -// if (completed === jobs) process.exit(0) -// } -// ) -// } + if (completed === jobs) process.exit(0) + } + ) +}