diff --git a/include/marl/containers.h b/include/marl/containers.h index 2eaee0b..1147a97 100644 --- a/include/marl/containers.h +++ b/include/marl/containers.h @@ -22,18 +22,66 @@ #include // size_t #include // std::move +#include +#include +#include +#include +#include + namespace marl { namespace containers { +//////////////////////////////////////////////////////////////////////////////// +// STL wrappers +// STL containers that use a marl::StlAllocator backed by a marl::Allocator. +// Note: These may be re-implemented to optimize for marl's usage cases. +// See: https://github.com/google/marl/issues/129 +//////////////////////////////////////////////////////////////////////////////// +template +using deque = std::deque>; + +template > +using map = std::map>>; + +template > +using set = std::set>; + +template , + typename E = std::equal_to> +using unordered_map = + std::unordered_map>>; + +template , typename E = std::equal_to> +using unordered_set = std::unordered_set>; + +// take() takes and returns the front value from the deque. +template +inline T take(deque& queue) { + auto out = std::move(queue.front()); + queue.pop_front(); + return out; +} + +// take() takes and returns the first value from the unordered_set. +template +inline T take(unordered_set& set) { + auto it = set.begin(); + auto out = std::move(*it); + set.erase(it); + return out; +} + //////////////////////////////////////////////////////////////////////////////// // vector //////////////////////////////////////////////////////////////////////////////// // vector is a container of contiguously stored elements. -// Unlike std::vector, marl::containers::vector keeps the first BASE_CAPACITY -// elements internally, which will avoid dynamic heap allocations. -// Once the vector exceeds BASE_CAPACITY elements, vector will allocate storage -// from the heap. +// Unlike std::vector, marl::containers::vector keeps the first +// BASE_CAPACITY elements internally, which will avoid dynamic heap +// allocations. Once the vector exceeds BASE_CAPACITY elements, vector will +// allocate storage from the heap. template class vector { public: @@ -74,6 +122,10 @@ class vector { inline size_t cap() const; inline void resize(size_t n); inline void reserve(size_t n); + inline T* data(); + inline const T* data() const; + + Allocator* const allocator; private: using TStorage = typename marl::aligned_storage::type; @@ -82,7 +134,6 @@ class vector { inline void free(); - Allocator* const allocator; size_t count = 0; size_t capacity = BASE_CAPACITY; TStorage buffer[BASE_CAPACITY]; @@ -272,6 +323,16 @@ void vector::reserve(size_t n) { } } +template +T* vector::data() { + return elements; +} + +template +const T* vector::data() const { + return elements; +} + template void vector::free() { for (size_t i = 0; i < count; i++) { diff --git a/include/marl/memory.h b/include/marl/memory.h index 8608851..b1bd631 100644 --- a/include/marl/memory.h +++ b/include/marl/memory.h @@ -27,6 +27,9 @@ namespace marl { +template +struct StlAllocator; + // pageSize() returns the size in bytes of a virtual memory page for the host // system. size_t pageSize(); @@ -36,6 +39,19 @@ inline T alignUp(T val, T alignment) { return alignment * ((val + alignment - 1) / alignment); } +// aligned_storage() is a replacement for std::aligned_storage that isn't busted +// on older versions of MSVC. +template +struct aligned_storage { + struct alignas(ALIGNMENT) type { + unsigned char data[SIZE]; + }; +}; + +/////////////////////////////////////////////////////////////////////////////// +// Allocation +/////////////////////////////////////////////////////////////////////////////// + // Allocation holds the result of a memory allocation from an Allocator. struct Allocation { // Intended usage of the allocation. Used for allocation trackers. @@ -45,6 +61,7 @@ struct Allocation { Create, // Allocator::create(), make_unique(), make_shared() Vector, // marl::containers::vector List, // marl::containers::list + Stl, // marl::StlAllocator Count, // Not intended to be used as a usage type - used for upper bound. }; @@ -60,6 +77,10 @@ struct Allocation { Request request; // Request used for the allocation. }; +/////////////////////////////////////////////////////////////////////////////// +// Allocator +/////////////////////////////////////////////////////////////////////////////// + // Allocator is an interface to a memory allocator. // Marl provides a default implementation with Allocator::Default. class Allocator { @@ -183,14 +204,9 @@ std::shared_ptr Allocator::make_shared(ARGS&&... args) { return std::shared_ptr(reinterpret_cast(alloc.ptr), Deleter{this}); } -// aligned_storage() is a replacement for std::aligned_storage that isn't busted -// on older versions of MSVC. -template -struct aligned_storage { - struct alignas(ALIGNMENT) type { - unsigned char data[SIZE]; - }; -}; +/////////////////////////////////////////////////////////////////////////////// +// TrackedAllocator +/////////////////////////////////////////////////////////////////////////////// // TrackedAllocator wraps an Allocator to track the allocations made. class TrackedAllocator : public Allocator { @@ -280,6 +296,103 @@ void TrackedAllocator::free(const Allocation& allocation) { return allocator->free(allocation); } +/////////////////////////////////////////////////////////////////////////////// +// StlAllocator +/////////////////////////////////////////////////////////////////////////////// + +// StlAllocator exposes an STL-compatible allocator for a given a +// marl::Allocator. +template +struct StlAllocator { + using value_type = T; + using pointer = T*; + using const_pointer = const T*; + using reference = T&; + using const_reference = const T&; + using size_type = size_t; + using difference_type = size_t; + + // An equivalent STL allocator for a different type. + template + struct rebind { + typedef StlAllocator other; + }; + + // Creation of the allocator is allowed, but if anyone were to try to use + // an object with a null allocator, it would fail. This however allows us to + // default-construct a bunch of objects in a container, and fill, in their + // allocators on first use. + // StlAllocator() : allocator(nullptr) {} + + // All allocations will be done through this allocator. It must remain + // valid until this StlAllocator and all allocators created + // from it have been destroyed. + StlAllocator(Allocator* allocator) : allocator(allocator) {} + + template + StlAllocator(const StlAllocator& other) { + allocator = other.allocator; + } + + // Returns the actual address of x even in presence of overloaded operator&. + pointer address(reference x) const { return &x; } + const_pointer address(const_reference x) const { return &x; } + + // Allocates the memory for n objects of type T. Does not + // actually construct the objects. + T* allocate(std::size_t n) { + auto alloc = allocator->allocate(request(n)); + return reinterpret_cast(alloc.ptr); + } + + // Deallocates the memory for n Objects of size T. + void deallocate(T* p, std::size_t n) { + Allocation alloc; + alloc.ptr = p; + alloc.request = request(n); + allocator->free(alloc); + } + + // Returns the maximum theoretically possible number of T stored in this + // allocator. + size_type max_size() const { + return std::numeric_limits::max() / sizeof(value_type); + } + + // Copy constructs an object of type T at the location given by p. + void construct(pointer p, const_reference val) { new (p) T(val); } + + // Constructs an object of Type U at the location given by P passing + // through all other arguments to the constructor. + template + void construct(U* p, Args&&... args) { + ::new ((void*)p) U(std::forward(args)...); + } + + // Deconstructs the object at p. It does not free the memory. + void destroy(pointer p) { ((T*)p)->~T(); } + + // Deconstructs the object at p. It does not free the memory. + template + void destroy(U* p) { + p->~U(); + } + + private: + template + Allocation::Request request(size_t n) const { + Allocation::Request req = {}; + req.size = sizeof(T) * n; + req.alignment = alignof(T); + req.usage = Allocation::Usage::Stl; + return req; + } + + template + friend struct StlAllocator; + Allocator* allocator; +}; + } // namespace marl #endif // marl_memory_h diff --git a/include/marl/pool.h b/include/marl/pool.h index 70d53c9..41a37c8 100644 --- a/include/marl/pool.h +++ b/include/marl/pool.h @@ -363,7 +363,7 @@ class UnboundedPool : public Pool { Allocator* allocator; marl::mutex mutex; - std::vector items; + containers::vector items; Item* free = nullptr; }; @@ -373,7 +373,7 @@ class UnboundedPool : public Pool { template UnboundedPool::Storage::Storage(Allocator* allocator) - : allocator(allocator) {} + : allocator(allocator), items(allocator) {} template UnboundedPool::Storage::~Storage() { diff --git a/include/marl/scheduler.h b/include/marl/scheduler.h index 5880673..12cc8c1 100644 --- a/include/marl/scheduler.h +++ b/include/marl/scheduler.h @@ -15,6 +15,7 @@ #ifndef marl_scheduler_h #define marl_scheduler_h +#include "containers.h" #include "debug.h" #include "deprecated.h" #include "memory.h" @@ -26,14 +27,8 @@ #include #include #include -#include #include -#include -#include #include -#include -#include -#include namespace marl { @@ -291,6 +286,8 @@ class Scheduler { // WaitingFibers holds all the fibers waiting on a timeout. struct WaitingFibers { + inline WaitingFibers(Allocator*); + // operator bool() returns true iff there are any wait fibers. inline operator bool() const; @@ -317,15 +314,15 @@ class Scheduler { Fiber* fiber; inline bool operator<(const Timeout&) const; }; - std::set timeouts; - std::unordered_map fibers; + containers::set> timeouts; + containers::unordered_map fibers; }; // TODO: Implement a queue that recycles elements to reduce number of // heap allocations. - using TaskQueue = std::deque; - using FiberQueue = std::deque; - using FiberSet = std::unordered_set; + using TaskQueue = containers::deque; + using FiberQueue = containers::deque; + using FiberSet = containers::unordered_set; // Workers executes Tasks on a single thread. // Once a task is started, it may yield to other tasks on the same Worker. @@ -437,6 +434,8 @@ class Scheduler { // Work holds tasks and fibers that are enqueued on the Worker. struct Work { + inline Work(Allocator*); + std::atomic num = {0}; // tasks.size() + fibers.size() GUARDED_BY(mutex) uint64_t numBlockedFibers = 0; GUARDED_BY(mutex) TaskQueue tasks; @@ -474,7 +473,7 @@ class Scheduler { Thread thread; Work work; FiberSet idleFibers; // Fibers that have completed which can be reused. - std::vector> + containers::vector, 16> workerFibers; // All fibers created by this worker. FastRnd rng; bool shutdown = false; @@ -506,8 +505,11 @@ class Scheduler { std::array workerThreads; struct SingleThreadedWorkers { + inline SingleThreadedWorkers(Allocator*); + using WorkerByTid = - std::unordered_map>; + containers::unordered_map>; marl::mutex mutex; GUARDED_BY(mutex) std::condition_variable unbind; GUARDED_BY(mutex) WorkerByTid byTid; diff --git a/src/event_bench.cpp b/src/event_bench.cpp index dc8f569..fb3d872 100644 --- a/src/event_bench.cpp +++ b/src/event_bench.cpp @@ -14,20 +14,22 @@ #include "marl_bench.h" +#include "marl/containers.h" #include "marl/event.h" #include "benchmark/benchmark.h" -#include - BENCHMARK_DEFINE_F(Schedule, Event)(benchmark::State& state) { run(state, [&](int numTasks) { for (auto _ : state) { - std::vector events(numTasks + 1); + marl::containers::vector events; + events.resize(numTasks + 1); for (auto i = 0; i < numTasks; i++) { + marl::Event prev = events[i]; + marl::Event next = events[i + 1]; marl::schedule([=] { - events[i].wait(); - events[i + 1].signal(); + prev.wait(); + next.signal(); }); } events.front().signal(); diff --git a/src/event_test.cpp b/src/event_test.cpp index 0250329..bbc9102 100644 --- a/src/event_test.cpp +++ b/src/event_test.cpp @@ -18,6 +18,8 @@ #include "marl_test.h" +#include + namespace std { namespace chrono { template @@ -28,9 +30,7 @@ std::ostream& operator<<(std::ostream& os, const duration& d) { } // namespace std TEST_P(WithBoundScheduler, EventIsSignalled) { - std::vector modes = {marl::Event::Mode::Manual, - marl::Event::Mode::Auto}; - for (auto mode : modes) { + for (auto mode : {marl::Event::Mode::Manual, marl::Event::Mode::Auto}) { auto event = marl::Event(mode); ASSERT_EQ(event.isSignalled(), false); event.signal(); @@ -99,9 +99,7 @@ TEST_P(WithBoundScheduler, EventManualWait) { } TEST_P(WithBoundScheduler, EventSequence) { - std::vector modes = {marl::Event::Mode::Manual, - marl::Event::Mode::Auto}; - for (auto mode : modes) { + for (auto mode : {marl::Event::Mode::Manual, marl::Event::Mode::Auto}) { std::string sequence; auto eventA = marl::Event(mode); auto eventB = marl::Event(mode); @@ -216,7 +214,7 @@ TEST_P(WithBoundScheduler, EventWaitStressTest) { TEST_P(WithBoundScheduler, EventAny) { for (int i = 0; i < 3; i++) { - std::vector events = { + std::array events = { marl::Event(marl::Event::Mode::Auto), marl::Event(marl::Event::Mode::Auto), marl::Event(marl::Event::Mode::Auto), diff --git a/src/memory_test.cpp b/src/memory_test.cpp index 8aa6334..f760934 100644 --- a/src/memory_test.cpp +++ b/src/memory_test.cpp @@ -22,14 +22,11 @@ class AllocatorTest : public testing::Test { }; TEST_F(AllocatorTest, AlignedAllocate) { - std::vector guards = {false, true}; - std::vector sizes = {1, 2, 3, 4, 5, 7, 8, 14, 16, 17, - 31, 34, 50, 63, 64, 65, 100, 127, 128, 129, - 200, 255, 256, 257, 500, 511, 512, 513}; - std::vector alignments = {1, 2, 4, 8, 16, 32, 64, 128}; - for (auto useGuards : guards) { - for (auto alignment : alignments) { - for (auto size : sizes) { + for (auto useGuards : {false, true}) { + for (auto alignment : {1, 2, 4, 8, 16, 32, 64, 128}) { + for (auto size : {1, 2, 3, 4, 5, 7, 8, 14, 16, 17, + 31, 34, 50, 63, 64, 65, 100, 127, 128, 129, + 200, 255, 256, 257, 500, 511, 512, 513}) { marl::Allocation::Request request; request.alignment = alignment; request.size = size; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 852d74a..66440eb 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -59,21 +59,6 @@ inline uint64_t threadID() { } #endif -template -inline T take(std::deque& queue) { - auto out = std::move(queue.front()); - queue.pop_front(); - return out; -} - -template -inline T take(std::unordered_set& set) { - auto it = set.begin(); - auto out = std::move(*it); - set.erase(it); - return out; -} - inline void nop() { #if defined(_WIN32) __nop(); @@ -127,12 +112,12 @@ void Scheduler::unbind() { bound = nullptr; } -Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} { +Scheduler::Scheduler(const Config& config) + : cfg(config), workerThreads{}, singleThreadedWorkers(config.allocator) { if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) { cfg.workerThread.affinityPolicy = Thread::Affinity::Policy::anyOf( Thread::Affinity::all(cfg.allocator), cfg.allocator); } - for (size_t i = 0; i < spinningWorkers.size(); i++) { spinningWorkers[i] = -1; } @@ -167,7 +152,7 @@ Scheduler::~Scheduler() { #if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */) - : workerThreads{} { + : workerThreads{}, singleThreadedWorkers(allocator) { cfg.allocator = allocator; for (size_t i = 0; i < spinningWorkers.size(); i++) { spinningWorkers[i] = -1; @@ -343,6 +328,9 @@ const char* Scheduler::Fiber::toString(State state) { //////////////////////////////////////////////////////////////////////////////// // Scheduler::WaitingFibers //////////////////////////////////////////////////////////////////////////////// +Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator) + : timeouts(allocator), fibers(allocator) {} + Scheduler::WaitingFibers::operator bool() const { return !fibers.empty(); } @@ -404,7 +392,11 @@ bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const { thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr; Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id) - : id(id), mode(mode), scheduler(scheduler) {} + : id(id), + mode(mode), + scheduler(scheduler), + work(scheduler->cfg.allocator), + idleFibers(scheduler->cfg.allocator) {} void Scheduler::Worker::start() { switch (mode) { @@ -525,12 +517,12 @@ void Scheduler::Worker::suspend( if (!work.fibers.empty()) { // There's another fiber that has become unblocked, resume that. work.num--; - auto to = take(work.fibers); + auto to = containers::take(work.fibers); ASSERT_FIBER_STATE(to, Fiber::State::Queued); switchToFiber(to); } else if (!idleFibers.empty()) { // There's an old fiber we can reuse, resume that. - auto to = take(idleFibers); + auto to = containers::take(idleFibers); ASSERT_FIBER_STATE(to, Fiber::State::Idle); switchToFiber(to); } else { @@ -605,7 +597,7 @@ bool Scheduler::Worker::steal(Task& out) { return false; } work.num--; - out = take(work.tasks); + out = containers::take(work.tasks); work.mutex.unlock(); return true; } @@ -722,7 +714,7 @@ void Scheduler::Worker::runUntilIdle() { while (!work.fibers.empty()) { work.num--; - auto fiber = take(work.fibers); + auto fiber = containers::take(work.fibers); // Sanity checks, MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle"); MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running"); @@ -739,7 +731,7 @@ void Scheduler::Worker::runUntilIdle() { if (!work.tasks.empty()) { work.num--; - auto task = take(work.tasks); + auto task = containers::take(work.tasks); work.mutex.unlock(); // Run the task. @@ -760,7 +752,7 @@ Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() { auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, FiberStackSize, [&]() REQUIRES(work.mutex) { run(); }); auto ptr = fiber.get(); - workerFibers.push_back(std::move(fiber)); + workerFibers.emplace_back(std::move(fiber)); return ptr; } @@ -776,6 +768,9 @@ void Scheduler::Worker::switchToFiber(Fiber* to) { //////////////////////////////////////////////////////////////////////////////// // Scheduler::Worker::Work //////////////////////////////////////////////////////////////////////////////// +Scheduler::Worker::Work::Work(Allocator* allocator) + : tasks(allocator), fibers(allocator), waiting(allocator) {} + template void Scheduler::Worker::Work::wait(F&& f) { notifyAdded = true; @@ -787,4 +782,10 @@ void Scheduler::Worker::Work::wait(F&& f) { notifyAdded = false; } +//////////////////////////////////////////////////////////////////////////////// +// Scheduler::Worker::Work +//////////////////////////////////////////////////////////////////////////////// +Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator) + : byTid(allocator) {} + } // namespace marl diff --git a/src/scheduler_test.cpp b/src/scheduler_test.cpp index c268c3b..3bf8849 100644 --- a/src/scheduler_test.cpp +++ b/src/scheduler_test.cpp @@ -14,12 +14,12 @@ #include "marl_test.h" +#include "marl/containers.h" #include "marl/defer.h" #include "marl/event.h" #include "marl/waitgroup.h" #include -#include TEST_F(WithoutBoundScheduler, SchedulerConstructAndDestruct) { auto scheduler = std::unique_ptr( @@ -120,9 +120,9 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) { marl::WaitGroup fence(1); marl::WaitGroup wg(num_threads); - std::vector threads; + marl::containers::vector threads; for (int i = 0; i < num_threads; i++) { - threads.push_back(std::thread([=] { + threads.emplace_back(std::thread([=] { scheduler->bind(); defer(scheduler->unbind()); @@ -151,7 +151,7 @@ TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) { defer(scheduler->unbind()); std::mutex mutex; - std::unordered_set threads; + marl::containers::unordered_set threads(allocator); marl::WaitGroup wg; for (int i = 0; i < 10000; i++) { wg.add(1); diff --git a/src/thread.cpp b/src/thread.cpp index 81f4638..80ce77f 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -19,7 +19,6 @@ #include "marl/trace.h" #include // std::sort -#include #include #include @@ -27,9 +26,9 @@ #if defined(_WIN32) #define WIN32_LEAN_AND_MEAN 1 #include +#include #include // mbstowcs #include // std::numeric_limits -#include #undef max #elif defined(__APPLE__) #include @@ -81,9 +80,14 @@ struct ProcessorGroup { KAFFINITY affinity; // affinity mask. }; -const std::vector& getProcessorGroups() { - static std::vector groups = [] { - std::vector out; +struct ProcessorGroups { + std::array groups; + size_t count; +}; + +const ProcessorGroups& getProcessorGroups() { + static ProcessorGroups groups = [] { + ProcessorGroups out = {}; SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {}; DWORD size = sizeof(info); CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size)); @@ -93,9 +97,9 @@ const std::vector& getProcessorGroups() { auto groupCount = info[i].Group.ActiveGroupCount; for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) { auto const& groupInfo = info[i].Group.GroupInfo[groupIdx]; - out.emplace_back(ProcessorGroup{groupInfo.ActiveProcessorCount, - groupInfo.ActiveProcessorMask}); - MARL_ASSERT(out.size() <= MaxGroupCount, "Group index overflow"); + out.groups[out.count++] = ProcessorGroup{ + groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask}; + MARL_ASSERT(out.count <= MaxGroupCount, "Group index overflow"); } } } @@ -129,17 +133,17 @@ Thread::Affinity Thread::Affinity::all( Thread::Affinity affinity(allocator); #if defined(_WIN32) - decltype(Core::windows.group) groupIndex = 0; - for (auto group : getProcessorGroups()) { + const auto& groups = getProcessorGroups(); + for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) { + const auto& group = groups.groups[groupIdx]; Core core; - core.windows.group = groupIndex; - for (size_t i = 0; i < sizeof(KAFFINITY) * 8; i++) { - if ((group.affinity >> i) & 1) { - core.windows.index = static_cast(i); + core.windows.group = static_cast(groupIdx); + for (size_t coreIdx = 0; coreIdx < sizeof(KAFFINITY) * 8; coreIdx++) { + if ((group.affinity >> coreIdx) & 1) { + core.windows.index = static_cast(coreIdx); affinity.cores.emplace_back(std::move(core)); } } - groupIndex++; } #elif defined(__linux__) auto thread = pthread_self(); @@ -233,7 +237,7 @@ Thread::Core Thread::Affinity::operator[](size_t index) const { } Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) { - std::unordered_set set; + containers::unordered_set set(cores.allocator); for (auto core : cores) { set.emplace(core); } @@ -247,7 +251,7 @@ Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) { } Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) { - std::unordered_set set; + containers::unordered_set set(cores.allocator); for (auto core : other.cores) { set.emplace(core); } @@ -341,7 +345,9 @@ void Thread::setName(const char* fmt, ...) { unsigned int Thread::numLogicalCPUs() { unsigned int count = 0; - for (auto& group : getProcessorGroups()) { + const auto& groups = getProcessorGroups(); + for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) { + const auto& group = groups.groups[groupIdx]; count += group.count; } return count; diff --git a/src/trace.cpp b/src/trace.cpp index 702aa74..e238523 100644 --- a/src/trace.cpp +++ b/src/trace.cpp @@ -28,7 +28,6 @@ #include #include -#include namespace {