Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/280/output dds #329

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions perftest_qos_profiles.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1091,5 +1091,21 @@ _KeepDurationUsec variable name corresponds to the command-line option -keepDura
</durability>
</datawriter_qos>
</qos_profile>

<qos_profile name="LoggingQos">

<datawriter_qos>
<reliability>
<kind>BEST_EFFORT_RELIABILITY_QOS</kind>
</reliability>

<resource_limits>
<max_samples>10</max_samples>
<initial_samples>1</initial_samples>
<max_samples_per_instance>1</max_samples_per_instance>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

</resource_limits>
</datawriter_qos>
</qos_profile>

</qos_library>
</dds>
311 changes: 311 additions & 0 deletions srcCpp/RTIDDSImpl.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -3842,6 +3842,317 @@ double RTIDDSImpl_FlatData<T>::obtain_dds_deserialize_time_cost_override(
}
#endif

void PerftestDDSPrinter::initialize(ParameterManager *_PM)
{
PerftestPrinter::initialize(_PM);
this->_PM = _PM;
topicName = std::string("PerftestInfo");
}

void PerftestDDSPrinter::initialize_dds_entities()
{
DDS_ReturnCode_t retcode;

DDS_DomainParticipantFactoryQos factory_qos;
DDS_DomainParticipantQos dpQos;
DDS_PublisherQos publisherQos;
DDS_DataWriterQos dwQos;

#ifndef RTI_MICRO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check what is the behavior when using micro? We might need to make this differently.

retcode = DDSTheParticipantFactory->get_qos(factory_qos);
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"DDSTheParticipantFactory->get_qos(factory_qos)\n");
return;
}

if (!_PM->get<bool>("noXmlQos")) {
factory_qos.profile.url_profile.ensure_length(1, 1);
factory_qos.profile.url_profile[0] =
DDS_String_dup(_PM->get<std::string>("qosFile").c_str());
} else {
factory_qos.profile.string_profile.from_array(
PERFTEST_QOS_STRING,
PERFTEST_QOS_STRING_SIZE);
}

retcode = DDSTheParticipantFactory->set_qos(factory_qos);
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"DDSTheParticipantFactory->set_qos(factory_qos) failed\n");
return;
}

retcode = DDSTheParticipantFactory->reload_profiles();
if ( retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"Problem opening QOS profiles file %s.\n",
_PM->get<std::string>("qosFile").c_str());
return;
}

retcode = DDSTheParticipantFactory->get_participant_qos_from_profile(
dpQos,
_PM->get<std::string>("qosLibrary").c_str(),
"LoggingQos");
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"Problem setting QoS Library \"%s::LoggingQos\" "
"for participant_qos.\n",
_PM->get<std::string>("qosLibrary").c_str());
return;
}

retcode = DDSTheParticipantFactory->get_publisher_qos_from_profile(
publisherQos,
_PM->get<std::string>("qosLibrary").c_str(),
"LoggingQos");
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"Problem setting QoS Library for publisherQos.\n");
}

retcode = DDSTheParticipantFactory->get_datawriter_qos_from_profile(
dwQos,
_PM->get<std::string>("qosLibrary").c_str(),
"LoggingQos");
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"Problem setting QoS Library for dwQos.\n");
}
#endif

//TODO: Decide if we want to use the same domain or + 1 or what.
participant = DDSTheParticipantFactory->create_participant(
(DDS_DomainId_t) (_PM->get<int>("domain") + 1),
dpQos,
NULL,
DDS_STATUS_MASK_NONE);
if (participant == NULL) {
fprintf(stderr, "PerftestDDSPrinter Problem creating participant.\n");
finalize();
}

publisher = participant->create_publisher(
publisherQos,
NULL,
DDS_STATUS_MASK_NONE);
if (publisher == NULL) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"PerftestDDSPrinter create_publisher error\n");
finalize();
}

retcode = perftestInfo::TypeSupport::register_type(
participant,
perftestInfo::TypeSupport::get_type_name());
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"PerftestDDSPrinter register_type error %d\n",
retcode);
finalize();
}

topic = participant->create_topic(
topicName.c_str(),
perftestInfo::TypeSupport::get_type_name(),
DDS_TOPIC_QOS_DEFAULT,
NULL,
DDS_STATUS_MASK_NONE);
if (topic == NULL) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"PerftestDDSPrinter create_topic error\n");
finalize();
}

DDSDataWriter *writer = publisher->create_datawriter(
topic, dwQos, NULL,
DDS_STATUS_MASK_NONE);
if (writer == NULL) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"PerftestDDSPrinter create_datawriter error\n");
finalize();
}

ptInfoWriter = perftestInfo::DataWriter::narrow(writer);
if (ptInfoWriter == NULL) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"DataWriter narrow error\n");
finalize();
}

ptInfo = perftestInfo::TypeSupport::create_data();
if (ptInfo == NULL) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"testTypeSupport::create_data error\n");
finalize();
}

ptInfo->pLatencyInfo = perftestLatencyInfo::TypeSupport::create_data();
if (ptInfo->pLatencyInfo == NULL) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"testTypeSupport::create_data error\n");
finalize();
}
ptInfo->pThroughputInfo = perftestThroughputInfo::TypeSupport::create_data();
if (ptInfo->pThroughputInfo == NULL) {
fprintf(stderr,
"PerftestDDSPrinter::initialize: "
"testTypeSupport::create_data error\n");
finalize();
}

ptInfo->appId = _PM->get<bool>("pub") ? _PM->get<int>("pidMultiPubTest")
: _PM->get<int>("sidMultiSubTest");

fprintf(stderr,
"[Info] Publishing latency/throughput information via DDS\n");
}

void PerftestDDSPrinter::finalize()
{
DDS_ReturnCode_t retcode;

deleteDataSample();
retcode = perftestInfo::TypeSupport::delete_data(ptInfo);
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr, "perftestInfo::TypeSupport::delete_data error %d\n", retcode);
}

if (participant != NULL) {
retcode = participant->delete_contained_entities();
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr, "delete_contained_entities error %d\n", retcode);
}
retcode = DDSTheParticipantFactory->delete_participant(participant);
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr, "delete_participant error %d\n", retcode);
}
}
}

void PerftestDDSPrinter::print_latency_interval(LatencyInfo latInfo)
{
this->dataWrapperLatency(latInfo);
DDS_ReturnCode_t retcode = ptInfoWriter->write(*ptInfo, DDS_HANDLE_NIL);
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::print_latency_interval"
"write error %d\n",
retcode);
}
}

void PerftestDDSPrinter::print_throughput_interval(ThroughputInfo thInfo)
{
DDS_ReturnCode_t retcode;
this->dataWrapperThroughput(thInfo);
retcode = ptInfoWriter->write(*ptInfo, DDS_HANDLE_NIL);
if (retcode != DDS_RETCODE_OK) {
fprintf(stderr,
"PerftestDDSPrinter::print_latency_interval"
"write error %d\n",
retcode);
}
}

void PerftestDDSPrinter::deleteDataSample()
{
ptInfo->outputCpu = NULL;
ptInfo->pLatencyInfo->latency = NULL;
ptInfo->pLatencyInfo->ave = NULL;
ptInfo->pLatencyInfo->std = NULL;
ptInfo->pLatencyInfo->min = NULL;
ptInfo->pLatencyInfo->max = NULL;
ptInfo->pLatencyInfo->h50 = NULL;
ptInfo->pLatencyInfo->h90 = NULL;
ptInfo->pLatencyInfo->h99 = NULL;
ptInfo->pLatencyInfo->h9999 = NULL;
ptInfo->pLatencyInfo->h999999 = NULL;
ptInfo->pLatencyInfo->serialize = NULL;
ptInfo->pLatencyInfo->deserialize = NULL;
ptInfo->pLatencyInfo->total = NULL;
ptInfo->pThroughputInfo->packets = NULL;
ptInfo->pThroughputInfo->packetsAve = NULL;
ptInfo->pThroughputInfo->mbps = NULL;
ptInfo->pThroughputInfo->mbpsAve = NULL;
ptInfo->pThroughputInfo->lost = NULL;
ptInfo->pThroughputInfo->lostPercent = NULL;
ptInfo->pThroughputInfo->packetsS = NULL;
}

void PerftestDDSPrinter::dataWrapperLatency(LatencyInfo latInfo)
{
if (this->_dataLength != ptInfo->dataLength) {
ptInfo->dataLength = _dataLength;
}

if (this->_showCPU) {
ptInfo->outputCpu = &latInfo.outputCpu;
}
ptInfo->pLatencyInfo->latency = &latInfo.latency;
ptInfo->pLatencyInfo->ave = &latInfo.ave;
ptInfo->pLatencyInfo->std = &latInfo.std;
ptInfo->pLatencyInfo->min = &latInfo.min;
ptInfo->pLatencyInfo->max = &latInfo.max;
// summary part
if (!latInfo.interval) {
ptInfo->pLatencyInfo->h50 = &latInfo.h50;
ptInfo->pLatencyInfo->h90 = &latInfo.h90;
ptInfo->pLatencyInfo->h99 = &latInfo.h99;
ptInfo->pLatencyInfo->h9999 = &latInfo.h9999;
ptInfo->pLatencyInfo->h999999 = &latInfo.h999999;
if (_printSerialization) {
ptInfo->pLatencyInfo->serialize = &latInfo.serialize;
ptInfo->pLatencyInfo->deserialize = &latInfo.deserialize;
ptInfo->pLatencyInfo->total = &latInfo.total;
}
} else {
ptInfo->pLatencyInfo->h50 = NULL;
ptInfo->pLatencyInfo->h90 = NULL;
ptInfo->pLatencyInfo->h99 = NULL;
ptInfo->pLatencyInfo->h9999 = NULL;
ptInfo->pLatencyInfo->h999999 = NULL;
ptInfo->pLatencyInfo->serialize = NULL;
ptInfo->pLatencyInfo->deserialize = NULL;
ptInfo->pLatencyInfo->total = NULL;
}
}

void PerftestDDSPrinter::dataWrapperThroughput(ThroughputInfo thInfo)
{
if (this->_dataLength != ptInfo->dataLength) {
ptInfo->dataLength = _dataLength;
}
if (this->_showCPU) {
ptInfo->outputCpu = &thInfo.outputCpu;
}
ptInfo->pThroughputInfo->packets = &thInfo.packets;
ptInfo->pThroughputInfo->packetsAve = &thInfo.pAve;
ptInfo->pThroughputInfo->mbps = &thInfo.mbps;
ptInfo->pThroughputInfo->mbpsAve = &thInfo.mbpsAve;
ptInfo->pThroughputInfo->lost = &thInfo.lost;
ptInfo->pThroughputInfo->lostPercent = &thInfo.lostPercent;
if (thInfo.interval) {
ptInfo->pThroughputInfo->packetsS = &thInfo.packetsS;
} else {
ptInfo->pThroughputInfo->packetsS = NULL;
}
}

/*
* This instantiation is to avoid a undefined reference of a templated static
* function of obtain_dds_de/serialize_time_costs.
Expand Down
47 changes: 47 additions & 0 deletions srcCpp/RTIDDSImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "perftestSupport.h"
#include "PerftestTransport.h"
#include "Infrastructure_common.h"
#include "PerftestPrinter.h"

#ifdef RTI_ZEROCOPY_AVAILABLE
#include "perftest_ZeroCopySupport.h"
Expand Down Expand Up @@ -262,4 +263,50 @@ class RTIDDSImpl_FlatData: public RTIDDSImpl<TestData_t> {
};
#endif // RTI_FLATDATA_AVAILABLE

class PerftestDDSPrinter: public PerftestPrinter {

int domain;
std::string topicName;

ParameterManager *_PM;
DDSDomainParticipant *participant;
DDSPublisher *publisher;
DDSTopic *topic;
perftestInfoDataWriter *ptInfoWriter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infoDataWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

perftestInfo *ptInfo;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perftestInfo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


~PerftestDDSPrinter() {};

void initialize(ParameterManager *_PM);
void initialize_dds_entities();
void finalize();

void print_initial_output()
{
initialize_dds_entities();
};
void print_final_output()
{
finalize();
};

void print_latency_header() {};
void print_latency_interval(LatencyInfo latInfo);
void print_latency_summary(LatencyInfo latInfo)
{
print_latency_interval(latInfo);
};

void print_throughput_header() {};
void print_throughput_interval(ThroughputInfo thInfo);
void print_throughput_summary(ThroughputInfo thInfo)
{
print_throughput_interval(thInfo);
};

void dataWrapperLatency(LatencyInfo latInfo);
void dataWrapperThroughput(ThroughputInfo thInfo);
void deleteDataSample();
};

#endif // __RTIDDSIMPL_H__
Loading