Skip to content

Commit

Permalink
Switch to local.conf-based config of multithreading
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-henry committed Aug 20, 2024
1 parent 8283a6d commit b1a30ae
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 184 deletions.
135 changes: 70 additions & 65 deletions node/IncomingPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,71 +794,68 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
{
Metrics::pkt_frame_in++;
int32_t _flowId = ZT_QOS_NO_FLOW;
//if (peer->flowHashingSupported()) {
if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;

if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
switch(proto) {
case 0x01: // ICMP
//flowId = 0x01;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (headerLen + 4)) {
unsigned int pos = headerLen + 0;
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
}
}

if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
unsigned int pos;
unsigned int proto;
_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
switch(proto) {
case 0x3A: // ICMPv6
//flowId = 0x3A;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (pos + 4)) {
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
default:
break;
}
if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;

if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
switch(proto) {
case 0x01: // ICMP
//flowId = 0x01;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (headerLen + 4)) {
unsigned int pos = headerLen + 0;
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
}
}
//}

//fprintf(stderr, "IncomingPacket::_doFRAME: flowId=%d\n", _flowId);
if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
unsigned int pos;
unsigned int proto;
_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
switch(proto) {
case 0x3A: // ICMPv6
//flowId = 0x3A;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (pos + 4)) {
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
default:
break;
}
}
}

const uint64_t nwid = at<uint64_t>(ZT_PROTO_VERB_FRAME_IDX_NETWORK_ID);
const SharedPtr<Network> network(RR->node->network(nwid));
Expand All @@ -872,8 +869,12 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) {
//RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
if (RR->node->getMultithreadingEnabled()) {
RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
}
else {
RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
}
}
}
} else {
Expand Down Expand Up @@ -946,8 +947,12 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
}
// fall through -- 2 means accept regardless of bridging checks or other restrictions
case 2:
//RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
if (RR->node->getMultithreadingEnabled()) {
RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
}
else {
RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
}
break;
}
}
Expand Down
6 changes: 6 additions & 0 deletions node/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ ZT_ResultCode Node::processVirtualNetworkFrame(
}
}

void Node::initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled)
{
_multithreadingEnabled = isEnabled;
RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled);
}

// Closure used to ping upstream and active/online peers
class _PingPeersThatNeedPing
{
Expand Down
9 changes: 9 additions & 0 deletions node/Node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,14 @@ class Node : public NetworkController::Sender
return _lowBandwidthMode;
}

inline bool getMultithreadingEnabled()
{
return _multithreadingEnabled;
}

void initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled);


public:
RuntimeEnvironment _RR;
RuntimeEnvironment *RR;
Expand Down Expand Up @@ -331,6 +339,7 @@ class Node : public NetworkController::Sender
volatile int64_t _prngState[2];
bool _online;
bool _lowBandwidthMode;
bool _multithreadingEnabled;
};

} // namespace ZeroTier
Expand Down
45 changes: 15 additions & 30 deletions node/PacketMultiplexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

namespace ZeroTier {

PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
{
RR = renv;
};

void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
{
PacketRecord* packet;
Expand All @@ -46,46 +51,26 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const
memcpy(packet->data, data, len);

int bucket = flowId % _concurrency;
//fprintf(stderr, "bucket=%d\n", bucket);
_rxPacketQueues[bucket]->postLimit(packet, 2048);
_rxPacketQueues[bucket]->postLimit(packet, 256);
}

PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled)
{
RR = renv;
bool _enablePinning = false;
char* pinningVar = std::getenv("ZT_CPU_PINNING");
if (pinningVar) {
int tmp = atoi(pinningVar);
if (tmp > 0) {
_enablePinning = true;
}
}

_concurrency = 1;
char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY");
if (concurrencyVar) {
int tmp = atoi(concurrencyVar);
if (tmp > 0) {
_concurrency = tmp;
}
else {
_concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2);
}
}
else {
_concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2);
if (! RR->node->getMultithreadingEnabled()) {
return;
}
_concurrency = concurrency;
bool _enablePinning = cpuPinningEnabled;

for (unsigned int i = 0; i < _concurrency; ++i) {
fprintf(stderr, "reserved queue for thread %d\n", i);
fprintf(stderr, "Reserved queue for thread %d\n", i);
_rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());
}

// Each thread picks from its own queue to feed into the core
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxThreads.push_back(std::thread([this, i, _enablePinning]() {
fprintf(stderr, "created post-decode packet ingestion thread %d\n", i);
fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);

PacketRecord* packet = nullptr;
for (;;) {
Expand All @@ -96,7 +81,7 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
break;
}

//fprintf(stderr, "popped packet from queue %d\n", i);
// fprintf(stderr, "popped packet from queue %d\n", i);

MAC sourceMac = MAC(packet->source);
MAC destMac = MAC(packet->dest);
Expand All @@ -120,6 +105,6 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
}
}));
}
};
}

} // namespace ZeroTier
4 changes: 3 additions & 1 deletion node/PacketMultiplexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ class PacketMultiplexer {

PacketMultiplexer(const RuntimeEnvironment* renv);

void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled);

void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId);

std::vector<BlockingQueue<PacketRecord*>*> _rxPacketQueues;

unsigned int _concurrency;
unsigned int _concurrency;
// pool
std::vector<PacketRecord*> _rxPacketVector;
std::vector<std::thread> _rxPacketThreads;
Expand Down
Loading

0 comments on commit b1a30ae

Please sign in to comment.