Skip to content

Commit

Permalink
feat(scheduler): add ThreadPoolExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-vincent committed Jul 24, 2024
1 parent 1288b19 commit f28bbde
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 0 deletions.
59 changes: 59 additions & 0 deletions include/endstone/detail/scheduler/thread_pool_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2024, The Endstone Project. (https://endstone.dev) All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#pragma once

#include <atomic>
#include <functional>
#include <future>
#include <thread>
#include <vector>

#include <moodycamel/concurrentqueue.h>

namespace endstone::detail {

class ThreadPoolExecutor {
public:
explicit ThreadPoolExecutor(size_t threadCount = std::thread::hardware_concurrency());
~ThreadPoolExecutor();

template <typename Func, typename... Args>
auto submit(Func &&func, Args &&...args) -> std::future<std::invoke_result_t<Func, Args...>>
{
using ReturnType = std::invoke_result_t<Func, Args...>;

auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));

auto result = task->get_future();
tasks.enqueue([task]() { (*task)(); });

condition.notify_one();
return result;
}

private:
void worker();

std::vector<std::thread> threads;
moodycamel::ConcurrentQueue<std::function<void()>> tasks;
std::atomic<bool> done;
std::mutex mutex;
std::condition_variable condition;
};

} // namespace endstone::detail
56 changes: 56 additions & 0 deletions src/endstone_core/scheduler/thread_pool_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2024, The Endstone Project. (https://endstone.dev) All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "endstone/detail/scheduler/thread_pool_executor.h"

namespace endstone::detail {

ThreadPoolExecutor::ThreadPoolExecutor(size_t thread_count) : done(false)
{
for (size_t i = 0; i < thread_count; ++i) {
threads.emplace_back(&ThreadPoolExecutor::worker, this);
}
}

ThreadPoolExecutor::~ThreadPoolExecutor()
{
done = true;
condition.notify_all();
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
}

void ThreadPoolExecutor::worker()
{
while (!done) {
std::function<void()> task;
if (tasks.try_dequeue(task)) {
task();
}
else {
std::unique_lock<std::mutex> lock(mutex);
condition.wait_for(lock, std::chrono::milliseconds(10));
}
}

// Process remaining tasks
std::function<void()> task;
while (tasks.try_dequeue(task)) {
task();
}
}
} // namespace endstone::detail
111 changes: 111 additions & 0 deletions tests/scheduler/test_thread_pool_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2024, The Endstone Project. (https://endstone.dev) All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include "endstone/detail/scheduler/thread_pool_executor.h"

using endstone::detail::ThreadPoolExecutor;

// Test if tasks are executed
TEST(ThreadPoolExecutorTest, ExecuteTasks)
{
ThreadPoolExecutor executor(4);
auto future1 = executor.submit([]() { return 1; });
auto future2 = executor.submit([](int a, int b) { return a + b; }, 2, 3);
EXPECT_EQ(future1.get(), 1);
EXPECT_EQ(future2.get(), 5);
}

// Test if tasks are executed in parallel
TEST(ThreadPoolExecutorTest, ParallelExecution)
{
ThreadPoolExecutor executor(4);
std::atomic<int> counter{0};

auto task = [&counter]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
counter++;
};

auto future1 = executor.submit(task);
auto future2 = executor.submit(task);
auto future3 = executor.submit(task);
auto future4 = executor.submit(task);

auto start = std::chrono::steady_clock::now();
future1.get();
future2.get();
future3.get();
future4.get();
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

EXPECT_LT(duration.count(), 200);
EXPECT_EQ(counter.load(), 4);
}

// Test if the destructor waits for tasks to finish
TEST(ThreadPoolExecutorTest, DestructorWaitsForTasks)
{
auto start = std::chrono::steady_clock::now();
{
ThreadPoolExecutor executor(4);
for (int i = 0; i < 10; ++i) {
executor.submit([]() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); });
}
}
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

EXPECT_GE(duration.count(), 300);
}

// Test with no tasks
TEST(ThreadPoolExecutorTest, NoTasks)
{
ThreadPoolExecutor executor(4);
}

// Test if tasks with exceptions are handled properly
TEST(ThreadPoolExecutorTest, TaskWithException)
{
ThreadPoolExecutor executor(4);
auto future = executor.submit([]() { throw std::runtime_error("Task exception"); });
EXPECT_THROW(future.get(), std::runtime_error);
}

// Test with a large number of tasks
TEST(ThreadPoolExecutorTest, ManyTasks)
{
ThreadPoolExecutor executor(4);
std::atomic<int> counter{0};
const int task_count = 1000;

auto task = [&counter]() {
counter++;
};

std::vector<std::future<void>> futures;
futures.reserve(task_count);
for (int i = 0; i < task_count; ++i) {
futures.push_back(executor.submit(task));
}

for (auto &future : futures) {
future.get();
}

EXPECT_EQ(counter.load(), task_count);
}

0 comments on commit f28bbde

Please sign in to comment.