Skip to content

Commit

Permalink
refs #27: Refactor FixedRing.
Browse files Browse the repository at this point in the history
 * Removes non-necessary "default_value" template argument for
   FixedRing.

 * Forbids implicit mis-use of default constructor of FixedRing.

 * Adds "NEW" macro that allocates a memory block using NUMA-aware
   malloc and then calls the constructor on the pointer to it.
  • Loading branch information
achimnol committed Jan 22, 2016
1 parent 0adf111 commit bab412b
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 132 deletions.
8 changes: 5 additions & 3 deletions elements/loadbalancers/LoadBalanceThruput.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ namespace nba {

class LoadBalanceThruput : public SchedulableElement, PerBatchElement {
public:
LoadBalanceThruput() : SchedulableElement(), PerBatchElement(), direction(0), thruput_history()
LoadBalanceThruput() : SchedulableElement(), PerBatchElement(),
direction(0), thruput_history(nullptr)
{ }

virtual ~LoadBalanceThruput()
Expand All @@ -38,7 +39,8 @@ public:
rte_atomic64_set(&cpu_ratio, 1000);
rte_atomic64_set(&update_count, 0);

thruput_history.init(3, ctx->loc.node_id);
NEW(ctx->loc.node_id, thruput_history, FixedRing<uint64_t>,
3, ctx->loc.node_id);
last_count = 0;
last_thruput = 0;
last_cpu_ratio_update = 0.1;
Expand Down Expand Up @@ -166,7 +168,7 @@ private:
double last_thruput;

uint64_t last_direction_changed;
FixedRing<uint64_t, 0> thruput_history;
FixedRing<uint64_t> *thruput_history;

static rte_atomic64_t cpu_ratio __rte_cache_aligned;
static rte_atomic64_t update_flag;
Expand Down
4 changes: 2 additions & 2 deletions elements/standards/Queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public:
int get_type() const { return SchedulableElement::get_type() | PerBatchElement::get_type(); }

int initialize() {
queue = new FixedRing<PacketBatch*, nullptr>(max_size, ctx->loc.node_id);
queue = new FixedRing<PacketBatch*>(max_size, ctx->loc.node_id);
return 0;
};
int initialize_global() { return 0; };
Expand Down Expand Up @@ -61,7 +61,7 @@ public:

private:
size_t max_size;
FixedRing<PacketBatch*, nullptr> *queue;
FixedRing<PacketBatch*> *queue;
};

EXPORT_ELEMENT(Queue);
Expand Down
5 changes: 5 additions & 0 deletions include/nba/core/intrinsic.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@

#define ALIGN_CEIL(x,a) (((x)+(a)-1)&~((a)-1))

#define NEW(node_id, ptr, cls, ...) { \
(ptr) = (cls*) rte_malloc_socket(nullptr, sizeof(cls), CACHE_LINE_SIZE, node_id); \
new (ptr) cls(__VA_ARGS__); \
}

namespace nba {

template<typename T>
Expand Down
100 changes: 42 additions & 58 deletions include/nba/core/queue.hh
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#ifndef __NBA_QUEUE_HH__
#define __NBA_QUEUE_HH__

#include <vector>
#include <cassert>

#include <nba/core/intrinsic.hh>
#include <rte_malloc.h>

namespace nba {
Expand Down Expand Up @@ -108,13 +107,13 @@ private:
T values[max_size];
};

template<class T, T const default_value>
template<class T>
class FixedRing;

template<class T, T const default_value>
template<class T>
class FixedRingIterator
{
using ContainerType = FixedRing<T, default_value>;
using ContainerType = FixedRing<T>;

const ContainerType *_p_ring;
unsigned _pos;
Expand Down Expand Up @@ -148,49 +147,62 @@ public:

T operator* () const
{
return _p_ring->get(_pos);
return _p_ring->at(_pos);
}
};

template<class T, T const default_value>
template<class T>
class FixedRing
{
using IterType = FixedRingIterator<T, default_value>;
using IterType = FixedRingIterator<T>;

public:
private:
// Disallow implicit default construction.
FixedRing()
: v_(nullptr), is_external(false), push_idx(0), pop_idx(0), count(0), max_size(0)
{
/* Default constructor. You must explicitly call init() to use the instance. */
}
: v_(nullptr), is_external(false), push_idx(0), pop_idx(0),
count(0), max_size(0)
{ }

FixedRing(size_t max_size, int numa_node = 0, T *xmem = nullptr)
: v_(nullptr), is_external(false), push_idx(0), pop_idx(0), count(0), max_size(max_size)
// Index-access is only accessible by iterator.
T at(unsigned i) const
{
init(max_size, numa_node, xmem);
return v_[(pop_idx + i) % max_size];
}

virtual ~FixedRing()
T *v_;
bool is_external;
size_t push_idx;
size_t pop_idx;
size_t count;
size_t max_size;

friend IterType;

public:
FixedRing(size_t max_size, unsigned numa_node)
: v_(nullptr), is_external(false), push_idx(0), pop_idx(0),
count(0), max_size(max_size)
{
if (v_ != nullptr && !is_external)
rte_free(v_);
assert(max_size > 0);
v_ = (T*) rte_malloc_socket("fixedring", sizeof(T) * max_size,
CACHE_LINE_SIZE, numa_node);
assert(v_ != nullptr);
}

void init(size_t max_size, int numa_node = 0, T *xmem = nullptr)
FixedRing(size_t max_size, T *xmem)
: v_(xmem), is_external(true), push_idx(0), pop_idx(0),
count(0), max_size(max_size)
{
assert(max_size > 0);
this->count = 0;
this->max_size = max_size;
if (xmem == nullptr) {
v_ = (T*) rte_malloc_socket("fixedring", sizeof(T) * max_size, 64, numa_node);
is_external = false;
} else {
v_ = xmem;
is_external = true;
}
assert(v_ != nullptr);
}

virtual ~FixedRing()
{
if (v_ != nullptr && !is_external)
rte_free(v_);
}

void push_back(T t)
{
assert(count < max_size);
Expand All @@ -210,26 +222,7 @@ public:

T front() const
{
if (!empty())
return v_[pop_idx];
return default_value;
}

T at(unsigned i) const
{
if (i >= count)
return default_value;
return v_[(pop_idx + i) % max_size];
}

T get(unsigned i) const
{
return at(i);
}

T operator[](const unsigned& i) const
{
return at(i);
return v_[pop_idx];
}

IterType begin() const
Expand All @@ -245,7 +238,6 @@ public:
void pop_front()
{
if (!empty()) {
v_[pop_idx] = default_value;
pop_idx = (pop_idx + 1) % max_size;
count --;
}
Expand All @@ -260,14 +252,6 @@ public:
{
return count;
}

private:
T *v_;
bool is_external;
size_t push_idx;
size_t pop_idx;
size_t count;
size_t max_size;
};

} /* endns(nba) */
Expand Down
5 changes: 3 additions & 2 deletions include/nba/element/element.hh
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ public:
};
offload_compute_handlers.insert({{"dummy", ch},});
}
finished_batches.init(MAX_FINBATCH_QLEN, -1, finished_batches_arrbuf);
NEW(0, finished_batches, FixedRing<PacketBatch*>,
MAX_FINBATCH_QLEN, finished_batches_arrbuf);
memset(tasks, 0, sizeof(OffloadTask *) * NBA_MAX_COPROCESSOR_TYPES);
}
virtual ~OffloadableElement() {}
Expand Down Expand Up @@ -256,7 +257,7 @@ public:

private:
OffloadTask *tasks[NBA_MAX_COPROCESSOR_TYPES];
FixedRing<PacketBatch*, nullptr> finished_batches;
FixedRing<PacketBatch*> *finished_batches;
PacketBatch *finished_batches_arrbuf[MAX_FINBATCH_QLEN];
void dummy_compute_handler(ComputeContext *ctx, struct resource_param *res);
};
Expand Down
3 changes: 1 addition & 2 deletions include/nba/engines/cuda/computecontext.hh
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ private:
size_t num_kernel_args;
struct kernel_arg kernel_args[CUDA_MAX_KERNEL_ARGS];

FixedRing<unsigned, 0> io_base_ring;
unsigned io_base_ring_buf[NBA_MAX_IO_BASES];
FixedRing<unsigned> *io_base_ring;
};

}
Expand Down
3 changes: 1 addition & 2 deletions include/nba/engines/dummy/computecontext.hh
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ private:
DummyCPUMemoryPool _cpu_mempool_in[NBA_MAX_IO_BASES];
DummyCPUMemoryPool _cpu_mempool_out[NBA_MAX_IO_BASES];

FixedRing<unsigned, 0> io_base_ring;
unsigned io_base_ring_buf[NBA_MAX_IO_BASES];
FixedRing<unsigned> *io_base_ring;
};

}
Expand Down
10 changes: 5 additions & 5 deletions include/nba/framework/elementgraph.hh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public:
/**
* Returns the list of all elements.
*/
const FixedRing<Element*, nullptr>& get_elements() const;
const FixedRing<Element*>& get_elements() const;

/**
* Free a packet batch.
Expand All @@ -106,20 +106,20 @@ private:
/**
* Used to book-keep element objects.
*/
FixedRing<Element *, nullptr> elements;
FixedRing<Element *> elements;

/**
* Book-keepers to avoid dynamic_cast and runtime type checks.
*/
FixedRing<SchedulableElement *, nullptr> sched_elements;
FixedRing<OffloadableElement *, nullptr> offl_elements;
FixedRing<SchedulableElement *> sched_elements;
FixedRing<OffloadableElement *> offl_elements;

/**
* Used to pass context objects when calling element handlers.
*/
comp_thread_context *ctx;

FixedRing<void *, nullptr> queue;
FixedRing<void *> queue;

/* Executes the element graph for the given batch and free it after
* processing. Internally it manages a queue to handle diverged paths
Expand Down
6 changes: 3 additions & 3 deletions include/nba/framework/threadcontext.hh
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public:
struct rte_mempool *packet_pool;
ElementGraph *elem_graph;
SystemInspector *inspector;
FixedRing<ComputeContext *, nullptr> cctx_list;
FixedRing<ComputeContext *> *cctx_list;
PacketBatch *input_batch;
DataBlock *datablock_registry[NBA_MAX_DATABLOCKS];

Expand Down Expand Up @@ -208,8 +208,8 @@ struct coproc_thread_context {
ComputeDevice *device;

struct ev_async *task_d2h_watcher;
FixedRing<OffloadTask *, nullptr> *d2h_pending_queue;
FixedRing<OffloadTask *, nullptr> *task_done_queue;
FixedRing<OffloadTask *> *d2h_pending_queue;
FixedRing<OffloadTask *> *task_done_queue;
struct ev_async *task_done_watcher;

char _reserved2[64]; // to prevent false-sharing
Expand Down
13 changes: 7 additions & 6 deletions src/engines/cuda/computecontext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ CUDAComputeContext::CUDAComputeContext(unsigned ctx_id, ComputeDevice *mother)
type_name = "cuda";
size_t io_base_size = ALIGN_CEIL(IO_BASE_SIZE, getpagesize()); // TODO: read from config
cutilSafeCall(cudaStreamCreateWithFlags(&_stream, cudaStreamNonBlocking));
io_base_ring.init(NBA_MAX_IO_BASES, node_id, io_base_ring_buf);
NEW(node_id, io_base_ring, FixedRing<unsigned>,
NBA_MAX_IO_BASES, node_id);
for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) {
io_base_ring.push_back(i);
io_base_ring->push_back(i);
_cuda_mempool_in[i].init(io_base_size);
_cuda_mempool_out[i].init(io_base_size);
#ifdef USE_PHYS_CONT_MEMORY
Expand Down Expand Up @@ -85,9 +86,9 @@ CUDAComputeContext::~CUDAComputeContext()

io_base_t CUDAComputeContext::alloc_io_base()
{
if (io_base_ring.empty()) return INVALID_IO_BASE;
unsigned i = io_base_ring.front();
io_base_ring.pop_front();
if (io_base_ring->empty()) return INVALID_IO_BASE;
unsigned i = io_base_ring->front();
io_base_ring->pop_front();
return (io_base_t) i;
}

Expand Down Expand Up @@ -144,7 +145,7 @@ void CUDAComputeContext::clear_io_buffers(io_base_t io_base)
_cpu_mempool_out[i].reset();
_cuda_mempool_in[i].reset();
_cuda_mempool_out[i].reset();
io_base_ring.push_back(i);
io_base_ring->push_back(i);
}

int CUDAComputeContext::enqueue_memwrite_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size)
Expand Down
11 changes: 6 additions & 5 deletions src/engines/dummy/computecontext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ DummyComputeContext::DummyComputeContext(unsigned ctx_id, ComputeDevice *mother_
{
type_name = "dummy";
size_t io_base_size = 5 * 1024 * 1024; // TODO: read from config
io_base_ring.init(NBA_MAX_IO_BASES, node_id, io_base_ring_buf);
NEW(node_id, io_base_ring, FixedRing<unsigned>,
NBA_MAX_IO_BASES, node_id);
for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) {
_dev_mempool_in[i].init(io_base_size);
_dev_mempool_out[i].init(io_base_size);
Expand All @@ -30,9 +31,9 @@ DummyComputeContext::~DummyComputeContext()

io_base_t DummyComputeContext::alloc_io_base()
{
if (io_base_ring.empty()) return INVALID_IO_BASE;
unsigned i = io_base_ring.front();
io_base_ring.pop_front();
if (io_base_ring->empty()) return INVALID_IO_BASE;
unsigned i = io_base_ring->front();
io_base_ring->pop_front();
return (io_base_t) i;
}

Expand Down Expand Up @@ -90,7 +91,7 @@ void DummyComputeContext::clear_io_buffers(io_base_t io_base)
_cpu_mempool_out[i].reset();
_dev_mempool_in[i].reset();
_dev_mempool_out[i].reset();
io_base_ring.push_back(i);
io_base_ring->push_back(i);
}

int DummyComputeContext::enqueue_memwrite_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size)
Expand Down
4 changes: 2 additions & 2 deletions src/lib/coprocessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ void *coproc_loop(void *arg)
#endif

/* Initialize task queues. */
ctx->d2h_pending_queue = new FixedRing<OffloadTask *, nullptr>(256, ctx->loc.node_id);
ctx->task_done_queue = new FixedRing<OffloadTask *, nullptr>(256, ctx->loc.node_id);
ctx->d2h_pending_queue = new FixedRing<OffloadTask *>(256, ctx->loc.node_id);
ctx->task_done_queue = new FixedRing<OffloadTask *>(256, ctx->loc.node_id);

/* Initialize the event loop. */
ctx->loop = ev_loop_new(EVFLAG_AUTO | EVFLAG_NOSIGMASK);
Expand Down
Loading

0 comments on commit bab412b

Please sign in to comment.