diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 2c8bf723a..3df54bdb7 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -26,6 +26,7 @@ from ros2bag.api import print_warn from ros2bag.verb import VerbExtension from ros2cli.node import NODE_NAME_PREFIX +from rosbag2_py import MessageOrder from rosbag2_py import Player from rosbag2_py import PlayOptions from rosbag2_py import ServiceRequestsSource @@ -166,6 +167,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Determine the source of the service requests to be replayed. This option only ' 'makes sense if the "--publish-service-requests" option is set. By default,' ' the service requests replaying from recorded service introspection message.') + parser.add_argument( + '--message-order', default='recv', + choices=['recv', 'send'], + help='The reference to use for bag message chronological ordering. Choices: reception ' + 'timestamp, publication timestamp. Default: reception timestamp. ' + 'If messages are significantly disordered (within a single bag or across ' + 'multiple bags), replayed messages may not be correctly ordered. A possible ' + 'solution could be to increase the read_ahead_queue_size value to buffer (and ' + 'order) more messages.') parser.add_argument( '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], @@ -275,6 +285,11 @@ def main(self, *, args): # noqa: D102 play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION else: play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION + # argparse makes sure that we get a valid arg value + play_options.message_order = { + 'recv': MessageOrder.RECV_TIMESTAMP, + 'send': MessageOrder.SEND_TIMESTAMP, + }.get(args.message_order) player = Player(args.log_level) try: diff --git a/rosbag2_py/rosbag2_py/__init__.py b/rosbag2_py/rosbag2_py/__init__.py index f68250997..46354dbc7 100644 --- a/rosbag2_py/rosbag2_py/__init__.py +++ b/rosbag2_py/rosbag2_py/__init__.py @@ -59,6 +59,7 @@ Info, ) from rosbag2_py._transport import ( + MessageOrder, Player, PlayOptions, ServiceRequestsSource, @@ -98,6 +99,7 @@ 'TopicInformation', 'BagMetadata', 'MessageDefinition', + 'MessageOrder', 'MetadataIo', 'Info', 'Player', diff --git a/rosbag2_py/rosbag2_py/__init__.pyi b/rosbag2_py/rosbag2_py/__init__.pyi index c38eaa338..7fdbfe63d 100644 --- a/rosbag2_py/rosbag2_py/__init__.pyi +++ b/rosbag2_py/rosbag2_py/__init__.pyi @@ -4,7 +4,7 @@ from rosbag2_py._message_definitions import LocalMessageDefinitionSource as Loca from rosbag2_py._reader import SequentialCompressionReader as SequentialCompressionReader, SequentialReader as SequentialReader, get_registered_readers as get_registered_readers from rosbag2_py._reindexer import Reindexer as Reindexer from rosbag2_py._storage import BagMetadata as BagMetadata, ConverterOptions as ConverterOptions, FileInformation as FileInformation, MessageDefinition as MessageDefinition, MetadataIo as MetadataIo, ReadOrder as ReadOrder, ReadOrderSortBy as ReadOrderSortBy, StorageFilter as StorageFilter, StorageOptions as StorageOptions, TopicInformation as TopicInformation, TopicMetadata as TopicMetadata, convert_rclcpp_qos_to_rclpy_qos as convert_rclcpp_qos_to_rclpy_qos, get_default_storage_id as get_default_storage_id, to_rclcpp_qos_vector as to_rclcpp_qos_vector -from rosbag2_py._transport import PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite +from rosbag2_py._transport import MessageOrder as MessageOrder, PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite from rosbag2_py._writer import SequentialCompressionWriter as SequentialCompressionWriter, SequentialWriter as SequentialWriter, get_registered_compressors as get_registered_compressors, get_registered_serializers as get_registered_serializers, get_registered_writers as get_registered_writers -__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] +__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MessageOrder', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index c2512f483..94dc502a6 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -3,6 +3,22 @@ import rosbag2_py._storage from _typeshed import Incomplete from typing import ClassVar, List, overload +class MessageOrder: + __members__: ClassVar[dict] = ... # read-only + RECV_TIMESTAMP: ClassVar[MessageOrder] = ... + SEND_TIMESTAMP: ClassVar[MessageOrder] = ... + __entries: ClassVar[dict] = ... + def __init__(self, value: int) -> None: ... + def __eq__(self, other: object) -> bool: ... + def __hash__(self) -> int: ... + def __index__(self) -> int: ... + def __int__(self) -> int: ... + def __ne__(self, other: object) -> bool: ... + @property + def name(self) -> str: ... + @property + def value(self) -> int: ... + class PlayOptions: clock_publish_frequency: float clock_publish_on_topic_publish: bool @@ -14,6 +30,7 @@ class PlayOptions: exclude_service_events_to_filter: List[str] exclude_topics_to_filter: List[str] loop: bool + message_order: Incomplete node_prefix: str playback_duration: float playback_until_timestamp: int diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index afe813b2f..09023f872 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -560,6 +560,7 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) .def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests) .def_readwrite("service_requests_source", &PlayOptions::service_requests_source) + .def_readwrite("message_order", &PlayOptions::message_order) ; py::enum_(m, "ServiceRequestsSource") @@ -567,6 +568,11 @@ PYBIND11_MODULE(_transport, m) { .value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION) ; + py::enum_(m, "MessageOrder") + .value("RECV_TIMESTAMP", rosbag2_transport::MessageOrder::RECV_TIMESTAMP) + .value("SEND_TIMESTAMP", rosbag2_transport::MessageOrder::SEND_TIMESTAMP) + ; + py::class_(m, "RecordOptions") .def(py::init<>()) .def_readwrite("all_topics", &RecordOptions::all_topics) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index e5029c923..7489c9825 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -34,6 +34,14 @@ enum class ServiceRequestsSource : int8_t CLIENT_INTROSPECTION = 1 }; +enum class MessageOrder : std::uint8_t +{ + // Order chronologically by message reception timestamp + RECV_TIMESTAMP = 0, + // Order chronologically by message publication timestamp + SEND_TIMESTAMP = 1 +}; + struct PlayOptions { public: @@ -123,6 +131,12 @@ struct PlayOptions // The source of the service request ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + + // The reference to use for bag message chronological ordering. + // If messages are significantly disordered (within a single bag or across multiple bags), + // replayed messages may not be correctly ordered. A possible solution could be to increase the + // read_ahead_queue_size value to buffer (and order) more messages. + MessageOrder message_order = MessageOrder::RECV_TIMESTAMP; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d3b322ee5..8f0d0e0bb 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -303,6 +303,8 @@ class PlayerImpl void create_control_services(); void configure_play_until_timestamp(); bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; + rcutils_time_point_value_t get_message_order_timestamp( + const rosbag2_storage::SerializedBagMessageSharedPtr & message) const; static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; @@ -320,7 +322,7 @@ class PlayerImpl rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; using BagMessageComparator = std::function< - int( + bool( const rosbag2_storage::SerializedBagMessageSharedPtr &, const rosbag2_storage::SerializedBagMessageSharedPtr &)>; LockedPriorityQueue< @@ -372,6 +374,8 @@ class PlayerImpl std::shared_ptr player_service_client_manager_; + static BagMessageComparator get_bag_message_comparator(const MessageOrder & order); + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp. struct { @@ -381,9 +385,39 @@ class PlayerImpl { return l->recv_timestamp > r->recv_timestamp; } - } bag_message_chronological_recv_timestamp_comparator; + } static bag_message_chronological_recv_timestamp_comparator; + + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp. + struct + { + bool operator()( + const rosbag2_storage::SerializedBagMessageSharedPtr & l, + const rosbag2_storage::SerializedBagMessageSharedPtr & r) const + { + return l->send_timestamp > r->send_timestamp; + } + } static bag_message_chronological_send_timestamp_comparator; }; +decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator) +PlayerImpl::bag_message_chronological_recv_timestamp_comparator; +decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator) +PlayerImpl::bag_message_chronological_send_timestamp_comparator; + +PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) +{ + switch (order) { + case MessageOrder::RECV_TIMESTAMP: + return bag_message_chronological_recv_timestamp_comparator; + case MessageOrder::SEND_TIMESTAMP: + return bag_message_chronological_send_timestamp_comparator; + default: + throw std::runtime_error( + "unknown MessageOrder: " + + std::to_string(static_cast>(order))); + } +} + PlayerImpl::PlayerImpl( Player * owner, std::vector && readers_with_options, @@ -392,7 +426,7 @@ PlayerImpl::PlayerImpl( : readers_with_options_(std::move(readers_with_options)), owner_(owner), play_options_(play_options), - message_queue_(bag_message_chronological_recv_timestamp_comparator), + message_queue_(get_bag_message_comparator(play_options_.message_order)), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()) { @@ -984,13 +1018,13 @@ void PlayerImpl::play_messages_from_queue() while (rclcpp::ok() && !stop_playback_) { // While there's a message to play and we haven't reached the end timestamp yet while (rclcpp::ok() && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) + message_ptr != nullptr && !shall_stop_at_timestamp(get_message_order_timestamp(message_ptr))) { // Sleep until the message's replay time, do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause // However, skip sleeping if we're trying to play the next message while (rclcpp::ok() && !stop_playback_ && !play_next_.load() && - !clock_->sleep_until(message_ptr->recv_timestamp)) + !clock_->sleep_until(get_message_order_timestamp(message_ptr))) { // Stop sleeping if cancelled if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { @@ -1012,7 +1046,7 @@ void PlayerImpl::play_messages_from_queue() const bool message_published = publish_message(message_ptr); // If we tried to publish because of play_next(), jump the clock if (play_next_.load()) { - clock_->jump(message_ptr->recv_timestamp); + clock_->jump(get_message_order_timestamp(message_ptr)); // If we successfully played next, notify that we're done, otherwise keep trying if (message_published) { play_next_ = false; @@ -1058,6 +1092,22 @@ void PlayerImpl::play_messages_from_queue() } } +rcutils_time_point_value_t PlayerImpl::get_message_order_timestamp( + const rosbag2_storage::SerializedBagMessageSharedPtr & message) const +{ + switch (play_options_.message_order) { + case MessageOrder::RECV_TIMESTAMP: + return message->recv_timestamp; + case MessageOrder::SEND_TIMESTAMP: + return message->send_timestamp; + default: + throw std::runtime_error( + "unknown MessageOrder: " + + std::to_string( + static_cast>(play_options_.message_order))); + } +} + namespace { bool allow_topic( diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 594c1a92a..81699d245 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -55,17 +55,42 @@ class Rosbag2TransportTestFixture : public Test std::shared_ptr serialize_test_message( const std::string & topic, - int64_t milliseconds, + int64_t milliseconds_recv, + std::shared_ptr message) + { + return serialize_test_message(topic, milliseconds_recv, 0, message); + } + + template + std::shared_ptr + serialize_test_message( + const std::string & topic, + int64_t milliseconds_recv, + int64_t milliseconds_send, std::shared_ptr message) { auto bag_msg = std::make_shared(); bag_msg->serialized_data = memory_management_.serialize_message(message); - bag_msg->recv_timestamp = milliseconds * 1000000; + bag_msg->recv_timestamp = milliseconds_recv * 1000000; + bag_msg->send_timestamp = milliseconds_send * 1000000; bag_msg->topic_name = topic; return bag_msg; } + static std::string format_message_order( + const TestParamInfo & info) + { + switch (info.param) { + case rosbag2_transport::MessageOrder::RECV_TIMESTAMP: + return "recv_timestamp"; + case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: + return "send_timestamp"; + default: + throw std::runtime_error("unknown value"); + } + } + MemoryManagement memory_management_; rosbag2_storage::StorageOptions storage_options_; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 64952fe39..84bdd8073 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -109,6 +109,10 @@ void spin_thread_and_wait_for_sent_service_requests_to_finish( } } // namespace +class RosBag2PlayTestFixtureMessageOrder + : public RosBag2PlayTestFixture, public WithParamInterface +{}; + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) { auto primitive_message1 = get_messages_basic_types()[0]; @@ -174,7 +178,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } -TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_three_bags) +TEST_P( + RosBag2PlayTestFixtureMessageOrder, recorded_messages_are_played_for_all_topics_from_three_bags) { auto msg = get_messages_basic_types()[0]; msg->int32_value = 42; @@ -184,30 +189,31 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ {2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""}, }; - // Make sure each reader's/bag's messages are ordered by time - // However, do interlace messages across bags + // Make sure each reader's/bag's messages are ordered by recv_timestamp + // However, do interlace messages based on recv_timestamp across bags and based on send_timestamp + // within a bag and across bags std::vector>> messages_list{}; messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 5, msg), - serialize_test_message("topic1", 8, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 13, msg), - serialize_test_message("topic2", 14, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 5, 2, msg), + serialize_test_message("topic1", 8, 4, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 13, 7, msg), + serialize_test_message("topic2", 14, 15, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 2, msg), - serialize_test_message("topic2", 3, msg), - serialize_test_message("topic1", 6, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 12, msg), - serialize_test_message("topic2", 16, msg)}); + serialize_test_message("topic1", 2, 1, msg), + serialize_test_message("topic2", 3, 2, msg), + serialize_test_message("topic1", 6, 5, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 12, 7, msg), + serialize_test_message("topic2", 16, 14, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 4, msg), - serialize_test_message("topic1", 7, msg), - serialize_test_message("topic2", 9, msg), - serialize_test_message("topic1", 11, msg), - serialize_test_message("topic2", 15, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 4, 3, msg), + serialize_test_message("topic1", 7, 2, msg), + serialize_test_message("topic2", 9, 9, msg), + serialize_test_message("topic1", 11, 8, msg), + serialize_test_message("topic2", 15, 7, msg)}); std::vector bags{}; std::size_t total_messages = 0u; for (std::size_t i = 0u; i < messages_list.size(); i++) { @@ -219,13 +225,27 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ } ASSERT_GT(total_messages, 0u); + const rosbag2_transport::MessageOrder message_order = GetParam(); + play_options_.message_order = message_order; auto player = std::make_shared(std::move(bags), play_options_); std::size_t num_played_messages = 0u; rcutils_time_point_value_t last_timetamp = 0; + const auto get_timestamp = + [message_order](std::shared_ptr msg) { + switch (message_order) { + case rosbag2_transport::MessageOrder::RECV_TIMESTAMP: + return msg->recv_timestamp; + case rosbag2_transport::MessageOrder::SEND_TIMESTAMP: + return msg->send_timestamp; + default: + throw std::runtime_error("unknown rosbag2_transport::MessageOrder value"); + } + }; const auto callback = [&](std::shared_ptr msg) { // Make sure messages are played in order - EXPECT_LE(last_timetamp, msg->recv_timestamp); - last_timetamp = msg->recv_timestamp; + const auto timestamp = get_timestamp(msg); + EXPECT_LE(last_timetamp, timestamp); + last_timetamp = timestamp; num_played_messages++; }; player->add_on_play_message_pre_callback(callback); @@ -234,6 +254,16 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ EXPECT_EQ(total_messages, num_played_messages); } +INSTANTIATE_TEST_SUITE_P( + ParametrizedPlayTests, + RosBag2PlayTestFixtureMessageOrder, + Values( + rosbag2_transport::MessageOrder::RECV_TIMESTAMP, + rosbag2_transport::MessageOrder::SEND_TIMESTAMP + ), + Rosbag2TransportTestFixture::format_message_order +); + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) { const std::string service_name1 = "/test_service1"; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 8e12da5eb..5ab903b6d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -154,6 +154,48 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) EXPECT_EQ(replayed_topic1[1]->int32_value, 1); } +TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration_message_order) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {1u, kTopic1Name_, "test_msgs/BasicTypes", "", {}, ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 10, 5, primitive_message1), + serialize_test_message(kTopic1Name_, 50, 45, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 2 messages from play() due to the send_timestamp order + sub_->add_subscription(kTopic1_, 2u); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; + play_options_.message_order = MessageOrder::SEND_TIMESTAMP; + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + player_->wait_for_playback_to_finish(); + + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 2); +} + TEST_F( RosBag2PlayUntilTestFixture, play_until_intermediate_duration_recorded_messages_with_filtered_topics)