From 5f47219c4986f46948bb97d2808275f22adbbadb Mon Sep 17 00:00:00 2001 From: psy_commando Date: Wed, 24 Aug 2016 20:56:47 -0400 Subject: [PATCH 1/2] Statsutil: Attempt to fix issue #33 * I wrote a new task handler using exclusively std::async so that should probably make things a lot simpler, and hopefully fix the issue. --- src/ppmdu/pmd2/pmd2_scripts_xml_io.cpp | 25 ++- src/utils/parallel_tasks.cpp | 203 +++++++++++++++++ src/utils/parallel_tasks.hpp | 256 ++++++++++++++++++++++ vcprojects/ppmd_statsutil.vcxproj | 2 + vcprojects/ppmd_statsutil.vcxproj.filters | 6 + 5 files changed, 481 insertions(+), 11 deletions(-) create mode 100644 src/utils/parallel_tasks.cpp create mode 100644 src/utils/parallel_tasks.hpp diff --git a/src/ppmdu/pmd2/pmd2_scripts_xml_io.cpp b/src/ppmdu/pmd2/pmd2_scripts_xml_io.cpp index ebc2cd99..d39cf2e8 100644 --- a/src/ppmdu/pmd2/pmd2_scripts_xml_io.cpp +++ b/src/ppmdu/pmd2/pmd2_scripts_xml_io.cpp @@ -9,7 +9,8 @@ #include #include #include -#include +//#include +#include #include #include #include @@ -2796,7 +2797,8 @@ namespace pmd2 out_dest.WriteScriptSet(out_dest.m_common); //Prepare import of everything else! - multitask::CMultiTaskHandler taskhandler; + //multitask::CMultiTaskHandler taskhandler; + utils::AsyncTaskHandler taskhandler; Poco::DirectoryIterator dirit(dir); Poco::DirectoryIterator dirend; while( dirit != dirend ) @@ -2806,7 +2808,7 @@ namespace pmd2 Poco::Path destination(out_dest.GetScriptDir()); destination.append(dirit.path().getBaseName()); - taskhandler.AddTask( multitask::pktask_t( std::bind( RunLevelXMLImport, + taskhandler.QueueTask( utils::AsyncTaskHandler::task_t( std::bind( RunLevelXMLImport, std::ref(out_dest), dirit->path(), destination.toString(), @@ -2833,9 +2835,9 @@ namespace pmd2 // out_dest.m_setsindex.size(), // std::ref(shouldUpdtProgress) ); } - taskhandler.Execute(); - taskhandler.BlockUntilTaskQueueEmpty(); - taskhandler.StopExecute(); + taskhandler.Start(); + taskhandler.WaitTasksFinished(); + taskhandler.WaitStop(); shouldUpdtProgress = false; if(updatethread.valid()) @@ -2873,11 +2875,12 @@ namespace pmd2 atomic_bool shouldUpdtProgress = true; future updtProgress; atomic completed = 0; - multitask::CMultiTaskHandler taskhandler; + //multitask::CMultiTaskHandler taskhandler; + utils::AsyncTaskHandler taskhandler; //Export everything else for( const auto & entry : gs.m_setsindex ) { - taskhandler.AddTask( multitask::pktask_t( std::bind( RunLevelXMLExport, + taskhandler.QueueTask( utils::AsyncTaskHandler::task_t( std::bind( RunLevelXMLExport, std::cref(entry.second), std::cref(dir), std::cref(gs.GetConfig()), @@ -2896,9 +2899,9 @@ namespace pmd2 gs.m_setsindex.size(), std::ref(shouldUpdtProgress) ); } - taskhandler.Execute(); - taskhandler.BlockUntilTaskQueueEmpty(); - taskhandler.StopExecute(); + taskhandler.Start(); + taskhandler.WaitTasksFinished(); + taskhandler.WaitStop(); shouldUpdtProgress = false; if( updtProgress.valid() ) diff --git a/src/utils/parallel_tasks.cpp b/src/utils/parallel_tasks.cpp new file mode 100644 index 00000000..e6d10dcb --- /dev/null +++ b/src/utils/parallel_tasks.cpp @@ -0,0 +1,203 @@ +#include "parallel_tasks.hpp" + +using namespace std; + +namespace utils +{ + +//====================================================================================================================================== +// TaskQueue +//====================================================================================================================================== + const std::chrono::microseconds TaskQueue::WaitForNewTaskTime(100); + /* + Push a task at the back of the queue. + */ + void TaskQueue::Push( TaskQueue::task_t && task ) + { + std::lock_guard lck(m_queuemtx); + m_taskqueue.push_back(std::forward(task)); + } + + /* + Try to pop from the queue. Return false if pop failed, true otherwise. + */ + bool TaskQueue::TryPop( TaskQueue::task_t & out_task ) + { + if(!empty()) + { + std::lock_guard lck(m_queuemtx); + out_task = std::move(m_taskqueue.front()); + m_taskqueue.pop_front(); + return true; + } + return false; + } + + /* + Try to pop from the queue, and wait until the delay is reached, or the queue has a new task. Return false if pop failed and/or delay expired, true otherwise. + */ + bool TaskQueue::TryPopWait( TaskQueue::task_t & out_task ) + { + if(empty()) + { + std::unique_lock lcknewtask(m_waitfortaskmtx); + m_cvnewtask.wait_for(lcknewtask, WaitForNewTaskTime); + } + //Recheck condition + return TryPop(out_task); + } + +//====================================================================================================================================== +// Worker +//====================================================================================================================================== +#if 0 + const std::chrono::microseconds Worker::WaitOnTaskTime(1); + Worker::Worker() + :m_bshoulrun(false), m_ptaskqueue(nullptr) + {} + + Worker::Worker( TaskQueue & tq ) + :m_bshoulrun(true), m_ptaskqueue(&tq) + {} + + Worker::Worker( const Worker & cp ) + :m_bshoulrun(cp.m_bshoulrun.load()), + m_ptaskqueue(cp.m_ptaskqueue), + m_excepts(cp.m_excepts) + { + if(cp.m_mythread.joinable()) + Start(); + } + + Worker & Worker::operator=( const Worker & cp ) + { + m_bshoulrun = cp.m_bshoulrun.load(); + m_ptaskqueue = cp.m_ptaskqueue; + m_excepts = cp.m_excepts; + if(cp.m_mythread.joinable()) + Start(); + return *this; + } + + Worker::~Worker() + { + try + { + Stop(); //Need to stop + TryJoin(); + } + catch(...){} + } + + /* + */ + inline bool Worker::IsValid()const{return m_ptaskqueue != nullptr;} + + /* + Returns whether the worker is working on a task or not + */ + inline bool Worker::IsBusy()const {return m_bisbusy;} + + inline void Worker::Start() + { + if(!IsValid()) + throw std::runtime_error("Worker::Start(): Worker thread is in an undefined state!"); + m_mythread = std::thread(&Worker::Work, this); + } + + /* + This is what the worker thread executes. + */ + void Worker::Work() + { + do + { + TaskQueue::task_t mytask; + if( m_ptaskqueue->TryPopWait(mytask) && mytask.valid() ) + RunATask(mytask); + else + WaitForTask(); + } + while(m_bshoulrun); + } + + /* + This tells the thread to stop, and tries to join it, then pops any exceptions. + */ + inline void Worker::Stop() + { + if(!IsValid()) + throw std::runtime_error("Worker::Stop(): Worker thread is in an undefined state!"); + m_bshoulrun = false; + } + + inline void Worker::WaitStop() + { + Stop(); + TryJoin(); + do + { + TryPopException(); + } + while(!m_excepts.empty()); + } + + bool Worker::TryJoin() + { + if(!IsValid()) + throw std::runtime_error("Worker::TryJoin(): Worker thread is in an undefined state!"); + if( m_mythread.joinable() ) + { + m_mythread.join(); + return true; + } + return false; + } + + void Worker::TryPopException() + { + if(!IsValid()) + throw std::runtime_error("Worker::TryPopException(): Worker thread is in an undefined state!"); + if( !m_excepts.empty() ) + { + std::lock_guard lck(m_exceptmtx); + std::exception_ptr eptr = m_excepts.front(); + m_excepts.pop_front(); + std::rethrow_exception(eptr); + } + } + + + inline void Worker::PushException( std::exception_ptr ptr ) + { + std::lock_guard lck(m_exceptmtx); + m_excepts.push_back(ptr); + } + + inline void Worker::RunATask( TaskQueue::task_t & curtask ) + { + try + { + m_bisbusy = true; + curtask(); + } + catch(...) + { + PushException(std::current_exception()); + } + m_bisbusy= false; + } + + inline void Worker::WaitForTask() + { + this_thread::sleep_for(WaitOnTaskTime); + } + +#endif + + +//====================================================================================================================================== +// +//====================================================================================================================================== + +}; \ No newline at end of file diff --git a/src/utils/parallel_tasks.hpp b/src/utils/parallel_tasks.hpp new file mode 100644 index 00000000..99f96bd7 --- /dev/null +++ b/src/utils/parallel_tasks.hpp @@ -0,0 +1,256 @@ +#ifndef PARALLEL_TASKS_HPP +#define PARALLEL_TASKS_HPP +/* +parallel_tasks.hpp +2016/08/24 +psycommando@gmail.com +Description: A set of utilities for handling multi-threaded tasks execution. Meant to replace the previous implementation. +*/ +#include +#include +#include +#include +#include +#include +#include + +namespace utils +{ + + class TaskQueue + { + public: + typedef std::packaged_task task_t; + + /* + Push a task at the back of the queue. + */ + void Push( task_t && task ); + + /* + Try to pop from the queue. Return false if pop failed, true otherwise. + */ + bool TryPop( task_t & out_task ); + + /* + Try to pop from the queue, and wait until the delay is reached, or the queue has a new task. Return false if pop failed and/or delay expired, true otherwise. + */ + bool TryPopWait( task_t & out_task ); + + inline bool empty()const {return m_taskqueue.empty();} + inline size_t size()const {return m_taskqueue.size();} + + + private: + std::mutex m_queuemtx; + std::deque m_taskqueue; + + std::mutex m_waitfortaskmtx; + std::condition_variable m_cvnewtask; + static const std::chrono::microseconds WaitForNewTaskTime; + }; + + +//====================================================================================================================================== +// ThreadedTasks +//====================================================================================================================================== +#if 0 + /* + */ + class Worker + { + public: + + Worker(); + Worker(TaskQueue & tq); + Worker(const Worker & cp); + Worker & operator=( const Worker & cp ); + ~Worker(); + + /* + */ + inline bool IsValid()const{return m_ptaskqueue != nullptr;} + + /* + Returns whether the worker is working on a task or not + */ + inline bool IsBusy()const {return m_bisbusy;} + + inline void Start(); + + /* + This is what the worker thread executes. + */ + void Work(); + + /* + This tells the thread to stop, and tries to join it, then pops any exceptions. + */ + inline void Stop(); + inline void WaitStop(); + + bool TryJoin(); + void TryPopException(); + + private: + inline void PushException( std::exception_ptr ptr ); + inline void RunATask( TaskQueue::task_t & curtask ); + inline void WaitForTask(); + + private: + TaskQueue * m_ptaskqueue; + std::thread m_mythread; + std::mutex m_exceptmtx; + std::deque m_excepts; + std::atomic_bool m_bshoulrun; + std::atomic_bool m_bisbusy; + static const std::chrono::microseconds WaitOnTaskTime; + }; + +#endif + +//====================================================================================================================================== +// AsyncTasks +//====================================================================================================================================== + class AsyncWorker + { + public: + + AsyncWorker( TaskQueue & q ) + :m_ptasks(&q), m_bshouldwork(true) + {} + + AsyncWorker( const AsyncWorker & cp ) + :m_ptasks(cp.m_ptasks), + m_bshouldwork(cp.m_bshouldwork.load()) + { + if(cp.m_myfut.valid() && m_bshouldwork) + Start(); + } + + AsyncWorker& operator=( const AsyncWorker & cp ) + { + m_ptasks = cp.m_ptasks; + m_bshouldwork = cp.m_bshouldwork.load(); + if(cp.m_myfut.valid() && m_bshouldwork) + Start(); + return *this; + } + + ~AsyncWorker() + { + try + { + Stop(); + if(m_myfut.valid()) + m_myfut.wait_for(std::chrono::milliseconds(1000)); + } + catch(...) + {} + } + + inline bool IsValid()const + { + return m_ptasks != nullptr; + } + + inline void Start() + { + m_bshouldwork = true; + m_myfut = std::move(std::async(std::launch::async, &AsyncWorker::Work, this)); + } + + inline void Stop() + { + m_bshouldwork = false; + } + + inline void WaitStop() + { + Stop(); + if(m_myfut.valid()) + m_myfut.wait(); + } + + inline void WaitFinished() + { + if(m_myfut.valid()) + m_myfut.wait(); + } + + private: + void Work() + { + while(!m_ptasks->empty() && m_bshouldwork) + { + TaskQueue::task_t mytask; + if(m_ptasks->TryPopWait(mytask) && mytask.valid()) + { + mytask(); + } + } + } + + private: + TaskQueue * m_ptasks; + std::atomic_bool m_bshouldwork; + std::future m_myfut; + }; + + class AsyncTaskHandler + { + public: + typedef TaskQueue::task_t task_t; + + AsyncTaskHandler() + :m_bshouldrun(false) + { + const size_t nbth = utils::LibWide().getNbThreadsToUse(); + for( size_t cnt = 0; cnt < nbth; ++cnt ) + m_workers.emplace(m_workers.end(), m_taskqueue); + } + + ~AsyncTaskHandler(){} + + inline void QueueTask( task_t && t ) + { + m_taskqueue.Push(std::forward(t)); + } + + inline void Start() + { + m_bshouldrun = true; + for( auto & w : m_workers ) + w.Start(); + } + + inline void Stop() + { + m_bshouldrun = false; + for( auto & w : m_workers ) + w.Stop(); + } + + inline void WaitStop() + { + m_bshouldrun = false; + for( auto & w : m_workers ) + w.WaitStop(); + } + + inline void WaitTasksFinished() + { + for( auto & w : m_workers ) + w.WaitFinished(); + } + + inline bool empty()const {return m_taskqueue.empty();} + + private: + TaskQueue m_taskqueue; + std::vector m_workers; + std::atomic_bool m_bshouldrun; + }; +}; + +#endif diff --git a/vcprojects/ppmd_statsutil.vcxproj b/vcprojects/ppmd_statsutil.vcxproj index 1851d9d4..582526ec 100644 --- a/vcprojects/ppmd_statsutil.vcxproj +++ b/vcprojects/ppmd_statsutil.vcxproj @@ -195,6 +195,7 @@ copy "$(SolutionDir)resources\pmd2scriptdata.xml" "$(OutDir)pmd2scriptdata.xml" Source Files\ppmdutils\utility Source Files\ppmdutils\utility + Source Files\ppmdutils\utility Source Files\ppmdutils\utility @@ -306,6 +307,7 @@ copy "$(SolutionDir)resources\pmd2scriptdata.xml" "$(OutDir)pmd2scriptdata.xml" Header Files\ppmdutils\utility + Header Files\ppmdutils\utility diff --git a/vcprojects/ppmd_statsutil.vcxproj.filters b/vcprojects/ppmd_statsutil.vcxproj.filters index f9e43eb2..3a50d875 100644 --- a/vcprojects/ppmd_statsutil.vcxproj.filters +++ b/vcprojects/ppmd_statsutil.vcxproj.filters @@ -234,6 +234,9 @@ Source Files\ppmdu\data formats\Scripts + + Source Files\ppmdu\utility + @@ -404,6 +407,9 @@ Header Files\ppmdu\GameDataAccess + + Header Files\ppmdu\utility + From 5f32471fad8bcabf5c290a91427ff12f182405a7 Mon Sep 17 00:00:00 2001 From: psy_commando Date: Wed, 24 Aug 2016 22:16:43 -0400 Subject: [PATCH 2/2] Statsutil: Bugfix Release 0.23.1 * Fixed issue #33 --- readmes/ppmd_statsutil.txt | 2 ++ src/statsutil.cpp | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/readmes/ppmd_statsutil.txt b/readmes/ppmd_statsutil.txt index 2559e8e4..c579bc3f 100644 --- a/readmes/ppmd_statsutil.txt +++ b/readmes/ppmd_statsutil.txt @@ -44,6 +44,8 @@ Changelog: This should make everything more readable, and customizable/maintainable no matter if there are ASM hacks applied to the game or not. * Added support for importing/exporting script data along with scripts. * Added support for dumping the level list to a SIR0 file. Its pretty much completely useless to anyone for now however. +-0.23.1(2016/08/24): + * Fixed issue on windows 10 where a lot of the scripts wouldn't export or import properly. The multithreading class was at fault. ---------------------------------------------------------------------------------------------------- License info: diff --git a/src/statsutil.cpp b/src/statsutil.cpp index 21fbc66d..e0ddd487 100644 --- a/src/statsutil.cpp +++ b/src/statsutil.cpp @@ -108,7 +108,7 @@ namespace statsutil //------------------------------------------------ const string CStatsUtil::Exe_Name = "ppmd_statsutil.exe"; const string CStatsUtil::Title = "Game data importer/exporter"; - const string CStatsUtil::Version = "0.23"; + const string CStatsUtil::Version = "0.23.1"; const string CStatsUtil::Short_Description = "A utility to export and import various game statistics/data, such as pokemon stats."; const string CStatsUtil::Long_Description = "To export game data to XML, you have to append \"-e\" to the\ncommandline, followed with the option corresponding to what to export.\n"