Skip to content

Commit

Permalink
Replace EventsBase inheritance with DataCallbackManager and EventsMan…
Browse files Browse the repository at this point in the history
…ager objects

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Feb 26, 2024
1 parent a5b5acf commit 29f3a79
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 69 deletions.
24 changes: 12 additions & 12 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@


///=============================================================================
void EventsBase::set_user_callback(
void DataCallbackManager::set_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
std::lock_guard<std::mutex> lock_mutex(event_mutex_);

if (callback) {
// Push events arrived before setting the the executor callback.
Expand All @@ -42,10 +42,10 @@ void EventsBase::set_user_callback(
}

///=============================================================================
void EventsBase::trigger_user_callback()
void DataCallbackManager::trigger_callback()
{
// Trigger the user provided event callback if available.
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
std::lock_guard<std::mutex> lock_mutex(event_mutex_);
if (callback_ != nullptr) {
callback_(user_data_, 1);
} else {
Expand All @@ -54,7 +54,7 @@ void EventsBase::trigger_user_callback()
}

///=============================================================================
void EventsBase::event_set_callback(
void EventsManager::event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data)
Expand Down Expand Up @@ -82,7 +82,7 @@ void EventsBase::event_set_callback(
}

///=============================================================================
void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -103,7 +103,7 @@ void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
bool EventsManager::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -119,7 +119,7 @@ bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
Expand All @@ -145,7 +145,7 @@ std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
}

///=============================================================================
void EventsBase::add_new_event(
void EventsManager::add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
{
Expand Down Expand Up @@ -180,7 +180,7 @@ void EventsBase::add_new_event(
}

///=============================================================================
void EventsBase::attach_event_condition(
void EventsManager::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
{
Expand All @@ -197,7 +197,7 @@ void EventsBase::attach_event_condition(
}

///=============================================================================
void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -212,7 +212,7 @@ void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
void EventsBase::notify_event(rmw_zenoh_event_type_t event_id)
void EventsManager::notify_event(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand Down
25 changes: 20 additions & 5 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,34 @@ struct rmw_zenoh_event_status_t
};

///=============================================================================
/// Base class to be inherited by entities that support events.
class EventsBase
/// A class that manages callbacks that should be triggered when a new
/// message/request/response is received by an entity.
class DataCallbackManager
{
public:
/// @brief Set the user defined callback that should be called when
/// a new message/response/request is received.
/// @param user_data the data that should be passed to the callback.
/// @param callback the callback to be set.
void set_user_callback(const void * user_data, rmw_event_callback_t callback);
void set_callback(const void * user_data, rmw_event_callback_t callback);

/// Trigger the user callback.
void trigger_user_callback();
void trigger_callback();

private:
std::mutex event_mutex_;
/// User callback that can be set via set_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
/// number of trigger requests made before the callback was set.
size_t unread_count_ {0};
};

/// Base class to be inherited by entities that support events.
class EventsManager
{
public:
/// @brief Set the callback to be triggered when the relevant event is triggered.
/// @param event_id the id of the event
/// @param callback the callback to trigger for this event.
Expand Down Expand Up @@ -143,7 +158,7 @@ class EventsBase
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via set_user_callback().
/// User callback that can be set via data_callback_mgr.set_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
Expand Down
26 changes: 1 addition & 25 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,7 @@ void GraphCache::update_topic_map_for_put(
topic_info.type_);
if (topic_type_map_it == topic_map_it->second.end()) {
// First time this topic type is added.

// Check for and report an *_INCOMPATIBLE_TYPE event if a different type for the same
// topic exists.
// if (report_events) {
// for (const auto & [topic_type, qos_map] : topic_map_it->second) {
// for (const auto & [qos_type, topic_data_ptr] : qos_map) {
// // If the entity is a publisher but the graph contains a sub,
// // report ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE or
// // ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE if vice versa.
// if ((is_pub && topic_data_ptr->stats_.sub_count_ > 0) ||
// (!is_pub && topic_data_ptr->stats_.pub_count_ > 0))
// {
// goto incompatible_type_event_found;
// }
// }
// }
// incompatible_type_event_found:
// // TODO(Yadunund): Retrieve total count from global counters.
// auto event_type = is_pub ? ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE :
// ZENOH_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE;
// auto event = std::make_unique<rmw_zenoh_event_status_t>();
// event->total_count = 1;
// event->total_count_change = 1;
// detected_events[event_type] = std::move(event);
// }
// TODO(Yadunund) Check for and report an *_INCOMPATIBLE_TYPE events.

topic_map_it->second.insert(
std::make_pair(
Expand Down
6 changes: 3 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void rmw_subscription_data_t::add_new_message(
message_queue_.emplace_back(std::move(msg));

// Since we added new data, trigger user callback and guard condition if they are available
trigger_user_callback();
data_callback_mgr.trigger_callback();
notify();
}

Expand Down Expand Up @@ -176,7 +176,7 @@ void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
query_queue_.emplace_back(std::move(query));

// Since we added new data, trigger user callback and guard condition if they are available
trigger_user_callback();
data_callback_mgr.trigger_callback();
notify();
}

Expand Down Expand Up @@ -225,7 +225,7 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr<ZenohReply> reply)
reply_queue_.emplace_back(std::move(reply));

// Since we added new data, trigger user callback and guard condition if they are available
trigger_user_callback();
data_callback_mgr.trigger_callback();
notify();
}

Expand Down
17 changes: 13 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct rmw_node_data_t
};

///=============================================================================
class rmw_publisher_data_t : public EventsBase
class rmw_publisher_data_t
{
public:
// The Entity generated for the publisher.
Expand All @@ -108,6 +108,8 @@ class rmw_publisher_data_t : public EventsBase

// Context for memory allocation for messages.
rmw_context_t * context;

EventsManager events_mgr;
};

///=============================================================================
Expand Down Expand Up @@ -135,7 +137,7 @@ struct saved_msg_data
};

///=============================================================================
class rmw_subscription_data_t : public EventsBase
class rmw_subscription_data_t
{
public:
// The Entity generated for the subscription.
Expand Down Expand Up @@ -165,6 +167,9 @@ class rmw_subscription_data_t : public EventsBase

void add_new_message(std::unique_ptr<saved_msg_data> msg, const std::string & topic_name);

DataCallbackManager data_callback_mgr;
EventsManager events_mgr;

private:
std::deque<std::unique_ptr<saved_msg_data>> message_queue_;
mutable std::mutex message_queue_mutex_;
Expand Down Expand Up @@ -197,7 +202,7 @@ class ZenohQuery final
};

///=============================================================================
class rmw_service_data_t : public EventsBase
class rmw_service_data_t
{
public:
// The Entity generated for the service.
Expand Down Expand Up @@ -235,6 +240,8 @@ class rmw_service_data_t : public EventsBase

std::unique_ptr<ZenohQuery> take_from_query_map(int64_t sequence_number);

DataCallbackManager data_callback_mgr;

private:
void notify();

Expand Down Expand Up @@ -265,7 +272,7 @@ class ZenohReply final
};

///=============================================================================
class rmw_client_data_t : public EventsBase
class rmw_client_data_t
{
public:
// The Entity generated for the client.
Expand Down Expand Up @@ -303,6 +310,8 @@ class rmw_client_data_t : public EventsBase

std::unique_ptr<ZenohReply> pop_next_reply();

DataCallbackManager data_callback_mgr;

private:
void notify();

Expand Down
19 changes: 6 additions & 13 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ rmw_publisher_event_init(
}

rmw_event->implementation_identifier = publisher->implementation_identifier;
rmw_event->data = pub_data;
rmw_event->data = &pub_data->events_mgr;
rmw_event->event_type = event_type;

// Register the event with graph cache.
Expand All @@ -67,15 +67,12 @@ rmw_publisher_event_init(
if (pub_data == nullptr) {
return;
}
pub_data->add_new_event(
pub_data->events_mgr.add_new_event(
event_id,
std::move(zenoh_event));
}
);

// printf(
// "[rmw_publisher_event_init] created new rmw_event_type_t %s for %s\n",
// std::to_string(event_type).c_str(), pub_data->entity->keyexpr().c_str());
return RMW_RET_OK;
}

Expand Down Expand Up @@ -109,7 +106,7 @@ rmw_subscription_event_init(
}

rmw_event->implementation_identifier = subscription->implementation_identifier;
rmw_event->data = sub_data;
rmw_event->data = &sub_data->events_mgr;
rmw_event->event_type = event_type;

// Register the event with graph cache if the event is not ZENOH_EVENT_MESSAGE_LOST
Expand All @@ -127,15 +124,12 @@ rmw_subscription_event_init(
if (sub_data == nullptr) {
return;
}
sub_data->add_new_event(
sub_data->events_mgr.add_new_event(
event_id,
std::move(zenoh_event));
}
);

// printf(
// "[rmw_subscription_event_init] created new rmw_event_type_t %s for %s\n",
// std::to_string(event_type).c_str(), sub_data->entity->keyexpr().c_str());
return RMW_RET_OK;
}

Expand All @@ -159,7 +153,7 @@ rmw_event_set_callback(
}

// Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase.
EventsBase * event_data = static_cast<EventsBase *>(rmw_event->data);
EventsManager * event_data = static_cast<EventsManager *>(rmw_event->data);
RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT);
event_data->event_set_callback(
zenoh_event_it->second,
Expand Down Expand Up @@ -196,8 +190,7 @@ rmw_take_event(
return RMW_RET_ERROR;
}

// Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase.
EventsBase * event_data = static_cast<EventsBase *>(event_handle->data);
EventsManager * event_data = static_cast<EventsManager *>(event_handle->data);
RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT);
std::unique_ptr<rmw_zenoh_event_status_t> st = event_data->pop_next_event(
zenoh_event_it->second);
Expand Down
Loading

0 comments on commit 29f3a79

Please sign in to comment.