Now has a job queue.

This commit is contained in:
Fabian Seluska 2021-10-20 10:03:48 +02:00
parent 35a14b4362
commit e760e6bd38
2 changed files with 60 additions and 52 deletions

View File

@ -6,64 +6,34 @@ const log = initLog("Worker Setup")
let workerFarm = [] let workerFarm = []
let workerReady = [] let workerReady = []
let scheduledJobs = []
function scheduler() {
if (scheduledJobs.length === 0) return
function waitForThread() {
return new Promise(
(resolve, reject) => {
let interval = setInterval(
() => {
const readyIndex = workerReady.indexOf(true) const readyIndex = workerReady.indexOf(true)
if (readyIndex !== -1) { if (readyIndex === -1) return
clearInterval(interval)
resolve(readyIndex)
}
},
50
)
}
)
}
export function initWorkers(threads) {
for (let i = 0; i < threads; ++i) {
workerFarm.push(new Worker('./libs/utils/multithread/genericWorker.js', { env: SHARE_ENV }))
}
for (const i in workerFarm) {
const worker = workerFarm[i]
worker.postMessage({
type: 'initWorker',
name: `Worker ${parseInt(i) + 1}`
})
workerReady[i] = true
}
log(`Initialised ${threads} workers!`)
}
export function spread(script, data) {
return new Promise(
async (resolve, reject) => {
const readyIndex = await waitForThread()
workerReady[readyIndex] = false workerReady[readyIndex] = false
const worker = workerFarm[readyIndex] const worker = workerFarm[readyIndex]
const jobID = randomUUID()
const job = scheduledJobs.shift()
worker.postMessage({ worker.postMessage({
type: 'runScript', type: 'runScript',
script, script: job.script,
data, data: job.data,
id: jobID id: job.jobID
}) })
const messageHandler = message => { const messageHandler = message => {
if (message.id === jobID) { if (message.id === job.jobID) {
workerReady[readyIndex] = true workerReady[readyIndex] = true
worker.removeListener( worker.removeListener(
'message', 'message',
messageHandler messageHandler
) )
resolve(message.result) return job.callback(message.result)
} }
} }
@ -71,6 +41,44 @@ export function spread(script, data) {
'message', 'message',
messageHandler messageHandler
) )
}
export function initWorkers(threads) {
for (let i = 0; i < threads; ++i) {
workerFarm.push(new Worker('./libs/utils/multithread/genericWorker.js', { env: SHARE_ENV }))
const worker = workerFarm[i]
worker.postMessage({
type: 'initWorker',
name: `Worker ${parseInt(i) + 1}`
})
workerReady[i] = true
}
log(`Initialised ${threads} workers!`)
setInterval(
scheduler,
50
)
log(`Initialised the scheduler!`)
}
export function spread(script, data) {
return new Promise(
async (resolve, reject) => {
const jobID = randomUUID()
scheduledJobs.push({
script,
data,
jobID,
callback: result => {
resolve(result)
}
})
} }
) )
} }

View File

@ -5,23 +5,23 @@ import initLog from '../libs/utils/log.js'
const log = initLog("Fib. Test") const log = initLog("Fib. Test")
initWorkers(10) initWorkers(3)
// const jobs = 100 const jobs = 10
// let completed = 0 let completed = 0
// for (let i = 0; i < jobs; ++i) { for (let i = 0; i < jobs; ++i) {
// spread( spread(
// './fibonacci.js', './fibonacci.js',
// {} {}
// ).then( ).then(
// result => { result => {
// ++completed ++completed
// log(`Completed job ${completed}/${jobs} -> ${result}`) log(`Completed job ${completed}/${jobs} -> ${result}`)
// if (completed === jobs) process.exit(0) if (completed === jobs) process.exit(0)
// } }
// ) )
// } }