Skip to content

Commit

Permalink
Decouple transport receivers creation using unique network flows (#5583)
Browse files Browse the repository at this point in the history
* Refs #22519. Add regression test

Signed-off-by: Juan Lopez Fernandez <[email protected]>

* Refs #22519. Decouple transport receivers creation using unique network flows

Signed-off-by: Juan Lopez Fernandez <[email protected]>

* Refs #22519. Add comment for future developers

Signed-off-by: Juan Lopez Fernandez <[email protected]>

* Refs #22519. Apply suggestion

Signed-off-by: Juan Lopez Fernandez <[email protected]>

* Refs #22519. Reuse unique ports for locators of same kind in a reader

Signed-off-by: Juan Lopez Fernandez <[email protected]>

---------

Signed-off-by: Juan Lopez Fernandez <[email protected]>
(cherry picked from commit e6e918f)
  • Loading branch information
juanlofer-eprosima authored and cferreiragonz committed Jan 22, 2025
1 parent 604a27d commit 78bf8c4
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 17 deletions.
62 changes: 45 additions & 17 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <sstream>
Expand Down Expand Up @@ -1837,42 +1838,69 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
if (unique_flows)
{
attributes.multicastLocatorList.clear();
attributes.unicastLocatorList = m_att.defaultUnicastLocatorList;
attributes.unicastLocatorList.clear();
attributes.external_unicast_locators.clear();

uint16_t port = initial_unique_port;
while (port < final_unique_port)
// Register created resources to distinguish the case where a receiver was created in this same function call
// (and can be reused for other locators of the same kind in this reader), and that in which it was already
// created before for other reader in this same participant.
std::map<int32_t, int16_t> created_resources;

// Create unique flows for unicast locators
LocatorList_t input_locator_list = m_att.defaultUnicastLocatorList;
for (Locator_t& loc : input_locator_list)
{
// Set port on unicast locators
for (Locator_t& loc : attributes.unicastLocatorList)
uint16_t port = created_resources.count(loc.kind) ? created_resources[loc.kind] : initial_unique_port;
while (port < final_unique_port)
{
// Set logical port only TCP locators
if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind)
{
// Due to current implementation limitations only one physical port (actual socket receiver)
// is allowed when using TCP tranport. All we can do for now is to create a unique "logical" flow.
// TODO(juanlofer): create a unique dedicated TCP communication channel once this limitation is removed.
IPLocator::setLogicalPort(loc, port);
}
else
{
loc.port = port;
}

// Try creating receiver for this locator
LocatorList_t aux_locator_list;
aux_locator_list.push_back(loc);
if (createReceiverResources(aux_locator_list, false, true, false))
{
created_resources[loc.kind] = port;
}

// Locator will be present in the list if receiver was created, or was already created
// Continue if receiver not created for this reader (might exist but created for other reader in this same participant)
if (!aux_locator_list.empty() &&
created_resources.count(loc.kind) && (created_resources[loc.kind] == port))
{
break;
}

// Try with next port
++port;
}

// Try creating receiver resources
LocatorList_t aux_locator_list = attributes.unicastLocatorList;
if (createReceiverResources(aux_locator_list, false, true, false))
// Fail when unique ports are exhausted
if (port >= final_unique_port)
{
break;
EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
<< initial_unique_port << "-" << final_unique_port << ". Discarding locator: " << loc);
}
else
{
attributes.unicastLocatorList.push_back(loc);
}

// Try with next port
++port;
}

// Fail when unique ports are exhausted
if (port >= final_unique_port)
if (attributes.unicastLocatorList.empty())
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
<< initial_unique_port << "-" << final_unique_port);
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "No unicast locators to create unique flows");
return false;
}
}
Expand Down Expand Up @@ -1983,7 +2011,7 @@ bool RTPSParticipantImpl::createReceiverResources(
bool ret_val = input_list.empty();

#if HAVE_SECURITY
// An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
// An auxilary buffer is needed in the ReceiverResource to decrypt the message,
// that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size.
uint32_t max_receiver_buffer_size =
is_secure() ? std::numeric_limits<uint16_t>::max() : (std::numeric_limits<uint32_t>::max)();
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ class RTPSParticipantImpl
* @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable
* @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled.
* @param log_when_creation_fails - True if a log warning shall be issued for each locator when a receiver resource cannot be created.
* @return True if a receiver resource was created for at least a locator in the list, false otherwise.
*/
bool createReceiverResources(
LocatorList_t& Locator_list,
Expand Down
102 changes: 102 additions & 0 deletions test/blackbox/common/BlackboxTestsNetworkConf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "PubSubParticipant.hpp"

#include <fastrtps/rtps/common/Locator.h>
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
#include <fastrtps/utils/IPFinder.h>

using namespace eprosima::fastrtps;
Expand Down Expand Up @@ -165,6 +166,107 @@ TEST_P(NetworkConfig, sub_unique_network_flows)
}
}

// Regression test for redmine issue #22519 to check that readers using unique network flows cannot share locators
// with other readers. The mentioned issue referred to the case where TCP + builtin transports are present.
// In that concrete scenario, the problem was that while the TCP (and UDP) transports rightly were able
// to create a receiver in the dedicated "unique flow" port, shared memory failed for that same port as the other
// process (or participant) is already listening on it. However this was not being handled properly, so once matched,
// the publisher attempts to send data to the wrongfully announced shared memory locator.
// Note that the underlying problem is that, when creating unique network flows, all transports are requested to
// create a receiver for a specific port all together. This is, the creation of unique flow receivers is only
// considered to fail when it fails for all transports, instead of decoupling them and keep trying for alternative
// ports when the creation of a specific transport receiver fails.
// In this test a similar scenario is presented, but using instead UDP and shared memory transports. In the first
// participant, only shared memory is used (which should create a SHM receiver in the first "unique" port attempted).
// In the second participant both UDP and shared memory are used (which should create a UDP receiver in the first
// "unique" port attempted, and a shared memory receiver in the second "unique" port attempted, as the first one is
// already being used by the first participant). As a result, the listening shared memory locators of each data
// reader should be different. Finally, a third data reader is created in the second participant, and it is verified
// that its listening locators are different from those of the other reader created in the same participant, as well as
// from the (SHM) one of the reader created in the first participant.
TEST_P(NetworkConfig, sub_unique_network_flows_multiple_locators)
{
// Enable unique network flows feature
PropertyPolicy properties;
properties.properties().emplace_back("fastdds.unique_network_flows", "");

// First participant
PubSubParticipant<HelloWorldPubSubType> participant(0, 1, 0, 0);

participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);

std::shared_ptr<eprosima::fastdds::rtps::SharedMemTransportDescriptor> shm_descriptor =
std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
// Use only SHM transport in the first participant
participant.disable_builtin_transport().add_user_transport_to_pparams(shm_descriptor);

ASSERT_TRUE(participant.init_participant());
ASSERT_TRUE(participant.init_subscriber(0));

LocatorList_t locators;

participant.get_native_reader(0).get_listening_locators(locators);
ASSERT_EQ(locators.size(), 1u);
ASSERT_EQ((*locators.begin()).kind, LOCATOR_KIND_SHM);

// Second participant
PubSubParticipant<HelloWorldPubSubType> participant2(0, 2, 0, 0);

participant2.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);

// Use both UDP and SHM in the second participant
if (!use_udpv4)
{
participant2.disable_builtin_transport().add_user_transport_to_pparams(descriptor_).
add_user_transport_to_pparams(shm_descriptor);
}

ASSERT_TRUE(participant2.init_participant());
ASSERT_TRUE(participant2.init_subscriber(0));

LocatorList_t locators2_1;

participant2.get_native_reader(0).get_listening_locators(locators2_1);
ASSERT_TRUE(locators2_1.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP

// Check SHM locator is different from the one in the first participant
for (const Locator_t& loc : locators2_1)
{
if (LOCATOR_KIND_SHM == loc.kind)
{
// Ports should be different (expected second and first values of the unique network flows port range)
ASSERT_FALSE(loc == *locators.begin());
}
}

// Now create a second reader in the second participant
ASSERT_TRUE(participant2.init_subscriber(1));

LocatorList_t locators2_2;

participant2.get_native_reader(1).get_listening_locators(locators2_2);
ASSERT_TRUE(locators2_2.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP

// Check SHM locator is different from the one in the first participant
for (const Locator_t& loc : locators2_2)
{
if (LOCATOR_KIND_SHM == loc.kind)
{
// Ports should be different (expected third and first values of the unique network flows port range)
ASSERT_FALSE(loc == *locators.begin());
}
}

// Now check no locators are shared between the two readers in the second participant
for (const Locator_t& loc_1 : locators2_1)
{
for (const Locator_t& loc_2 : locators2_2)
{
ASSERT_FALSE(loc_1 == loc_2);
}
}
}

//Verify that outLocatorList is used to select the desired output channel
TEST_P(NetworkConfig, PubSubOutLocatorSelection)
{
Expand Down

0 comments on commit 78bf8c4

Please sign in to comment.