Working multithreading w. one job at a time and no memory leaks...

This commit is contained in:
Fabian Seluska 2021-10-14 19:20:09 +02:00
parent 36ab9d81e5
commit b5cc339543
3 changed files with 81 additions and 26 deletions

View File

@ -1,6 +1,25 @@
import { randomUUID } from 'crypto'
import { Worker } from 'worker_threads' import { Worker } from 'worker_threads'
let workerFarm = [] let workerFarm = []
let workerReady = []
function waitForThread() {
return new Promise(
(resolve, reject) => {
let interval = setInterval(
() => {
const readyIndex = workerReady.indexOf(true)
if (readyIndex !== -1) {
clearInterval(interval)
resolve(readyIndex)
}
},
50
)
}
)
}
export function initWorkers(threads) { export function initWorkers(threads) {
for (let i = 0; i < threads; ++i) { for (let i = 0; i < threads; ++i) {
@ -13,27 +32,42 @@ export function initWorkers(threads) {
type: 'initWorker', type: 'initWorker',
name: `Worker ${parseInt(i) + 1}` name: `Worker ${parseInt(i) + 1}`
}) })
workerReady[i] = true
} }
console.log(`Initialised ${threads} workers!`) console.log(`Initialised ${threads} workers!`)
} }
let spreadIndex = 0
export function spread(script, data) { export function spread(script, data) {
return new Promise( return new Promise(
(resolve, reject) => { async (resolve, reject) => {
const worker = workerFarm[spreadIndex] const readyIndex = await waitForThread()
workerReady[readyIndex] = false
const worker = workerFarm[readyIndex]
const jobID = randomUUID()
worker.postMessage({ worker.postMessage({
type: 'runScript', type: 'runScript',
script, script,
data data,
id: jobID
}) })
worker.on('message', result => resolve(result)) const messageHandler = message => {
if (message.id === jobID) {
++spreadIndex workerReady[readyIndex] = true
if(spreadIndex >= workerFarm.length) worker.removeListener(
spreadIndex = 0 'message',
messageHandler
)
resolve(message.result)
}
}
worker.on(
'message',
messageHandler
)
} }
) )
} }

View File

@ -23,7 +23,7 @@ parentPort.on(
import(runScript).then( import(runScript).then(
script => script.main(message.data).then( script => script.main(message.data).then(
result => { result => {
parentPort.postMessage(result) parentPort.postMessage({ result, id: message.id })
process.env.DEBUG && log(`Script done -> «${runScript}»`) process.env.DEBUG && log(`Script done -> «${runScript}»`)
} }
) )

View File

@ -2,23 +2,44 @@ import { initWorkers, spread } from '../libs/utils/multithread.js'
import Finish from '../schemas/Finish-copy.js' import Finish from '../schemas/Finish-copy.js'
import initLog from '../libs/utils/log.js' import initLog from '../libs/utils/log.js'
const log = initLog("sqlite2mongo") const log = initLog("Fib. Test")
initWorkers(6) initWorkers(6)
export default async function() { const jobs = 100
log("Checking for new finishes...") let completed = 0
for (let i = 0; i < jobs; ++i) {
spread(
'./fibonacci.js',
{}
).then(
result => {
++completed
log(`Completed job ${completed}/${jobs} -> ${result}`)
await Finish.deleteMany({}) if (completed === jobs) process.exit(0)
}
)
}
let offset = -1
while (offset < 10000000) {
await spread(
'./db.test.js',
{
offset // export default async function() {
} // log("Checking for new finishes...")
)
offset += 5000 // await Finish.deleteMany({})
}
} // let offset = -1
// while (offset < 10000000) {
// await spread(
// './db.test.js',
// {
// offset
// }
// )
// offset += 5000
// }
// }