From f28bbde5e5faa56072a78bcafe9a9e91ab1498e9 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 24 Jul 2024 01:51:53 +0100 Subject: [PATCH] feat(scheduler): add ThreadPoolExecutor --- .../detail/scheduler/thread_pool_executor.h | 59 ++++++++++ .../scheduler/thread_pool_executor.cpp | 56 +++++++++ tests/scheduler/test_thread_pool_executor.cpp | 111 ++++++++++++++++++ 3 files changed, 226 insertions(+) create mode 100644 include/endstone/detail/scheduler/thread_pool_executor.h create mode 100644 src/endstone_core/scheduler/thread_pool_executor.cpp create mode 100644 tests/scheduler/test_thread_pool_executor.cpp diff --git a/include/endstone/detail/scheduler/thread_pool_executor.h b/include/endstone/detail/scheduler/thread_pool_executor.h new file mode 100644 index 000000000..a5b4e60e5 --- /dev/null +++ b/include/endstone/detail/scheduler/thread_pool_executor.h @@ -0,0 +1,59 @@ +// Copyright (c) 2024, The Endstone Project. (https://endstone.dev) All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace endstone::detail { + +class ThreadPoolExecutor { +public: + explicit ThreadPoolExecutor(size_t threadCount = std::thread::hardware_concurrency()); + ~ThreadPoolExecutor(); + + template + auto submit(Func &&func, Args &&...args) -> std::future> + { + using ReturnType = std::invoke_result_t; + + auto task = std::make_shared>( + std::bind(std::forward(func), std::forward(args)...)); + + auto result = task->get_future(); + tasks.enqueue([task]() { (*task)(); }); + + condition.notify_one(); + return result; + } + +private: + void worker(); + + std::vector threads; + moodycamel::ConcurrentQueue> tasks; + std::atomic done; + std::mutex mutex; + std::condition_variable condition; +}; + +} // namespace endstone::detail diff --git a/src/endstone_core/scheduler/thread_pool_executor.cpp b/src/endstone_core/scheduler/thread_pool_executor.cpp new file mode 100644 index 000000000..71fce9051 --- /dev/null +++ b/src/endstone_core/scheduler/thread_pool_executor.cpp @@ -0,0 +1,56 @@ +// Copyright (c) 2024, The Endstone Project. (https://endstone.dev) All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "endstone/detail/scheduler/thread_pool_executor.h" + +namespace endstone::detail { + +ThreadPoolExecutor::ThreadPoolExecutor(size_t thread_count) : done(false) +{ + for (size_t i = 0; i < thread_count; ++i) { + threads.emplace_back(&ThreadPoolExecutor::worker, this); + } +} + +ThreadPoolExecutor::~ThreadPoolExecutor() +{ + done = true; + condition.notify_all(); + for (auto &thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } +} + +void ThreadPoolExecutor::worker() +{ + while (!done) { + std::function task; + if (tasks.try_dequeue(task)) { + task(); + } + else { + std::unique_lock lock(mutex); + condition.wait_for(lock, std::chrono::milliseconds(10)); + } + } + + // Process remaining tasks + std::function task; + while (tasks.try_dequeue(task)) { + task(); + } +} +} // namespace endstone::detail diff --git a/tests/scheduler/test_thread_pool_executor.cpp b/tests/scheduler/test_thread_pool_executor.cpp new file mode 100644 index 000000000..9ddfb1ffe --- /dev/null +++ b/tests/scheduler/test_thread_pool_executor.cpp @@ -0,0 +1,111 @@ +// Copyright (c) 2024, The Endstone Project. (https://endstone.dev) All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "endstone/detail/scheduler/thread_pool_executor.h" + +using endstone::detail::ThreadPoolExecutor; + +// Test if tasks are executed +TEST(ThreadPoolExecutorTest, ExecuteTasks) +{ + ThreadPoolExecutor executor(4); + auto future1 = executor.submit([]() { return 1; }); + auto future2 = executor.submit([](int a, int b) { return a + b; }, 2, 3); + EXPECT_EQ(future1.get(), 1); + EXPECT_EQ(future2.get(), 5); +} + +// Test if tasks are executed in parallel +TEST(ThreadPoolExecutorTest, ParallelExecution) +{ + ThreadPoolExecutor executor(4); + std::atomic counter{0}; + + auto task = [&counter]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + counter++; + }; + + auto future1 = executor.submit(task); + auto future2 = executor.submit(task); + auto future3 = executor.submit(task); + auto future4 = executor.submit(task); + + auto start = std::chrono::steady_clock::now(); + future1.get(); + future2.get(); + future3.get(); + future4.get(); + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + EXPECT_LT(duration.count(), 200); + EXPECT_EQ(counter.load(), 4); +} + +// Test if the destructor waits for tasks to finish +TEST(ThreadPoolExecutorTest, DestructorWaitsForTasks) +{ + auto start = std::chrono::steady_clock::now(); + { + ThreadPoolExecutor executor(4); + for (int i = 0; i < 10; ++i) { + executor.submit([]() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); }); + } + } + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + EXPECT_GE(duration.count(), 300); +} + +// Test with no tasks +TEST(ThreadPoolExecutorTest, NoTasks) +{ + ThreadPoolExecutor executor(4); +} + +// Test if tasks with exceptions are handled properly +TEST(ThreadPoolExecutorTest, TaskWithException) +{ + ThreadPoolExecutor executor(4); + auto future = executor.submit([]() { throw std::runtime_error("Task exception"); }); + EXPECT_THROW(future.get(), std::runtime_error); +} + +// Test with a large number of tasks +TEST(ThreadPoolExecutorTest, ManyTasks) +{ + ThreadPoolExecutor executor(4); + std::atomic counter{0}; + const int task_count = 1000; + + auto task = [&counter]() { + counter++; + }; + + std::vector> futures; + futures.reserve(task_count); + for (int i = 0; i < task_count; ++i) { + futures.push_back(executor.submit(task)); + } + + for (auto &future : futures) { + future.get(); + } + + EXPECT_EQ(counter.load(), task_count); +}