From 41bf4c7376fde7847e0fca5aeae6d1e510febdca Mon Sep 17 00:00:00 2001 From: Anton Ivanov Date: Wed, 7 Oct 2020 08:20:05 +0100 Subject: [PATCH 1/2] Copy metadata when making a packet copy Copy was copying only data, losing all metadata in the process. This resulted in very erratic behaviour of Replicate in a pipeline which relies on metadata to identify packets. --- core/packet.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/packet.cc b/core/packet.cc index a3656b524..a56f85bb1 100644 --- a/core/packet.cc +++ b/core/packet.cc @@ -58,7 +58,9 @@ Packet *Packet::copy(const Packet *src) { bess::utils::CopyInlined(dst->append(src->total_len()), src->head_data(), src->total_len(), true); - + bess::utils::CopyInlined(&dst->metadata_, &src->metadata_, + SNBUF_METADATA, true); + return dst; } From c357added3bafe9271cf7ac3393384e9a2cddf89 Mon Sep 17 00:00:00 2001 From: Anton Ivanov Date: Wed, 7 Oct 2020 12:25:29 +0100 Subject: [PATCH 2/2] RPF (no reflection) check This is a set of basic functionality which is present in all switching and forwarding frameworks. They all set/keep packet origin in the packet metadata. The reason is that knowing your "in" interface is essential for most forwarding decisions. Example - you cannot implement even a basic hub without knowing your "in" interface, because you are not supposed to send packets to the interface which originated them. There are similar requirements for forwarding, security, acls, etc. This changeset adds the following functionality: 1. Each port gets an ifIndex similar to ifIndex in SNMP assigned on creation. 2. PortInc now always sets an integer metadata called ifIndex with the ifIndex value. 3. PortOut now can optionally check if the packet is being sent out from its arrival port and drop it. The default behavious is rpf check off - to emulate previous BESS behaviour. Signed-off-by: Anton Ivanov --- core/modules/port_inc.cc | 46 +++++++++++++++++++---- core/modules/port_inc.h | 22 +++++++++++ core/modules/port_out.cc | 79 ++++++++++++++++++++++++++++++--------- core/modules/port_out.h | 4 ++ core/port.cc | 22 +++++++++++ core/port.h | 3 ++ protobuf/module_msg.proto | 1 + 7 files changed, 152 insertions(+), 25 deletions(-) diff --git a/core/modules/port_inc.cc b/core/modules/port_inc.cc index acdb6b1b6..57fcbc623 100644 --- a/core/modules/port_inc.cc +++ b/core/modules/port_inc.cc @@ -44,6 +44,9 @@ CommandResponse PortInc::Init(const bess::pb::PortIncArg &arg) { int ret; CommandResponse err; placement_constraint placement; + value_t value; + mask_t mask = {0}; + std::string name = "ifName"; burst_ = bess::PacketBatch::kMaxBurst; @@ -86,6 +89,22 @@ CommandResponse PortInc::Init(const bess::pb::PortIncArg &arg) { return CommandFailure(-ret); } + ret = AddMetadataAttr("ifIndex", sizeof(uint64_t), + bess::metadata::Attribute::AccessMode::kWrite); + + uint64_t port_ifindex = port_->get_ifIndex(); + + std::memcpy(&value, &port_ifindex, sizeof(uint64_t)); + + /* We may add more "metadata at source" here later */ + attrs_.push_back({.name = name, + .value = value, + .mask = mask, + .offset = -1, + .size = sizeof(uint64_t), + .do_mask = false, + .shift = 0}); + return CommandSuccess(); } @@ -136,18 +155,29 @@ struct task_result PortInc::RunTask(Context *ctx, bess::PacketBatch *batch, return {.block = true, .packets = 0, .bits = 0}; } - // NOTE: we cannot skip this step since it might be used by scheduler. - if (prefetch_) { - for (uint32_t i = 0; i < cnt; i++) { - received_bytes += batch->pkts()[i]->total_len(); - rte_prefetch0(batch->pkts()[i]->head_data()); + for (uint32_t i = 0; i < cnt; i++) { + received_bytes += batch->pkts()[i]->total_len(); + if (prefetch_) { + rte_prefetch0(batch->pkts()[i]->head_data()); } - } else { - for (uint32_t i = 0; i < cnt; i++) { - received_bytes += batch->pkts()[i]->total_len(); + bess::Packet *snb = batch->pkts()[i]; + for (size_t k = 0; k < attrs_.size(); k++) { + const struct Attr *attr = &attrs_[k]; + mt_offset_t mt_offset = attr_offset(k); + + if (!bess::metadata::IsValidOffset(mt_offset)) { + continue; + } + + void *mt_ptr = _ptr_attr_with_offset(mt_offset, snb); + const void *val_ptr = &attr->value; + + bess::utils::CopySmall(mt_ptr, val_ptr, attr->size); } } + // NOTE: we cannot skip this step since it might be used by scheduler. + if (!(p->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) { p->queue_stats[PACKET_DIR_INC][qid].packets += cnt; p->queue_stats[PACKET_DIR_INC][qid].bytes += received_bytes; diff --git a/core/modules/port_inc.h b/core/modules/port_inc.h index d28737420..384196e6f 100644 --- a/core/modules/port_inc.h +++ b/core/modules/port_inc.h @@ -35,6 +35,27 @@ #include "../pb/module_msg.pb.h" #include "../port.h" + +using bess::metadata::kMetadataAttrMaxSize; +using bess::metadata::mt_offset_t; + +typedef struct { + uint8_t bytes[kMetadataAttrMaxSize]; +} value_t; +typedef struct { + uint8_t bytes[kMetadataAttrMaxSize]; +} mask_t; + +struct Attr { + std::string name; + value_t value; + mask_t mask; + int offset; + size_t size; + bool do_mask; + int shift; // in bytes for now +}; + class PortInc final : public Module { public: static const gate_idx_t kNumIGates = 0; @@ -63,6 +84,7 @@ class PortInc final : public Module { Port *port_; int prefetch_; int burst_; + std::vector attrs_; }; #endif // BESS_MODULES_PORTINC_H_ diff --git a/core/modules/port_out.cc b/core/modules/port_out.cc index d798a71a3..7939a2080 100644 --- a/core/modules/port_out.cc +++ b/core/modules/port_out.cc @@ -65,16 +65,27 @@ CommandResponse PortOut::Init(const bess::pb::PortOutArg &arg) { mcs_lock_init(&queue_locks_[i]); } + if (arg.rpfcheck()) { + rpfcheck_ = 1; + } else { + rpfcheck_ = 0; + } + if (ret < 0) { return CommandFailure(-ret); } + std::string attr_name = "ifIndex"; + using AccessMode = bess::metadata::Attribute::AccessMode; + attr_id_ = AddMetadataAttr(attr_name, sizeof(uint64_t), AccessMode::kRead); + return CommandSuccess(); } CommandResponse PortOut::GetInitialArg(const bess::pb::EmptyArg &) { bess::pb::PortOutArg arg; arg.set_port(port_->name()); + arg.set_rpfcheck(rpfcheck_); return CommandSuccess(arg); } @@ -90,42 +101,76 @@ std::string PortOut::GetDesc() const { port_->port_builder()->class_name().c_str()); } -static inline int SendBatch(bess::PacketBatch *batch, Port *p, queue_t qid) { +int PortOut::SendBatch(bess::PacketBatch *batch, queue_t qid) { uint64_t sent_bytes = 0; int sent_pkts = 0; - - if (p->conf().admin_up) { - sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt()); + int dropped = 0; + int cnt = batch->cnt(); + + if (port_->conf().admin_up) { + if ((!rpfcheck_) || (attr_id_ == -1)) { + /* no RPF check - send everything at once */ + sent_pkts = port_->SendPackets(qid, batch->pkts(), batch->cnt()); + } else { + bool need_flush = true; + for (int i = 0; i < cnt; i++) { + uint64_t ifIndex = 0; + ifIndex = get_attr(this, attr_id_, batch->pkts()[i]); + if ((ifIndex > 0) && (ifIndex == port_->get_ifIndex())) { + /* need a drop - send all packets till now, + * free this packet */ + if (i - sent_pkts > 0) { + sent_pkts += + port_->SendPackets(qid, &batch->pkts()[sent_pkts], i - sent_pkts); + } + if (sent_pkts == i) { + /* send sucessful, drop packet failing rpf check*/ + bess::Packet::Free(batch->pkts()[i]); + /* Add dropped packet to "sent" count. We will subtract it later. + */ + sent_pkts++; + dropped++; + /* move to next packet */ + } else { + /* remove from the total the length of packets we failed to send */ + for (int k = sent_pkts; k < i; k++) { + sent_bytes -= batch->pkts()[k]->total_len(); + } + need_flush = false; + break; + } + } else { + sent_bytes += batch->pkts()[i]->total_len(); + } + } + if (need_flush && (sent_pkts < cnt)) { + sent_pkts += port_->SendPackets(qid, &batch->pkts()[sent_pkts], cnt - sent_pkts); + } + } } - if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) { + if (!(port_->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) { const packet_dir_t dir = PACKET_DIR_OUT; - - for (int i = 0; i < sent_pkts; i++) { - sent_bytes += batch->pkts()[i]->total_len(); - } - - p->queue_stats[dir][qid].packets += sent_pkts; - p->queue_stats[dir][qid].dropped += (batch->cnt() - sent_pkts); - p->queue_stats[dir][qid].bytes += sent_bytes; + port_->queue_stats[dir][qid].packets += sent_pkts - dropped; + port_->queue_stats[dir][qid].dropped += cnt - sent_pkts + dropped; + port_->queue_stats[dir][qid].bytes += sent_bytes; } - return sent_pkts; } void PortOut::ProcessBatch(Context *ctx, bess::PacketBatch *batch) { - Port *p = port_; CHECK(worker_queues_[ctx->wid] >= 0); queue_t qid = worker_queues_[ctx->wid]; int sent_pkts = 0; + if (queue_users_[qid] == 1) { - sent_pkts = SendBatch(batch, p, qid); + sent_pkts = SendBatch(batch, qid); } else { mcslock_node_t me; mcs_lock(&queue_locks_[qid], &me); - sent_pkts = SendBatch(batch, p, qid); + sent_pkts = SendBatch(batch, qid); mcs_unlock(&queue_locks_[qid], &me); } diff --git a/core/modules/port_out.h b/core/modules/port_out.h index 6017e7695..343e6f547 100644 --- a/core/modules/port_out.h +++ b/core/modules/port_out.h @@ -62,6 +62,8 @@ class PortOut final : public Module { private: Port *port_; + int attr_id_; + bool rpfcheck_; int worker_queues_[Worker::kMaxWorkers]; @@ -69,6 +71,8 @@ class PortOut final : public Module { int queue_users_[MAX_QUEUES_PER_DIR]; mcslock_t queue_locks_[MAX_QUEUES_PER_DIR]; + + int SendBatch(bess::PacketBatch *batch, queue_t qid); }; #endif // BESS_MODULES_PORTOUT_H_ diff --git a/core/port.cc b/core/port.cc index 347ebf81f..c832ff92a 100644 --- a/core/port.cc +++ b/core/port.cc @@ -45,9 +45,31 @@ std::map PortBuilder::all_ports_; +static uint64_t cur_ifIndex = 1; +static bool rolled_over = false; + Port *PortBuilder::CreatePort(const std::string &name) const { Port *p = port_generator_(); p->set_name(name); + if (rolled_over) { + for (p->ifIndex = 1; p->ifIndex < UINT64_MAX; p->ifIndex++) { + bool found = false; + for (auto const &pi : all_ports_) { + if (pi.second->ifIndex == p->ifIndex) { + found = true; + break; + } + } + if (!found) { + break; + } + } + } else { + p->ifIndex = ++cur_ifIndex; + if (cur_ifIndex == UINT64_MAX) { + rolled_over = true; + } + } p->set_port_builder(this); return p; } diff --git a/core/port.h b/core/port.h index 193d13105..2bf900c9d 100644 --- a/core/port.h +++ b/core/port.h @@ -293,6 +293,8 @@ class Port { const PortBuilder *port_builder() const { return port_builder_; } + uint64_t get_ifIndex() const { return ifIndex; } + protected: friend class PortBuilder; @@ -301,6 +303,7 @@ class Port { // Current configuration Conf conf_; + uint64_t ifIndex; private: static const size_t kDefaultIncQueueSize = 1024; diff --git a/protobuf/module_msg.proto b/protobuf/module_msg.proto index e00a463a3..73518e626 100644 --- a/protobuf/module_msg.proto +++ b/protobuf/module_msg.proto @@ -770,6 +770,7 @@ message PortIncArg { */ message PortOutArg { string port = 1; /// The portname to connect to. + bool rpfcheck = 2; /// Allow reflection of packets to source port } /**