From e4ddb48c4509146df071053b259a3f24b7360b6f Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Tue, 3 Dec 2024 15:28:43 -0800 Subject: [PATCH 01/12] Add support for replaying based on publication timestamp Signed-off-by: Christophe Bedard --- ros2bag/ros2bag/verb/play.py | 15 ++++ rosbag2_py/rosbag2_py/__init__.py | 2 + rosbag2_py/rosbag2_py/__init__.pyi | 4 +- rosbag2_py/rosbag2_py/_transport.pyi | 17 +++++ rosbag2_py/src/rosbag2_py/_transport.cpp | 6 ++ .../rosbag2_transport/play_options.hpp | 14 ++++ .../src/rosbag2_transport/player.cpp | 62 +++++++++++++-- .../rosbag2_transport_test_fixture.hpp | 29 ++++++- .../test/rosbag2_transport/test_play.cpp | 76 +++++++++++++------ .../rosbag2_transport/test_play_until.cpp | 42 ++++++++++ 10 files changed, 234 insertions(+), 33 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 2c8bf723a1..3df54bdb77 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -26,6 +26,7 @@ from ros2bag.api import print_warn from ros2bag.verb import VerbExtension from ros2cli.node import NODE_NAME_PREFIX +from rosbag2_py import MessageOrder from rosbag2_py import Player from rosbag2_py import PlayOptions from rosbag2_py import ServiceRequestsSource @@ -166,6 +167,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Determine the source of the service requests to be replayed. This option only ' 'makes sense if the "--publish-service-requests" option is set. By default,' ' the service requests replaying from recorded service introspection message.') + parser.add_argument( + '--message-order', default='recv', + choices=['recv', 'send'], + help='The reference to use for bag message chronological ordering. Choices: reception ' + 'timestamp, publication timestamp. Default: reception timestamp. ' + 'If messages are significantly disordered (within a single bag or across ' + 'multiple bags), replayed messages may not be correctly ordered. A possible ' + 'solution could be to increase the read_ahead_queue_size value to buffer (and ' + 'order) more messages.') parser.add_argument( '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], @@ -275,6 +285,11 @@ def main(self, *, args): # noqa: D102 play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION else: play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION + # argparse makes sure that we get a valid arg value + play_options.message_order = { + 'recv': MessageOrder.RECV_TIMESTAMP, + 'send': MessageOrder.SEND_TIMESTAMP, + }.get(args.message_order) player = Player(args.log_level) try: diff --git a/rosbag2_py/rosbag2_py/__init__.py b/rosbag2_py/rosbag2_py/__init__.py index f682509976..46354dbc7a 100644 --- a/rosbag2_py/rosbag2_py/__init__.py +++ b/rosbag2_py/rosbag2_py/__init__.py @@ -59,6 +59,7 @@ Info, ) from rosbag2_py._transport import ( + MessageOrder, Player, PlayOptions, ServiceRequestsSource, @@ -98,6 +99,7 @@ 'TopicInformation', 'BagMetadata', 'MessageDefinition', + 'MessageOrder', 'MetadataIo', 'Info', 'Player', diff --git a/rosbag2_py/rosbag2_py/__init__.pyi b/rosbag2_py/rosbag2_py/__init__.pyi index c38eaa3383..7fdbfe63d7 100644 --- a/rosbag2_py/rosbag2_py/__init__.pyi +++ b/rosbag2_py/rosbag2_py/__init__.pyi @@ -4,7 +4,7 @@ from rosbag2_py._message_definitions import LocalMessageDefinitionSource as Loca from rosbag2_py._reader import SequentialCompressionReader as SequentialCompressionReader, SequentialReader as SequentialReader, get_registered_readers as get_registered_readers from rosbag2_py._reindexer import Reindexer as Reindexer from rosbag2_py._storage import BagMetadata as BagMetadata, ConverterOptions as ConverterOptions, FileInformation as FileInformation, MessageDefinition as MessageDefinition, MetadataIo as MetadataIo, ReadOrder as ReadOrder, ReadOrderSortBy as ReadOrderSortBy, StorageFilter as StorageFilter, StorageOptions as StorageOptions, TopicInformation as TopicInformation, TopicMetadata as TopicMetadata, convert_rclcpp_qos_to_rclpy_qos as convert_rclcpp_qos_to_rclpy_qos, get_default_storage_id as get_default_storage_id, to_rclcpp_qos_vector as to_rclcpp_qos_vector -from rosbag2_py._transport import PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite +from rosbag2_py._transport import MessageOrder as MessageOrder, PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite from rosbag2_py._writer import SequentialCompressionWriter as SequentialCompressionWriter, SequentialWriter as SequentialWriter, get_registered_compressors as get_registered_compressors, get_registered_serializers as get_registered_serializers, get_registered_writers as get_registered_writers -__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] +__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MessageOrder', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index c2512f483b..94dc502a6e 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -3,6 +3,22 @@ import rosbag2_py._storage from _typeshed import Incomplete from typing import ClassVar, List, overload +class MessageOrder: + __members__: ClassVar[dict] = ... # read-only + RECV_TIMESTAMP: ClassVar[MessageOrder] = ... + SEND_TIMESTAMP: ClassVar[MessageOrder] = ... + __entries: ClassVar[dict] = ... + def __init__(self, value: int) -> None: ... + def __eq__(self, other: object) -> bool: ... + def __hash__(self) -> int: ... + def __index__(self) -> int: ... + def __int__(self) -> int: ... + def __ne__(self, other: object) -> bool: ... + @property + def name(self) -> str: ... + @property + def value(self) -> int: ... + class PlayOptions: clock_publish_frequency: float clock_publish_on_topic_publish: bool @@ -14,6 +30,7 @@ class PlayOptions: exclude_service_events_to_filter: List[str] exclude_topics_to_filter: List[str] loop: bool + message_order: Incomplete node_prefix: str playback_duration: float playback_until_timestamp: int diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index afe813b2f5..09023f8727 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -560,6 +560,7 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) .def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests) .def_readwrite("service_requests_source", &PlayOptions::service_requests_source) + .def_readwrite("message_order", &PlayOptions::message_order) ; py::enum_(m, "ServiceRequestsSource") @@ -567,6 +568,11 @@ PYBIND11_MODULE(_transport, m) { .value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION) ; + py::enum_(m, "MessageOrder") + .value("RECV_TIMESTAMP", rosbag2_transport::MessageOrder::RECV_TIMESTAMP) + .value("SEND_TIMESTAMP", rosbag2_transport::MessageOrder::SEND_TIMESTAMP) + ; + py::class_(m, "RecordOptions") .def(py::init<>()) .def_readwrite("all_topics", &RecordOptions::all_topics) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index e5029c923a..7489c9825c 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -34,6 +34,14 @@ enum class ServiceRequestsSource : int8_t CLIENT_INTROSPECTION = 1 }; +enum class MessageOrder : std::uint8_t +{ + // Order chronologically by message reception timestamp + RECV_TIMESTAMP = 0, + // Order chronologically by message publication timestamp + SEND_TIMESTAMP = 1 +}; + struct PlayOptions { public: @@ -123,6 +131,12 @@ struct PlayOptions // The source of the service request ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + + // The reference to use for bag message chronological ordering. + // If messages are significantly disordered (within a single bag or across multiple bags), + // replayed messages may not be correctly ordered. A possible solution could be to increase the + // read_ahead_queue_size value to buffer (and order) more messages. + MessageOrder message_order = MessageOrder::RECV_TIMESTAMP; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d3b322ee5f..8f0d0e0bb5 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -303,6 +303,8 @@ class PlayerImpl void create_control_services(); void configure_play_until_timestamp(); bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; + rcutils_time_point_value_t get_message_order_timestamp( + const rosbag2_storage::SerializedBagMessageSharedPtr & message) const; static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; @@ -320,7 +322,7 @@ class PlayerImpl rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; using BagMessageComparator = std::function< - int( + bool( const rosbag2_storage::SerializedBagMessageSharedPtr &, const rosbag2_storage::SerializedBagMessageSharedPtr &)>; LockedPriorityQueue< @@ -372,6 +374,8 @@ class PlayerImpl std::shared_ptr player_service_client_manager_; + static BagMessageComparator get_bag_message_comparator(const MessageOrder & order); + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp. struct { @@ -381,9 +385,39 @@ class PlayerImpl { return l->recv_timestamp > r->recv_timestamp; } - } bag_message_chronological_recv_timestamp_comparator; + } static bag_message_chronological_recv_timestamp_comparator; + + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp. + struct + { + bool operator()( + const rosbag2_storage::SerializedBagMessageSharedPtr & l, + const rosbag2_storage::SerializedBagMessageSharedPtr & r) const + { + return l->send_timestamp > r->send_timestamp; + } + } static bag_message_chronological_send_timestamp_comparator; }; +decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator) +PlayerImpl::bag_message_chronological_recv_timestamp_comparator; +decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator) +PlayerImpl::bag_message_chronological_send_timestamp_comparator; + +PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) +{ + switch (order) { + case MessageOrder::RECV_TIMESTAMP: + return bag_message_chronological_recv_timestamp_comparator; + case MessageOrder::SEND_TIMESTAMP: + return bag_message_chronological_send_timestamp_comparator; + default: + throw std::runtime_error( + "unknown MessageOrder: " + + std::to_string(static_cast>(order))); + } +} + PlayerImpl::PlayerImpl( Player * owner, std::vector && readers_with_options, @@ -392,7 +426,7 @@ PlayerImpl::PlayerImpl( : readers_with_options_(std::move(readers_with_options)), owner_(owner), play_options_(play_options), - message_queue_(bag_message_chronological_recv_timestamp_comparator), + message_queue_(get_bag_message_comparator(play_options_.message_order)), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()) { @@ -984,13 +1018,13 @@ void PlayerImpl::play_messages_from_queue() while (rclcpp::ok() && !stop_playback_) { // While there's a message to play and we haven't reached the end timestamp yet while (rclcpp::ok() && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) + message_ptr != nullptr && !shall_stop_at_timestamp(get_message_order_timestamp(message_ptr))) { // Sleep until the message's replay time, do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause // However, skip sleeping if we're trying to play the next message while (rclcpp::ok() && !stop_playback_ && !play_next_.load() && - !clock_->sleep_until(message_ptr->recv_timestamp)) + !clock_->sleep_until(get_message_order_timestamp(message_ptr))) { // Stop sleeping if cancelled if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { @@ -1012,7 +1046,7 @@ void PlayerImpl::play_messages_from_queue() const bool message_published = publish_message(message_ptr); // If we tried to publish because of play_next(), jump the clock if (play_next_.load()) { - clock_->jump(message_ptr->recv_timestamp); + clock_->jump(get_message_order_timestamp(message_ptr)); // If we successfully played next, notify that we're done, otherwise keep trying if (message_published) { play_next_ = false; @@ -1058,6 +1092,22 @@ void PlayerImpl::play_messages_from_queue() } } +rcutils_time_point_value_t PlayerImpl::get_message_order_timestamp( + const rosbag2_storage::SerializedBagMessageSharedPtr & message) const +{ + switch (play_options_.message_order) { + case MessageOrder::RECV_TIMESTAMP: + return message->recv_timestamp; + case MessageOrder::SEND_TIMESTAMP: + return message->send_timestamp; + default: + throw std::runtime_error( + "unknown MessageOrder: " + + std::to_string( + static_cast>(play_options_.message_order))); + } +} + namespace { bool allow_topic( diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 594c1a92a3..81699d2452 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -55,17 +55,42 @@ class Rosbag2TransportTestFixture : public Test std::shared_ptr serialize_test_message( const std::string & topic, - int64_t milliseconds, + int64_t milliseconds_recv, + std::shared_ptr message) + { + return serialize_test_message(topic, milliseconds_recv, 0, message); + } + + template + std::shared_ptr + serialize_test_message( + const std::string & topic, + int64_t milliseconds_recv, + int64_t milliseconds_send, std::shared_ptr message) { auto bag_msg = std::make_shared(); bag_msg->serialized_data = memory_management_.serialize_message(message); - bag_msg->recv_timestamp = milliseconds * 1000000; + bag_msg->recv_timestamp = milliseconds_recv * 1000000; + bag_msg->send_timestamp = milliseconds_send * 1000000; bag_msg->topic_name = topic; return bag_msg; } + static std::string format_message_order( + const TestParamInfo & info) + { + switch (info.param) { + case rosbag2_transport::MessageOrder::RECV_TIMESTAMP: + return "recv_timestamp"; + case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: + return "send_timestamp"; + default: + throw std::runtime_error("unknown value"); + } + } + MemoryManagement memory_management_; rosbag2_storage::StorageOptions storage_options_; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 64952fe39f..84bdd80738 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -109,6 +109,10 @@ void spin_thread_and_wait_for_sent_service_requests_to_finish( } } // namespace +class RosBag2PlayTestFixtureMessageOrder + : public RosBag2PlayTestFixture, public WithParamInterface +{}; + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) { auto primitive_message1 = get_messages_basic_types()[0]; @@ -174,7 +178,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } -TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_three_bags) +TEST_P( + RosBag2PlayTestFixtureMessageOrder, recorded_messages_are_played_for_all_topics_from_three_bags) { auto msg = get_messages_basic_types()[0]; msg->int32_value = 42; @@ -184,30 +189,31 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ {2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""}, }; - // Make sure each reader's/bag's messages are ordered by time - // However, do interlace messages across bags + // Make sure each reader's/bag's messages are ordered by recv_timestamp + // However, do interlace messages based on recv_timestamp across bags and based on send_timestamp + // within a bag and across bags std::vector>> messages_list{}; messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 5, msg), - serialize_test_message("topic1", 8, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 13, msg), - serialize_test_message("topic2", 14, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 5, 2, msg), + serialize_test_message("topic1", 8, 4, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 13, 7, msg), + serialize_test_message("topic2", 14, 15, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 2, msg), - serialize_test_message("topic2", 3, msg), - serialize_test_message("topic1", 6, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 12, msg), - serialize_test_message("topic2", 16, msg)}); + serialize_test_message("topic1", 2, 1, msg), + serialize_test_message("topic2", 3, 2, msg), + serialize_test_message("topic1", 6, 5, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 12, 7, msg), + serialize_test_message("topic2", 16, 14, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 4, msg), - serialize_test_message("topic1", 7, msg), - serialize_test_message("topic2", 9, msg), - serialize_test_message("topic1", 11, msg), - serialize_test_message("topic2", 15, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 4, 3, msg), + serialize_test_message("topic1", 7, 2, msg), + serialize_test_message("topic2", 9, 9, msg), + serialize_test_message("topic1", 11, 8, msg), + serialize_test_message("topic2", 15, 7, msg)}); std::vector bags{}; std::size_t total_messages = 0u; for (std::size_t i = 0u; i < messages_list.size(); i++) { @@ -219,13 +225,27 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ } ASSERT_GT(total_messages, 0u); + const rosbag2_transport::MessageOrder message_order = GetParam(); + play_options_.message_order = message_order; auto player = std::make_shared(std::move(bags), play_options_); std::size_t num_played_messages = 0u; rcutils_time_point_value_t last_timetamp = 0; + const auto get_timestamp = + [message_order](std::shared_ptr msg) { + switch (message_order) { + case rosbag2_transport::MessageOrder::RECV_TIMESTAMP: + return msg->recv_timestamp; + case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: + return msg->send_timestamp; + default: + throw std::runtime_error("unknown rosbag2_transport::MessageOrder value"); + } + }; const auto callback = [&](std::shared_ptr msg) { // Make sure messages are played in order - EXPECT_LE(last_timetamp, msg->recv_timestamp); - last_timetamp = msg->recv_timestamp; + const auto timestamp = get_timestamp(msg); + EXPECT_LE(last_timetamp, timestamp); + last_timetamp = timestamp; num_played_messages++; }; player->add_on_play_message_pre_callback(callback); @@ -234,6 +254,16 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ EXPECT_EQ(total_messages, num_played_messages); } +INSTANTIATE_TEST_SUITE_P( + ParametrizedPlayTests, + RosBag2PlayTestFixtureMessageOrder, + Values( + rosbag2_transport::MessageOrder::RECV_TIMESTAMP, + rosbag2_transport::MessageOrder::SEND_TIMESTAMP + ), + Rosbag2TransportTestFixture::format_message_order +); + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) { const std::string service_name1 = "/test_service1"; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 8e12da5ebf..5ab903b6db 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -154,6 +154,48 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) EXPECT_EQ(replayed_topic1[1]->int32_value, 1); } +TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration_message_order) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {1u, kTopic1Name_, "test_msgs/BasicTypes", "", {}, ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 10, 5, primitive_message1), + serialize_test_message(kTopic1Name_, 50, 45, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 2 messages from play() due to the send_timestamp order + sub_->add_subscription(kTopic1_, 2u); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; + play_options_.message_order = MessageOrder::SEND_TIMESTAMP; + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + player_->wait_for_playback_to_finish(); + + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 2); +} + TEST_F( RosBag2PlayUntilTestFixture, play_until_intermediate_duration_recorded_messages_with_filtered_topics) From 80a03242c55ed4a628aef16032331bdab1267b87 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Thu, 5 Dec 2024 13:40:14 -0500 Subject: [PATCH 02/12] Address minor style feedback Signed-off-by: Christophe Bedard --- rosbag2_transport/test/rosbag2_transport/test_play.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 84bdd80738..c90b8a037f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -110,8 +110,7 @@ void spin_thread_and_wait_for_sent_service_requests_to_finish( } // namespace class RosBag2PlayTestFixtureMessageOrder - : public RosBag2PlayTestFixture, public WithParamInterface -{}; + : public RosBag2PlayTestFixture, public WithParamInterface {}; TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) { @@ -178,8 +177,7 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } -TEST_P( - RosBag2PlayTestFixtureMessageOrder, recorded_messages_are_played_for_all_topics_from_three_bags) +TEST_P(RosBag2PlayTestFixtureMessageOrder, recorded_msgs_are_played_for_all_topics_from_three_bags) { auto msg = get_messages_basic_types()[0]; msg->int32_value = 42; From 93ba439ec8a93559187e15a76aee46750c71319b Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Thu, 5 Dec 2024 15:24:02 -0500 Subject: [PATCH 03/12] Rename recv to received Signed-off-by: Christophe Bedard --- ros2bag/ros2bag/verb/play.py | 6 +++--- rosbag2_py/rosbag2_py/_transport.pyi | 2 +- rosbag2_py/src/rosbag2_py/_transport.cpp | 2 +- .../include/rosbag2_transport/play_options.hpp | 4 ++-- rosbag2_transport/src/rosbag2_transport/player.cpp | 4 ++-- .../rosbag2_transport/rosbag2_transport_test_fixture.hpp | 2 +- rosbag2_transport/test/rosbag2_transport/test_play.cpp | 4 ++-- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 3df54bdb77..e4b7a847f1 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -168,8 +168,8 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'makes sense if the "--publish-service-requests" option is set. By default,' ' the service requests replaying from recorded service introspection message.') parser.add_argument( - '--message-order', default='recv', - choices=['recv', 'send'], + '--message-order', default='received', + choices=['received', 'send'], help='The reference to use for bag message chronological ordering. Choices: reception ' 'timestamp, publication timestamp. Default: reception timestamp. ' 'If messages are significantly disordered (within a single bag or across ' @@ -287,7 +287,7 @@ def main(self, *, args): # noqa: D102 play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION # argparse makes sure that we get a valid arg value play_options.message_order = { - 'recv': MessageOrder.RECV_TIMESTAMP, + 'received': MessageOrder.RECEIVED_TIMESTAMP, 'send': MessageOrder.SEND_TIMESTAMP, }.get(args.message_order) diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index 94dc502a6e..a5c92ac2ab 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -5,7 +5,7 @@ from typing import ClassVar, List, overload class MessageOrder: __members__: ClassVar[dict] = ... # read-only - RECV_TIMESTAMP: ClassVar[MessageOrder] = ... + RECEIVED_TIMESTAMP: ClassVar[MessageOrder] = ... SEND_TIMESTAMP: ClassVar[MessageOrder] = ... __entries: ClassVar[dict] = ... def __init__(self, value: int) -> None: ... diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 09023f8727..6bd95c68f1 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -569,7 +569,7 @@ PYBIND11_MODULE(_transport, m) { ; py::enum_(m, "MessageOrder") - .value("RECV_TIMESTAMP", rosbag2_transport::MessageOrder::RECV_TIMESTAMP) + .value("RECEIVED_TIMESTAMP", rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP) .value("SEND_TIMESTAMP", rosbag2_transport::MessageOrder::SEND_TIMESTAMP) ; diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 7489c9825c..fd80d50ccb 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -37,7 +37,7 @@ enum class ServiceRequestsSource : int8_t enum class MessageOrder : std::uint8_t { // Order chronologically by message reception timestamp - RECV_TIMESTAMP = 0, + RECEIVED_TIMESTAMP = 0, // Order chronologically by message publication timestamp SEND_TIMESTAMP = 1 }; @@ -136,7 +136,7 @@ struct PlayOptions // If messages are significantly disordered (within a single bag or across multiple bags), // replayed messages may not be correctly ordered. A possible solution could be to increase the // read_ahead_queue_size value to buffer (and order) more messages. - MessageOrder message_order = MessageOrder::RECV_TIMESTAMP; + MessageOrder message_order = MessageOrder::RECEIVED_TIMESTAMP; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 8f0d0e0bb5..fc9c00ce72 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -407,7 +407,7 @@ PlayerImpl::bag_message_chronological_send_timestamp_comparator; PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) { switch (order) { - case MessageOrder::RECV_TIMESTAMP: + case MessageOrder::RECEIVED_TIMESTAMP: return bag_message_chronological_recv_timestamp_comparator; case MessageOrder::SEND_TIMESTAMP: return bag_message_chronological_send_timestamp_comparator; @@ -1096,7 +1096,7 @@ rcutils_time_point_value_t PlayerImpl::get_message_order_timestamp( const rosbag2_storage::SerializedBagMessageSharedPtr & message) const { switch (play_options_.message_order) { - case MessageOrder::RECV_TIMESTAMP: + case MessageOrder::RECEIVED_TIMESTAMP: return message->recv_timestamp; case MessageOrder::SEND_TIMESTAMP: return message->send_timestamp; diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 81699d2452..f275a74b9a 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -82,7 +82,7 @@ class Rosbag2TransportTestFixture : public Test const TestParamInfo & info) { switch (info.param) { - case rosbag2_transport::MessageOrder::RECV_TIMESTAMP: + case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP: return "recv_timestamp"; case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: return "send_timestamp"; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index c90b8a037f..6662774197 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -231,7 +231,7 @@ TEST_P(RosBag2PlayTestFixtureMessageOrder, recorded_msgs_are_played_for_all_topi const auto get_timestamp = [message_order](std::shared_ptr msg) { switch (message_order) { - case rosbag2_transport::MessageOrder::RECV_TIMESTAMP: + case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP: return msg->recv_timestamp; case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: return msg->send_timestamp; @@ -256,7 +256,7 @@ INSTANTIATE_TEST_SUITE_P( ParametrizedPlayTests, RosBag2PlayTestFixtureMessageOrder, Values( - rosbag2_transport::MessageOrder::RECV_TIMESTAMP, + rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP, rosbag2_transport::MessageOrder::SEND_TIMESTAMP ), Rosbag2TransportTestFixture::format_message_order From c6f02bfe22f98c283adaa4b044a8658ef7d6b8a3 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Thu, 5 Dec 2024 18:51:37 -0500 Subject: [PATCH 04/12] Update get_play_options_from_node_params() and test Signed-off-by: Christophe Bedard --- .../config_options_from_node_params.cpp | 15 +++++++++++++++ .../test/resources/player_node_params.yaml | 1 + .../rosbag2_transport/test_composable_player.cpp | 1 + 3 files changed, 17 insertions(+) diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index ef6408db7c..783bbe9bd8 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -237,6 +237,21 @@ PlayOptions get_play_options_from_node_params(rclcpp::Node & node) play_options.publish_service_requests = node.declare_parameter("play.publish_service_request", false); + auto message_order = + node.declare_parameter("play.message_order", "RECEIVED_TIMESTAMP"); + if (message_order == "RECEIVED_TIMESTAMP") { + play_options.message_order = MessageOrder::RECEIVED_TIMESTAMP; + } else if (message_order == "SEND_TIMESTAMP") { + play_options.message_order = MessageOrder::SEND_TIMESTAMP; + } else { + play_options.message_order = MessageOrder::RECEIVED_TIMESTAMP; + RCLCPP_ERROR( + node.get_logger(), + "play.message_order doesn't support %s. It must be one of RECEIVED_TIMESTAMP" + " and SEND_TIMESTAMP. Changed it to default value RECEIVED_TIMESTAMP.", + message_order.c_str()); + } + return play_options; } diff --git a/rosbag2_transport/test/resources/player_node_params.yaml b/rosbag2_transport/test/resources/player_node_params.yaml index 79e1b47a93..cc432a7ba7 100644 --- a/rosbag2_transport/test/resources/player_node_params.yaml +++ b/rosbag2_transport/test/resources/player_node_params.yaml @@ -39,6 +39,7 @@ player_params_node: disable_loan_message: false publish_service_requests: false service_requests_source: "CLIENT_INTROSPECTION" # SERVICE_INTROSPECTION or CLIENT_INTROSPECTION + message_order: "SEND_TIMESTAMP" # RECEIVED_TIMESTAMP or SEND_TIMESTAMP storage: uri: "path/to/some_bag" diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index eadd6fbd0a..3aca12e568 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -177,6 +177,7 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { EXPECT_EQ( play_options.service_requests_source, rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION); + EXPECT_EQ(play_options.message_order, rosbag2_transport::MessageOrder::SEND_TIMESTAMP); ASSERT_EQ(1, storage_options.size()); EXPECT_EQ(storage_options[0].uri, uri_str); From a1b2b070474cc5c75c957c27fd026808280135f8 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Thu, 5 Dec 2024 20:21:29 -0500 Subject: [PATCH 05/12] Try deleting static anon structs definition Signed-off-by: Christophe Bedard --- rosbag2_transport/src/rosbag2_transport/player.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index fc9c00ce72..61ac3943a6 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -399,11 +399,6 @@ class PlayerImpl } static bag_message_chronological_send_timestamp_comparator; }; -decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator) -PlayerImpl::bag_message_chronological_recv_timestamp_comparator; -decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator) -PlayerImpl::bag_message_chronological_send_timestamp_comparator; - PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) { switch (order) { From de9cf6b7e53351eaf8d4a5a142a4c5d2757b1309 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Fri, 6 Dec 2024 10:52:53 -0500 Subject: [PATCH 06/12] Revert "Try deleting static anon structs definition" This reverts commit a1b2b070474cc5c75c957c27fd026808280135f8. Signed-off-by: Christophe Bedard --- rosbag2_transport/src/rosbag2_transport/player.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 61ac3943a6..fc9c00ce72 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -399,6 +399,11 @@ class PlayerImpl } static bag_message_chronological_send_timestamp_comparator; }; +decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator) +PlayerImpl::bag_message_chronological_recv_timestamp_comparator; +decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator) +PlayerImpl::bag_message_chronological_send_timestamp_comparator; + PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) { switch (order) { From 36b99c2416819060eb5b7b5c37440f36bb325509 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Fri, 6 Dec 2024 10:57:38 -0500 Subject: [PATCH 07/12] Change to non-static anon comparator structs Signed-off-by: Christophe Bedard --- rosbag2_transport/src/rosbag2_transport/player.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index fc9c00ce72..d976f1b2d7 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -374,7 +374,7 @@ class PlayerImpl std::shared_ptr player_service_client_manager_; - static BagMessageComparator get_bag_message_comparator(const MessageOrder & order); + BagMessageComparator get_bag_message_comparator(const MessageOrder & order); /// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp. struct @@ -385,7 +385,7 @@ class PlayerImpl { return l->recv_timestamp > r->recv_timestamp; } - } static bag_message_chronological_recv_timestamp_comparator; + } bag_message_chronological_recv_timestamp_comparator; /// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp. struct @@ -396,14 +396,9 @@ class PlayerImpl { return l->send_timestamp > r->send_timestamp; } - } static bag_message_chronological_send_timestamp_comparator; + } bag_message_chronological_send_timestamp_comparator; }; -decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator) -PlayerImpl::bag_message_chronological_recv_timestamp_comparator; -decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator) -PlayerImpl::bag_message_chronological_send_timestamp_comparator; - PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) { switch (order) { From 9f23136562d51e20ff1ad4f5b62df4e43c7e9f16 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Fri, 6 Dec 2024 11:01:04 -0500 Subject: [PATCH 08/12] Address minor review feedback Signed-off-by: Christophe Bedard --- rosbag2_transport/test/rosbag2_transport/test_play_until.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 5ab903b6db..75459ca302 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -162,7 +162,7 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration_mess primitive_message2->int32_value = 2; auto topic_types = std::vector{ - {1u, kTopic1Name_, "test_msgs/BasicTypes", "", {}, ""}}; + {1u, kTopic1Name_, "test_msgs/msg/BasicTypes", "", {}, ""}}; std::vector> messages = { From d78f6f15fddad21d12b695591586c22fcf54e50c Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Fri, 6 Dec 2024 11:07:29 -0500 Subject: [PATCH 09/12] Change timestamp order in test Signed-off-by: Christophe Bedard --- .../test/rosbag2_transport/test_play_until.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 75459ca302..0162ea8c0a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -166,8 +166,8 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration_mess std::vector> messages = { - serialize_test_message(kTopic1Name_, 10, 5, primitive_message1), - serialize_test_message(kTopic1Name_, 50, 45, primitive_message2), + serialize_test_message(kTopic1Name_, 46, 45, primitive_message1), + serialize_test_message(kTopic1Name_, 50, 5, primitive_message2), }; auto prepared_mock_reader = std::make_unique(); @@ -192,8 +192,8 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration_mess await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); EXPECT_THAT(replayed_topic1, SizeIs(2)); - EXPECT_EQ(replayed_topic1[0]->int32_value, 1); - EXPECT_EQ(replayed_topic1[1]->int32_value, 2); + EXPECT_EQ(replayed_topic1[0]->int32_value, 2); + EXPECT_EQ(replayed_topic1[1]->int32_value, 1); } TEST_F( From 57970c0e0d4ee6d6ba4450363ed0607c86005ed2 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Fri, 6 Dec 2024 11:26:17 -0500 Subject: [PATCH 10/12] Change send -> sent in new (user-facing) code Signed-off-by: Christophe Bedard --- ros2bag/ros2bag/verb/play.py | 4 ++-- rosbag2_py/rosbag2_py/_transport.pyi | 2 +- rosbag2_py/src/rosbag2_py/_transport.cpp | 2 +- .../include/rosbag2_transport/play_options.hpp | 2 +- .../rosbag2_transport/config_options_from_node_params.cpp | 6 +++--- rosbag2_transport/src/rosbag2_transport/player.cpp | 4 ++-- rosbag2_transport/test/resources/player_node_params.yaml | 2 +- .../rosbag2_transport/rosbag2_transport_test_fixture.hpp | 4 ++-- .../test/rosbag2_transport/test_composable_player.cpp | 2 +- rosbag2_transport/test/rosbag2_transport/test_play.cpp | 4 ++-- .../test/rosbag2_transport/test_play_until.cpp | 2 +- 11 files changed, 17 insertions(+), 17 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index e4b7a847f1..4b4039be4e 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -169,7 +169,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102 ' the service requests replaying from recorded service introspection message.') parser.add_argument( '--message-order', default='received', - choices=['received', 'send'], + choices=['received', 'sent'], help='The reference to use for bag message chronological ordering. Choices: reception ' 'timestamp, publication timestamp. Default: reception timestamp. ' 'If messages are significantly disordered (within a single bag or across ' @@ -288,7 +288,7 @@ def main(self, *, args): # noqa: D102 # argparse makes sure that we get a valid arg value play_options.message_order = { 'received': MessageOrder.RECEIVED_TIMESTAMP, - 'send': MessageOrder.SEND_TIMESTAMP, + 'sent': MessageOrder.SENT_TIMESTAMP, }.get(args.message_order) player = Player(args.log_level) diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index a5c92ac2ab..c9fc3a23fc 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -6,7 +6,7 @@ from typing import ClassVar, List, overload class MessageOrder: __members__: ClassVar[dict] = ... # read-only RECEIVED_TIMESTAMP: ClassVar[MessageOrder] = ... - SEND_TIMESTAMP: ClassVar[MessageOrder] = ... + SENT_TIMESTAMP: ClassVar[MessageOrder] = ... __entries: ClassVar[dict] = ... def __init__(self, value: int) -> None: ... def __eq__(self, other: object) -> bool: ... diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 6bd95c68f1..efb7fc536d 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -570,7 +570,7 @@ PYBIND11_MODULE(_transport, m) { py::enum_(m, "MessageOrder") .value("RECEIVED_TIMESTAMP", rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP) - .value("SEND_TIMESTAMP", rosbag2_transport::MessageOrder::SEND_TIMESTAMP) + .value("SENT_TIMESTAMP", rosbag2_transport::MessageOrder::SENT_TIMESTAMP) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index fd80d50ccb..3fe90b1413 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -39,7 +39,7 @@ enum class MessageOrder : std::uint8_t // Order chronologically by message reception timestamp RECEIVED_TIMESTAMP = 0, // Order chronologically by message publication timestamp - SEND_TIMESTAMP = 1 + SENT_TIMESTAMP = 1 }; struct PlayOptions diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index 783bbe9bd8..3a29a418e2 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -241,14 +241,14 @@ PlayOptions get_play_options_from_node_params(rclcpp::Node & node) node.declare_parameter("play.message_order", "RECEIVED_TIMESTAMP"); if (message_order == "RECEIVED_TIMESTAMP") { play_options.message_order = MessageOrder::RECEIVED_TIMESTAMP; - } else if (message_order == "SEND_TIMESTAMP") { - play_options.message_order = MessageOrder::SEND_TIMESTAMP; + } else if (message_order == "SENT_TIMESTAMP") { + play_options.message_order = MessageOrder::SENT_TIMESTAMP; } else { play_options.message_order = MessageOrder::RECEIVED_TIMESTAMP; RCLCPP_ERROR( node.get_logger(), "play.message_order doesn't support %s. It must be one of RECEIVED_TIMESTAMP" - " and SEND_TIMESTAMP. Changed it to default value RECEIVED_TIMESTAMP.", + " and SENT_TIMESTAMP. Changed it to default value RECEIVED_TIMESTAMP.", message_order.c_str()); } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d976f1b2d7..56e51454ed 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -404,7 +404,7 @@ PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const Me switch (order) { case MessageOrder::RECEIVED_TIMESTAMP: return bag_message_chronological_recv_timestamp_comparator; - case MessageOrder::SEND_TIMESTAMP: + case MessageOrder::SENT_TIMESTAMP: return bag_message_chronological_send_timestamp_comparator; default: throw std::runtime_error( @@ -1093,7 +1093,7 @@ rcutils_time_point_value_t PlayerImpl::get_message_order_timestamp( switch (play_options_.message_order) { case MessageOrder::RECEIVED_TIMESTAMP: return message->recv_timestamp; - case MessageOrder::SEND_TIMESTAMP: + case MessageOrder::SENT_TIMESTAMP: return message->send_timestamp; default: throw std::runtime_error( diff --git a/rosbag2_transport/test/resources/player_node_params.yaml b/rosbag2_transport/test/resources/player_node_params.yaml index cc432a7ba7..f471d6cb91 100644 --- a/rosbag2_transport/test/resources/player_node_params.yaml +++ b/rosbag2_transport/test/resources/player_node_params.yaml @@ -39,7 +39,7 @@ player_params_node: disable_loan_message: false publish_service_requests: false service_requests_source: "CLIENT_INTROSPECTION" # SERVICE_INTROSPECTION or CLIENT_INTROSPECTION - message_order: "SEND_TIMESTAMP" # RECEIVED_TIMESTAMP or SEND_TIMESTAMP + message_order: "SENT_TIMESTAMP" # RECEIVED_TIMESTAMP or SENT_TIMESTAMP storage: uri: "path/to/some_bag" diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index f275a74b9a..bec884665b 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -84,8 +84,8 @@ class Rosbag2TransportTestFixture : public Test switch (info.param) { case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP: return "recv_timestamp"; - case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: - return "send_timestamp"; + case rosbag2_transport::MessageOrder::SENT_TIMESTAMP: + return "sent_timestamp"; default: throw std::runtime_error("unknown value"); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index 3aca12e568..c9d7fd710c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -177,7 +177,7 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { EXPECT_EQ( play_options.service_requests_source, rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION); - EXPECT_EQ(play_options.message_order, rosbag2_transport::MessageOrder::SEND_TIMESTAMP); + EXPECT_EQ(play_options.message_order, rosbag2_transport::MessageOrder::SENT_TIMESTAMP); ASSERT_EQ(1, storage_options.size()); EXPECT_EQ(storage_options[0].uri, uri_str); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 6662774197..401ad9b77f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -233,7 +233,7 @@ TEST_P(RosBag2PlayTestFixtureMessageOrder, recorded_msgs_are_played_for_all_topi switch (message_order) { case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP: return msg->recv_timestamp; - case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: + case rosbag2_transport::MessageOrder::SENT_TIMESTAMP: return msg->send_timestamp; default: throw std::runtime_error("unknown rosbag2_transport::MessageOrder value"); @@ -257,7 +257,7 @@ INSTANTIATE_TEST_SUITE_P( RosBag2PlayTestFixtureMessageOrder, Values( rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP, - rosbag2_transport::MessageOrder::SEND_TIMESTAMP + rosbag2_transport::MessageOrder::SENT_TIMESTAMP ), Rosbag2TransportTestFixture::format_message_order ); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 0162ea8c0a..af64009cb3 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -177,7 +177,7 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration_mess // Expect to receive 2 messages from play() due to the send_timestamp order sub_->add_subscription(kTopic1_, 2u); play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; - play_options_.message_order = MessageOrder::SEND_TIMESTAMP; + play_options_.message_order = MessageOrder::SENT_TIMESTAMP; std::shared_ptr player_ = std::make_shared( std::move(reader), storage_options_, play_options_); From 72207add6203f39fe2fd3c27b80e19cf890e3b8f Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Fri, 6 Dec 2024 16:39:49 -0500 Subject: [PATCH 11/12] Change recv_timestamp -> received_timestamp Signed-off-by: Christophe Bedard --- .../test/rosbag2_transport/rosbag2_transport_test_fixture.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index bec884665b..fa27e58f47 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -83,7 +83,7 @@ class Rosbag2TransportTestFixture : public Test { switch (info.param) { case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP: - return "recv_timestamp"; + return "received_timestamp"; case rosbag2_transport::MessageOrder::SENT_TIMESTAMP: return "sent_timestamp"; default: From 71474caafa648af1975c39744a8ce9b7053801d4 Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Sat, 7 Dec 2024 15:18:36 -0500 Subject: [PATCH 12/12] Document some 'ros2 bag play' options Signed-off-by: Christophe Bedard --- README.md | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7d733c73cd..16499038f6 100644 --- a/README.md +++ b/README.md @@ -29,11 +29,11 @@ rosbag2 is part of the ROS 2 command line interface as `ros2 bag`. These verbs are available for `ros2 bag`: * `ros2 bag burst` -* `ros2 bag convert` -* `ros2 bag info` +* [`ros2 bag convert`](#converting-bags) +* [`ros2 bag info`](#analyzing-data) * `ros2 bag list` -* `ros2 bag play` -* `ros2 bag record` +* [`ros2 bag play`](#replaying-data) +* [`ros2 bag record`](#recording-data) * `ros2 bag reindex` For up-to-date information on the available options for each, use `ros2 bag --help`. @@ -146,6 +146,27 @@ $ ros2 bag play -i -i Messages from all provided bags will be played in order, based on their original recording reception timestamps. +Options: + +* `--topics`: + Space-delimited list of topics to play. +* `--services`: + Space-delimited list of services to play. +* `-e,--regex`: + Play only topics and services matches with regular expression. +* `-x,--exclude-regex`: + Regular expressions to exclude topics and services from replay. +* `--exclude-topics`: + Space-delimited list of topics not to play. +* `--exclude-services`: + Space-delimited list of services not to play. +* `--message-order {received,sent}`: + The reference to use for bag message chronological ordering. + Choices: reception timestamp (`received`), publication timestamp (`sent`). + Default: reception timestamp. + +For more options, run with `--help`. + #### Controlling playback via services The Rosbag2 player provides the following services for remote control, which can be called via `ros2 service` commandline or from your nodes,