Skip to content

Commit

Permalink
Support specifying worker thread affinities
Browse files Browse the repository at this point in the history
Add `Thread::Affinity` which describes the cores on which a thread can
execute.

Add `Thread::Affinity::Policy` which generates an `Affinity` for a
thread by identifier. `Policy` contains two helper functions that return
`Policy` implementations:
 * `anyOf()` returns a `Policy` that returns an Affinity for the
   available cores in the `Affinity` passed to the function.
 * `oneOf()` returns a `Policy` that returns an affinity with a single
   enabled core from the `Affinity` passed to the function.

Other `Policy`s can be implemented by the user.

Add a new `affinityPolicy` field to `Scheduler::Config` allowing the
user to specify the rules for assigning thread affinities to each
scheduler worker thread.

Affinities are currently only supported on windows, linux and freebsd.

Issue: #136
  • Loading branch information
ben-clayton committed Jun 5, 2020
1 parent 49fe9a1 commit 325b072
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 46 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ if(MARL_BUILD_TESTS)
${MARL_SRC_DIR}/parallelize_test.cpp
${MARL_SRC_DIR}/pool_test.cpp
${MARL_SRC_DIR}/scheduler_test.cpp
${MARL_SRC_DIR}/thread_test.cpp
${MARL_SRC_DIR}/ticket_test.cpp
${MARL_SRC_DIR}/waitgroup_test.cpp
${MARL_GOOGLETEST_DIR}/googletest/src/gtest-all.cc
Expand Down
9 changes: 9 additions & 0 deletions include/marl/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Scheduler {
struct WorkerThread {
int count = 0;
ThreadInitializer initializer;
std::shared_ptr<Thread::Affinity::Policy> affinityPolicy;
};
WorkerThread workerThread;

Expand All @@ -74,6 +75,8 @@ class Scheduler {
inline Config& setAllocator(Allocator*);
inline Config& setWorkerThreadCount(int);
inline Config& setWorkerThreadInitializer(const ThreadInitializer&);
inline Config& setWorkerThreadAffinityPolicy(
const std::shared_ptr<Thread::Affinity::Policy>&);
};

// Constructor.
Expand Down Expand Up @@ -531,6 +534,12 @@ Scheduler::Config& Scheduler::Config::setWorkerThreadInitializer(
return *this;
}

Scheduler::Config& Scheduler::Config::setWorkerThreadAffinityPolicy(
const std::shared_ptr<Thread::Affinity::Policy>& policy) {
workerThread.affinityPolicy = policy;
return *this;
}

////////////////////////////////////////////////////////////////////////////////
// Scheduler::Fiber
////////////////////////////////////////////////////////////////////////////////
Expand Down
113 changes: 104 additions & 9 deletions include/marl/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,109 @@

#include <functional>

#include "containers.h"

namespace marl {

// Thread provides an OS abstraction for threads of execution.
// Thread is used by marl instead of std::thread as Windows does not naturally
// scale beyond 64 logical threads on a single CPU, unless you use the Win32
// API.
// Thread alsocontains static methods that abstract OS-specific thread / cpu
// queries and control.
class Thread {
public:
using Func = std::function<void()>;

// Core identifies a logical processor unit.
// How a core is identified varies by platform.
struct Core {
struct Windows {
uint8_t group; // Group number
uint8_t index; // Core within the processor group
};
struct Pthread {
uint16_t index; // Core number
};
union {
Windows windows;
Pthread pthread;
};

// Comparison functions
inline bool operator==(const Core&) const;
inline bool operator<(const Core&) const;
};

// Affinity holds the affinity mask for a thread - a description of what cores
// the thread is allowed to run on.
struct Affinity {
// supported is true if marl supports controlling thread affinity for this
// platform.
#if defined(_WIN32) || defined(__linux__) || defined(__FreeBSD__)
static constexpr bool supported = true;
#else
static constexpr bool supported = false;
#endif

// Policy is an interface that provides a get() method for returning an
// Affinity for the given thread by id.
class Policy {
public:
virtual ~Policy(){};

// anyOf() returns a Policy that returns an Affinity for a number of
// available cores in affinity.
//
// Windows requires that each thread is only associated with a
// single affinity group, so the Policy's returned affinity will contain
// cores all from the same group.
static std::shared_ptr<Policy> anyOf(
Affinity&& affinity,
Allocator* allocator = Allocator::Default);

// oneOf() returns a Policy that returns an affinity with a single enabled
// core from affinity. The single enabled core in the Policy's returned
// affinity is:
// affinity[threadId % affinity.count()]
static std::shared_ptr<Policy> oneOf(
Affinity&& affinity,
Allocator* allocator = Allocator::Default);

// get() returns the thread Affinity for the for the given thread by id.
virtual Affinity get(uint32_t threadId, Allocator* allocator) const = 0;
};

Affinity(Allocator*);
Affinity(Affinity&&);
Affinity(const Affinity&, Allocator* allocator);

// all() returns an Affinity with all the cores available to the process.
static Affinity all(Allocator* allocator = Allocator::Default);

Affinity(std::initializer_list<Core>, Allocator* allocator);

// count() returns the number of enabled cores in the affinity.
size_t count() const;

// operator[] returns the i'th enabled core from this affinity.
Core operator[](size_t index) const;

// add() adds the cores from the given affinity to this affinity.
// This affinity is returned to allow for fluent calls.
Affinity& add(const Affinity&);

// remove() removes the cores from the given affinity from this affinity.
// This affinity is returned to allow for fluent calls.
Affinity& remove(const Affinity&);

private:
Affinity(const Affinity&) = delete;

containers::vector<Core, 32> cores;
};

Thread() = default;
Thread(Thread&&);
Thread& operator=(Thread&&);

// Start a new thread that calls func.
// logicalCpuHint is a hint to run the thread on the specified logical CPU.
// logicalCpuHint may be entirely ignored.
Thread(unsigned int logicalCpuHint, const Func& func);
// Start a new thread using the given affinity that calls func.
Thread(Affinity&& affinity, Func&& func);

~Thread();

Expand All @@ -59,6 +142,18 @@ class Thread {
Impl* impl = nullptr;
};

////////////////////////////////////////////////////////////////////////////////
// Thread::Core
////////////////////////////////////////////////////////////////////////////////
// Comparison functions
bool Thread::Core::operator==(const Core& other) const {
return pthread.index == other.pthread.index;
}

bool Thread::Core::operator<(const Core& other) const {
return pthread.index < other.pthread.index;
}

} // namespace marl

#endif // marl_thread_h
2 changes: 1 addition & 1 deletion src/memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <cstring>

#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
#include <sys/mman.h>
#include <unistd.h>
namespace {
Expand Down
18 changes: 13 additions & 5 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ void Scheduler::unbind() {
}

Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} {
if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
cfg.workerThread.affinityPolicy = Thread::Affinity::Policy::anyOf(
Thread::Affinity::all(cfg.allocator), cfg.allocator);
}

for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1;
}
Expand Down Expand Up @@ -403,8 +408,11 @@ Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)

void Scheduler::Worker::start() {
switch (mode) {
case Mode::MultiThreaded:
thread = Thread(id, [=] {
case Mode::MultiThreaded: {
auto allocator = scheduler->cfg.allocator;
auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
auto affinity = affinityPolicy->get(id, allocator);
thread = Thread(std::move(affinity), [=] {
Thread::setName("Thread<%.2d>", int(id));

if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
Expand All @@ -423,13 +431,13 @@ void Scheduler::Worker::start() {
Worker::current = nullptr;
});
break;

case Mode::SingleThreaded:
}
case Mode::SingleThreaded: {
Worker::current = this;
mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
currentFiber = mainFiber.get();
break;

}
default:
MARL_ASSERT(false, "Unknown mode: %d", int(mode));
}
Expand Down
22 changes: 22 additions & 0 deletions src/scheduler_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,25 @@ BENCHMARK_DEFINE_F(Schedule, SomeWork)
});
}
BENCHMARK_REGISTER_F(Schedule, SomeWork)->Apply(Schedule::args);

BENCHMARK_DEFINE_F(Schedule, SomeWorkWorkerAffinityOneOf)
(benchmark::State& state) {
marl::Scheduler::Config cfg;
cfg.setWorkerThreadAffinityPolicy(
marl::Thread::Affinity::Policy::oneOf(marl::Thread::Affinity::all()));
run(state, cfg, [&](int numTasks) {
for (auto _ : state) {
marl::WaitGroup wg;
wg.add(numTasks);
for (auto i = 0; i < numTasks; i++) {
marl::schedule([=] {
benchmark::DoNotOptimize(doSomeWork(i));
wg.done();
});
}
wg.wait();
}
});
}
BENCHMARK_REGISTER_F(Schedule, SomeWorkWorkerAffinityOneOf)
->Apply(Schedule::args);
Loading

0 comments on commit 325b072

Please sign in to comment.