diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e8efd8..963f1c8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,6 +254,7 @@ if(MARL_BUILD_TESTS) ${MARL_SRC_DIR}/parallelize_test.cpp ${MARL_SRC_DIR}/pool_test.cpp ${MARL_SRC_DIR}/scheduler_test.cpp + ${MARL_SRC_DIR}/thread_test.cpp ${MARL_SRC_DIR}/ticket_test.cpp ${MARL_SRC_DIR}/waitgroup_test.cpp ${MARL_GOOGLETEST_DIR}/googletest/src/gtest-all.cc diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h index e103a3b..5880673 100644 --- a/include/marl/scheduler.h +++ b/include/marl/scheduler.h @@ -60,6 +60,7 @@ class Scheduler { struct WorkerThread { int count = 0; ThreadInitializer initializer; + std::shared_ptr affinityPolicy; }; WorkerThread workerThread; @@ -74,6 +75,8 @@ class Scheduler { inline Config& setAllocator(Allocator*); inline Config& setWorkerThreadCount(int); inline Config& setWorkerThreadInitializer(const ThreadInitializer&); + inline Config& setWorkerThreadAffinityPolicy( + const std::shared_ptr&); }; // Constructor. @@ -531,6 +534,12 @@ Scheduler::Config& Scheduler::Config::setWorkerThreadInitializer( return *this; } +Scheduler::Config& Scheduler::Config::setWorkerThreadAffinityPolicy( + const std::shared_ptr& policy) { + workerThread.affinityPolicy = policy; + return *this; +} + //////////////////////////////////////////////////////////////////////////////// // Scheduler::Fiber //////////////////////////////////////////////////////////////////////////////// diff --git a/include/marl/thread.h b/include/marl/thread.h index f18bddd..1046922 100644 --- a/include/marl/thread.h +++ b/include/marl/thread.h @@ -17,26 +17,109 @@ #include +#include "containers.h" + namespace marl { // Thread provides an OS abstraction for threads of execution. -// Thread is used by marl instead of std::thread as Windows does not naturally -// scale beyond 64 logical threads on a single CPU, unless you use the Win32 -// API. -// Thread alsocontains static methods that abstract OS-specific thread / cpu -// queries and control. class Thread { public: using Func = std::function; + // Core identifies a logical processor unit. + // How a core is identified varies by platform. + struct Core { + struct Windows { + uint8_t group; // Group number + uint8_t index; // Core within the processor group + }; + struct Pthread { + uint16_t index; // Core number + }; + union { + Windows windows; + Pthread pthread; + }; + + // Comparison functions + inline bool operator==(const Core&) const; + inline bool operator<(const Core&) const; + }; + + // Affinity holds the affinity mask for a thread - a description of what cores + // the thread is allowed to run on. + struct Affinity { + // supported is true if marl supports controlling thread affinity for this + // platform. +#if defined(_WIN32) || defined(__linux__) || defined(__FreeBSD__) + static constexpr bool supported = true; +#else + static constexpr bool supported = false; +#endif + + // Policy is an interface that provides a get() method for returning an + // Affinity for the given thread by id. + class Policy { + public: + virtual ~Policy(){}; + + // anyOf() returns a Policy that returns an Affinity for a number of + // available cores in affinity. + // + // Windows requires that each thread is only associated with a + // single affinity group, so the Policy's returned affinity will contain + // cores all from the same group. + static std::shared_ptr anyOf( + Affinity&& affinity, + Allocator* allocator = Allocator::Default); + + // oneOf() returns a Policy that returns an affinity with a single enabled + // core from affinity. The single enabled core in the Policy's returned + // affinity is: + // affinity[threadId % affinity.count()] + static std::shared_ptr oneOf( + Affinity&& affinity, + Allocator* allocator = Allocator::Default); + + // get() returns the thread Affinity for the for the given thread by id. + virtual Affinity get(uint32_t threadId, Allocator* allocator) const = 0; + }; + + Affinity(Allocator*); + Affinity(Affinity&&); + Affinity(const Affinity&, Allocator* allocator); + + // all() returns an Affinity with all the cores available to the process. + static Affinity all(Allocator* allocator = Allocator::Default); + + Affinity(std::initializer_list, Allocator* allocator); + + // count() returns the number of enabled cores in the affinity. + size_t count() const; + + // operator[] returns the i'th enabled core from this affinity. + Core operator[](size_t index) const; + + // add() adds the cores from the given affinity to this affinity. + // This affinity is returned to allow for fluent calls. + Affinity& add(const Affinity&); + + // remove() removes the cores from the given affinity from this affinity. + // This affinity is returned to allow for fluent calls. + Affinity& remove(const Affinity&); + + private: + Affinity(const Affinity&) = delete; + + containers::vector cores; + }; + Thread() = default; Thread(Thread&&); Thread& operator=(Thread&&); - // Start a new thread that calls func. - // logicalCpuHint is a hint to run the thread on the specified logical CPU. - // logicalCpuHint may be entirely ignored. - Thread(unsigned int logicalCpuHint, const Func& func); + // Start a new thread using the given affinity that calls func. + Thread(Affinity&& affinity, Func&& func); ~Thread(); @@ -59,6 +142,18 @@ class Thread { Impl* impl = nullptr; }; +//////////////////////////////////////////////////////////////////////////////// +// Thread::Core +//////////////////////////////////////////////////////////////////////////////// +// Comparison functions +bool Thread::Core::operator==(const Core& other) const { + return pthread.index == other.pthread.index; +} + +bool Thread::Core::operator<(const Core& other) const { + return pthread.index < other.pthread.index; +} + } // namespace marl #endif // marl_thread_h diff --git a/src/memory.cpp b/src/memory.cpp index 142d76b..b8cbaa2 100644 --- a/src/memory.cpp +++ b/src/memory.cpp @@ -19,7 +19,7 @@ #include -#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) +#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) #include #include namespace { diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 384f67d..852d74a 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -128,6 +128,11 @@ void Scheduler::unbind() { } Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} { + if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) { + cfg.workerThread.affinityPolicy = Thread::Affinity::Policy::anyOf( + Thread::Affinity::all(cfg.allocator), cfg.allocator); + } + for (size_t i = 0; i < spinningWorkers.size(); i++) { spinningWorkers[i] = -1; } @@ -403,8 +408,11 @@ Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id) void Scheduler::Worker::start() { switch (mode) { - case Mode::MultiThreaded: - thread = Thread(id, [=] { + case Mode::MultiThreaded: { + auto allocator = scheduler->cfg.allocator; + auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy; + auto affinity = affinityPolicy->get(id, allocator); + thread = Thread(std::move(affinity), [=] { Thread::setName("Thread<%.2d>", int(id)); if (auto const& initFunc = scheduler->cfg.workerThread.initializer) { @@ -423,13 +431,13 @@ void Scheduler::Worker::start() { Worker::current = nullptr; }); break; - - case Mode::SingleThreaded: + } + case Mode::SingleThreaded: { Worker::current = this; mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0); currentFiber = mainFiber.get(); break; - + } default: MARL_ASSERT(false, "Unknown mode: %d", int(mode)); } diff --git a/src/scheduler_bench.cpp b/src/scheduler_bench.cpp index 3548e54..d009713 100644 --- a/src/scheduler_bench.cpp +++ b/src/scheduler_bench.cpp @@ -46,3 +46,25 @@ BENCHMARK_DEFINE_F(Schedule, SomeWork) }); } BENCHMARK_REGISTER_F(Schedule, SomeWork)->Apply(Schedule::args); + +BENCHMARK_DEFINE_F(Schedule, SomeWorkWorkerAffinityOneOf) +(benchmark::State& state) { + marl::Scheduler::Config cfg; + cfg.setWorkerThreadAffinityPolicy( + marl::Thread::Affinity::Policy::oneOf(marl::Thread::Affinity::all())); + run(state, cfg, [&](int numTasks) { + for (auto _ : state) { + marl::WaitGroup wg; + wg.add(numTasks); + for (auto i = 0; i < numTasks; i++) { + marl::schedule([=] { + benchmark::DoNotOptimize(doSomeWork(i)); + wg.done(); + }); + } + wg.wait(); + } + }); +} +BENCHMARK_REGISTER_F(Schedule, SomeWorkWorkerAffinityOneOf) + ->Apply(Schedule::args); diff --git a/src/thread.cpp b/src/thread.cpp index c2ac40d..81f4638 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -18,6 +18,9 @@ #include "marl/defer.h" #include "marl/trace.h" +#include // std::sort +#include + #include #include @@ -25,7 +28,9 @@ #define WIN32_LEAN_AND_MEAN 1 #include #include // mbstowcs +#include // std::numeric_limits #include +#undef max #elif defined(__APPLE__) #include #include @@ -42,10 +47,27 @@ #include #endif +namespace { + +struct CoreHasher { + inline uint64_t operator()(const marl::Thread::Core& core) const { + return core.pthread.index; + } +}; + +} // anonymous namespace + namespace marl { #if defined(_WIN32) +static constexpr size_t MaxCoreCount = + std::numeric_limits::max() + 1ULL; +static constexpr size_t MaxGroupCount = + std::numeric_limits::max() + 1ULL; +static_assert(sizeof(KAFFINITY) * 8ULL <= MaxCoreCount, + "Thread::Core::windows.index is too small"); +namespace { #define CHECK_WIN32(expr) \ do { \ auto res = expr; \ @@ -54,8 +76,6 @@ namespace marl { (int)GetLastError()); \ } while (false) -namespace { - struct ProcessorGroup { unsigned int count; // number of logical processors in this group. KAFFINITY affinity; // affinity mask. @@ -75,6 +95,7 @@ const std::vector& getProcessorGroups() { auto const& groupInfo = info[i].Group.GroupInfo[groupIdx]; out.emplace_back(ProcessorGroup{groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask}); + MARL_ASSERT(out.size() <= MaxGroupCount, "Group index overflow"); } } } @@ -82,38 +103,169 @@ const std::vector& getProcessorGroups() { }(); return groups; } +} // namespace +#endif // defined(_WIN32) + +//////////////////////////////////////////////////////////////////////////////// +// Thread::Affinty +//////////////////////////////////////////////////////////////////////////////// + +Thread::Affinity::Affinity(Allocator* allocator) : cores(allocator) {} +Thread::Affinity::Affinity(Affinity&& other) : cores(std::move(other.cores)) {} +Thread::Affinity::Affinity(const Affinity& other, Allocator* allocator) + : cores(other.cores, allocator) {} + +Thread::Affinity::Affinity(std::initializer_list list, + Allocator* allocator) + : cores(allocator) { + cores.reserve(list.size()); + for (auto core : list) { + cores.push_back(core); + } +} + +Thread::Affinity Thread::Affinity::all( + Allocator* allocator /* = Allocator::Default */) { + Thread::Affinity affinity(allocator); + +#if defined(_WIN32) + decltype(Core::windows.group) groupIndex = 0; + for (auto group : getProcessorGroups()) { + Core core; + core.windows.group = groupIndex; + for (size_t i = 0; i < sizeof(KAFFINITY) * 8; i++) { + if ((group.affinity >> i) & 1) { + core.windows.index = static_cast(i); + affinity.cores.emplace_back(std::move(core)); + } + } + groupIndex++; + } +#elif defined(__linux__) + auto thread = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + if (pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset) == 0) { + int count = CPU_COUNT(&cpuset); + for (int i = 0; i < count; i++) { + Core core; + core.pthread.index = static_cast(i); + affinity.cores.emplace_back(std::move(core)); + } + } +#elif defined(__FreeBSD__) + auto thread = pthread_self(); + cpuset_t cpuset; + CPU_ZERO(&cpuset); + if (pthread_getaffinity_np(thread, sizeof(cpuset_t), &cpuset) == 0) { + int count = CPU_COUNT(&cpuset); + for (int i = 0; i < count; i++) { + Core core; + core.pthread.index = static_cast(i); + affinity.cores.emplace_back(std::move(core)); + } + } +#else + static_assert(!supported, + "marl::Thread::Affinity::supported is true, but " + "Thread::Affinity::all() is not implemented for this platform"); +#endif + + return affinity; +} -bool getGroupAffinity(unsigned int index, GROUP_AFFINITY* groupAffinity) { - auto& groups = getProcessorGroups(); - for (size_t groupIdx = 0; groupIdx < groups.size(); groupIdx++) { - auto& group = groups[groupIdx]; - if (index < group.count) { - for (int i = 0; i < sizeof(group.affinity) * 8; i++) { - if (group.affinity & (1ULL << i)) { - if (index == 0) { - groupAffinity->Group = static_cast(groupIdx); - // Use the whole group's affinity, as the OS is then able to shuffle - // threads around based on external demands. Pinning these to a - // single core can cause up to 20% performance loss in benchmarking. - groupAffinity->Mask = group.affinity; - return true; - } - index--; +std::shared_ptr Thread::Affinity::Policy::anyOf( + Affinity&& affinity, + Allocator* allocator /* = Allocator::Default */) { + struct Policy : public Thread::Affinity::Policy { + Affinity affinity; + Policy(Affinity&& affinity) : affinity(std::move(affinity)) {} + + Affinity get(uint32_t threadId, Allocator* allocator) const override { +#if defined(_WIN32) + auto count = affinity.count(); + if (count == 0) { + return Affinity(affinity, allocator); + } + auto group = affinity[threadId % affinity.count()].windows.group; + Affinity out(allocator); + out.cores.reserve(count); + for (auto core : affinity.cores) { + if (core.windows.group == group) { + out.cores.push_back(core); } } - return false; - } else { - index -= group.count; + return out; +#else + return Affinity(affinity, allocator); +#endif + } + }; + + return allocator->make_shared(std::move(affinity)); +} + +std::shared_ptr Thread::Affinity::Policy::oneOf( + Affinity&& affinity, + Allocator* allocator /* = Allocator::Default */) { + struct Policy : public Thread::Affinity::Policy { + Affinity affinity; + Policy(Affinity&& affinity) : affinity(std::move(affinity)) {} + + Affinity get(uint32_t threadId, Allocator* allocator) const override { + auto count = affinity.count(); + if (count == 0) { + return Affinity(affinity, allocator); + } + return Affinity({affinity[threadId % affinity.count()]}, allocator); + } + }; + + return allocator->make_shared(std::move(affinity)); +} + +size_t Thread::Affinity::count() const { + return cores.size(); +} + +Thread::Core Thread::Affinity::operator[](size_t index) const { + return cores[index]; +} + +Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) { + std::unordered_set set; + for (auto core : cores) { + set.emplace(core); + } + for (auto core : other.cores) { + if (set.count(core) == 0) { + cores.push_back(core); } } - return false; + std::sort(cores.begin(), cores.end()); + return *this; } -} // namespace +Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) { + std::unordered_set set; + for (auto core : other.cores) { + set.emplace(core); + } + for (size_t i = 0; i < cores.size(); i++) { + if (set.count(cores[i]) != 0) { + cores[i] = cores.back(); + cores.resize(cores.size() - 1); + } + } + std::sort(cores.begin(), cores.end()); + return *this; +} + +#if defined(_WIN32) class Thread::Impl { public: - Impl(const Func& func) : func(func) {} + Impl(Func&& func) : func(std::move(func)) {} static DWORD WINAPI run(void* self) { reinterpret_cast(self)->func(); return 0; @@ -123,7 +275,7 @@ class Thread::Impl { HANDLE handle; }; -Thread::Thread(unsigned int logicalCpu, const Func& func) { +Thread::Thread(Affinity&& affinity, Func&& func) { SIZE_T size = 0; InitializeProcThreadAttributeList(nullptr, 1, 0, &size); MARL_ASSERT(size > 0, @@ -135,14 +287,22 @@ Thread::Thread(unsigned int logicalCpu, const Func& func) { CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size)); defer(DeleteProcThreadAttributeList(attributes)); - GROUP_AFFINITY groupAffinity = {}; - if (getGroupAffinity(logicalCpu, &groupAffinity)) { + auto count = affinity.count(); + if (count > 0) { + GROUP_AFFINITY groupAffinity = {}; + groupAffinity.Group = affinity[0].windows.group; + for (size_t i = 0; i < count; i++) { + auto core = affinity[i]; + MARL_ASSERT(groupAffinity.Group == core.windows.group, + "Cannot create thread that uses multiple affinity groups"); + groupAffinity.Mask |= (1ULL << core.windows.index); + } CHECK_WIN32(UpdateProcThreadAttribute( attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity, sizeof(groupAffinity), nullptr, nullptr)); } - impl = new Impl(func); + impl = new Impl(std::move(func)); impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0, &Impl::run, impl, 0, attributes, nullptr); } @@ -191,13 +351,47 @@ unsigned int Thread::numLogicalCPUs() { class Thread::Impl { public: - template - Impl(F&& func) : thread(func) {} + Impl(Affinity&& affinity, Thread::Func&& f) + : affinity(std::move(affinity)), func(std::move(f)), thread([this] { + setAffinity(); + func(); + }) {} + + Affinity affinity; + Func func; std::thread thread; + + void setAffinity() { + auto count = affinity.count(); + if (count == 0) { + return; + } + +#if defined(__linux__) + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (size_t i = 0; i < count; i++) { + CPU_SET(affinity[i].pthread.index, &cpuset); + } + auto thread = pthread_self(); + pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); +#elif defined(__FreeBSD__) + cpuset_t cpuset; + CPU_ZERO(&cpuset); + for (size_t i = 0; i < count; i++) { + CPU_SET(affinity[i].pthread.index, &cpuset); + } + auto thread = pthread_self(); + pthread_setaffinity_np(thread, sizeof(cpuset_t), &cpuset); +#else + MARL_ASSERT(!marl::Thread::Affinity::supported, + "Attempting to use thread affinity on a unsupported platform"); +#endif + } }; -Thread::Thread(unsigned int /* logicalCpu */, const Func& func) - : impl(new Thread::Impl(func)) {} +Thread::Thread(Affinity&& affinity, Func&& func) + : impl(new Thread::Impl(std::move(affinity), std::move(func))) {} Thread::~Thread() { delete impl; diff --git a/src/thread_test.cpp b/src/thread_test.cpp new file mode 100644 index 0000000..69370f9 --- /dev/null +++ b/src/thread_test.cpp @@ -0,0 +1,123 @@ +// Copyright 2019 The Marl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "marl_test.h" + +#include "marl/thread.h" + +namespace { + +marl::Thread::Core core(int idx) { + marl::Thread::Core c; + c.pthread.index = static_cast(idx); + return c; +} + +} // anonymous namespace + +TEST_F(WithoutBoundScheduler, ThreadAffinityCount) { + auto affinity = marl::Thread::Affinity( + { + core(10), + core(20), + core(30), + core(40), + }, + allocator); + EXPECT_EQ(affinity.count(), 4U); +} + +TEST_F(WithoutBoundScheduler, ThreadAdd) { + auto affinity = marl::Thread::Affinity( + { + core(10), + core(20), + core(30), + core(40), + }, + allocator); + + affinity + .add(marl::Thread::Affinity( + { + core(25), + core(15), + }, + allocator)) + .add(marl::Thread::Affinity({core(35)}, allocator)); + + EXPECT_EQ(affinity.count(), 7U); + EXPECT_EQ(affinity[0], core(10)); + EXPECT_EQ(affinity[1], core(15)); + EXPECT_EQ(affinity[2], core(20)); + EXPECT_EQ(affinity[3], core(25)); + EXPECT_EQ(affinity[4], core(30)); + EXPECT_EQ(affinity[5], core(35)); + EXPECT_EQ(affinity[6], core(40)); +} + +TEST_F(WithoutBoundScheduler, ThreadRemove) { + auto affinity = marl::Thread::Affinity( + { + core(10), + core(20), + core(30), + core(40), + }, + allocator); + + affinity + .remove(marl::Thread::Affinity( + { + core(25), + core(20), + }, + allocator)) + .remove(marl::Thread::Affinity({core(40)}, allocator)); + + EXPECT_EQ(affinity.count(), 2U); + EXPECT_EQ(affinity[0], core(10)); + EXPECT_EQ(affinity[1], core(30)); +} + +TEST_F(WithoutBoundScheduler, ThreadAffinityAllCountNonzero) { + auto affinity = marl::Thread::Affinity::all(allocator); + if (marl::Thread::Affinity::supported) { + EXPECT_NE(affinity.count(), 0U); + } else { + EXPECT_EQ(affinity.count(), 0U); + } +} + +TEST_F(WithoutBoundScheduler, ThreadAffinityPolicyOneOf) { + auto all = marl::Thread::Affinity( + { + core(10), + core(20), + core(30), + core(40), + }, + allocator); + + auto policy = + marl::Thread::Affinity::Policy::oneOf(std::move(all), allocator); + EXPECT_EQ(policy->get(0, allocator).count(), 1U); + EXPECT_EQ(policy->get(0, allocator)[0].pthread.index, 10); + EXPECT_EQ(policy->get(1, allocator).count(), 1U); + EXPECT_EQ(policy->get(1, allocator)[0].pthread.index, 20); + EXPECT_EQ(policy->get(2, allocator).count(), 1U); + EXPECT_EQ(policy->get(2, allocator)[0].pthread.index, 30); + EXPECT_EQ(policy->get(3, allocator).count(), 1U); + EXPECT_EQ(policy->get(3, allocator)[0].pthread.index, 40); +}