Skip to content

Commit

Permalink
Refs eProsima#2821. Fixing error storing history record
Browse files Browse the repository at this point in the history
  • Loading branch information
richiware committed Jun 13, 2018
1 parent df18029 commit 8ebd4d9
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 21 deletions.
2 changes: 1 addition & 1 deletion include/fastrtps/rtps/reader/WriterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace eprosima
* Set initial value for last acked sequence number.
* @param[in] seqNum last acked sequence number.
*/
void loaded_from_storage(const SequenceNumber_t& seqNum);
void loaded_from_storage_nts(const SequenceNumber_t& seqNum);

/**
* Get the maximum sequenceNumber received from this Writer.
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
RemoteWriterAttributes watt(pdata.m_VendorId);
watt.guid.guidPrefix = pdata.m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SEDPPubWriter;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata.m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
Expand Down Expand Up @@ -704,6 +705,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
RemoteWriterAttributes watt(pdata.m_VendorId);
watt.guid.guidPrefix = pdata.m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SEDPSubWriter;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata.m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
Expand Down Expand Up @@ -738,6 +740,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
WriterProxyData watt;
watt.guid().guidPrefix = pdata.m_guid.guidPrefix;
watt.guid().entityId = sedp_builtin_publications_secure_writer;
watt.persistence_guid(watt.guid());
watt.unicastLocatorList(pdata.m_metatrafficUnicastLocatorList);
watt.multicastLocatorList(pdata.m_metatrafficMulticastLocatorList);
watt.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
Expand Down Expand Up @@ -783,6 +786,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
WriterProxyData watt;
watt.guid().guidPrefix = pdata.m_guid.guidPrefix;
watt.guid().entityId = sedp_builtin_subscriptions_secure_writer;
watt.persistence_guid(watt.guid());
watt.unicastLocatorList(pdata.m_metatrafficUnicastLocatorList);
watt.multicastLocatorList(pdata.m_metatrafficMulticastLocatorList);
watt.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
Expand Down Expand Up @@ -837,6 +841,7 @@ void EDPSimple::removeRemoteEndpoints(ParticipantProxyData* pdata)
RemoteWriterAttributes watt;
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SEDPPubWriter;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
Expand Down Expand Up @@ -869,6 +874,7 @@ void EDPSimple::removeRemoteEndpoints(ParticipantProxyData* pdata)
RemoteWriterAttributes watt;
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SEDPSubWriter;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
Expand Down Expand Up @@ -903,6 +909,7 @@ void EDPSimple::removeRemoteEndpoints(ParticipantProxyData* pdata)
RemoteWriterAttributes watt;
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = sedp_builtin_publications_secure_writer;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
Expand Down Expand Up @@ -944,6 +951,7 @@ void EDPSimple::removeRemoteEndpoints(ParticipantProxyData* pdata)
RemoteWriterAttributes watt;
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = sedp_builtin_subscriptions_secure_writer;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ void PDPSimple::assignRemoteEndpoints(ParticipantProxyData* pdata)
RemoteWriterAttributes watt(pdata->m_VendorId);
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SPDPWriter;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = BEST_EFFORT;
Expand Down Expand Up @@ -657,6 +658,7 @@ void PDPSimple::removeRemoteEndpoints(ParticipantProxyData* pdata)
RemoteWriterAttributes watt;
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SPDPWriter;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList;
watt.endpoint.reliabilityKind = BEST_EFFORT;
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ bool WLP::assignRemoteEndpoints(const ParticipantProxyData& pdata)
RemoteWriterAttributes watt(pdata.m_VendorId);
watt.guid.guidPrefix = pdata.m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_WriterLiveliness;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata.m_metatrafficMulticastLocatorList;
watt.endpoint.topicKind = WITH_KEY;
Expand Down Expand Up @@ -211,6 +212,7 @@ void WLP::removeRemoteEndpoints(ParticipantProxyData* pdata)
RemoteWriterAttributes watt;
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_WriterLiveliness;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList;
watt.endpoint.topicKind = WITH_KEY;
Expand Down
38 changes: 22 additions & 16 deletions src/cpp/rtps/reader/RTPSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,38 +125,44 @@ void RTPSReader::remove_persistence_guid(const RemoteWriterAttributes& wdata)

SequenceNumber_t RTPSReader::update_last_notified(const GUID_t& guid, const SequenceNumber_t& seq)
{
SequenceNumber_t ret_val(0, 0);
SequenceNumber_t ret_val;
std::lock_guard<std::recursive_mutex> guard(*mp_mutex);
GUID_t guid_to_look = guid;
auto p_guid = persistence_guid_map_.find(guid);
if (p_guid != persistence_guid_map_.end())
{
auto p_seq = history_record_.find(p_guid->second);
if (p_seq != history_record_.end())
{
ret_val = p_seq->second;
}
guid_to_look = p_guid->second;
}

if (ret_val < seq)
{
set_last_notified(p_guid->second, seq);
}
auto p_seq = history_record_.find(guid_to_look);
if (p_seq != history_record_.end())
{
ret_val = p_seq->second;
}

if (ret_val < seq)
{
set_last_notified(guid_to_look, seq);
}

return ret_val;
}

SequenceNumber_t RTPSReader::get_last_notified(const GUID_t& guid) const
{
SequenceNumber_t ret_val(0, 0);
SequenceNumber_t ret_val;
std::lock_guard<std::recursive_mutex> guard(*mp_mutex);
GUID_t guid_to_look = guid;
auto p_guid = persistence_guid_map_.find(guid);
if (p_guid != persistence_guid_map_.end())
{
auto p_seq = history_record_.find(p_guid->second);
if (p_seq != history_record_.end())
{
ret_val = p_seq->second;
}
guid_to_look = p_guid->second;
}

auto p_seq = history_record_.find(guid_to_look);
if (p_seq != history_record_.end())
{
ret_val = p_seq->second;
}

return ret_val;
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ bool StatefulReader::matched_writer_add(const RemoteWriterAttributes& wdata)
}
}
WriterProxy* wp = new WriterProxy(wdata, this);

wp->mp_initialAcknack->restart_timer();

matched_writers.push_back(wp);
add_persistence_guid(wdata);
wp->loaded_from_storage(get_last_notified(wdata.guid));
wp->loaded_from_storage_nts(get_last_notified(wdata.guid));
matched_writers.push_back(wp);
logInfo(RTPS_READER,"Writer Proxy " <<wp->m_att.guid <<" added to " <<m_guid.entityId);
return true;
}
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ WriterProxy::WriterProxy(const RemoteWriterAttributes& watt,
logInfo(RTPS_READER,"Writer Proxy created in reader: "<<mp_SFR->getGuid().entityId);
}

void WriterProxy::loaded_from_storage(const SequenceNumber_t& seqNum)
void WriterProxy::loaded_from_storage_nts(const SequenceNumber_t& seqNum)
{
lastNotified_ = seqNum;
changesFromWLowMark_ = seqNum;
}

void WriterProxy::missing_changes_update(const SequenceNumber_t& seqNum)
Expand Down
4 changes: 4 additions & 0 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic
RemoteWriterAttributes watt(participant_data.m_VendorId);
watt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
watt.guid.entityId = participant_stateless_message_writer_entity_id;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList;
watt.endpoint.reliabilityKind = BEST_EFFORT;
participant_stateless_message_reader_->matched_writer_add(watt);
Expand All @@ -1651,6 +1652,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic
RemoteWriterAttributes watt(participant_data.m_VendorId);
watt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
watt.guid.entityId = participant_volatile_message_secure_writer_entity_id;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
watt.endpoint.durabilityKind = VOLATILE;
Expand Down Expand Up @@ -1681,6 +1683,7 @@ void SecurityManager::unmatch_builtin_endpoints(const ParticipantProxyData& part
RemoteWriterAttributes watt;
watt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
watt.guid.entityId = participant_stateless_message_writer_entity_id;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList;
watt.endpoint.reliabilityKind = BEST_EFFORT;
participant_stateless_message_reader_->matched_writer_remove(watt);
Expand All @@ -1704,6 +1707,7 @@ void SecurityManager::unmatch_builtin_endpoints(const ParticipantProxyData& part
RemoteWriterAttributes watt;
watt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
watt.guid.entityId = participant_volatile_message_secure_writer_entity_id;
watt.endpoint.persistence_guid = watt.guid;
watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList;
watt.endpoint.reliabilityKind = RELIABLE;
watt.endpoint.durabilityKind = VOLATILE;
Expand Down

0 comments on commit 8ebd4d9

Please sign in to comment.