diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index b9e5d8b26..4d6d5ce8e 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -794,71 +794,68 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { Metrics::pkt_frame_in++; int32_t _flowId = ZT_QOS_NO_FLOW; - //if (peer->flowHashingSupported()) { - if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { - const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); - const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - - if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - uint8_t proto = (reinterpret_cast(frameData)[9]); - const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); - switch(proto) { - case 0x01: // ICMP - //flowId = 0x01; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (headerLen + 4)) { - unsigned int pos = headerLen + 0; - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - } - } - if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - unsigned int pos; - unsigned int proto; - _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); - switch(proto) { - case 0x3A: // ICMPv6 - //flowId = 0x3A; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (pos + 4)) { - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - default: - break; - } + if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { + const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); + const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + + if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + uint8_t proto = (reinterpret_cast(frameData)[9]); + const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); + switch(proto) { + case 0x01: // ICMP + //flowId = 0x01; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (headerLen + 4)) { + unsigned int pos = headerLen + 0; + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; } } - //} - //fprintf(stderr, "IncomingPacket::_doFRAME: flowId=%d\n", _flowId); + if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + unsigned int pos; + unsigned int proto; + _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); + switch(proto) { + case 0x3A: // ICMPv6 + //flowId = 0x3A; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (pos + 4)) { + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; + default: + break; + } + } + } const uint64_t nwid = at(ZT_PROTO_VERB_FRAME_IDX_NETWORK_ID); const SharedPtr network(RR->node->network(nwid)); @@ -872,8 +869,12 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - //RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); - RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); + if (RR->node->getMultithreadingEnabled()) { + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); + } + else { + RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + } } } } else { @@ -946,8 +947,12 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - //RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); - RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); + if (RR->node->getMultithreadingEnabled()) { + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); + } + else { + RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + } break; } } diff --git a/node/Node.cpp b/node/Node.cpp index 50a3c3ff5..c7cb37df5 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -240,6 +240,12 @@ ZT_ResultCode Node::processVirtualNetworkFrame( } } +void Node::initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled) +{ + _multithreadingEnabled = isEnabled; + RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled); +} + // Closure used to ping upstream and active/online peers class _PingPeersThatNeedPing { diff --git a/node/Node.hpp b/node/Node.hpp index 86d1aee57..da2d5427f 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,6 +283,14 @@ class Node : public NetworkController::Sender return _lowBandwidthMode; } + inline bool getMultithreadingEnabled() + { + return _multithreadingEnabled; + } + + void initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled); + + public: RuntimeEnvironment _RR; RuntimeEnvironment *RR; @@ -331,6 +339,7 @@ class Node : public NetworkController::Sender volatile int64_t _prngState[2]; bool _online; bool _lowBandwidthMode; + bool _multithreadingEnabled; }; } // namespace ZeroTier diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp index 6f559c4b8..401e1db9f 100644 --- a/node/PacketMultiplexer.cpp +++ b/node/PacketMultiplexer.cpp @@ -21,6 +21,11 @@ namespace ZeroTier { +PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +{ + RR = renv; +}; + void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) { PacketRecord* packet; @@ -46,46 +51,26 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const memcpy(packet->data, data, len); int bucket = flowId % _concurrency; - //fprintf(stderr, "bucket=%d\n", bucket); - _rxPacketQueues[bucket]->postLimit(packet, 2048); + _rxPacketQueues[bucket]->postLimit(packet, 256); } -PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) { - RR = renv; - bool _enablePinning = false; - char* pinningVar = std::getenv("ZT_CPU_PINNING"); - if (pinningVar) { - int tmp = atoi(pinningVar); - if (tmp > 0) { - _enablePinning = true; - } - } - - _concurrency = 1; - char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); - if (concurrencyVar) { - int tmp = atoi(concurrencyVar); - if (tmp > 0) { - _concurrency = tmp; - } - else { - _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); - } - } - else { - _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); + if (! RR->node->getMultithreadingEnabled()) { + return; } + _concurrency = concurrency; + bool _enablePinning = cpuPinningEnabled; for (unsigned int i = 0; i < _concurrency; ++i) { - fprintf(stderr, "reserved queue for thread %d\n", i); + fprintf(stderr, "Reserved queue for thread %d\n", i); _rxPacketQueues.push_back(new BlockingQueue()); } // Each thread picks from its own queue to feed into the core for (unsigned int i = 0; i < _concurrency; ++i) { _rxThreads.push_back(std::thread([this, i, _enablePinning]() { - fprintf(stderr, "created post-decode packet ingestion thread %d\n", i); + fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i); PacketRecord* packet = nullptr; for (;;) { @@ -96,7 +81,7 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) break; } - //fprintf(stderr, "popped packet from queue %d\n", i); + // fprintf(stderr, "popped packet from queue %d\n", i); MAC sourceMac = MAC(packet->source); MAC destMac = MAC(packet->dest); @@ -120,6 +105,6 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) } })); } -}; +} } // namespace ZeroTier \ No newline at end of file diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp index 152bda320..8cd592a10 100644 --- a/node/PacketMultiplexer.hpp +++ b/node/PacketMultiplexer.hpp @@ -43,11 +43,13 @@ class PacketMultiplexer { PacketMultiplexer(const RuntimeEnvironment* renv); + void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled); + void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId); std::vector*> _rxPacketQueues; - unsigned int _concurrency; + unsigned int _concurrency; // pool std::vector _rxPacketVector; std::vector _rxPacketThreads; diff --git a/service/OneService.cpp b/service/OneService.cpp index c783f01cd..799f31412 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -798,12 +798,13 @@ class OneServiceImpl : public OneService bool _serverThreadRunning; bool _serverThreadRunningV6; - unsigned int _rxThreadCount; BlockingQueue _rxPacketQueue; std::vector _rxPacketVector; std::vector _rxPacketThreads; Mutex _rxPacketVector_m,_rxPacketThreads_m; - bool _enableMulticore; + bool _multicoreEnabled; + bool _cpuPinningEnabled; + unsigned int _concurrency; bool _allowTcpFallbackRelay; bool _forceTcpRelay; @@ -938,89 +939,6 @@ class OneServiceImpl : public OneService _ports[1] = 0; _ports[2] = 0; - _enableMulticore = false; - char* multicoreVar = std::getenv("ZT_ENABLE_MULTICORE"); - if (multicoreVar) { - int tmp = atoi(multicoreVar); - if (tmp > 0) { - _enableMulticore = true; - } - } - if (_enableMulticore) { - bool _enablePinning = false; - char* pinningVar = std::getenv("ZT_CORE_PINNING"); - if (pinningVar) { - int tmp = atoi(pinningVar); - if (tmp > 0) { - _enablePinning = true; - } - } - char* concurrencyVar = std::getenv("ZT_CONCURRENCY"); - if (concurrencyVar) { - int tmp = atoi(concurrencyVar); - if (tmp > 0) { - _rxThreadCount = tmp; - } - else { - _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; - } - } - else { - _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; - } - fprintf(stderr, "using %d rx threads\n", _rxThreadCount); - for (unsigned int i = 0; i < _rxThreadCount; ++i) { - _rxPacketThreads.push_back(std::thread([this, i, _enablePinning]() { - - if (_enablePinning) { -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - int pinCore = i % _rxThreadCount; - fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); - pthread_t self = pthread_self(); - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(pinCore, &cpuset); -#endif -#ifdef __LINUX__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#elif __FreeBSD__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#endif -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - if (rc != 0) - { - fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); - exit(1); - } -#endif - } - PacketRecord* packet = nullptr; - for (;;) { - if (! _rxPacketQueue.get(packet)) { - break; - } - if (! packet) { - break; - } - const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); - { - Mutex::Lock l(_rxPacketVector_m); - _rxPacketVector.push_back(packet); - } - if (ZT_ResultCode_isFatal(err)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; - } - } - })); - } - } - prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); @@ -1071,6 +989,64 @@ class OneServiceImpl : public OneService delete _rc; } + void setUpMultithreading() + { + _node->initMultithreading(true, _concurrency, _cpuPinningEnabled); + bool pinning = _cpuPinningEnabled; + + fprintf(stderr, "Starting %d RX threads\n", _concurrency); + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxPacketThreads.push_back(std::thread([this, i, pinning]() { + + if (pinning) { +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + int pinCore = i % _concurrency; + fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); +#endif +#ifdef __LINUX__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#elif __FreeBSD__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#endif +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + if (rc != 0) + { + fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } +#endif + } + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueue.get(packet)) { + break; + } + if (! packet) { + break; + } + const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + })); + } + } + virtual ReasonForTermination run() { try { @@ -2672,7 +2648,18 @@ class OneServiceImpl : public OneService fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S); } _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); - _node->setLowBandwidthMode(OSUtils::jsonBool(settings["lowBandwidthMode"],false)); + _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); + _concurrency = OSUtils::jsonInt(settings["concurrency"],0); + _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); + if (_multicoreEnabled) { + unsigned int maxConcurrency = std::thread::hardware_concurrency(); + if (_concurrency <= 1 || _concurrency >= maxConcurrency) { + unsigned int conservativeDefault = (std::thread::hardware_concurrency() >= 4 ? 2 : 1); + fprintf(stderr, "Concurrency level provided (%d) is invalid, assigning conservative default value of (%d)\n", _concurrency, conservativeDefault); + _concurrency = conservativeDefault; + } + setUpMultithreading(); + } #ifndef ZT_SDK const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT)); @@ -3001,7 +2988,7 @@ class OneServiceImpl : public OneService _lastDirectReceiveFromGlobal = now; } - if (_enableMulticore) { + if (_multicoreEnabled) { PacketRecord* packet; _rxPacketVector_m.lock(); if (_rxPacketVector.empty()) { @@ -3018,7 +3005,7 @@ class OneServiceImpl : public OneService memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); packet->size = (unsigned int)len; memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); + _rxPacketQueue.postLimit(packet, 256 * _concurrency); } else { const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline);