From a8545935c3ea33e379813f9314edb4a0a7236743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zieli=C5=84ski?= Date: Sat, 6 May 2023 18:54:19 +0200 Subject: [PATCH] ThreadPool implementation. It runs, but dies from race conditions. --- cmake_modules/VCMI_lib.cmake | 3 + lib/rmg/CMapGenerator.cpp | 68 +++++++++- lib/rmg/CMapGenerator.h | 12 +- lib/rmg/Zone.cpp | 24 ++++ lib/rmg/Zone.h | 8 +- lib/rmg/threadpool/BlockingQueue.h | 91 +++++++++++++ lib/rmg/threadpool/JobProvider.h | 30 +++++ lib/rmg/threadpool/JobProvoider.cpp | 0 lib/rmg/threadpool/ThreadPool.h | 190 ++++++++++++++++++++++++++++ 9 files changed, 413 insertions(+), 13 deletions(-) create mode 100644 lib/rmg/threadpool/BlockingQueue.h create mode 100644 lib/rmg/threadpool/JobProvider.h create mode 100644 lib/rmg/threadpool/JobProvoider.cpp create mode 100644 lib/rmg/threadpool/ThreadPool.h diff --git a/cmake_modules/VCMI_lib.cmake b/cmake_modules/VCMI_lib.cmake index 82fb986e2..71063fc8e 100644 --- a/cmake_modules/VCMI_lib.cmake +++ b/cmake_modules/VCMI_lib.cmake @@ -425,6 +425,9 @@ macro(add_main_lib TARGET_NAME LIBRARY_TYPE) ${MAIN_LIB_DIR}/rmg/RiverPlacer.h ${MAIN_LIB_DIR}/rmg/TerrainPainter.h ${MAIN_LIB_DIR}/rmg/float3.h + ${MAIN_LIB_DIR}/rmg/threadpool/BlockingQueue.h + ${MAIN_LIB_DIR}/rmg/threadpool/ThreadPool.h + ${MAIN_LIB_DIR}/rmg/threadpool/JobProvider.h ${MAIN_LIB_DIR}/serializer/BinaryDeserializer.h ${MAIN_LIB_DIR}/serializer/BinarySerializer.h diff --git a/lib/rmg/CMapGenerator.cpp b/lib/rmg/CMapGenerator.cpp index e5bebdf82..2d6ef0b27 100644 --- a/lib/rmg/CMapGenerator.cpp +++ b/lib/rmg/CMapGenerator.cpp @@ -23,6 +23,7 @@ #include "Zone.h" #include "Functions.h" #include "RmgMap.h" +#include "threadpool/ThreadPool.h" #include "ObjectManager.h" #include "TreasurePlacer.h" #include "RoadPlacer.h" @@ -294,9 +295,12 @@ void CMapGenerator::fillZones() logGlobal->info("Started filling zones"); + size_t numZones = map->getZones().size(); + //we need info about all town types to evaluate dwellings and pandoras with creatures properly //place main town in the middle - Load::Progress::setupStepsTill(map->getZones().size(), 50); + + Load::Progress::setupStepsTill(numZones, 50); for (const auto& it : map->getZones()) { it.second->initFreeTiles(); @@ -304,16 +308,40 @@ void CMapGenerator::fillZones() Progress::Progress::step(); } - Load::Progress::setupStepsTill(map->getZones().size(), 240); + //TODO: multiply by the number of modificators + Load::Progress::setupStepsTill(numZones, 240); std::vector> treasureZones; + + ThreadPool pool; + + std::vector> futures; + //At most one Modificator can run for every zone + pool.init(std::min(std::thread::hardware_concurrency(), numZones)); + + while (hasJobs()) + { + auto job = getNextJob(); + if (job) + { + futures.push_back(pool.async([this, job]() -> void + { + job.value()(); + Progress::Progress::step(); //Update progress bar + } + )); + } + } + + //Wait for all the tasks + for (auto& fut : futures) + { + fut.get(); + } + for (const auto& it : map->getZones()) { - it.second->processModificators(); - if (it.second->getType() == ETemplateZoneType::TREASURE) treasureZones.push_back(it.second); - - Progress::Progress::step(); } //find place for Grail @@ -381,7 +409,7 @@ const std::vector & CMapGenerator::getAllPossibleQuestArtifacts() co return questArtifacts; } -const std::vector& CMapGenerator::getAllPossibleHeroes() const +const std::vector CMapGenerator::getAllPossibleHeroes() const { //Skip heroes that were banned, including the ones placed in prisons std::vector ret; @@ -395,11 +423,13 @@ const std::vector& CMapGenerator::getAllPossibleHeroes() const void CMapGenerator::banQuestArt(const ArtifactID & id) { + //TODO: Protect with mutex map->map().allowedArtifact[id] = false; } void CMapGenerator::banHero(const HeroTypeID & id) { + //TODO: Protect with mutex map->map().allowedHeroes[id] = false; } @@ -411,4 +441,28 @@ Zone * CMapGenerator::getZoneWater() const return nullptr; } +bool CMapGenerator::hasJobs() +{ + for (auto zone : map->getZones()) + { + if (zone.second->hasJobs()) + { + return true; + } + } + return false; +} + +TRMGJob CMapGenerator::getNextJob() +{ + for (auto zone : map->getZones()) + { + if (zone.second->hasJobs()) + { + return zone.second->getNextJob(); + } + } + return TRMGJob(); +} + VCMI_LIB_NAMESPACE_END diff --git a/lib/rmg/CMapGenerator.h b/lib/rmg/CMapGenerator.h index c180552b7..0dbce09f0 100644 --- a/lib/rmg/CMapGenerator.h +++ b/lib/rmg/CMapGenerator.h @@ -15,6 +15,7 @@ #include "CMapGenOptions.h" #include "../int3.h" #include "CRmgTemplate.h" +#include "threadpool/JobProvider.h" #include "../LoadProgress.h" VCMI_LIB_NAMESPACE_BEGIN @@ -30,7 +31,7 @@ class CZonePlacer; using JsonVector = std::vector; /// The map generator creates a map randomly. -class DLL_LINKAGE CMapGenerator: public Load::Progress +class DLL_LINKAGE CMapGenerator: public Load::Progress, public IJobProvider { public: struct Config @@ -64,7 +65,7 @@ public: int getPrisonsRemaning() const; std::shared_ptr getZonePlacer() const; const std::vector & getAllPossibleQuestArtifacts() const; - const std::vector& getAllPossibleHeroes() const; + const std::vector getAllPossibleHeroes() const; void banQuestArt(const ArtifactID & id); void banHero(const HeroTypeID& id); @@ -82,11 +83,9 @@ private: std::vector connectionsLeft; - //std::pair zoneWater; - int allowedPrisons; int monolithIndex; - std::vector questArtifacts; //TODO: Protect with mutex + std::vector questArtifacts; /// Generation methods void loadConfig(); @@ -100,6 +99,9 @@ private: void genZones(); void fillZones(); + TRMGJob getNextJob() override; + bool hasJobs() override; + }; VCMI_LIB_NAMESPACE_END diff --git a/lib/rmg/Zone.cpp b/lib/rmg/Zone.cpp index 40885b9e1..259918698 100644 --- a/lib/rmg/Zone.cpp +++ b/lib/rmg/Zone.cpp @@ -179,6 +179,30 @@ rmg::Path Zone::searchPath(const int3 & src, bool onlyStraight, const std::funct return searchPath(rmg::Area({src}), onlyStraight, areafilter); } +TRMGJob Zone::getNextJob() +{ + for (auto& modificator : modificators) + { + if (modificator->hasJobs()) + { + return modificator->getNextJob(); + } + } + return TRMGJob(); +} + +bool Zone::hasJobs() +{ + for (auto& modificator : modificators) + { + if (modificator->hasJobs()) + { + return true; + } + } + return false; +} + void Zone::connectPath(const rmg::Path & path) ///connect current tile to any other free tile within zone { diff --git a/lib/rmg/Zone.h b/lib/rmg/Zone.h index bd85d08aa..9eaebfba5 100644 --- a/lib/rmg/Zone.h +++ b/lib/rmg/Zone.h @@ -13,6 +13,7 @@ #include "../GameConstants.h" #include "float3.h" #include "../int3.h" +#include "threadpool/JobProvider.h" #include "CRmgTemplate.h" #include "RmgArea.h" #include "RmgPath.h" @@ -30,7 +31,7 @@ class Modificator; extern std::function AREA_NO_FILTER; -class Zone : public rmg::ZoneOptions +class Zone : public rmg::ZoneOptions, public IJobProvider { public: Zone(RmgMap & map, CMapGenerator & generator); @@ -63,9 +64,14 @@ public: rmg::Path searchPath(const rmg::Area & src, bool onlyStraight, const std::function & areafilter = AREA_NO_FILTER) const; rmg::Path searchPath(const int3 & src, bool onlyStraight, const std::function & areafilter = AREA_NO_FILTER) const; + TRMGJob getNextJob() override; + bool hasJobs() override; + template T* getModificator() { + //TODO: Protect with recursive mutex? + for(auto & m : modificators) if(auto * mm = dynamic_cast(m.get())) return mm; diff --git a/lib/rmg/threadpool/BlockingQueue.h b/lib/rmg/threadpool/BlockingQueue.h new file mode 100644 index 000000000..3879bbe45 --- /dev/null +++ b/lib/rmg/threadpool/BlockingQueue.h @@ -0,0 +1,91 @@ +/* + * BlockingQueue.h, part of VCMI engine + * + * Authors: listed in file AUTHORS in main folder + * + * License: GNU General Public License v2.0 or later + * Full text of license available in license.txt file, in main folder + * + */ + +#pragma once + +#include "StdInc.h" + +VCMI_LIB_NAMESPACE_BEGIN + +//Credit to https://github.com/Liam0205/toy-threadpool/tree/master/yuuki + +template +class DLL_LINKAGE BlockingQueue : protected std::queue +{ + using WriteLock = boost::unique_lock; + using Readlock = boost::shared_lock; + +public: + BlockingQueue() = default; + ~BlockingQueue() + { + clear(); + } + BlockingQueue(const BlockingQueue&) = delete; + BlockingQueue(BlockingQueue&&) = delete; + BlockingQueue& operator=(const BlockingQueue&) = delete; + BlockingQueue& operator=(BlockingQueue&&) = delete; + +public: + bool empty() const + { + Readlock lock(mx); + return std::queue::empty(); + } + + size_t size() const + { + Readlock lock(mx); + return std::queue::size(); + } + +public: + void clear() + { + WriteLock lock(mx); + while (!std::queue::empty()) + { + std::queue::pop(); + } + } + + void push(const T& obj) + { + WriteLock lock(mx); + std::queue::push(obj); + } + + template + void emplace(Args&&... args) + { + WriteLock lock(mx); + std::queue::emplace(std::forward(args)...); + } + + bool pop(T& holder) + { + WriteLock lock(mx); + if (std::queue::empty()) + { + return false; + } + else + { + holder = std::move(std::queue::front()); + std::queue::pop(); + return true; + } + } + +private: + mutable boost::shared_mutex mx; +}; + +VCMI_LIB_NAMESPACE_END \ No newline at end of file diff --git a/lib/rmg/threadpool/JobProvider.h b/lib/rmg/threadpool/JobProvider.h new file mode 100644 index 000000000..261bc1a34 --- /dev/null +++ b/lib/rmg/threadpool/JobProvider.h @@ -0,0 +1,30 @@ +/* + * JobProvider.h, part of VCMI engine + * + * Authors: listed in file AUTHORS in main folder + * + * License: GNU General Public License v2.0 or later + * Full text of license available in license.txt file, in main folder + * + */ + +#pragma once + +#include "StdInc.h" +#include "../../GameConstants.h" + +VCMI_LIB_NAMESPACE_BEGIN + +typedef std::function TRMGfunction ; +typedef std::optional TRMGJob; + +class DLL_LINKAGE IJobProvider +{ +public: + //TODO: Think about some mutex protection + + virtual TRMGJob getNextJob() = 0; + virtual bool hasJobs() = 0; +}; + +VCMI_LIB_NAMESPACE_END \ No newline at end of file diff --git a/lib/rmg/threadpool/JobProvoider.cpp b/lib/rmg/threadpool/JobProvoider.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/lib/rmg/threadpool/ThreadPool.h b/lib/rmg/threadpool/ThreadPool.h new file mode 100644 index 000000000..d3145d42e --- /dev/null +++ b/lib/rmg/threadpool/ThreadPool.h @@ -0,0 +1,190 @@ +/* + * ThreadPool.h, part of VCMI engine + * + * Authors: listed in file AUTHORS in main folder + * + * License: GNU General Public License v2.0 or later + * Full text of license available in license.txt file, in main folder + * + */ + +#pragma once + +#include "BlockingQueue.h" +#include "JobProvider.h" +#include +#include + +VCMI_LIB_NAMESPACE_BEGIN + +//Credit to https://github.com/Liam0205/toy-threadpool/tree/master/yuuki + +class DLL_LINKAGE ThreadPool +{ +private: + using Lock = boost::unique_lock; + mutable boost::shared_mutex mx; + mutable boost::condition_variable_any cv; + mutable boost::once_flag once; + + bool isInitialized = false; + bool stopping = false; + bool canceling = false; +public: + ThreadPool(); + ~ThreadPool(); + + void init(size_t numThreads); + void spawn(); + void terminate(); + void cancel(); + +public: + bool initialized() const; + bool running() const; + int size() const; +private: + bool isRunning() const; + +public: + auto async(std::function&& f) const -> boost::future; + +private: + std::vector workers; + mutable BlockingQueue tasks; +}; + +ThreadPool::ThreadPool() : + once(BOOST_ONCE_INIT) +{}; + +ThreadPool::~ThreadPool() +{ + terminate(); +} + +inline void ThreadPool::init(size_t numThreads) +{ + boost::call_once(once, [this, numThreads]() + { + Lock lock(mx); + stopping = false; + canceling = false; + workers.reserve(numThreads); + for (size_t i = 0; i < numThreads; ++i) + { + workers.emplace_back(std::bind(&ThreadPool::spawn, this)); + } + isInitialized = true; + }); +} + +bool ThreadPool::isRunning() const +{ + return isInitialized && !stopping && !canceling; +} + +inline bool ThreadPool::initialized() const +{ + Lock lock(mx); + return isInitialized; +} + +inline bool ThreadPool::running() const +{ + Lock lock(mx); + return isRunning(); +} + +inline int ThreadPool::size() const +{ + Lock lock(mx); + return workers.size(); +} + +inline void ThreadPool::spawn() +{ + while(true) + { + bool pop = false; + TRMGfunction task; + { + Lock lock(mx); + cv.wait(lock, [this, &pop, &task] + { + pop = tasks.pop(task); + return canceling || stopping || pop; + }); + } + if (canceling || (stopping && !pop)) + { + return; + } + task(); + } +} + +inline void ThreadPool::terminate() +{ + { + Lock lock(mx); + if (running()) + { + stopping = true; + } + else + { + return; + } + } + cv.notify_all(); + for (auto& worker : workers) + { + worker.join(); + } +} + +inline void ThreadPool::cancel() +{ + { + Lock lock(mx); + if (running()) + { + canceling = true; + } + else + { + return; + } + } + tasks.clear(); + cv.notify_all(); + for (auto& worker : workers) + { + worker.join(); + } +} + +auto ThreadPool::async(std::function&& f) const -> boost::future +{ + using TaskT = boost::packaged_task; + + { + Lock lock(mx); + if (stopping || canceling) + { + throw std::runtime_error("Delegating task to a threadpool that has been terminated or canceled."); + } + } + + std::shared_ptr task = std::make_shared(f); + boost::future fut = task->get_future(); + tasks.emplace([task]() -> void + { + (*task)(); + }); + cv.notify_one(); + return fut; +} + +VCMI_LIB_NAMESPACE_END \ No newline at end of file