diff --git a/elements/loadbalancers/LoadBalanceThruput.hh b/elements/loadbalancers/LoadBalanceThruput.hh index 7b56693..7113093 100644 --- a/elements/loadbalancers/LoadBalanceThruput.hh +++ b/elements/loadbalancers/LoadBalanceThruput.hh @@ -18,7 +18,8 @@ namespace nba { class LoadBalanceThruput : public SchedulableElement, PerBatchElement { public: - LoadBalanceThruput() : SchedulableElement(), PerBatchElement(), direction(0), thruput_history() + LoadBalanceThruput() : SchedulableElement(), PerBatchElement(), + direction(0), thruput_history(nullptr) { } virtual ~LoadBalanceThruput() @@ -38,7 +39,8 @@ public: rte_atomic64_set(&cpu_ratio, 1000); rte_atomic64_set(&update_count, 0); - thruput_history.init(3, ctx->loc.node_id); + NEW(ctx->loc.node_id, thruput_history, FixedRing, + 3, ctx->loc.node_id); last_count = 0; last_thruput = 0; last_cpu_ratio_update = 0.1; @@ -166,7 +168,7 @@ private: double last_thruput; uint64_t last_direction_changed; - FixedRing thruput_history; + FixedRing *thruput_history; static rte_atomic64_t cpu_ratio __rte_cache_aligned; static rte_atomic64_t update_flag; diff --git a/elements/standards/Queue.hh b/elements/standards/Queue.hh index feba4da..05d9f04 100644 --- a/elements/standards/Queue.hh +++ b/elements/standards/Queue.hh @@ -26,7 +26,7 @@ public: int get_type() const { return SchedulableElement::get_type() | PerBatchElement::get_type(); } int initialize() { - queue = new FixedRing(max_size, ctx->loc.node_id); + queue = new FixedRing(max_size, ctx->loc.node_id); return 0; }; int initialize_global() { return 0; }; @@ -61,7 +61,7 @@ public: private: size_t max_size; - FixedRing *queue; + FixedRing *queue; }; EXPORT_ELEMENT(Queue); diff --git a/include/nba/core/intrinsic.hh b/include/nba/core/intrinsic.hh index 3326f25..037802d 100644 --- a/include/nba/core/intrinsic.hh +++ b/include/nba/core/intrinsic.hh @@ -11,6 +11,11 @@ #define ALIGN_CEIL(x,a) (((x)+(a)-1)&~((a)-1)) +#define NEW(node_id, ptr, cls, ...) { \ + (ptr) = (cls*) rte_malloc_socket(nullptr, sizeof(cls), CACHE_LINE_SIZE, node_id); \ + new (ptr) cls(__VA_ARGS__); \ +} + namespace nba { template diff --git a/include/nba/core/queue.hh b/include/nba/core/queue.hh index 62e53e1..ae93bef 100644 --- a/include/nba/core/queue.hh +++ b/include/nba/core/queue.hh @@ -1,9 +1,8 @@ #ifndef __NBA_QUEUE_HH__ #define __NBA_QUEUE_HH__ -#include #include - +#include #include namespace nba { @@ -108,13 +107,13 @@ private: T values[max_size]; }; -template +template class FixedRing; -template +template class FixedRingIterator { - using ContainerType = FixedRing; + using ContainerType = FixedRing; const ContainerType *_p_ring; unsigned _pos; @@ -148,49 +147,62 @@ public: T operator* () const { - return _p_ring->get(_pos); + return _p_ring->at(_pos); } }; -template +template class FixedRing { - using IterType = FixedRingIterator; + using IterType = FixedRingIterator; -public: +private: + // Disallow implicit default construction. FixedRing() - : v_(nullptr), is_external(false), push_idx(0), pop_idx(0), count(0), max_size(0) - { - /* Default constructor. You must explicitly call init() to use the instance. */ - } + : v_(nullptr), is_external(false), push_idx(0), pop_idx(0), + count(0), max_size(0) + { } - FixedRing(size_t max_size, int numa_node = 0, T *xmem = nullptr) - : v_(nullptr), is_external(false), push_idx(0), pop_idx(0), count(0), max_size(max_size) + // Index-access is only accessible by iterator. + T at(unsigned i) const { - init(max_size, numa_node, xmem); + return v_[(pop_idx + i) % max_size]; } - virtual ~FixedRing() + T *v_; + bool is_external; + size_t push_idx; + size_t pop_idx; + size_t count; + size_t max_size; + + friend IterType; + +public: + FixedRing(size_t max_size, unsigned numa_node) + : v_(nullptr), is_external(false), push_idx(0), pop_idx(0), + count(0), max_size(max_size) { - if (v_ != nullptr && !is_external) - rte_free(v_); + assert(max_size > 0); + v_ = (T*) rte_malloc_socket("fixedring", sizeof(T) * max_size, + CACHE_LINE_SIZE, numa_node); + assert(v_ != nullptr); } - void init(size_t max_size, int numa_node = 0, T *xmem = nullptr) + FixedRing(size_t max_size, T *xmem) + : v_(xmem), is_external(true), push_idx(0), pop_idx(0), + count(0), max_size(max_size) { assert(max_size > 0); - this->count = 0; - this->max_size = max_size; - if (xmem == nullptr) { - v_ = (T*) rte_malloc_socket("fixedring", sizeof(T) * max_size, 64, numa_node); - is_external = false; - } else { - v_ = xmem; - is_external = true; - } assert(v_ != nullptr); } + virtual ~FixedRing() + { + if (v_ != nullptr && !is_external) + rte_free(v_); + } + void push_back(T t) { assert(count < max_size); @@ -210,26 +222,7 @@ public: T front() const { - if (!empty()) - return v_[pop_idx]; - return default_value; - } - - T at(unsigned i) const - { - if (i >= count) - return default_value; - return v_[(pop_idx + i) % max_size]; - } - - T get(unsigned i) const - { - return at(i); - } - - T operator[](const unsigned& i) const - { - return at(i); + return v_[pop_idx]; } IterType begin() const @@ -245,7 +238,6 @@ public: void pop_front() { if (!empty()) { - v_[pop_idx] = default_value; pop_idx = (pop_idx + 1) % max_size; count --; } @@ -260,14 +252,6 @@ public: { return count; } - -private: - T *v_; - bool is_external; - size_t push_idx; - size_t pop_idx; - size_t count; - size_t max_size; }; } /* endns(nba) */ diff --git a/include/nba/element/element.hh b/include/nba/element/element.hh index d365508..8d8b5da 100644 --- a/include/nba/element/element.hh +++ b/include/nba/element/element.hh @@ -224,7 +224,8 @@ public: }; offload_compute_handlers.insert({{"dummy", ch},}); } - finished_batches.init(MAX_FINBATCH_QLEN, -1, finished_batches_arrbuf); + NEW(0, finished_batches, FixedRing, + MAX_FINBATCH_QLEN, finished_batches_arrbuf); memset(tasks, 0, sizeof(OffloadTask *) * NBA_MAX_COPROCESSOR_TYPES); } virtual ~OffloadableElement() {} @@ -256,7 +257,7 @@ public: private: OffloadTask *tasks[NBA_MAX_COPROCESSOR_TYPES]; - FixedRing finished_batches; + FixedRing *finished_batches; PacketBatch *finished_batches_arrbuf[MAX_FINBATCH_QLEN]; void dummy_compute_handler(ComputeContext *ctx, struct resource_param *res); }; diff --git a/include/nba/engines/cuda/computecontext.hh b/include/nba/engines/cuda/computecontext.hh index 5eeab56..7474d0c 100644 --- a/include/nba/engines/cuda/computecontext.hh +++ b/include/nba/engines/cuda/computecontext.hh @@ -103,8 +103,7 @@ private: size_t num_kernel_args; struct kernel_arg kernel_args[CUDA_MAX_KERNEL_ARGS]; - FixedRing io_base_ring; - unsigned io_base_ring_buf[NBA_MAX_IO_BASES]; + FixedRing *io_base_ring; }; } diff --git a/include/nba/engines/dummy/computecontext.hh b/include/nba/engines/dummy/computecontext.hh index 07d9fc0..b091ddc 100644 --- a/include/nba/engines/dummy/computecontext.hh +++ b/include/nba/engines/dummy/computecontext.hh @@ -71,8 +71,7 @@ private: DummyCPUMemoryPool _cpu_mempool_in[NBA_MAX_IO_BASES]; DummyCPUMemoryPool _cpu_mempool_out[NBA_MAX_IO_BASES]; - FixedRing io_base_ring; - unsigned io_base_ring_buf[NBA_MAX_IO_BASES]; + FixedRing *io_base_ring; }; } diff --git a/include/nba/framework/elementgraph.hh b/include/nba/framework/elementgraph.hh index ae06774..e8d8092 100644 --- a/include/nba/framework/elementgraph.hh +++ b/include/nba/framework/elementgraph.hh @@ -92,7 +92,7 @@ public: /** * Returns the list of all elements. */ - const FixedRing& get_elements() const; + const FixedRing& get_elements() const; /** * Free a packet batch. @@ -106,20 +106,20 @@ private: /** * Used to book-keep element objects. */ - FixedRing elements; + FixedRing elements; /** * Book-keepers to avoid dynamic_cast and runtime type checks. */ - FixedRing sched_elements; - FixedRing offl_elements; + FixedRing sched_elements; + FixedRing offl_elements; /** * Used to pass context objects when calling element handlers. */ comp_thread_context *ctx; - FixedRing queue; + FixedRing queue; /* Executes the element graph for the given batch and free it after * processing. Internally it manages a queue to handle diverged paths diff --git a/include/nba/framework/threadcontext.hh b/include/nba/framework/threadcontext.hh index 51bb47e..c2d90b3 100644 --- a/include/nba/framework/threadcontext.hh +++ b/include/nba/framework/threadcontext.hh @@ -165,7 +165,7 @@ public: struct rte_mempool *packet_pool; ElementGraph *elem_graph; SystemInspector *inspector; - FixedRing cctx_list; + FixedRing *cctx_list; PacketBatch *input_batch; DataBlock *datablock_registry[NBA_MAX_DATABLOCKS]; @@ -208,8 +208,8 @@ struct coproc_thread_context { ComputeDevice *device; struct ev_async *task_d2h_watcher; - FixedRing *d2h_pending_queue; - FixedRing *task_done_queue; + FixedRing *d2h_pending_queue; + FixedRing *task_done_queue; struct ev_async *task_done_watcher; char _reserved2[64]; // to prevent false-sharing diff --git a/src/engines/cuda/computecontext.cc b/src/engines/cuda/computecontext.cc index 0aa1ec0..54ad901 100644 --- a/src/engines/cuda/computecontext.cc +++ b/src/engines/cuda/computecontext.cc @@ -23,9 +23,10 @@ CUDAComputeContext::CUDAComputeContext(unsigned ctx_id, ComputeDevice *mother) type_name = "cuda"; size_t io_base_size = ALIGN_CEIL(IO_BASE_SIZE, getpagesize()); // TODO: read from config cutilSafeCall(cudaStreamCreateWithFlags(&_stream, cudaStreamNonBlocking)); - io_base_ring.init(NBA_MAX_IO_BASES, node_id, io_base_ring_buf); + NEW(node_id, io_base_ring, FixedRing, + NBA_MAX_IO_BASES, node_id); for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) { - io_base_ring.push_back(i); + io_base_ring->push_back(i); _cuda_mempool_in[i].init(io_base_size); _cuda_mempool_out[i].init(io_base_size); #ifdef USE_PHYS_CONT_MEMORY @@ -85,9 +86,9 @@ CUDAComputeContext::~CUDAComputeContext() io_base_t CUDAComputeContext::alloc_io_base() { - if (io_base_ring.empty()) return INVALID_IO_BASE; - unsigned i = io_base_ring.front(); - io_base_ring.pop_front(); + if (io_base_ring->empty()) return INVALID_IO_BASE; + unsigned i = io_base_ring->front(); + io_base_ring->pop_front(); return (io_base_t) i; } @@ -144,7 +145,7 @@ void CUDAComputeContext::clear_io_buffers(io_base_t io_base) _cpu_mempool_out[i].reset(); _cuda_mempool_in[i].reset(); _cuda_mempool_out[i].reset(); - io_base_ring.push_back(i); + io_base_ring->push_back(i); } int CUDAComputeContext::enqueue_memwrite_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size) diff --git a/src/engines/dummy/computecontext.cc b/src/engines/dummy/computecontext.cc index 9936c6f..6a2eb77 100644 --- a/src/engines/dummy/computecontext.cc +++ b/src/engines/dummy/computecontext.cc @@ -9,7 +9,8 @@ DummyComputeContext::DummyComputeContext(unsigned ctx_id, ComputeDevice *mother_ { type_name = "dummy"; size_t io_base_size = 5 * 1024 * 1024; // TODO: read from config - io_base_ring.init(NBA_MAX_IO_BASES, node_id, io_base_ring_buf); + NEW(node_id, io_base_ring, FixedRing, + NBA_MAX_IO_BASES, node_id); for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) { _dev_mempool_in[i].init(io_base_size); _dev_mempool_out[i].init(io_base_size); @@ -30,9 +31,9 @@ DummyComputeContext::~DummyComputeContext() io_base_t DummyComputeContext::alloc_io_base() { - if (io_base_ring.empty()) return INVALID_IO_BASE; - unsigned i = io_base_ring.front(); - io_base_ring.pop_front(); + if (io_base_ring->empty()) return INVALID_IO_BASE; + unsigned i = io_base_ring->front(); + io_base_ring->pop_front(); return (io_base_t) i; } @@ -90,7 +91,7 @@ void DummyComputeContext::clear_io_buffers(io_base_t io_base) _cpu_mempool_out[i].reset(); _dev_mempool_in[i].reset(); _dev_mempool_out[i].reset(); - io_base_ring.push_back(i); + io_base_ring->push_back(i); } int DummyComputeContext::enqueue_memwrite_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size) diff --git a/src/lib/coprocessor.cc b/src/lib/coprocessor.cc index 9c35792..b03d3c1 100644 --- a/src/lib/coprocessor.cc +++ b/src/lib/coprocessor.cc @@ -161,8 +161,8 @@ void *coproc_loop(void *arg) #endif /* Initialize task queues. */ - ctx->d2h_pending_queue = new FixedRing(256, ctx->loc.node_id); - ctx->task_done_queue = new FixedRing(256, ctx->loc.node_id); + ctx->d2h_pending_queue = new FixedRing(256, ctx->loc.node_id); + ctx->task_done_queue = new FixedRing(256, ctx->loc.node_id); /* Initialize the event loop. */ ctx->loop = ev_loop_new(EVFLAG_AUTO | EVFLAG_NOSIGMASK); diff --git a/src/lib/element.cc b/src/lib/element.cc index e4edd61..b863f1e 100644 --- a/src/lib/element.cc +++ b/src/lib/element.cc @@ -256,16 +256,16 @@ int OffloadableElement::offload(ElementGraph *mother, PacketBatch *batch, int in int OffloadableElement::enqueue_batch(PacketBatch *batch) { - finished_batches.push_back(batch); + finished_batches->push_back(batch); return 0; } int OffloadableElement::dispatch(uint64_t loop_count, PacketBatch*& out_batch, uint64_t &next_delay) { /* Retrieve out_batch from the internal completion_queue. */ - if (finished_batches.size() > 0) { - out_batch = finished_batches.front(); - finished_batches.pop_front(); + if (finished_batches->size() > 0) { + out_batch = finished_batches->front(); + finished_batches->pop_front(); } else { out_batch = nullptr; } diff --git a/src/lib/elementgraph.cc b/src/lib/elementgraph.cc index fb60787..93a74e2 100644 --- a/src/lib/elementgraph.cc +++ b/src/lib/elementgraph.cc @@ -66,7 +66,7 @@ void ElementGraph::send_offload_task_to_device(OffloadTask *task) /* Start offloading! */ // TODO: create multiple cctx_list and access them via dev_idx for hetero-device systems. const int dev_idx = 0; - ComputeContext *cctx = ctx->cctx_list.front(); + ComputeContext *cctx = ctx->cctx_list->front(); assert(cctx != nullptr); #ifdef USE_NVPROF nvtxRangePush("offl_prepare"); @@ -726,7 +726,7 @@ int ElementGraph::validate() return 0; } -const FixedRing& ElementGraph::get_elements() const +const FixedRing& ElementGraph::get_elements() const { return elements; } diff --git a/src/main.cc b/src/main.cc index 13ae52a..c328cca 100644 --- a/src/main.cc +++ b/src/main.cc @@ -755,18 +755,20 @@ int main(int argc, char **argv) ctx->datablock_registry[dbid]->set_id(dbid); RTE_LOG(DEBUG, MAIN, " [%u] %s\n", dbid, ctx->datablock_registry[dbid]->name()); } - new (&ctx->cctx_list) FixedRing(2 * NBA_MAX_COPROCESSOR_TYPES, ctx->loc.node_id); + NEW(ctx->loc.node_id, ctx->cctx_list, FixedRing, + 2 * NBA_MAX_COPROCESSOR_TYPES, ctx->loc.node_id); for (unsigned k = 0, k_max = system_params["COPROC_CTX_PER_COMPTHREAD"]; k < k_max; k++) { ComputeContext *cctx = nullptr; cctx = device->get_available_context(); assert(cctx != nullptr); assert(cctx->state == ComputeContext::READY); - ctx->cctx_list.push_back(cctx); + ctx->cctx_list->push_back(cctx); } } } else { - new (&ctx->cctx_list) FixedRing(2 * NBA_MAX_COPROCESSOR_TYPES, ctx->loc.node_id); - assert(ctx->cctx_list.empty()); + NEW(ctx->loc.node_id, ctx->cctx_list, FixedRing, + 2 * NBA_MAX_COPROCESSOR_TYPES, ctx->loc.node_id); + assert(ctx->cctx_list->empty()); ctx->task_completion_queue = NULL; ctx->task_completion_watcher = NULL; ctx->coproc_ctx = NULL; diff --git a/tests/test_core_fixedarray.cc b/tests/test_core_fixedarray.cc index 0bafa13..9e39f91 100644 --- a/tests/test_core_fixedarray.cc +++ b/tests/test_core_fixedarray.cc @@ -25,49 +25,35 @@ TEST(CoreFixedArrayTest, Initialization) { TEST(CoreFixedRingTest, PushBack) { int buf[3]; - FixedRing A; - A.init(3, 0, buf); + FixedRing A(3, buf); EXPECT_EQ(0, A.size()); EXPECT_TRUE(A.empty()); A.push_back(1); EXPECT_EQ(1, A.front()); - EXPECT_EQ(1, A[0]); EXPECT_EQ(1, A.size()); EXPECT_FALSE(A.empty()); A.push_back(2); EXPECT_EQ(1, A.front()); - EXPECT_EQ(1, A[0]); - EXPECT_EQ(2, A[1]); EXPECT_EQ(2, A.size()); A.push_back(3); EXPECT_EQ(1, A.front()); - EXPECT_EQ(1, A[0]); - EXPECT_EQ(2, A[1]); - EXPECT_EQ(3, A[2]); EXPECT_EQ(3, A.size()); } TEST(CoreFixedRingTest, PushFront) { int buf[3]; - FixedRing B; - B.init(3, 0, buf); + FixedRing B(3, buf); EXPECT_EQ(0, B.size()); EXPECT_TRUE(B.empty()); B.push_front(1); EXPECT_EQ(1, B.front()); - EXPECT_EQ(1, B[0]); EXPECT_EQ(1, B.size()); EXPECT_FALSE(B.empty()); B.push_front(2); EXPECT_EQ(2, B.front()); - EXPECT_EQ(2, B[0]); - EXPECT_EQ(1, B[1]); EXPECT_EQ(2, B.size()); B.push_front(3); EXPECT_EQ(3, B.front()); - EXPECT_EQ(3, B[0]); - EXPECT_EQ(2, B[1]); - EXPECT_EQ(1, B[2]); EXPECT_EQ(3, B.size()); int correctB[] = {3, 2, 1}; for (auto&& p : enumerate(B)) @@ -76,16 +62,12 @@ TEST(CoreFixedRingTest, PushFront) { TEST(CoreFixedRingTest, MixedPushBackFront) { int buf[3]; - FixedRing C; - C.init(3, 0, buf); + FixedRing C(3, buf); EXPECT_EQ(0, C.size()); EXPECT_TRUE(C.empty()); C.push_back(1); C.push_back(2); C.push_front(3); - EXPECT_EQ(3, C[0]); - EXPECT_EQ(1, C[1]); - EXPECT_EQ(2, C[2]); EXPECT_EQ(3, C.size()); EXPECT_FALSE(C.empty()); int correctC[] = {3, 1, 2}; @@ -95,8 +77,7 @@ TEST(CoreFixedRingTest, MixedPushBackFront) { TEST(CoreFixedRingTest, MixedPushPop) { int buf[3]; - FixedRing D; - D.init(3, 0, buf); + FixedRing D(3, buf); EXPECT_EQ(0, D.size()); EXPECT_TRUE(D.empty()); D.push_back(1); @@ -111,23 +92,23 @@ TEST(CoreFixedRingTest, MixedPushPop) { // Unbalance push_back() and push_front() // so that internal pop_idx/push_idx goes // beyond the boundaries. - D.push_back(2); - EXPECT_EQ(2, D.front()); - D.push_front(3); - EXPECT_EQ(3, D.front()); - D.push_back(4); - EXPECT_EQ(3, D.front()); - int correctD[] = {3, 2, 4}; + D.push_back(x + 2); + EXPECT_EQ(x + 2, D.front()); + D.push_front(x + 3); + EXPECT_EQ(x + 3, D.front()); + D.push_back(x + 4); + EXPECT_EQ(x + 3, D.front()); + int correctD[] = {x + 3, x + 2, x + 4}; for (auto&& p : enumerate(D)) EXPECT_EQ(correctD[p.first], p.second); D.pop_front(); EXPECT_EQ(2, D.size()); EXPECT_FALSE(D.empty()); - EXPECT_EQ(2, D.front()); + EXPECT_EQ(x + 2, D.front()); D.pop_front(); EXPECT_EQ(1, D.size()); EXPECT_FALSE(D.empty()); - EXPECT_EQ(4, D.front()); + EXPECT_EQ(x + 4, D.front()); D.pop_front(); EXPECT_EQ(0, D.size()); EXPECT_TRUE(D.empty());