From 505e9a5399d2fc37b667b6496932ffea324d90b4 Mon Sep 17 00:00:00 2001 From: BurnyLlama Date: Sat, 16 Oct 2021 19:11:16 +0200 Subject: [PATCH] Multithreaded postTasks.js --- index.js | 8 +- libs/database/sqlite2mongo/postTasks.js | 60 +++++++++----- libs/utils/multithread.js | 5 +- libs/utils/multithread/db.test.js | 31 -------- libs/utils/multithread/playerMonthPoints.js | 19 +++++ libs/utils/multithread/playerPoints.js | 19 +++++ libs/utils/multithread/playerRankPoints.js | 19 +++++ libs/utils/multithread/playerTeamPoints.js | 19 +++++ libs/utils/multithread/playerWeekPoints.js | 19 +++++ package.json | 5 +- schemas/Finish-copy.js | 17 ---- tests/multihread.test.js | 88 +++++++++++++-------- 12 files changed, 203 insertions(+), 106 deletions(-) delete mode 100644 libs/utils/multithread/db.test.js create mode 100644 libs/utils/multithread/playerMonthPoints.js create mode 100644 libs/utils/multithread/playerPoints.js create mode 100644 libs/utils/multithread/playerRankPoints.js create mode 100644 libs/utils/multithread/playerTeamPoints.js create mode 100644 libs/utils/multithread/playerWeekPoints.js delete mode 100644 schemas/Finish-copy.js diff --git a/index.js b/index.js index c9002b2..8565df9 100644 --- a/index.js +++ b/index.js @@ -10,7 +10,7 @@ import sqlite2mongo from './libs/database/sqlite2mongo.js' import databaseInit from './libs/database/init.js' import initLog from './libs/utils/log.js' import api from './api/api.js' -import multihreadTest from './tests/multihread.test.js' +import { initWorkers } from './libs/utils/multithread.js' const log = initLog("[ MAIN ]") @@ -23,9 +23,9 @@ Server.use('/api', api) await databaseInit() +initWorkers(process.env.THREADS ?? 4) + if (process.env.LOAD_DB === "true") await sqlite2mongo() -multihreadTest() - -Server.listen(process.env.PORT, () => log(`Server started and listening on port ${process.env.PORT}.`)) \ No newline at end of file +Server.listen(process.env.PORT ?? 12345, () => log(`Server started and listening on port ${process.env.PORT}.`)) \ No newline at end of file diff --git a/libs/database/sqlite2mongo/postTasks.js b/libs/database/sqlite2mongo/postTasks.js index b97faef..0162fdf 100644 --- a/libs/database/sqlite2mongo/postTasks.js +++ b/libs/database/sqlite2mongo/postTasks.js @@ -2,6 +2,7 @@ import Level from '../../../schemas/Level.js' import Player from '../../../schemas/Player.js' import initLog from '../../utils/log.js' import decodeMsgpack from './decodeMsgpack.js' +import { spread } from '../../utils/multithread.js' const log = initLog("sqlite2mongo") @@ -16,24 +17,35 @@ export default async function postTasks() { const cache = decodeMsgpack() log("Adding total amounts of finishes to maps...") + const totalMaps = await Level.find({}).count() + let processedMaps = 0 for (const category in cache.maps) { - for (const i in cache.maps[category]) { - const map = cache.maps[category][i] + for (const map of cache.maps[category]) { Level.findOneAndUpdate( { name: map[0] }, { totalFinishes: map[2] } ).then( - () => log(`Processed ${category} map ${i + 1}/${cache.maps[category].length} -> «${map[0]}» with ${map[2]} finishes!`) + () => { + ++processedMaps + log(`Processed map ${processedMaps}/${totalMaps} -> «${map[0]}» [${category}] with ${map[2]} finishes!`) + } ) } } + log("Reseting weekly and monthly points...") + await Player.updateMany({}, { pointsThisWeek: 0, pointsThisMonth: 0 }) + log("Done!") + log("Processing points for players...") let processedPlayerPoints = 0 for (const entry of cache.pointsRanks) { - Player.findOneAndUpdate( - { name: entry[0] }, - { points: entry[1] } + spread( + './playerPoints.js', + { + name: entry[0], + points: entry[1] + } ).then( () => { ++processedPlayerPoints @@ -45,9 +57,12 @@ export default async function postTasks() { log("Processing rank points for players...") let processedPlayerRankPoints = 0 for (const entry of cache.rankPoints) { - Player.findOneAndUpdate( - { name: entry[0] }, - { rankPoints: entry[1] } + spread( + './playerRankPoints.js', + { + name: entry[0], + rankPoints: [1] + } ).then( () => { ++processedPlayerRankPoints @@ -59,9 +74,12 @@ export default async function postTasks() { log("Processing team points for players...") let processedTeamPoints = 0 for (const entry in cache.teamRankPoints) { - Player.findOneAndUpdate( - { name: entry[0] }, - { teamPoints: entry[1] } + spread( + './playerTeamPoints.js', + { + name: entry[0], + teamPoints: entry[1] + } ).then( () => { ++processedTeamPoints @@ -73,9 +91,12 @@ export default async function postTasks() { log("Processing players' points for the last week...") let processedPlayerPointsWeek = 0 for (const entry of cache.pointsThisWeek) { - Player.findOneAndUpdate( - { name: entry[0] }, - { pointsThisWeek: entry[1] } + spread( + './playerWeekPoints.js', + { + name: entry[0], + pointsThisWeek: entry[1] + } ).then( () => { ++processedPlayerPointsWeek @@ -87,9 +108,12 @@ export default async function postTasks() { log("Processing players' points for the last month...") let processedPlayerPointsMonth = 0 for (const entry of cache.pointsThisMonth) { - Player.findOneAndUpdate( - { name: entry[0] }, - { pointsThisWeek: entry[1] } + spread( + './playerMonthPoints.js', + { + name: entry[0], + pointsThisMonth: entry[1] + } ).then( () => { ++processedPlayerPointsMonth diff --git a/libs/utils/multithread.js b/libs/utils/multithread.js index 73f9e2c..1d1de6c 100644 --- a/libs/utils/multithread.js +++ b/libs/utils/multithread.js @@ -1,5 +1,8 @@ import { randomUUID } from 'crypto' import { Worker } from 'worker_threads' +import initLog from './log.js' + +const log = initLog("Worker Setup") let workerFarm = [] let workerReady = [] @@ -35,7 +38,7 @@ export function initWorkers(threads) { workerReady[i] = true } - console.log(`Initialised ${threads} workers!`) + log(`Initialised ${threads} workers!`) } export function spread(script, data) { diff --git a/libs/utils/multithread/db.test.js b/libs/utils/multithread/db.test.js deleted file mode 100644 index ed05477..0000000 --- a/libs/utils/multithread/db.test.js +++ /dev/null @@ -1,31 +0,0 @@ -import Finish from '../../../schemas/Finish-copy.js' -import initLog from '../log.js' -import init from '../../database/init.js' -import { sqlite } from '../../database/init.js' -import dotenv from 'dotenv' - -dotenv.config() -init() - -const log = initLog('DB TEST') - -export function main(data) { - return new Promise( - async (resolve, reject) => { - const rows = await sqlite.all(`SELECT * FROM race ORDER BY Timestamp LIMIT 5000 OFFSET ${data.offset + 1}`) - let finishes = [] - rows.map( - finish => finishes.push({ - map: finish.Map, - time: finish.Time, - date: finish.Timestamp === '0000-00-00 00:00:00' ? new Date('January 1, 1970 00:00:00 UTC') : new Date(`${finish.Timestamp}+00:00`), - serverLocation: finish.Server ?? '', - player: finish.Name - }) - ) - await Finish.insertMany(finishes) - log('Inserted!') - resolve() - } - ) -} \ No newline at end of file diff --git a/libs/utils/multithread/playerMonthPoints.js b/libs/utils/multithread/playerMonthPoints.js new file mode 100644 index 0000000..a6b3aea --- /dev/null +++ b/libs/utils/multithread/playerMonthPoints.js @@ -0,0 +1,19 @@ +import Player from '../../../schemas/Player.js' +import init from '../../database/init.js' +import { config } from 'dotenv' + +config() +init() + +export function main(data) { + return new Promise( + (resolve, reject) => { + Player.findOneAndUpdate( + { name: data.name }, + { rankPoints: data.pointsThisMonth } + ).then( + () => resolve() + ) + } + ) +} \ No newline at end of file diff --git a/libs/utils/multithread/playerPoints.js b/libs/utils/multithread/playerPoints.js new file mode 100644 index 0000000..b4fc138 --- /dev/null +++ b/libs/utils/multithread/playerPoints.js @@ -0,0 +1,19 @@ +import Player from '../../../schemas/Player.js' +import init from '../../database/init.js' +import { config } from 'dotenv' + +config() +init() + +export function main(data) { + return new Promise( + (resolve, reject) => { + Player.findOneAndUpdate( + { name: data.name }, + { points: data.points } + ).then( + () => resolve() + ) + } + ) +} \ No newline at end of file diff --git a/libs/utils/multithread/playerRankPoints.js b/libs/utils/multithread/playerRankPoints.js new file mode 100644 index 0000000..55ba4a3 --- /dev/null +++ b/libs/utils/multithread/playerRankPoints.js @@ -0,0 +1,19 @@ +import Player from '../../../schemas/Player.js' +import init from '../../database/init.js' +import { config } from 'dotenv' + +config() +init() + +export function main(data) { + return new Promise( + (resolve, reject) => { + Player.findOneAndUpdate( + { name: data.name }, + { rankPoints: data.rankPoints } + ).then( + () => resolve() + ) + } + ) +} \ No newline at end of file diff --git a/libs/utils/multithread/playerTeamPoints.js b/libs/utils/multithread/playerTeamPoints.js new file mode 100644 index 0000000..7d52c4e --- /dev/null +++ b/libs/utils/multithread/playerTeamPoints.js @@ -0,0 +1,19 @@ +import Player from '../../../schemas/Player.js' +import init from '../../database/init.js' +import { config } from 'dotenv' + +config() +init() + +export function main(data) { + return new Promise( + (resolve, reject) => { + Player.findOneAndUpdate( + { name: data.name }, + { teamPoints: data.teamPoints } + ).then( + () => resolve() + ) + } + ) +} \ No newline at end of file diff --git a/libs/utils/multithread/playerWeekPoints.js b/libs/utils/multithread/playerWeekPoints.js new file mode 100644 index 0000000..f1a38be --- /dev/null +++ b/libs/utils/multithread/playerWeekPoints.js @@ -0,0 +1,19 @@ +import Player from '../../../schemas/Player.js' +import init from '../../database/init.js' +import { config } from 'dotenv' + +config() +init() + +export function main(data) { + return new Promise( + (resolve, reject) => { + Player.findOneAndUpdate( + { name: data.name }, + { pointsThisWeek: data.pointsThisWeek } + ).then( + () => resolve() + ) + } + ) +} \ No newline at end of file diff --git a/package.json b/package.json index adf9577..3e22737 100644 --- a/package.json +++ b/package.json @@ -20,8 +20,9 @@ "@msgpack/msgpack": "^2.7.1", "dotenv": "^10.0.0", "express": "^4.17.1", - "mongoose": "^6.0.7", + "mongoose": "^6.0.11", "sqlite": "^4.0.23", "sqlite3": "^4.2.0" - } + }, + "nodeVersion": "system" } diff --git a/schemas/Finish-copy.js b/schemas/Finish-copy.js deleted file mode 100644 index 0a3bd54..0000000 --- a/schemas/Finish-copy.js +++ /dev/null @@ -1,17 +0,0 @@ -import mongoose from 'mongoose' - -const Finish = new mongoose.Schema({ - map: String, - time: Number, - date: Date, - serverLocation: String, - player: String -}) - -Finish.index({ player: 1, map: 1 }) - -/** - * This cotains the mongoose 'Finish' model. - * @module schemas/Finish - */ -export default mongoose.model("Finish-Copy", Finish) \ No newline at end of file diff --git a/tests/multihread.test.js b/tests/multihread.test.js index 1c8ac8a..0511326 100644 --- a/tests/multihread.test.js +++ b/tests/multihread.test.js @@ -1,45 +1,67 @@ import { initWorkers, spread } from '../libs/utils/multithread.js' -import Finish from '../schemas/Finish-copy.js' +import Player from '../schemas/Player.js' +import decodeMsgpack from '../libs/database/sqlite2mongo/decodeMsgpack.js' import initLog from '../libs/utils/log.js' const log = initLog("Fib. Test") -initWorkers(6) +initWorkers(10) -const jobs = 100 -let completed = 0 -for (let i = 0; i < jobs; ++i) { - spread( - './fibonacci.js', - {} - ).then( - result => { - ++completed - log(`Completed job ${completed}/${jobs} -> ${result}`) +// const jobs = 100 +// let completed = 0 +// for (let i = 0; i < jobs; ++i) { +// spread( +// './fibonacci.js', +// {} +// ).then( +// result => { +// ++completed +// log(`Completed job ${completed}/${jobs} -> ${result}`) - if (completed === jobs) process.exit(0) +// if (completed === jobs) process.exit(0) +// } +// ) +// } + + + + + + +export default async function() { + const cache = decodeMsgpack() + + await Player.updateMany( + {}, + { + pointsThisWeek: 0, + pointsThisMonth: 0 } ) -} + let processedPlayers = 0 + const totalPlayers = cache.pointsRanks.length + const rawDbFunc = (name, points) => new Promise((resolve, reject) => Player.findOneAndUpdate({ name: name }, { points: points }).then(() => resolve())) + for (const entry of cache.pointsRanks) { + let data = new SharedArrayBuffer(1024 * 10) + // let data = new Array(buffer) + data[0] = 0 + data[1] = entry[0] + data[2] = entry[1] - - - -// export default async function() { -// log("Checking for new finishes...") - -// await Finish.deleteMany({}) - -// let offset = -1 -// while (offset < 10000000) { -// await spread( -// './db.test.js', -// { -// offset -// } -// ) -// offset += 5000 -// } -// } \ No newline at end of file + spread( + './db.test.js', + data + ).then( + result => { + console.log(data[0]) + ++processedPlayers + log(`Process player ${processedPlayers}/${totalPlayers} -> «${entry[0]}» with ${entry[1]} points!`) + + if (processedPlayers === totalPlayers) + process.exit(0) + } + ) + } +} \ No newline at end of file