Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/rolling' into ahcorde/rolling/wi…
Browse files Browse the repository at this point in the history
…ndows_support
  • Loading branch information
ahcorde committed Sep 4, 2024
2 parents 31b712e + 60b72f0 commit 263f4eb
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 220 deletions.
48 changes: 33 additions & 15 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,45 @@ on:
push:
branches: [ rolling ]
workflow_dispatch:
schedule:
# Run every morning to detect flakiness and broken dependencies
- cron: '03 5 * * *'
defaults:
run:
shell: bash
jobs:
test:
runs-on: ubuntu-latest
build_and_test:
name: build_and_test_${{ matrix.BUILD_TYPE }}_${{ matrix.ROS_DISTRO }}
strategy:
fail-fast: false
matrix:
distro: ['iron', 'jazzy', 'rolling']
include:
# Rolling (source)
- ROS_DISTRO: rolling
BUILD_TYPE: source
# Jazzy (binary)
- ROS_DISTRO: jazzy
BUILD_TYPE: binary
# Iron (binary)
- ROS_DISTRO: iron
BUILD_TYPE: binary
env:
ROS2_REPOS_FILE_URL: 'https://raw.githubusercontent.com/ros2/ros2/${{ matrix.ROS_DISTRO }}/ros2.repos'
runs-on: ubuntu-latest
container:
image: ros:${{ matrix.distro }}-ros-base
timeout-minutes: 30
image: ${{ matrix.BUILD_TYPE == 'binary' && format('ros:{0}-ros-base', matrix.ROS_DISTRO) || 'ubuntu:noble' }}
steps:
- name: Deps
run: |
apt update && apt install -y curl
- uses: actions/checkout@v4
- name: rosdep
run: |
rosdep update
rosdep install --from-paths . -yir
- name: build
run: /ros_entrypoint.sh colcon build
- uses: ros-tooling/[email protected]
if: ${{ matrix.BUILD_TYPE == 'source' }}
- name: Install Coverage Tools
if: ${{ matrix.BUILD_TYPE == 'binary' }}
run: sudo apt update && sudo apt install -y python3-colcon-coveragepy-result python3-colcon-lcov-result lcov
- name: Build and run tests
id: action-ros-ci
uses: ros-tooling/[email protected]
with:
package-name: |
rmw_zenoh_cpp
zenoh_c_vendor
target-ros2-distro: ${{ matrix.ROS_DISTRO }}
vcs-repo-file-url: ${{ matrix.BUILD_TYPE == 'source' && env.ROS2_REPOS_FILE_URL || '' }}
5 changes: 3 additions & 2 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ install(
)

add_executable(rmw_zenohd
src/zenohd/main.cpp
src/detail/zenoh_config.cpp
src/detail/liveliness_utils.cpp
src/detail/logging.cpp
src/detail/qos.cpp
src/detail/zenoh_config.cpp
src/zenohd/main.cpp
)

target_link_libraries(rmw_zenohd
Expand Down
32 changes: 30 additions & 2 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ void GraphCache::parse_put(
if (ignore_from_current_session && is_entity_local(*entity)) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Ignoring parse_put for %s from the same session.\n", entity->keyexpr().c_str());
"Ignoring parse_put for %s from the same session.\n", entity->liveliness_keyexpr().c_str());
return;
}

Expand Down Expand Up @@ -403,6 +403,20 @@ void GraphCache::parse_put(
// Otherwise, the entity represents a node that already exists in the graph.
// Update topic info if required below.
update_topic_maps_for_put(node_it->second, entity);

// If the newly added entity is a publisher with transient_local qos durability,
// we trigger any registered querying subscriber callbacks.
if (entity->type() == liveliness::EntityType::Publisher &&
entity->topic_info().has_value() &&
entity->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL)
{
auto sub_cbs_it = querying_subs_cbs_.find(entity->topic_info()->topic_keyexpr_);
if (sub_cbs_it != querying_subs_cbs_.end()) {
for (const auto & cb : sub_cbs_it->second) {
cb(entity->zid());
}
}
}
}

///=============================================================================
Expand Down Expand Up @@ -559,7 +573,7 @@ void GraphCache::parse_del(
if (ignore_from_current_session && is_entity_local(*entity)) {
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Ignoring parse_del for %s from the same session.\n", entity->keyexpr().c_str());
"Ignoring parse_del for %s from the same session.\n", entity->liveliness_keyexpr().c_str());
return;
}
// Lock the graph mutex before accessing the graph.
Expand Down Expand Up @@ -1315,4 +1329,18 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(
status_to_take.current_count_change = 0;
return result;
}

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const std::string & keyexpr,
QueryingSubscriberCallback cb)
{
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(std::vector<QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
}
cb_it->second.push_back(std::move(cb));
}

} // namespace rmw_zenoh_cpp
16 changes: 12 additions & 4 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ using GraphNodePtr = std::shared_ptr<GraphNode>;
class GraphCache final
{
public:
/// @brief Signature for a function that will be invoked by the GraphCache when a QoS
/// event is detected.
using GraphCacheEventCallback = std::function<void (std::unique_ptr<rmw_zenoh_event_status_t>)>;
/// Callback to be triggered when a publication cache is detected in the ROS Graph.
using QueryingSubscriberCallback = std::function<void (const std::string & queryable_prefix)>;

/// @brief Constructor
/// @param id The id of the zenoh session that is building the graph cache.
/// This is used to infer which entities originated from the current session
Expand Down Expand Up @@ -169,10 +175,6 @@ class GraphCache final
const char * service_type,
bool * is_available) const;

/// @brief Signature for a function that will be invoked by the GraphCache when a QoS
/// event is detected.
using GraphCacheEventCallback = std::function<void (std::unique_ptr<rmw_zenoh_event_status_t>)>;

/// Set a qos event callback for an entity from the current session.
/// @note The callback will be removed when the entity is removed from the graph.
void set_qos_event_callback(
Expand All @@ -183,6 +185,10 @@ class GraphCache final
/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const std::string & keyexpr,
QueryingSubscriberCallback cb);

private:
// Helper function to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
Expand Down Expand Up @@ -281,6 +287,8 @@ class GraphCache final
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::vector<QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1>> event_statuses_;
Expand Down
Loading

0 comments on commit 263f4eb

Please sign in to comment.