Multithreaded postTasks.js

This commit is contained in:
BurnyLlama 2021-10-16 19:11:16 +02:00
parent b5cc339543
commit 505e9a5399
12 changed files with 203 additions and 106 deletions

View File

@ -10,7 +10,7 @@ import sqlite2mongo from './libs/database/sqlite2mongo.js'
import databaseInit from './libs/database/init.js' import databaseInit from './libs/database/init.js'
import initLog from './libs/utils/log.js' import initLog from './libs/utils/log.js'
import api from './api/api.js' import api from './api/api.js'
import multihreadTest from './tests/multihread.test.js' import { initWorkers } from './libs/utils/multithread.js'
const log = initLog("[ MAIN ]") const log = initLog("[ MAIN ]")
@ -23,9 +23,9 @@ Server.use('/api', api)
await databaseInit() await databaseInit()
initWorkers(process.env.THREADS ?? 4)
if (process.env.LOAD_DB === "true") if (process.env.LOAD_DB === "true")
await sqlite2mongo() await sqlite2mongo()
multihreadTest() Server.listen(process.env.PORT ?? 12345, () => log(`Server started and listening on port ${process.env.PORT}.`))
Server.listen(process.env.PORT, () => log(`Server started and listening on port ${process.env.PORT}.`))

View File

@ -2,6 +2,7 @@ import Level from '../../../schemas/Level.js'
import Player from '../../../schemas/Player.js' import Player from '../../../schemas/Player.js'
import initLog from '../../utils/log.js' import initLog from '../../utils/log.js'
import decodeMsgpack from './decodeMsgpack.js' import decodeMsgpack from './decodeMsgpack.js'
import { spread } from '../../utils/multithread.js'
const log = initLog("sqlite2mongo") const log = initLog("sqlite2mongo")
@ -16,24 +17,35 @@ export default async function postTasks() {
const cache = decodeMsgpack() const cache = decodeMsgpack()
log("Adding total amounts of finishes to maps...") log("Adding total amounts of finishes to maps...")
const totalMaps = await Level.find({}).count()
let processedMaps = 0
for (const category in cache.maps) { for (const category in cache.maps) {
for (const i in cache.maps[category]) { for (const map of cache.maps[category]) {
const map = cache.maps[category][i]
Level.findOneAndUpdate( Level.findOneAndUpdate(
{ name: map[0] }, { name: map[0] },
{ totalFinishes: map[2] } { totalFinishes: map[2] }
).then( ).then(
() => log(`Processed ${category} map ${i + 1}/${cache.maps[category].length} -> «${map[0]}» with ${map[2]} finishes!`) () => {
++processedMaps
log(`Processed map ${processedMaps}/${totalMaps} -> «${map[0]}» [${category}] with ${map[2]} finishes!`)
}
) )
} }
} }
log("Reseting weekly and monthly points...")
await Player.updateMany({}, { pointsThisWeek: 0, pointsThisMonth: 0 })
log("Done!")
log("Processing points for players...") log("Processing points for players...")
let processedPlayerPoints = 0 let processedPlayerPoints = 0
for (const entry of cache.pointsRanks) { for (const entry of cache.pointsRanks) {
Player.findOneAndUpdate( spread(
{ name: entry[0] }, './playerPoints.js',
{ points: entry[1] } {
name: entry[0],
points: entry[1]
}
).then( ).then(
() => { () => {
++processedPlayerPoints ++processedPlayerPoints
@ -45,9 +57,12 @@ export default async function postTasks() {
log("Processing rank points for players...") log("Processing rank points for players...")
let processedPlayerRankPoints = 0 let processedPlayerRankPoints = 0
for (const entry of cache.rankPoints) { for (const entry of cache.rankPoints) {
Player.findOneAndUpdate( spread(
{ name: entry[0] }, './playerRankPoints.js',
{ rankPoints: entry[1] } {
name: entry[0],
rankPoints: [1]
}
).then( ).then(
() => { () => {
++processedPlayerRankPoints ++processedPlayerRankPoints
@ -59,9 +74,12 @@ export default async function postTasks() {
log("Processing team points for players...") log("Processing team points for players...")
let processedTeamPoints = 0 let processedTeamPoints = 0
for (const entry in cache.teamRankPoints) { for (const entry in cache.teamRankPoints) {
Player.findOneAndUpdate( spread(
{ name: entry[0] }, './playerTeamPoints.js',
{ teamPoints: entry[1] } {
name: entry[0],
teamPoints: entry[1]
}
).then( ).then(
() => { () => {
++processedTeamPoints ++processedTeamPoints
@ -73,9 +91,12 @@ export default async function postTasks() {
log("Processing players' points for the last week...") log("Processing players' points for the last week...")
let processedPlayerPointsWeek = 0 let processedPlayerPointsWeek = 0
for (const entry of cache.pointsThisWeek) { for (const entry of cache.pointsThisWeek) {
Player.findOneAndUpdate( spread(
{ name: entry[0] }, './playerWeekPoints.js',
{ pointsThisWeek: entry[1] } {
name: entry[0],
pointsThisWeek: entry[1]
}
).then( ).then(
() => { () => {
++processedPlayerPointsWeek ++processedPlayerPointsWeek
@ -87,9 +108,12 @@ export default async function postTasks() {
log("Processing players' points for the last month...") log("Processing players' points for the last month...")
let processedPlayerPointsMonth = 0 let processedPlayerPointsMonth = 0
for (const entry of cache.pointsThisMonth) { for (const entry of cache.pointsThisMonth) {
Player.findOneAndUpdate( spread(
{ name: entry[0] }, './playerMonthPoints.js',
{ pointsThisWeek: entry[1] } {
name: entry[0],
pointsThisMonth: entry[1]
}
).then( ).then(
() => { () => {
++processedPlayerPointsMonth ++processedPlayerPointsMonth

View File

@ -1,5 +1,8 @@
import { randomUUID } from 'crypto' import { randomUUID } from 'crypto'
import { Worker } from 'worker_threads' import { Worker } from 'worker_threads'
import initLog from './log.js'
const log = initLog("Worker Setup")
let workerFarm = [] let workerFarm = []
let workerReady = [] let workerReady = []
@ -35,7 +38,7 @@ export function initWorkers(threads) {
workerReady[i] = true workerReady[i] = true
} }
console.log(`Initialised ${threads} workers!`) log(`Initialised ${threads} workers!`)
} }
export function spread(script, data) { export function spread(script, data) {

View File

@ -1,31 +0,0 @@
import Finish from '../../../schemas/Finish-copy.js'
import initLog from '../log.js'
import init from '../../database/init.js'
import { sqlite } from '../../database/init.js'
import dotenv from 'dotenv'
dotenv.config()
init()
const log = initLog('DB TEST')
export function main(data) {
return new Promise(
async (resolve, reject) => {
const rows = await sqlite.all(`SELECT * FROM race ORDER BY Timestamp LIMIT 5000 OFFSET ${data.offset + 1}`)
let finishes = []
rows.map(
finish => finishes.push({
map: finish.Map,
time: finish.Time,
date: finish.Timestamp === '0000-00-00 00:00:00' ? new Date('January 1, 1970 00:00:00 UTC') : new Date(`${finish.Timestamp}+00:00`),
serverLocation: finish.Server ?? '',
player: finish.Name
})
)
await Finish.insertMany(finishes)
log('Inserted!')
resolve()
}
)
}

View File

@ -0,0 +1,19 @@
import Player from '../../../schemas/Player.js'
import init from '../../database/init.js'
import { config } from 'dotenv'
config()
init()
export function main(data) {
return new Promise(
(resolve, reject) => {
Player.findOneAndUpdate(
{ name: data.name },
{ rankPoints: data.pointsThisMonth }
).then(
() => resolve()
)
}
)
}

View File

@ -0,0 +1,19 @@
import Player from '../../../schemas/Player.js'
import init from '../../database/init.js'
import { config } from 'dotenv'
config()
init()
export function main(data) {
return new Promise(
(resolve, reject) => {
Player.findOneAndUpdate(
{ name: data.name },
{ points: data.points }
).then(
() => resolve()
)
}
)
}

View File

@ -0,0 +1,19 @@
import Player from '../../../schemas/Player.js'
import init from '../../database/init.js'
import { config } from 'dotenv'
config()
init()
export function main(data) {
return new Promise(
(resolve, reject) => {
Player.findOneAndUpdate(
{ name: data.name },
{ rankPoints: data.rankPoints }
).then(
() => resolve()
)
}
)
}

View File

@ -0,0 +1,19 @@
import Player from '../../../schemas/Player.js'
import init from '../../database/init.js'
import { config } from 'dotenv'
config()
init()
export function main(data) {
return new Promise(
(resolve, reject) => {
Player.findOneAndUpdate(
{ name: data.name },
{ teamPoints: data.teamPoints }
).then(
() => resolve()
)
}
)
}

View File

@ -0,0 +1,19 @@
import Player from '../../../schemas/Player.js'
import init from '../../database/init.js'
import { config } from 'dotenv'
config()
init()
export function main(data) {
return new Promise(
(resolve, reject) => {
Player.findOneAndUpdate(
{ name: data.name },
{ pointsThisWeek: data.pointsThisWeek }
).then(
() => resolve()
)
}
)
}

View File

@ -20,8 +20,9 @@
"@msgpack/msgpack": "^2.7.1", "@msgpack/msgpack": "^2.7.1",
"dotenv": "^10.0.0", "dotenv": "^10.0.0",
"express": "^4.17.1", "express": "^4.17.1",
"mongoose": "^6.0.7", "mongoose": "^6.0.11",
"sqlite": "^4.0.23", "sqlite": "^4.0.23",
"sqlite3": "^4.2.0" "sqlite3": "^4.2.0"
} },
"nodeVersion": "system"
} }

View File

@ -1,17 +0,0 @@
import mongoose from 'mongoose'
const Finish = new mongoose.Schema({
map: String,
time: Number,
date: Date,
serverLocation: String,
player: String
})
Finish.index({ player: 1, map: 1 })
/**
* This cotains the mongoose 'Finish' model.
* @module schemas/Finish
*/
export default mongoose.model("Finish-Copy", Finish)

View File

@ -1,45 +1,67 @@
import { initWorkers, spread } from '../libs/utils/multithread.js' import { initWorkers, spread } from '../libs/utils/multithread.js'
import Finish from '../schemas/Finish-copy.js' import Player from '../schemas/Player.js'
import decodeMsgpack from '../libs/database/sqlite2mongo/decodeMsgpack.js'
import initLog from '../libs/utils/log.js' import initLog from '../libs/utils/log.js'
const log = initLog("Fib. Test") const log = initLog("Fib. Test")
initWorkers(6) initWorkers(10)
// const jobs = 100
// let completed = 0
// for (let i = 0; i < jobs; ++i) {
// spread(
// './fibonacci.js',
// {}
// ).then(
// result => {
// ++completed
// log(`Completed job ${completed}/${jobs} -> ${result}`)
// if (completed === jobs) process.exit(0)
// }
// )
// }
export default async function() {
const cache = decodeMsgpack()
await Player.updateMany(
{},
{
pointsThisWeek: 0,
pointsThisMonth: 0
}
)
let processedPlayers = 0
const totalPlayers = cache.pointsRanks.length
const rawDbFunc = (name, points) => new Promise((resolve, reject) => Player.findOneAndUpdate({ name: name }, { points: points }).then(() => resolve()))
for (const entry of cache.pointsRanks) {
let data = new SharedArrayBuffer(1024 * 10)
// let data = new Array(buffer)
data[0] = 0
data[1] = entry[0]
data[2] = entry[1]
const jobs = 100
let completed = 0
for (let i = 0; i < jobs; ++i) {
spread( spread(
'./fibonacci.js', './db.test.js',
{} data
).then( ).then(
result => { result => {
++completed console.log(data[0])
log(`Completed job ${completed}/${jobs} -> ${result}`) ++processedPlayers
log(`Process player ${processedPlayers}/${totalPlayers} -> «${entry[0]}» with ${entry[1]} points!`)
if (completed === jobs) process.exit(0) if (processedPlayers === totalPlayers)
process.exit(0)
} }
) )
} }
}
// export default async function() {
// log("Checking for new finishes...")
// await Finish.deleteMany({})
// let offset = -1
// while (offset < 10000000) {
// await spread(
// './db.test.js',
// {
// offset
// }
// )
// offset += 5000
// }
// }