Skip to content

Commit

Permalink
refs #22: Implement continuous batches and skeleton for VectorElement.
Browse files Browse the repository at this point in the history
 * "Continuous batches"

   - Assumption: reordering inside a batch does not incur perf.overheads
     in higher-layer applications (e.g., TCP).

   - We move excluded packets to the end of batches by swapping them
     with the tail packets.
     This way, we can eliminate checks for "excluded" on iteration over
     packet batches everywhere!

   - This change improves the performance a little bit:
     5.7 Gbps => 5.9 Gbps for a single-core IPv4 64 B packets forwarding
     63 Gbps => 64 Gbps for a dual-node IPv4 64 B packets forwarding

 * Applied changes of output port selection API to IPv6/IPsec elements
   as well.

 * Implemented a (non-vectorized) version of VectorElement loop.
  • Loading branch information
achimnol committed Sep 16, 2015
1 parent 9fd00e0 commit 591fe18
Show file tree
Hide file tree
Showing 18 changed files with 241 additions and 110 deletions.
9 changes: 6 additions & 3 deletions configs/l2fwd-echo-branch-lv1.click
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
FromInput -> lv1_head :: RandomWeightedBranch({0}, {1});
lv1_head[0] -> L2Forward(method {2}) -> ToOutput();
lv1_head[1] -> L2Forward(method {2}) -> ToOutput();
//FromInput -> lv1_head :: RandomWeightedBranch({0}, {1});
//lv1_head[0] -> L2Forward(method {2}) -> ToOutput();
//lv1_head[1] -> L2Forward(method {2}) -> ToOutput();
FromInput -> lv1_head :: RandomWeightedBranch(0.3, 0.7);
lv1_head[0] -> L2Forward(method echoback) -> ToOutput();
lv1_head[1] -> L2Forward(method echoback) -> ToOutput();
1 change: 0 additions & 1 deletion elements/ip/IPRouterVec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ int IPRouterVec::process_vector(int input_port,
vec_mask_arg_t mask)
{
// A temporary scalar no-op implementation.
// TODO: implement
for (int i = 0; i < NBA_VECTOR_WIDTH; i++)
if (mask.m[i])
output(0).push(pkt_vec[i]);
Expand Down
5 changes: 4 additions & 1 deletion elements/ipsec/IPsecAES.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ int IPsecAES::process(int input_port, Packet *pkt)
AES_ctr128_encrypt(encrypt_ptr, encrypt_ptr, enc_size, &sa_entry->aes_key_t, esph->esp_iv, ecount_buf, &mode);
#endif
} else {
return DROP;
pkt->kill();
return 0;
}

output(0).push(pkt);
return 0;
}

Expand Down Expand Up @@ -222,6 +224,7 @@ size_t IPsecAES::get_desired_workgroup_size(const char *device_name) const

int IPsecAES::postproc(int input_port, void *custom_output, Packet *pkt)
{
output(0).push(pkt);
return 0;
}

Expand Down
5 changes: 4 additions & 1 deletion elements/ipsec/IPsecAuthHMACSHA1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ int IPsecAuthHMACSHA1::process(int input_port, Packet *pkt)
SHA1(hmac_buf, 64 + SHA_DIGEST_LENGTH, payload_out + payload_len);
// TODO: correctness check..
} else {
return DROP;
pkt->kill();
return 0;
}
output(0).push(pkt);
return 0;
}

Expand Down Expand Up @@ -213,6 +215,7 @@ size_t IPsecAuthHMACSHA1::get_desired_workgroup_size(const char *device_name) co

int IPsecAuthHMACSHA1::postproc(int input_port, void *custom_output, Packet *pkt)
{
output(0).push(pkt);
return 0;
}

Expand Down
7 changes: 5 additions & 2 deletions elements/ipsec/IPsecESPencap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ int IPsecESPencap::process(int input_port, Packet *pkt)
// TODO: Set src & dest of encaped pkt to ip addrs from configuration.

struct ether_hdr *ethh = (struct ether_hdr *) pkt->data();
if (ntohs(ethh->ether_type) != ETHER_TYPE_IPv4)
return DROP;
if (ntohs(ethh->ether_type) != ETHER_TYPE_IPv4) {
pkt->kill();
return 0;
}
struct iphdr *iph = (struct iphdr *) (ethh + 1);

struct ipaddr_pair pair;
Expand Down Expand Up @@ -119,6 +121,7 @@ int IPsecESPencap::process(int input_port, Packet *pkt)
iph->protocol = 0x32; // mark that this packet contains a secured payload.
iph->check = 0; // ignoring previous checksum.
iph->check = ip_fast_csum(iph, iph->ihl);
output(0).push(pkt);
return 0;
}

Expand Down
9 changes: 6 additions & 3 deletions elements/ipv6/CheckIP6Header.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ int CheckIP6Header::process(int input_port, Packet *pkt)
// Validate the packet header.
if (ntohs(ethh->ether_type) != ETHER_TYPE_IPv6) {
//RTE_LOG(DEBUG, ELEM, "CheckIP6Header: invalid packet type - %x\n", ntohs(ethh->ether_type));
return DROP;
pkt->kill();
return 0;
}

if ((iph->ip6_vfc & 0xf0) >> 4 != 6) // get the first 4 bits.
if ((iph->ip6_vfc & 0xf0) >> 4 != 6) { // get the first 4 bits.
pkt->kill();
return SLOWPATH;
}

// TODO: Discard illegal source addresses.

output(0).push(pkt);
return 0; // output port number: 0
}

Expand Down
4 changes: 3 additions & 1 deletion elements/ipv6/DecIP6HLIM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ int DecIP6HLIM::process(int input_port, Packet *pkt)
uint32_t checksum;

if (iph->ip6_hlim <= 1) {
return DROP;
pkt->kill();
return 0;
}

// Decrement TTL.
iph->ip6_hlim = htons(ntohs(iph->ip6_hlim) - 1);

output(0).push(pkt);
return 0;
}

Expand Down
14 changes: 10 additions & 4 deletions elements/ipv6/LookupIP6Route.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,29 @@ int LookupIP6Route::process(int input_port, Packet *pkt)
lookup_result = _table_ptr->lookup((reinterpret_cast<uint128_t*>(&dest_addr)));
//rte_rwlock_read_unlock(_rwlock_ptr);

if (lookup_result == 0xffff)
if (lookup_result == 0xffff) {
/* Could not find destination. Use the second output for "error" packets. */
return DROP;
pkt->kill();
return 0;
}

rr_port = (rr_port + 1) % num_tx_ports;
anno_set(&pkt->anno, NBA_ANNO_IFACE_OUT, rr_port);
output(0).push(pkt);
return 0;
}

int LookupIP6Route::postproc(int input_port, void *custom_output, Packet *pkt)
{
uint16_t lookup_result = *((uint16_t *)custom_output);
if (lookup_result == 0xffff)
if (lookup_result == 0xffff) {
/* Could not find destination. Use the second output for "error" packets. */
return DROP;
pkt->kill();
return 0;
}
rr_port = (rr_port + 1) % num_tx_ports;
anno_set(&pkt->anno, NBA_ANNO_IFACE_OUT, rr_port);
output(0).push(pkt);
return 0;
}

Expand Down
9 changes: 6 additions & 3 deletions elements/standards/RandomWeightedBranch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ int RandomWeightedBranch::process(int input_port, Packet *pkt)
float x = uniform_dist(random_generator);
int idx = 0;
for (auto cur = out_probs.begin(); cur != out_probs.end(); cur++) {
if(x < *cur)
return idx;
if(x < *cur) {
output(idx).push(pkt);
return 0;
}
idx++;
}
return idx-1;
output(idx - 1).push(pkt);
return 0;
}

// vim: ts=8 sts=4 sw=4 et
5 changes: 3 additions & 2 deletions include/nba/element/element.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct element_info {

class Element : public GraphMetaData {
private:
friend class Packet;

class OutputPort {
/** A simple utility class to emulate Click's output port. */

Expand All @@ -73,8 +75,7 @@ private:
/* We allow a packet to be pushed only once inside the process
* handler. If you want to push the same packet multiple times
* to different outputs, you MUST clone it. */
assert(pkt->output == -1);
pkt->output = my_idx;
pkt->mother->results[pkt->bidx] = my_idx;
if (pkt->cloned) {
/* Store the cloned packet separately. */
elem->output_cloned_packets[my_idx][elem->output_counts[my_idx]] = pkt;
Expand Down
12 changes: 8 additions & 4 deletions include/nba/element/packet.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ enum PacketDisposition {
};

class PacketBatch;
class Element;
class VectorElement;

/* We have to manage two memory pools:
* first for the original memory pool that our mbuf is allocated from.
Expand All @@ -36,11 +38,13 @@ private:
#ifdef DEBUG
uint32_t magic;
#endif
PacketBatch *mother;
struct rte_mbuf *base;
bool cloned;
int output;
int bidx;

friend class Element;
friend class VectorElement;
friend class DataBlock;

public:
Expand Down Expand Up @@ -80,7 +84,7 @@ public:
#ifdef DEBUG
magic(NBA_PACKET_MAGIC),
#endif
base((struct rte_mbuf *) base), cloned(false), output(PacketDisposition::DROP)
mother(mother), base((struct rte_mbuf *) base), cloned(false), bidx(-1)
{ }

~Packet() {
Expand All @@ -89,7 +93,7 @@ public:
}
}

inline void kill() { this->output = PacketDisposition::DROP; }
void kill();

inline unsigned char *data() { return rte_pktmbuf_mtod(base, unsigned char *); }
inline uint32_t length() { return rte_pktmbuf_data_len(base); }
Expand Down Expand Up @@ -117,7 +121,7 @@ public:
}

Packet *uniqueify() {
return nullptr;
return this;
}

Packet *push(uint32_t len) {
Expand Down
32 changes: 30 additions & 2 deletions include/nba/element/packetbatch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include <rte_mempool.h>
#include <rte_mbuf.h>

extern "C" {
struct rte_ring;
}

namespace nba {

class Element;
Expand All @@ -24,9 +28,9 @@ enum BatchDisposition {
class PacketBatch {
public:
PacketBatch()
: count(0), datablock_states(nullptr), recv_timestamp(0),
: count(0), drop_count(0), datablock_states(nullptr), recv_timestamp(0),
generation(0), batch_id(0), element(nullptr), input_port(0), has_results(false),
delay_start(0), compute_time(0)
has_dropped(false), delay_start(0), compute_time(0)
{
#ifdef DEBUG
memset(&results[0], 0xdd, sizeof(int) * NBA_MAX_COMP_BATCH_SIZE);
Expand All @@ -39,14 +43,38 @@ public:
{
}

/**
* Moves excluded packets to the end of batches, by swapping them
* with the tail packets, to reduce branching overheads when iterating
* over the packet batch in many places.
* (We assume that this "in-batch" reordering does not incur performance
* overheads for transport layers.)
* It stores the number of dropped packets to drop_count member
* variable. Later, ElementGraph refer this value to actually free
* the excluded packets.
*
* This should only be called right after doing Element::_process_batch()
* or moving packets to other batches in ElementGraph.
* This may be called multiple times until reaching the next element.
*/
void collect_excluded_packets();

/**
* Moves the collected excluded packets at the tail to drop_queue,
* and resets drop_count to zero.
*/
void clean_drops(struct rte_ring *drop_queue);

unsigned count;
unsigned drop_count;
struct datablock_tracker *datablock_states;
uint64_t recv_timestamp;
uint64_t generation;
uint64_t batch_id;
Element* element;
int input_port;
bool has_results;
bool has_dropped;
uint64_t delay_start;
uint64_t delay_time;
double compute_time;
Expand Down
16 changes: 8 additions & 8 deletions src/lib/datablock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc
case WRITE_WHOLE_PACKET: {

/* Update the packets and run postprocessing. */
batch->has_dropped = false;
for (unsigned p = 0; p < batch->count; p++) {
if (batch->excluded[p]) continue;
size_t elemsz = bitselect<size_t>(write_roi.type == WRITE_PARTIAL_PACKET,
t->aligned_item_sizes.size,
t->aligned_item_sizes.sizes[p]);
Expand All @@ -275,11 +275,11 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc
(char*) host_out_ptr + offset,
elemsz);
Packet *pkt = Packet::from_base(batch->packets[p]);
pkt->output = -1;
pkt->bidx = p;
elem->postproc(input_port, nullptr, pkt);
batch->results[p] = pkt->output;
batch->excluded[p] = (batch->results[p] == DROP);
}
if (batch->has_dropped)
batch->collect_excluded_packets();
batch->has_results = true;

break; }
Expand All @@ -290,16 +290,16 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc
case WRITE_FIXED_SEGMENTS: {

/* Run postporcessing only. */
batch->has_dropped = false;
for (unsigned p = 0; p < batch->count; p++) {
if (batch->excluded[p]) continue;
uintptr_t elemsz = t->aligned_item_sizes.size;
uintptr_t offset = elemsz * p;
Packet *pkt = Packet::from_base(batch->packets[p]);
pkt->output = -1;
pkt->bidx = p;
elem->postproc(input_port, (char*) host_out_ptr + offset, pkt);
batch->results[p] = pkt->output;
batch->excluded[p] = (batch->results[p] == DROP);
}
if (batch->has_dropped)
batch->collect_excluded_packets();
batch->has_results = true;

break; }
Expand Down
Loading

0 comments on commit 591fe18

Please sign in to comment.