Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ifIndex and RPF checks #1022

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions core/modules/port_inc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ CommandResponse PortInc::Init(const bess::pb::PortIncArg &arg) {
int ret;
CommandResponse err;
placement_constraint placement;
value_t value;
mask_t mask = {0};
std::string name = "ifName";

burst_ = bess::PacketBatch::kMaxBurst;

Expand Down Expand Up @@ -86,6 +89,22 @@ CommandResponse PortInc::Init(const bess::pb::PortIncArg &arg) {
return CommandFailure(-ret);
}

ret = AddMetadataAttr("ifIndex", sizeof(uint64_t),
bess::metadata::Attribute::AccessMode::kWrite);

uint64_t port_ifindex = port_->get_ifIndex();

std::memcpy(&value, &port_ifindex, sizeof(uint64_t));

/* We may add more "metadata at source" here later */
attrs_.push_back({.name = name,
.value = value,
.mask = mask,
.offset = -1,
.size = sizeof(uint64_t),
.do_mask = false,
.shift = 0});

return CommandSuccess();
}

Expand Down Expand Up @@ -136,18 +155,29 @@ struct task_result PortInc::RunTask(Context *ctx, bess::PacketBatch *batch,
return {.block = true, .packets = 0, .bits = 0};
}

// NOTE: we cannot skip this step since it might be used by scheduler.
if (prefetch_) {
for (uint32_t i = 0; i < cnt; i++) {
received_bytes += batch->pkts()[i]->total_len();
rte_prefetch0(batch->pkts()[i]->head_data());
for (uint32_t i = 0; i < cnt; i++) {
received_bytes += batch->pkts()[i]->total_len();
if (prefetch_) {
rte_prefetch0(batch->pkts()[i]->head_data());
}
} else {
for (uint32_t i = 0; i < cnt; i++) {
received_bytes += batch->pkts()[i]->total_len();
bess::Packet *snb = batch->pkts()[i];
for (size_t k = 0; k < attrs_.size(); k++) {
const struct Attr *attr = &attrs_[k];
mt_offset_t mt_offset = attr_offset(k);

if (!bess::metadata::IsValidOffset(mt_offset)) {
continue;
}

void *mt_ptr = _ptr_attr_with_offset<value_t>(mt_offset, snb);
const void *val_ptr = &attr->value;

bess::utils::CopySmall(mt_ptr, val_ptr, attr->size);
}
}

// NOTE: we cannot skip this step since it might be used by scheduler.

if (!(p->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) {
p->queue_stats[PACKET_DIR_INC][qid].packets += cnt;
p->queue_stats[PACKET_DIR_INC][qid].bytes += received_bytes;
Expand Down
22 changes: 22 additions & 0 deletions core/modules/port_inc.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@
#include "../pb/module_msg.pb.h"
#include "../port.h"


using bess::metadata::kMetadataAttrMaxSize;
using bess::metadata::mt_offset_t;

typedef struct {
uint8_t bytes[kMetadataAttrMaxSize];
} value_t;
typedef struct {
uint8_t bytes[kMetadataAttrMaxSize];
} mask_t;

struct Attr {
std::string name;
value_t value;
mask_t mask;
int offset;
size_t size;
bool do_mask;
int shift; // in bytes for now
};

class PortInc final : public Module {
public:
static const gate_idx_t kNumIGates = 0;
Expand Down Expand Up @@ -63,6 +84,7 @@ class PortInc final : public Module {
Port *port_;
int prefetch_;
int burst_;
std::vector<struct Attr> attrs_;
};

#endif // BESS_MODULES_PORTINC_H_
79 changes: 62 additions & 17 deletions core/modules/port_out.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,27 @@ CommandResponse PortOut::Init(const bess::pb::PortOutArg &arg) {
mcs_lock_init(&queue_locks_[i]);
}

if (arg.rpfcheck()) {
rpfcheck_ = 1;
} else {
rpfcheck_ = 0;
}

if (ret < 0) {
return CommandFailure(-ret);
}

std::string attr_name = "ifIndex";
using AccessMode = bess::metadata::Attribute::AccessMode;
attr_id_ = AddMetadataAttr(attr_name, sizeof(uint64_t), AccessMode::kRead);

return CommandSuccess();
}

CommandResponse PortOut::GetInitialArg(const bess::pb::EmptyArg &) {
bess::pb::PortOutArg arg;
arg.set_port(port_->name());
arg.set_rpfcheck(rpfcheck_);
return CommandSuccess(arg);
}

Expand All @@ -90,42 +101,76 @@ std::string PortOut::GetDesc() const {
port_->port_builder()->class_name().c_str());
}

static inline int SendBatch(bess::PacketBatch *batch, Port *p, queue_t qid) {
int PortOut::SendBatch(bess::PacketBatch *batch, queue_t qid) {
uint64_t sent_bytes = 0;
int sent_pkts = 0;

if (p->conf().admin_up) {
sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt());
int dropped = 0;
int cnt = batch->cnt();

if (port_->conf().admin_up) {
if ((!rpfcheck_) || (attr_id_ == -1)) {
/* no RPF check - send everything at once */
sent_pkts = port_->SendPackets(qid, batch->pkts(), batch->cnt());
} else {
bool need_flush = true;
for (int i = 0; i < cnt; i++) {
uint64_t ifIndex = 0;
ifIndex = get_attr<uint64_t>(this, attr_id_, batch->pkts()[i]);
if ((ifIndex > 0) && (ifIndex == port_->get_ifIndex())) {
/* need a drop - send all packets till now,
* free this packet */
if (i - sent_pkts > 0) {
sent_pkts +=
port_->SendPackets(qid, &batch->pkts()[sent_pkts], i - sent_pkts);
}
if (sent_pkts == i) {
/* send sucessful, drop packet failing rpf check*/
bess::Packet::Free(batch->pkts()[i]);
/* Add dropped packet to "sent" count. We will subtract it later.
*/
sent_pkts++;
dropped++;
/* move to next packet */
} else {
/* remove from the total the length of packets we failed to send */
for (int k = sent_pkts; k < i; k++) {
sent_bytes -= batch->pkts()[k]->total_len();
}
need_flush = false;
break;
}
} else {
sent_bytes += batch->pkts()[i]->total_len();
}
}
if (need_flush && (sent_pkts < cnt)) {
sent_pkts += port_->SendPackets(qid, &batch->pkts()[sent_pkts], cnt - sent_pkts);
}
}
}

if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) {
if (!(port_->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) {
const packet_dir_t dir = PACKET_DIR_OUT;

for (int i = 0; i < sent_pkts; i++) {
sent_bytes += batch->pkts()[i]->total_len();
}

p->queue_stats[dir][qid].packets += sent_pkts;
p->queue_stats[dir][qid].dropped += (batch->cnt() - sent_pkts);
p->queue_stats[dir][qid].bytes += sent_bytes;
port_->queue_stats[dir][qid].packets += sent_pkts - dropped;
port_->queue_stats[dir][qid].dropped += cnt - sent_pkts + dropped;
port_->queue_stats[dir][qid].bytes += sent_bytes;
}

return sent_pkts;
}

void PortOut::ProcessBatch(Context *ctx, bess::PacketBatch *batch) {
Port *p = port_;

CHECK(worker_queues_[ctx->wid] >= 0);
queue_t qid = worker_queues_[ctx->wid];
int sent_pkts = 0;


if (queue_users_[qid] == 1) {
sent_pkts = SendBatch(batch, p, qid);
sent_pkts = SendBatch(batch, qid);
} else {
mcslock_node_t me;
mcs_lock(&queue_locks_[qid], &me);
sent_pkts = SendBatch(batch, p, qid);
sent_pkts = SendBatch(batch, qid);
mcs_unlock(&queue_locks_[qid], &me);
}

Expand Down
4 changes: 4 additions & 0 deletions core/modules/port_out.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ class PortOut final : public Module {

private:
Port *port_;
int attr_id_;
bool rpfcheck_;

int worker_queues_[Worker::kMaxWorkers];

// Number of workers mapped to a given queue. Indexed by queue number
int queue_users_[MAX_QUEUES_PER_DIR];

mcslock_t queue_locks_[MAX_QUEUES_PER_DIR];

int SendBatch(bess::PacketBatch *batch, queue_t qid);
};

#endif // BESS_MODULES_PORTOUT_H_
4 changes: 3 additions & 1 deletion core/packet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ Packet *Packet::copy(const Packet *src) {

bess::utils::CopyInlined(dst->append(src->total_len()), src->head_data(),
src->total_len(), true);

bess::utils::CopyInlined(&dst->metadata_, &src->metadata_,
SNBUF_METADATA, true);

return dst;
}

Expand Down
22 changes: 22 additions & 0 deletions core/port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,31 @@

std::map<std::string, Port *> PortBuilder::all_ports_;

static uint64_t cur_ifIndex = 1;
static bool rolled_over = false;

Port *PortBuilder::CreatePort(const std::string &name) const {
Port *p = port_generator_();
p->set_name(name);
if (rolled_over) {
for (p->ifIndex = 1; p->ifIndex < UINT64_MAX; p->ifIndex++) {
bool found = false;
for (auto const &pi : all_ports_) {
if (pi.second->ifIndex == p->ifIndex) {
found = true;
break;
}
}
if (!found) {
break;
}
}
} else {
p->ifIndex = ++cur_ifIndex;
if (cur_ifIndex == UINT64_MAX) {
rolled_over = true;
}
}
p->set_port_builder(this);
return p;
}
Expand Down
3 changes: 3 additions & 0 deletions core/port.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ class Port {

const PortBuilder *port_builder() const { return port_builder_; }

uint64_t get_ifIndex() const { return ifIndex; }

protected:
friend class PortBuilder;

Expand All @@ -301,6 +303,7 @@ class Port {

// Current configuration
Conf conf_;
uint64_t ifIndex;

private:
static const size_t kDefaultIncQueueSize = 1024;
Expand Down
1 change: 1 addition & 0 deletions protobuf/module_msg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ message PortIncArg {
*/
message PortOutArg {
string port = 1; /// The portname to connect to.
bool rpfcheck = 2; /// Allow reflection of packets to source port
}

/**
Expand Down