diff --git a/configs/l2fwd-echo-branch-lv1.click b/configs/l2fwd-echo-branch-lv1.click index 7e7d1f6..0700865 100644 --- a/configs/l2fwd-echo-branch-lv1.click +++ b/configs/l2fwd-echo-branch-lv1.click @@ -1,3 +1,6 @@ -FromInput -> lv1_head :: RandomWeightedBranch({0}, {1}); -lv1_head[0] -> L2Forward(method {2}) -> ToOutput(); -lv1_head[1] -> L2Forward(method {2}) -> ToOutput(); +//FromInput -> lv1_head :: RandomWeightedBranch({0}, {1}); +//lv1_head[0] -> L2Forward(method {2}) -> ToOutput(); +//lv1_head[1] -> L2Forward(method {2}) -> ToOutput(); +FromInput -> lv1_head :: RandomWeightedBranch(0.3, 0.7); +lv1_head[0] -> L2Forward(method echoback) -> ToOutput(); +lv1_head[1] -> L2Forward(method echoback) -> ToOutput(); diff --git a/elements/ip/IPRouterVec.cc b/elements/ip/IPRouterVec.cc index a081673..9248c0d 100644 --- a/elements/ip/IPRouterVec.cc +++ b/elements/ip/IPRouterVec.cc @@ -9,7 +9,6 @@ int IPRouterVec::process_vector(int input_port, vec_mask_arg_t mask) { // A temporary scalar no-op implementation. - // TODO: implement for (int i = 0; i < NBA_VECTOR_WIDTH; i++) if (mask.m[i]) output(0).push(pkt_vec[i]); diff --git a/elements/ipsec/IPsecAES.cc b/elements/ipsec/IPsecAES.cc index b86373c..07863a6 100644 --- a/elements/ipsec/IPsecAES.cc +++ b/elements/ipsec/IPsecAES.cc @@ -170,9 +170,11 @@ int IPsecAES::process(int input_port, Packet *pkt) AES_ctr128_encrypt(encrypt_ptr, encrypt_ptr, enc_size, &sa_entry->aes_key_t, esph->esp_iv, ecount_buf, &mode); #endif } else { - return DROP; + pkt->kill(); + return 0; } + output(0).push(pkt); return 0; } @@ -222,6 +224,7 @@ size_t IPsecAES::get_desired_workgroup_size(const char *device_name) const int IPsecAES::postproc(int input_port, void *custom_output, Packet *pkt) { + output(0).push(pkt); return 0; } diff --git a/elements/ipsec/IPsecAuthHMACSHA1.cc b/elements/ipsec/IPsecAuthHMACSHA1.cc index 72e5787..9dce95c 100644 --- a/elements/ipsec/IPsecAuthHMACSHA1.cc +++ b/elements/ipsec/IPsecAuthHMACSHA1.cc @@ -166,8 +166,10 @@ int IPsecAuthHMACSHA1::process(int input_port, Packet *pkt) SHA1(hmac_buf, 64 + SHA_DIGEST_LENGTH, payload_out + payload_len); // TODO: correctness check.. } else { - return DROP; + pkt->kill(); + return 0; } + output(0).push(pkt); return 0; } @@ -213,6 +215,7 @@ size_t IPsecAuthHMACSHA1::get_desired_workgroup_size(const char *device_name) co int IPsecAuthHMACSHA1::postproc(int input_port, void *custom_output, Packet *pkt) { + output(0).push(pkt); return 0; } diff --git a/elements/ipsec/IPsecESPencap.cc b/elements/ipsec/IPsecESPencap.cc index 041d061..f7aaa48 100644 --- a/elements/ipsec/IPsecESPencap.cc +++ b/elements/ipsec/IPsecESPencap.cc @@ -64,8 +64,10 @@ int IPsecESPencap::process(int input_port, Packet *pkt) // TODO: Set src & dest of encaped pkt to ip addrs from configuration. struct ether_hdr *ethh = (struct ether_hdr *) pkt->data(); - if (ntohs(ethh->ether_type) != ETHER_TYPE_IPv4) - return DROP; + if (ntohs(ethh->ether_type) != ETHER_TYPE_IPv4) { + pkt->kill(); + return 0; + } struct iphdr *iph = (struct iphdr *) (ethh + 1); struct ipaddr_pair pair; @@ -119,6 +121,7 @@ int IPsecESPencap::process(int input_port, Packet *pkt) iph->protocol = 0x32; // mark that this packet contains a secured payload. iph->check = 0; // ignoring previous checksum. iph->check = ip_fast_csum(iph, iph->ihl); + output(0).push(pkt); return 0; } diff --git a/elements/ipv6/CheckIP6Header.cc b/elements/ipv6/CheckIP6Header.cc index c8de9e2..77c5422 100644 --- a/elements/ipv6/CheckIP6Header.cc +++ b/elements/ipv6/CheckIP6Header.cc @@ -26,14 +26,17 @@ int CheckIP6Header::process(int input_port, Packet *pkt) // Validate the packet header. if (ntohs(ethh->ether_type) != ETHER_TYPE_IPv6) { //RTE_LOG(DEBUG, ELEM, "CheckIP6Header: invalid packet type - %x\n", ntohs(ethh->ether_type)); - return DROP; + pkt->kill(); + return 0; } - if ((iph->ip6_vfc & 0xf0) >> 4 != 6) // get the first 4 bits. + if ((iph->ip6_vfc & 0xf0) >> 4 != 6) { // get the first 4 bits. + pkt->kill(); return SLOWPATH; + } // TODO: Discard illegal source addresses. - + output(0).push(pkt); return 0; // output port number: 0 } diff --git a/elements/ipv6/DecIP6HLIM.cc b/elements/ipv6/DecIP6HLIM.cc index 0ac9576..89ac691 100644 --- a/elements/ipv6/DecIP6HLIM.cc +++ b/elements/ipv6/DecIP6HLIM.cc @@ -23,12 +23,14 @@ int DecIP6HLIM::process(int input_port, Packet *pkt) uint32_t checksum; if (iph->ip6_hlim <= 1) { - return DROP; + pkt->kill(); + return 0; } // Decrement TTL. iph->ip6_hlim = htons(ntohs(iph->ip6_hlim) - 1); + output(0).push(pkt); return 0; } diff --git a/elements/ipv6/LookupIP6Route.cc b/elements/ipv6/LookupIP6Route.cc index e9be187..6d56949 100644 --- a/elements/ipv6/LookupIP6Route.cc +++ b/elements/ipv6/LookupIP6Route.cc @@ -122,23 +122,29 @@ int LookupIP6Route::process(int input_port, Packet *pkt) lookup_result = _table_ptr->lookup((reinterpret_cast(&dest_addr))); //rte_rwlock_read_unlock(_rwlock_ptr); - if (lookup_result == 0xffff) + if (lookup_result == 0xffff) { /* Could not find destination. Use the second output for "error" packets. */ - return DROP; + pkt->kill(); + return 0; + } rr_port = (rr_port + 1) % num_tx_ports; anno_set(&pkt->anno, NBA_ANNO_IFACE_OUT, rr_port); + output(0).push(pkt); return 0; } int LookupIP6Route::postproc(int input_port, void *custom_output, Packet *pkt) { uint16_t lookup_result = *((uint16_t *)custom_output); - if (lookup_result == 0xffff) + if (lookup_result == 0xffff) { /* Could not find destination. Use the second output for "error" packets. */ - return DROP; + pkt->kill(); + return 0; + } rr_port = (rr_port + 1) % num_tx_ports; anno_set(&pkt->anno, NBA_ANNO_IFACE_OUT, rr_port); + output(0).push(pkt); return 0; } diff --git a/elements/standards/RandomWeightedBranch.cc b/elements/standards/RandomWeightedBranch.cc index d69ab6c..1d83d98 100644 --- a/elements/standards/RandomWeightedBranch.cc +++ b/elements/standards/RandomWeightedBranch.cc @@ -43,11 +43,14 @@ int RandomWeightedBranch::process(int input_port, Packet *pkt) float x = uniform_dist(random_generator); int idx = 0; for (auto cur = out_probs.begin(); cur != out_probs.end(); cur++) { - if(x < *cur) - return idx; + if(x < *cur) { + output(idx).push(pkt); + return 0; + } idx++; } - return idx-1; + output(idx - 1).push(pkt); + return 0; } // vim: ts=8 sts=4 sw=4 et diff --git a/include/nba/element/element.hh b/include/nba/element/element.hh index e25c994..823f90a 100644 --- a/include/nba/element/element.hh +++ b/include/nba/element/element.hh @@ -55,6 +55,8 @@ struct element_info { class Element : public GraphMetaData { private: + friend class Packet; + class OutputPort { /** A simple utility class to emulate Click's output port. */ @@ -73,8 +75,7 @@ private: /* We allow a packet to be pushed only once inside the process * handler. If you want to push the same packet multiple times * to different outputs, you MUST clone it. */ - assert(pkt->output == -1); - pkt->output = my_idx; + pkt->mother->results[pkt->bidx] = my_idx; if (pkt->cloned) { /* Store the cloned packet separately. */ elem->output_cloned_packets[my_idx][elem->output_counts[my_idx]] = pkt; diff --git a/include/nba/element/packet.hh b/include/nba/element/packet.hh index d3ddd89..bd57eaa 100644 --- a/include/nba/element/packet.hh +++ b/include/nba/element/packet.hh @@ -24,6 +24,8 @@ enum PacketDisposition { }; class PacketBatch; +class Element; +class VectorElement; /* We have to manage two memory pools: * first for the original memory pool that our mbuf is allocated from. @@ -36,11 +38,13 @@ private: #ifdef DEBUG uint32_t magic; #endif + PacketBatch *mother; struct rte_mbuf *base; bool cloned; - int output; + int bidx; friend class Element; + friend class VectorElement; friend class DataBlock; public: @@ -80,7 +84,7 @@ public: #ifdef DEBUG magic(NBA_PACKET_MAGIC), #endif - base((struct rte_mbuf *) base), cloned(false), output(PacketDisposition::DROP) + mother(mother), base((struct rte_mbuf *) base), cloned(false), bidx(-1) { } ~Packet() { @@ -89,7 +93,7 @@ public: } } - inline void kill() { this->output = PacketDisposition::DROP; } + void kill(); inline unsigned char *data() { return rte_pktmbuf_mtod(base, unsigned char *); } inline uint32_t length() { return rte_pktmbuf_data_len(base); } @@ -117,7 +121,7 @@ public: } Packet *uniqueify() { - return nullptr; + return this; } Packet *push(uint32_t len) { diff --git a/include/nba/element/packetbatch.hh b/include/nba/element/packetbatch.hh index 1543992..b56ec5c 100644 --- a/include/nba/element/packetbatch.hh +++ b/include/nba/element/packetbatch.hh @@ -12,6 +12,10 @@ #include #include +extern "C" { +struct rte_ring; +} + namespace nba { class Element; @@ -24,9 +28,9 @@ enum BatchDisposition { class PacketBatch { public: PacketBatch() - : count(0), datablock_states(nullptr), recv_timestamp(0), + : count(0), drop_count(0), datablock_states(nullptr), recv_timestamp(0), generation(0), batch_id(0), element(nullptr), input_port(0), has_results(false), - delay_start(0), compute_time(0) + has_dropped(false), delay_start(0), compute_time(0) { #ifdef DEBUG memset(&results[0], 0xdd, sizeof(int) * NBA_MAX_COMP_BATCH_SIZE); @@ -39,7 +43,30 @@ public: { } + /** + * Moves excluded packets to the end of batches, by swapping them + * with the tail packets, to reduce branching overheads when iterating + * over the packet batch in many places. + * (We assume that this "in-batch" reordering does not incur performance + * overheads for transport layers.) + * It stores the number of dropped packets to drop_count member + * variable. Later, ElementGraph refer this value to actually free + * the excluded packets. + * + * This should only be called right after doing Element::_process_batch() + * or moving packets to other batches in ElementGraph. + * This may be called multiple times until reaching the next element. + */ + void collect_excluded_packets(); + + /** + * Moves the collected excluded packets at the tail to drop_queue, + * and resets drop_count to zero. + */ + void clean_drops(struct rte_ring *drop_queue); + unsigned count; + unsigned drop_count; struct datablock_tracker *datablock_states; uint64_t recv_timestamp; uint64_t generation; @@ -47,6 +74,7 @@ public: Element* element; int input_port; bool has_results; + bool has_dropped; uint64_t delay_start; uint64_t delay_time; double compute_time; diff --git a/src/lib/datablock.cc b/src/lib/datablock.cc index b665b76..b84afd6 100644 --- a/src/lib/datablock.cc +++ b/src/lib/datablock.cc @@ -263,8 +263,8 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc case WRITE_WHOLE_PACKET: { /* Update the packets and run postprocessing. */ + batch->has_dropped = false; for (unsigned p = 0; p < batch->count; p++) { - if (batch->excluded[p]) continue; size_t elemsz = bitselect(write_roi.type == WRITE_PARTIAL_PACKET, t->aligned_item_sizes.size, t->aligned_item_sizes.sizes[p]); @@ -275,11 +275,11 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc (char*) host_out_ptr + offset, elemsz); Packet *pkt = Packet::from_base(batch->packets[p]); - pkt->output = -1; + pkt->bidx = p; elem->postproc(input_port, nullptr, pkt); - batch->results[p] = pkt->output; - batch->excluded[p] = (batch->results[p] == DROP); } + if (batch->has_dropped) + batch->collect_excluded_packets(); batch->has_results = true; break; } @@ -290,16 +290,16 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc case WRITE_FIXED_SEGMENTS: { /* Run postporcessing only. */ + batch->has_dropped = false; for (unsigned p = 0; p < batch->count; p++) { - if (batch->excluded[p]) continue; uintptr_t elemsz = t->aligned_item_sizes.size; uintptr_t offset = elemsz * p; Packet *pkt = Packet::from_base(batch->packets[p]); - pkt->output = -1; + pkt->bidx = p; elem->postproc(input_port, (char*) host_out_ptr + offset, pkt); - batch->results[p] = pkt->output; - batch->excluded[p] = (batch->results[p] == DROP); } + if (batch->has_dropped) + batch->collect_excluded_packets(); batch->has_results = true; break; } diff --git a/src/lib/element.cc b/src/lib/element.cc index a1a62ae..efa76c8 100644 --- a/src/lib/element.cc +++ b/src/lib/element.cc @@ -38,50 +38,93 @@ Element::~Element() int Element::_process_batch(int input_port, PacketBatch *batch) { memset(output_counts, 0, sizeof(uint16_t) * ElementGraph::num_max_outputs); + batch->has_dropped = false; + batch->drop_count = 0; +//#define NBA_LOOP_UNROLLING +#ifdef NBA_LOOP_UNROLLING + #define NBA_UNROLL_STRIDE (4) + unsigned stride; + for (stride = 0; stride < batch->count; stride += NBA_UNROLL_STRIDE) { + unsigned idx1 = stride + 0; + unsigned idx2 = stride + 1; + unsigned idx3 = stride + 2; + unsigned idx4 = stride + 3; + Packet *pkt1 = Packet::from_base(batch->packets[idx1]); + Packet *pkt2 = Packet::from_base(batch->packets[idx2]); + Packet *pkt3 = Packet::from_base(batch->packets[idx3]); + Packet *pkt4 = Packet::from_base(batch->packets[idx4]); + pkt1->bidx = idx1; + pkt2->bidx = idx2; + pkt3->bidx = idx3; + pkt4->bidx = idx4; + this->process(input_port, pkt1); + this->process(input_port, pkt2); + this->process(input_port, pkt3); + this->process(input_port, pkt4); + } + for (unsigned p = stride - NBA_UNROLL_STRIDE; p < batch->count; p++) { + Packet *pkt = Packet::from_base(batch->packets[p]); + pkt->bidx = p; + this->process(input_port, pkt); + } + #undef NBA_UNROLL_STRIDE +#else for (unsigned p = 0; p < batch->count; p++) { if (likely(!batch->excluded[p])) { Packet *pkt = Packet::from_base(batch->packets[p]); - pkt->output = -1; + pkt->bidx = p; this->process(input_port, pkt); - batch->results[p] = pkt->output; } } +#endif + if (batch->has_dropped) + batch->collect_excluded_packets(); batch->has_results = true; return 0; // this value will be ignored. } int VectorElement::_process_batch(int input_port, PacketBatch *batch) { + batch->has_dropped = false; + batch->drop_count = 0; unsigned stride = 0; for (stride = 0; stride < batch->count; stride += NBA_VECTOR_WIDTH) { - vec_mask_t mask = _mm256_set1_epi64x(0); - vec_mask_arg_t mask_arg = {0,}; - // TODO: vectorize: !batch->excluded[p] - Packet *pkt_vec[NBA_VECTOR_WIDTH] = {nullptr,}; // TODO: call Packet::from_base(batch->packets[p]); - // TODO: vectorize: pkt->output = -1; - // TODO: copy mask to mask_arg + vec_mask_t mask = _mm256_set1_epi64x(1); + vec_mask_arg_t mask_arg; + _mm256_storeu_si256((__m256i *) &mask_arg, mask); + Packet *pkt_vec[NBA_VECTOR_WIDTH] = { + Packet::from_base(batch->packets[stride + 0]), + Packet::from_base(batch->packets[stride + 1]), + Packet::from_base(batch->packets[stride + 2]), + Packet::from_base(batch->packets[stride + 3]), + Packet::from_base(batch->packets[stride + 4]), + Packet::from_base(batch->packets[stride + 5]), + Packet::from_base(batch->packets[stride + 6]), + Packet::from_base(batch->packets[stride + 7]), + }; + // TODO: vectorize? + pkt_vec[0]->bidx = stride + 0; + pkt_vec[1]->bidx = stride + 1; + pkt_vec[2]->bidx = stride + 2; + pkt_vec[3]->bidx = stride + 3; + pkt_vec[4]->bidx = stride + 4; + pkt_vec[5]->bidx = stride + 5; + pkt_vec[6]->bidx = stride + 6; + pkt_vec[7]->bidx = stride + 7; this->process_vector(input_port, pkt_vec, mask_arg); - // TODO: vectorize: batch->results[p] = pkt->output; - for (unsigned i = 0; i < NBA_VECTOR_WIDTH; i++) { - unsigned idx = stride + i; - if (!batch->excluded[idx]) - batch->results[idx] = 0; - } } { - vec_mask_t mask = _mm256_set1_epi64x(0); vec_mask_arg_t mask_arg = {0,}; - // TODO: vectorize: !batch->excluded[p] - Packet *pkt_vec[NBA_VECTOR_WIDTH] = {nullptr,}; // TODO: Packet::from_base(batch->packets[p]); - // TODO: vectorize: pkt->output = -1; - // TODO: copy mask to mask_arg - this->process_vector(input_port, pkt_vec, mask_arg); - // TODO: vectorize: batch->results[p] = pkt->output; - for (unsigned idx = stride; idx < batch->count; idx++) { - if (!batch->excluded[idx]) - batch->results[idx] = 0; + Packet *pkt_vec[NBA_VECTOR_WIDTH] = {nullptr,}; + for (unsigned i = stride - NBA_VECTOR_WIDTH, j = 0; i < batch->count; i++, j++) { + pkt_vec[j] = Packet::from_base(batch->packets[i]); + pkt_vec[j]->bidx = i; + mask_arg.m[j] = 1; } + this->process_vector(input_port, pkt_vec, mask_arg); } + if (batch->has_dropped) + batch->collect_excluded_packets(); batch->has_results = true; return 0; } diff --git a/src/lib/elementgraph.cc b/src/lib/elementgraph.cc index 78e5ead..db58605 100644 --- a/src/lib/elementgraph.cc +++ b/src/lib/elementgraph.cc @@ -133,10 +133,12 @@ void ElementGraph::flush_delayed_batches() void ElementGraph::free_batch(PacketBatch *batch, bool free_pkts) { if (free_pkts) { - for (unsigned p = 0; p < batch->count; p++) - if (!batch->excluded[p] && batch->packets[p] != nullptr) - rte_ring_enqueue(ctx->io_ctx->drop_queue, (void *) batch->packets[p]); // only for separate comp/io threads - //rte_pktmbuf_free(batch->packets[p]); + rte_ring_enqueue_bulk(ctx->io_ctx->drop_queue, + (void **) &batch->packets[0], + batch->count); + rte_ring_enqueue_bulk(ctx->io_ctx->drop_queue, + (void **) &batch->packets[batch->count], + batch->drop_count); } rte_mempool_put(ctx->batch_pool, (void *) batch); } @@ -221,8 +223,7 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) task->input_ports.push_back(input_port); task->num_pkts += batch->count; for (unsigned p = 0; p < batch->count; p++) { - if (!batch->excluded[p]) - task->num_bytes += Packet::from_base(batch->packets[p])->length(); + task->num_bytes += Packet::from_base(batch->packets[p])->length(); } #ifdef USE_NVPROF nvtxMarkA("add_batch"); @@ -303,30 +304,29 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) * batches. Just reuse the given one. */ if (0 == (current_elem->get_type() & ELEMTYPE_PER_BATCH)) { const int *const results = batch->results; + batch->has_dropped = false; for (unsigned p = 0; p < batch->count; p++) { - if (batch->excluded[p]) continue; int o = results[p]; switch (o) { case 0: // pass break; - case DROP: - if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->count; - rte_ring_enqueue(ctx->io_ctx->drop_queue, (void *) batch->packets[p]); - batch->excluded[p] = true; - batch->packets[p] = nullptr; - break; case PENDING: // remove from PacketBatch, but don't free.. // They are stored in io_thread_ctx::pended_pkt_queue. batch->excluded[p] = true; batch->packets[p] = nullptr; + batch->has_dropped = true; break; case SLOWPATH: rte_panic("SLOWPATH is not supported yet. (element: %s)\n", current_elem->class_name()); break; } } + if (batch->has_dropped) + batch->collect_excluded_packets(); + if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->drop_count; + batch->clean_drops(ctx->io_ctx->drop_queue); } if (current_elem->next_elems[0]->get_type() & ELEMTYPE_OUTPUT) { /* We are at the end leaf of the pipeline. @@ -335,9 +335,7 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) ctx->inspector->tx_batch_count ++;; ctx->inspector->tx_pkt_count += batch->count; } - //printf("txtx? (%u)\n", rte_mempool_count(ctx->io_ctx->emul_rx_packet_pool)); io_tx_batch(ctx->io_ctx, batch); - //printf("txtx! (%u)\n", rte_mempool_count(ctx->io_ctx->emul_rx_packet_pool)); free_batch(batch, false); continue; } else { @@ -385,8 +383,8 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) out_batches[predicted_output] = batch; /* Classify packets into copy-batches. */ + batch->has_dropped = false; for (unsigned p = 0; p < batch->count; p++) { - if (batch->excluded[p]) continue; int o = results[p]; assert(o < (signed) num_outputs); @@ -413,20 +411,14 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) /* Exclude it from the batch. */ out_batches[predicted_output]->excluded[p] = true; out_batches[predicted_output]->packets[p] = nullptr; - break; } - case DROP: { - /* Let the IO loop free the packet. */ - if (ctx->inspector) ctx->inspector->drop_pkt_count += 1; - rte_ring_enqueue(ctx->io_ctx->drop_queue, (void *) out_batches[predicted_output]->packets[p]); - /* Exclude it from the batch. */ - out_batches[predicted_output]->excluded[p] = true; - out_batches[predicted_output]->packets[p] = nullptr; + out_batches[predicted_output]->has_dropped = true; break; } case PENDING: { /* The packet is stored in io_thread_ctx::pended_pkt_queue. */ /* Exclude it from the batch. */ out_batches[predicted_output]->excluded[p] = true; out_batches[predicted_output]->packets[p] = nullptr; + out_batches[predicted_output]->has_dropped = true; break; } case SLOWPATH: assert(0); // Not implemented yet. @@ -436,7 +428,13 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) } current_elem->branch_total++; current_elem->branch_count[o]++; - } + } /* endfor(batch->count) */ + + // NOTE: out_batches[predicted_output] == batch + if (batch->has_dropped) + batch->collect_excluded_packets(); + if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->drop_count; + batch->clean_drops(ctx->io_ctx->drop_queue); if (current_elem->branch_total & BRANCH_TRUNC_LIMIT) { //double percentage = ((double)(current_elem->branch_total-current_elem->branch_miss) / (double)current_elem->branch_total); @@ -508,18 +506,15 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) /* Classify packets into copy-batches. */ for (unsigned p = 0; p < batch->count; p++) { - if (batch->excluded[p]) - continue; int o = results[p]; - assert(o < (signed) num_outputs); + if (o >= (signed) num_outputs || o < 0) + printf("o=%d, num_outputs=%u, %u/%u/%u\n", o, num_outputs, p, batch->count, batch->drop_count); + assert(o < (signed) num_outputs && o >= 0); if (o >= 0) { int cnt = out_batches[o]->count ++; out_batches[o]->packets[cnt] = batch->packets[p]; out_batches[o]->excluded[cnt] = false; - } else if (o == DROP) { - if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->count; - rte_ring_enqueue(ctx->io_ctx->drop_queue, (void *) batch->packets[p]); } else if (o == PENDING) { // remove from PacketBatch, but don't free.. // They are stored in io_thread_ctx::pended_pkt_queue. @@ -528,11 +523,11 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) } else { rte_panic("Invalid packet disposition value. (element: %s, value: %d)\n", current_elem->class_name(), o); } - - // Packets are excluded from original batch in all cases. + /* Packets are excluded from original batch in ALL cases. */ + /* Therefore, we do not have to collect_excluded_packets() + * nor clean_drops()! */ batch->excluded[p] = true; - batch->packets[p] = nullptr; - } + } /* endfor(batch->count) */ /* Recurse into the element subgraph starting from each * output port using copy-batches. */ diff --git a/src/lib/io.cc b/src/lib/io.cc index f8200e9..93f01ce 100644 --- a/src/lib/io.cc +++ b/src/lib/io.cc @@ -523,20 +523,16 @@ void io_tx_batch(struct io_thread_context *ctx, PacketBatch *batch) // just transmit as requested for (unsigned p = 0; p < batch->count; p++) { Packet *pkt = Packet::from_base(batch->packets[p]); - if (batch->excluded[p] == false && anno_isset(&pkt->anno, NBA_ANNO_IFACE_OUT)) { - struct ether_hdr *ethh = rte_pktmbuf_mtod(batch->packets[p], struct ether_hdr *); - uint64_t o = anno_get(&pkt->anno, NBA_ANNO_IFACE_OUT); + struct ether_hdr *ethh = rte_pktmbuf_mtod(batch->packets[p], struct ether_hdr *); + uint64_t o = anno_get(&pkt->anno, NBA_ANNO_IFACE_OUT); - /* Update source/dest MAC addresses. */ - ether_addr_copy(ðh->s_addr, ðh->d_addr); - ether_addr_copy(&ctx->tx_ports[o].addr, ðh->s_addr); + /* Update source/dest MAC addresses. */ + ether_addr_copy(ðh->s_addr, ðh->d_addr); + ether_addr_copy(&ctx->tx_ports[o].addr, ðh->s_addr); - /* Append to the corresponding output batch. */ - int cnt = out_batches[o].count ++; - out_batches[o].packets[cnt] = batch->packets[p]; - } else { - assert(batch->packets[p] == nullptr); - } + /* Append to the corresponding output batch. */ + int cnt = out_batches[o].count ++; + out_batches[o].packets[cnt] = batch->packets[p]; } unsigned tx_tries = 0; @@ -987,7 +983,7 @@ int io_loop(void *arg) while (!rte_ring_empty(ctx->drop_queue)) { int n = rte_ring_dequeue_burst(ctx->drop_queue, (void**) drop_pkts, ctx->num_iobatch_size); - assert(n >= 0); + // TODO: add nullcheck? rte_mempool_put_bulk(ctx->emul_rx_packet_pool, (void **) drop_pkts, n); ctx->port_stats[0].num_sw_drop_pkts += n; } @@ -995,9 +991,9 @@ int io_loop(void *arg) while (!rte_ring_empty(ctx->drop_queue)) { int n = rte_ring_dequeue_burst(ctx->drop_queue, (void**) drop_pkts, ctx->num_iobatch_size); - assert(n >= 0); for (int p = 0; p < n; p++) - rte_pktmbuf_free(drop_pkts[p]); + if (drop_pkts[p] != nullptr) + rte_pktmbuf_free(drop_pkts[p]); ctx->port_stats[0].num_sw_drop_pkts += n; } } diff --git a/src/lib/packet.cc b/src/lib/packet.cc index 44a448b..c42401e 100644 --- a/src/lib/packet.cc +++ b/src/lib/packet.cc @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -26,4 +27,13 @@ struct rte_mempool *packet_create_mempool(size_t size, int node_id, int core_id) return packet_pool; } +void Packet::kill() +{ + mother->results[bidx] = PacketDisposition::DROP; + mother->excluded[bidx] = true; + mother->has_dropped = true; +} + } + +// vim: ts=8 sts=4 sw=4 et diff --git a/src/lib/packetbatch.cc b/src/lib/packetbatch.cc index 7254024..cc629b6 100644 --- a/src/lib/packetbatch.cc +++ b/src/lib/packetbatch.cc @@ -5,13 +5,42 @@ #include #include - using namespace std; using namespace nba; /* empty currently */ namespace nba { +void PacketBatch::collect_excluded_packets() +{ + unsigned dropped_cnt = 0; + for (unsigned p = 0; p < this->count - dropped_cnt; p++) { + if (unlikely(this->excluded[p])) { + unsigned q = this->count - dropped_cnt - 1; + struct rte_mbuf *t = this->packets[p]; + this->packets[p] = this->packets[q]; + this->packets[q] = t; + this->excluded[p] = false; + this->excluded[q] = true; + this->results[p] = this->results[q]; + this->results[q] = -1; + dropped_cnt ++; + } + } + this->count -= dropped_cnt; + this->drop_count += dropped_cnt; /* will be zeroed by ElementGraph. */ +} + +void PacketBatch::clean_drops(struct rte_ring *drop_queue) +{ + if (this->drop_count > 0) { + rte_ring_enqueue_bulk(drop_queue, + (void **) &this->packets[this->count], + this->drop_count); + this->drop_count = 0; + } +} + } // vim: ts=8 sts=4 sw=4 et