From a82f2597188a67cb11752815481d130fec18a9bb Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Wed, 16 Oct 2024 11:23:41 -0700 Subject: [PATCH] Support replaying multiple bags Signed-off-by: Christophe Bedard --- .github/workflows/lint.yml | 2 - .github/workflows/test.yml | 2 - README.md | 8 + ros2bag/ros2bag/api/__init__.py | 91 +++- ros2bag/ros2bag/verb/convert.py | 22 +- ros2bag/ros2bag/verb/play.py | 46 +- ros2bag/test/test_api.py | 27 ++ rosbag2/package.xml | 1 - rosbag2_cpp/package.xml | 1 - rosbag2_py/rosbag2_py/_transport.pyi | 3 + rosbag2_py/src/rosbag2_py/_transport.cpp | 33 +- rosbag2_transport/CMakeLists.txt | 4 +- .../include/rosbag2_transport/player.hpp | 62 ++- rosbag2_transport/package.xml | 3 +- .../locked_priority_queue.hpp | 110 +++++ .../src/rosbag2_transport/player.cpp | 417 ++++++++++++------ .../test/rosbag2_transport/mock_player.hpp | 9 +- .../test_composable_player.cpp | 21 +- .../test/rosbag2_transport/test_play.cpp | 60 +++ .../test/rosbag2_transport/test_play_next.cpp | 4 + .../test/rosbag2_transport/test_play_seek.cpp | 8 +- shared_queues_vendor/CHANGELOG.rst | 178 -------- shared_queues_vendor/CMakeLists.txt | 44 -- shared_queues_vendor/package.xml | 19 - 24 files changed, 753 insertions(+), 422 deletions(-) create mode 100644 rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp delete mode 100644 shared_queues_vendor/CHANGELOG.rst delete mode 100644 shared_queues_vendor/CMakeLists.txt delete mode 100644 shared_queues_vendor/package.xml diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 23a1dd4704..3f6577d2b1 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -33,7 +33,6 @@ jobs: rosbag2_test_common rosbag2_tests rosbag2_transport - shared_queues_vendor ament_lint_cpp: # Linters applicable to C++ packages name: ament_${{ matrix.linter }} @@ -62,7 +61,6 @@ jobs: rosbag2_test_common rosbag2_tests rosbag2_transport - shared_queues_vendor ament_lint_clang_format: # Linters applicable to C++ packages formatted with clang-format name: ament_${{ matrix.linter }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b5aa16f483..719328cc75 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,7 +48,6 @@ jobs: "rosbag2_test_msgdefs", "rosbag2_tests", "rosbag2_transport", - "shared_queues_vendor", "sqlite3_vendor", "zstd_vendor" ] @@ -74,7 +73,6 @@ jobs: "rosbag2_test_msgdefs", "rosbag2_tests", "rosbag2_transport", - "shared_queues_vendor", "sqlite3_vendor", "zstd_vendor" ] diff --git a/README.md b/README.md index 6dde2dc93a..7d733c73cd 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,14 @@ $ ros2 bag play The bag argument can be a directory containing `metadata.yaml` and one or more storage files, or to a single storage file such as `.mcap` or `.db3`. The Player will automatically detect which storage implementation to use for playing. +To play back multiple bags: + +``` +$ ros2 bag play -i -i +``` + +Messages from all provided bags will be played in order, based on their original recording reception timestamps. + #### Controlling playback via services The Rosbag2 player provides the following services for remote control, which can be called via `ros2 service` commandline or from your nodes, diff --git a/ros2bag/ros2bag/api/__init__.py b/ros2bag/ros2bag/api/__init__.py index 565cae52aa..bab50b35db 100644 --- a/ros2bag/ros2bag/api/__init__.py +++ b/ros2bag/ros2bag/api/__init__.py @@ -21,6 +21,7 @@ import os from typing import Any from typing import Dict +from typing import List from typing import Optional from rclpy.duration import Duration @@ -158,15 +159,103 @@ def check_not_negative_int(arg: str) -> int: def add_standard_reader_args(parser: ArgumentParser) -> None: + """ + Add arguments for one input bag. + + :param parser: the parser + """ reader_choices = rosbag2_py.get_registered_readers() parser.add_argument( - 'bag_path', type=check_path_exists, help='Bag to open') + 'bag_path', + nargs=None, + type=check_path_exists, + help='Bag to open.') parser.add_argument( '-s', '--storage', default='', choices=reader_choices, help='Storage implementation of bag. ' 'By default attempts to detect automatically - use this argument to override.') +def add_standard_multi_reader_args(parser: ArgumentParser) -> None: + """ + Add arguments for multiple input bags. + + :param parser: the parser + """ + # Let user provide an input bag path using an optional positional arg, but require them to use + # --input to provide an input bag with a specific storage ID + reader_choices = rosbag2_py.get_registered_readers() + parser.add_argument( + 'bag_path', + nargs='?', + type=check_path_exists, + help='Bag to open. Use --input instead to provide an input bag with a specific storage ID.') + parser.add_argument( + '-s', '--storage', default='', choices=reader_choices, + help='Storage implementation of bag. ' + 'By default attempts to detect automatically - use this argument to override.' + ' (deprecated: use --input to provide an input bag with a specific storage ID)') + add_multi_bag_input_arg(parser, required=False) + + +def add_multi_bag_input_arg(parser: ArgumentParser, required: bool = False) -> None: + """ + Add option for list of input bags. + + :param parser: the parser + :param required: whether this option should be required + """ + reader_choices = ', '.join(rosbag2_py.get_registered_readers()) + parser.add_argument( + '-i', '--input', + required=required, + action='append', nargs='+', + metavar=('uri', 'storage_id'), + help='URI (and optional storage ID) of an input bag. ' + 'May be provided more than once for multiple input bags. ' + f'Storage ID options are: {reader_choices}.') + + +def input_bag_arg_to_storage_options( + input_arg: List[List[str]], + storage_config_file: Optional[str] = None, +) -> List[rosbag2_py.StorageOptions]: + """ + Convert input bag argument value(s) to list of StorageOptions. + + Raises ValueError if validation fails, including: + 1. Bag path existence + 2. Storage ID + 3. Storage config file existence + + :param input_arg: the values of the input argument + :param storage_config_file: the storage config file, if any + """ + if storage_config_file and not os.path.exists(storage_config_file): + raise ValueError(f"File '{storage_config_file}' does not exist!") + storage_id_options = rosbag2_py.get_registered_readers() + storage_options = [] + for input_bag_info in input_arg: + if len(input_bag_info) > 2: + raise ValueError( + f'--input expects 1 or 2 arguments, {len(input_bag_info)} provided') + bag_path = input_bag_info[0] + if not os.path.exists(bag_path): + raise ValueError(f"Bag path '{bag_path}' does not exist!") + storage_id = input_bag_info[1] if len(input_bag_info) > 1 else '' + if storage_id and storage_id not in storage_id_options: + raise ValueError( + f"Unknown storage ID '{storage_id}', options are: {', '.join(storage_id_options)}") + options = rosbag2_py.StorageOptions( + uri=bag_path, + storage_id=storage_id, + ) + if storage_config_file: + options.storage_config_uri = storage_config_file + storage_options.append(options) + return storage_options + + def _parse_cli_storage_plugin(): plugin_choices = set(rosbag2_py.get_registered_writers()) default_storage = rosbag2_py.get_default_storage_id() diff --git a/ros2bag/ros2bag/verb/convert.py b/ros2bag/ros2bag/verb/convert.py index 080bb80c9d..78e87502c0 100644 --- a/ros2bag/ros2bag/verb/convert.py +++ b/ros2bag/ros2bag/verb/convert.py @@ -12,23 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse - +from ros2bag.api import add_multi_bag_input_arg +from ros2bag.api import input_bag_arg_to_storage_options from ros2bag.verb import VerbExtension from rosbag2_py import bag_rewrite -from rosbag2_py import StorageOptions class ConvertVerb(VerbExtension): """Given an input bag, write out a new bag with different settings.""" def add_arguments(self, parser, cli_name): - parser.add_argument( - '-i', '--input', - required=True, - action='append', nargs='+', - metavar=('uri', 'storage_id'), - help='URI (and optional storage ID) of an input bag. May be provided more than once') + add_multi_bag_input_arg(parser, required=True) parser.add_argument( '-o', '--output-options', type=str, required=True, @@ -37,14 +31,6 @@ def add_arguments(self, parser, cli_name): 'objects. See README.md for some examples.') def main(self, *, args): - input_options = [] - for input_bag in args.input: - if len(input_bag) > 2: - raise argparse.ArgumentTypeError( - f'--input expects 1 or 2 arguments, {len(input_bag)} provided') - storage_options = StorageOptions(uri=input_bag[0]) - if len(input_bag) > 1: - storage_options.storage_id = input_bag[1] - input_options.append(storage_options) + input_options = input_bag_arg_to_storage_options(args.input) bag_rewrite(input_options, args.output_options) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 363d43d59f..2c8bf723a1 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -15,13 +15,15 @@ from argparse import FileType from rclpy.qos import InvalidQoSProfileException -from ros2bag.api import add_standard_reader_args +from ros2bag.api import add_standard_multi_reader_args from ros2bag.api import check_not_negative_float from ros2bag.api import check_not_negative_int from ros2bag.api import check_positive_float from ros2bag.api import convert_service_to_service_event_topic from ros2bag.api import convert_yaml_to_qos_profile +from ros2bag.api import input_bag_arg_to_storage_options from ros2bag.api import print_error +from ros2bag.api import print_warn from ros2bag.verb import VerbExtension from ros2cli.node import NODE_NAME_PREFIX from rosbag2_py import Player @@ -42,7 +44,7 @@ class PlayVerb(VerbExtension): """Play back ROS data from a bag.""" def add_arguments(self, parser, cli_name): # noqa: D102 - add_standard_reader_args(parser) + add_standard_multi_reader_args(parser) parser.add_argument( '--read-ahead-queue-size', type=int, default=1000, help='size of message queue rosbag tries to hold in memory to help deterministic ' @@ -196,11 +198,41 @@ def main(self, *, args): # noqa: D102 topic_remapping.append('--remap') topic_remapping.append(remap_rule) - storage_options = StorageOptions( - uri=args.bag_path, - storage_id=args.storage, - storage_config_uri=storage_config_file, - ) + # Do not allow using both positional arg and --input option for input bags + if args.bag_path and args.input: + return print_error( + 'do not mix the [bag_path] positional argument and the -i,--input option; ' + 'for multiple input bags, use -i,--input multiple times') + # Add bag from optional positional arg, then bag(s) from optional flag + storage_options = [] + if args.bag_path: + storage_options.append(StorageOptions( + uri=args.bag_path, + storage_id=args.storage, + storage_config_uri=storage_config_file, + )) + if args.storage: + print(print_warn('--storage option is deprecated, use -i,--input to ' + 'provide an input bag with a specific storage ID')) + try: + storage_options.extend( + input_bag_arg_to_storage_options(args.input or [], storage_config_file)) + except ValueError as e: + return print_error(str(e)) + if not storage_options: + return print_error('no input bags were provided') + + # Users can currently only provide one storage config file, which is storage + # implementation-specific. Since we can replay bags from different storage + # implementations, this may lead to errors. For now, just warn if input bags have + # different explicit storage IDs and a storage config file is provided. + storage_ids = {options.storage_id for options in storage_options if options.storage_id} + if storage_config_file and len(storage_ids) > 1: + print( + print_warn('a global --storage-config-file was provided, but -i,--input bags are ' + 'using different explicit storage IDs, which may lead to errors with ' + f'replay: {storage_ids}')) + play_options = PlayOptions() play_options.read_ahead_queue_size = args.read_ahead_queue_size play_options.node_prefix = NODE_NAME_PREFIX diff --git a/ros2bag/test/test_api.py b/ros2bag/test/test_api.py index a711a48275..f8f8083056 100644 --- a/ros2bag/test/test_api.py +++ b/ros2bag/test/test_api.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path import unittest from rclpy.qos import DurabilityPolicy @@ -19,9 +20,13 @@ from rclpy.qos import ReliabilityPolicy from ros2bag.api import convert_yaml_to_qos_profile from ros2bag.api import dict_to_duration +from ros2bag.api import input_bag_arg_to_storage_options from ros2bag.api import interpret_dict_as_qos_profile +RESOURCES_PATH = Path(__file__).parent / 'resources' + + class TestRos2BagRecord(unittest.TestCase): def test_dict_to_duration_valid(self): @@ -84,3 +89,25 @@ def test_interpret_dict_as_qos_profile_negative(self): qos_dict = {'history': 'keep_all', 'liveliness_lease_duration': {'sec': -1, 'nsec': -1}} with self.assertRaises(ValueError): interpret_dict_as_qos_profile(qos_dict) + + def test_input_bag_arg_to_storage_options(self): + bag_path = (RESOURCES_PATH / 'empty_bag').as_posix() + # Just use a file that exists; the content does not matter + storage_config_file = (RESOURCES_PATH / 'qos_profile.yaml').as_posix() + + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([['path1', 'id1'], ['path2', 'id2', 'extra']]) + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([['path-does-not-exist']]) + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([[bag_path, 'id-does-not-exist']]) + with self.assertRaises(ValueError): + input_bag_arg_to_storage_options([[bag_path, 'mcap']], 'config-file-doesnt-exist') + + self.assertEqual([], input_bag_arg_to_storage_options([], None)) + storage_options = input_bag_arg_to_storage_options( + [[bag_path, 'mcap']], storage_config_file) + self.assertEqual(1, len(storage_options)) + self.assertEqual(bag_path, storage_options[0].uri) + self.assertEqual('mcap', storage_options[0].storage_id) + self.assertEqual(storage_config_file, storage_options[0].storage_config_uri) diff --git a/rosbag2/package.xml b/rosbag2/package.xml index 6819077015..70c4d62527 100644 --- a/rosbag2/package.xml +++ b/rosbag2/package.xml @@ -18,7 +18,6 @@ rosbag2_py rosbag2_storage rosbag2_transport - shared_queues_vendor rosbag2_compression_zstd diff --git a/rosbag2_cpp/package.xml b/rosbag2_cpp/package.xml index d48755fe80..f1a3321201 100644 --- a/rosbag2_cpp/package.xml +++ b/rosbag2_cpp/package.xml @@ -24,7 +24,6 @@ rosidl_runtime_cpp rosidl_typesupport_cpp rosidl_typesupport_introspection_cpp - shared_queues_vendor rosbag2_storage_default_plugins ament_cmake_gmock diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index fdd8f07802..c2512f483b 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -39,7 +39,10 @@ class Player: def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int) -> None: ... @staticmethod def cancel() -> None: ... + @overload def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions) -> None: ... + @overload + def play(self, storage_options: List[rosbag2_py._storage.StorageOptions], play_options: PlayOptions) -> None: ... class RecordOptions: all_services: bool diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 9d15e82b15..afe813b2f5 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -183,6 +183,13 @@ class Player void play( const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options) + { + play_impl(std::vector{storage_options}, play_options, false); + } + + void play( + const std::vector & storage_options, + PlayOptions & play_options) { play_impl(storage_options, play_options, false); } @@ -192,7 +199,7 @@ class Player PlayOptions & play_options, size_t num_messages) { - play_impl(storage_options, play_options, true, num_messages); + play_impl(std::vector{storage_options}, play_options, true, num_messages); } protected: @@ -240,14 +247,19 @@ class Player } void play_impl( - const rosbag2_storage::StorageOptions & storage_options, + const std::vector & storage_options, PlayOptions & play_options, bool burst = false, size_t burst_num_messages = 0) { install_signal_handlers(); try { - auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); + std::vector readers_with_options{}; + readers_with_options.reserve(storage_options.size()); + for (const auto & options : storage_options) { + readers_with_options.emplace_back( + rosbag2_transport::ReaderWriterFactory::make_reader(options), options); + } std::shared_ptr keyboard_handler; if (!play_options.disable_keyboard_controls) { #ifndef _WIN32 @@ -260,7 +272,7 @@ class Player #endif } auto player = std::make_shared( - std::move(reader), std::move(keyboard_handler), storage_options, play_options); + std::move(readers_with_options), std::move(keyboard_handler), play_options); rclcpp::executors::SingleThreadedExecutor exec; exec.add_node(player); @@ -591,7 +603,18 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init<>()) .def(py::init()) - .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) + .def( + "play", + py::overload_cast( + &rosbag2_py::Player::play), + py::arg("storage_options"), + py::arg("play_options")) + .def( + "play", + py::overload_cast &, PlayOptions &>( + &rosbag2_py::Player::play), + py::arg("storage_options"), + py::arg("play_options")) .def( "burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), py::arg("num_messages")) diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 2d120b3db5..3183672554 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -31,6 +31,7 @@ find_package(keyboard_handler REQUIRED) find_package(rcl REQUIRED) find_package(rclcpp REQUIRED) find_package(rclcpp_components REQUIRED) +find_package(rcpputils REQUIRED) find_package(rcutils REQUIRED) find_package(rmw REQUIRED) find_package(rosbag2_compression REQUIRED) @@ -38,7 +39,6 @@ find_package(rosbag2_cpp REQUIRED) find_package(rosbag2_interfaces REQUIRED) find_package(rosbag2_storage REQUIRED) find_package(rmw_implementation_cmake REQUIRED) -find_package(shared_queues_vendor REQUIRED) find_package(yaml_cpp_vendor REQUIRED) find_package(yaml-cpp REQUIRED) @@ -65,10 +65,10 @@ target_link_libraries(${PROJECT_NAME} PUBLIC target_link_libraries(${PROJECT_NAME} PRIVATE rcl::rcl rclcpp_components::component + rcpputils::rcpputils rcutils::rcutils rmw::rmw rosbag2_compression::rosbag2_compression - shared_queues_vendor::singleproducerconsumer yaml-cpp::yaml-cpp ) diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index d83bb69d23..d240d72761 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -74,6 +74,10 @@ class Player : public rclcpp::Node /// delete_on_play_message_callback. using callback_handle_t = uint64_t; + /// \brief Utility type for a pair of rosbag2_cpp::Reader and rosbag2_storage::StorageOptions. + using reader_storage_options_pair_t = \ + std::pair, rosbag2_storage::StorageOptions>; + /// \brief Const describing invalid value for callback_handle. ROSBAG2_TRANSPORT_PUBLIC static constexpr callback_handle_t invalid_callback_handle = 0; @@ -151,6 +155,55 @@ class Player : public rclcpp::Node const std::string & node_name = "rosbag2_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Constructor which will construct Player class with multiple readers and provided + /// storage options for each reader. + /// \note The KeyboardHandler class will be initialized with parameter which is disabling + /// signal handlers in it. + /// \param storage_options Storage options which will each be applied to a rosbag2_cpp::reader + /// after construction. + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + Player( + const std::vector & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + + /// \brief Constructor which will construct Player class with provided parameters. + /// \param readers_with_options Vector of pairs of unique pointer to the rosbag2_cpp::Reader class + /// (which will be moved to the internal instance of the Player class during construction) and + /// storage options (which will be applied to the rosbag2_cpp::reader when opening it). + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + Player( + std::vector && readers_with_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + + /// \brief Constructor which will construct Player class with provided parameters. + /// \param readers_with_options Vector of pairs of unique pointer to the rosbag2_cpp::Reader class + /// (which will be moved to the internal instance of the Player class during construction) and + /// storage options (which will be applied to the rosbag2_cpp::reader when opening it). + /// \param keyboard_handler Keyboard handler class uses to handle user input from keyboard. + /// \param play_options Playback settings for Player class. + /// \param node_name Name for the underlying node. + /// \param node_options Node options which will be used during construction of the underlying + /// node. + ROSBAG2_TRANSPORT_PUBLIC + Player( + std::vector && readers_with_options, + std::shared_ptr keyboard_handler, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_player", + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()); + /// \brief Default destructor. ROSBAG2_TRANSPORT_PUBLIC virtual ~Player(); @@ -298,14 +351,19 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC size_t get_number_of_registered_on_play_msg_post_callbacks(); + /// \brief Getter for the first of the currently stored storage options + /// \return Reference to the first item in the StorageOptions vector ROSBAG2_TRANSPORT_PUBLIC - /// \brief Getter for the currently stored storage options - /// \return Copy of the currently stored storage options const rosbag2_storage::StorageOptions & get_storage_options(); + /// \brief Getter for the currently stored storage options + /// \return Copy of the currently stored storage options ROSBAG2_TRANSPORT_PUBLIC + std::vector get_all_storage_options(); + /// \brief Getter for the currently stored play options /// \return Copy of the currently stored play options + ROSBAG2_TRANSPORT_PUBLIC const rosbag2_transport::PlayOptions & get_play_options(); private: diff --git a/rosbag2_transport/package.xml b/rosbag2_transport/package.xml index 53033b1c6b..4f2b00d60b 100644 --- a/rosbag2_transport/package.xml +++ b/rosbag2_transport/package.xml @@ -15,12 +15,13 @@ rclcpp rclcpp_components + rcpputils + rcutils rmw rosbag2_compression rosbag2_cpp rosbag2_interfaces rosbag2_storage - shared_queues_vendor yaml_cpp_vendor keyboard_handler diff --git a/rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp b/rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp new file mode 100644 index 0000000000..514730a2ab --- /dev/null +++ b/rosbag2_transport/src/rosbag2_transport/locked_priority_queue.hpp @@ -0,0 +1,110 @@ +// Copyright 2024 Apex.AI, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TRANSPORT__LOCKED_PRIORITY_QUEUE_HPP_ +#define ROSBAG2_TRANSPORT__LOCKED_PRIORITY_QUEUE_HPP_ + +#include +#include +#include +#include +#include + +#include "rcpputils/thread_safety_annotations.hpp" +#include "rcpputils/unique_lock.hpp" + +/// \brief `std::priority_queue` wrapper with locks. +/// \tparam T the element type +/// \tparam Container the underlying container type +/// \tparam Compare the comparator +/// \see std::priority_queue +template< + typename T, + typename Container = std::vector, + typename Compare = std::less +> +class LockedPriorityQueue +{ +public: + /// \brief Constructor. + /// \param compare the comparator object + explicit LockedPriorityQueue(const Compare & compare) + : queue_(compare) + {} + + LockedPriorityQueue() = delete; + LockedPriorityQueue(const LockedPriorityQueue &) = delete; + LockedPriorityQueue & operator=(const LockedPriorityQueue &) = delete; + LockedPriorityQueue(const LockedPriorityQueue &&) = delete; + LockedPriorityQueue & operator=(const LockedPriorityQueue &&) = delete; + + /// \brief Insert element and sort queue. + /// \param element the element + void push(const T & element) + { + rcpputils::unique_lock lk(queue_mutex_); + queue_.push(element); + } + + /// \brief Remove the top element. + void pop() + { + rcpputils::unique_lock lk(queue_mutex_); + queue_.pop(); + } + + /// \brief Check if the queue is empty. + /// \return whether the queue is empty + [[nodiscard]] bool empty() const + { + rcpputils::unique_lock lk(queue_mutex_); + return queue_.empty(); + } + + /// \brief Get the number of elements. + /// \return the number of elements + [[nodiscard]] std::size_t size() const + { + rcpputils::unique_lock lk(queue_mutex_); + return queue_.size(); + } + + /// Remove all elements from the queue. + void purge() + { + rcpputils::unique_lock lk(queue_mutex_); + while (!queue_.empty()) { + queue_.pop(); + } + } + + /// \brief Try to take the top element from the queue. + /// \return the top element, or `std::nullopt` if the queue is empty + [[nodiscard]] std::optional take() + { + rcpputils::unique_lock lk(queue_mutex_); + if (queue_.empty()) { + return std::nullopt; + } + T e = queue_.top(); + queue_.pop(); + return e; + } + +private: + mutable std::mutex queue_mutex_; + std::priority_queue queue_ RCPPUTILS_TSA_GUARDED_BY(queue_mutex_); +}; + +#endif // ROSBAG2_TRANSPORT__LOCKED_PRIORITY_QUEUE_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 74a911f1d8..2f3ba90dee 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -24,8 +25,6 @@ #include #include -#include "moodycamel/readerwriterqueue.h" - #include "rcl/graph.h" #include "rclcpp/rclcpp.hpp" @@ -43,6 +42,7 @@ #include "rosbag2_transport/reader_writer_factory.hpp" #include "logging.hpp" +#include "locked_priority_queue.hpp" namespace { @@ -87,12 +87,12 @@ class PlayerImpl public: using callback_handle_t = Player::callback_handle_t; using play_msg_callback_t = Player::play_msg_callback_t; + using reader_storage_options_pair_t = Player::reader_storage_options_pair_t; PlayerImpl( Player * owner, - std::unique_ptr reader, + std::vector && readers_with_options, std::shared_ptr keyboard_handler, - const rosbag2_storage::StorageOptions & storage_options, const rosbag2_transport::PlayOptions & play_options); virtual ~PlayerImpl(); @@ -215,9 +215,13 @@ class PlayerImpl /// \return Number of registered on_play_msg_post_callbacks size_t get_number_of_registered_on_play_msg_post_callbacks(); + /// \brief Getter for the first of the currently stored storage options + /// \return Copy of the first of the currently stored storage options + const rosbag2_storage::StorageOptions & get_storage_options(); + /// \brief Getter for the currently stored storage options /// \return Copy of the currently stored storage options - const rosbag2_storage::StorageOptions & get_storage_options(); + std::vector get_all_storage_options(); /// \brief Getter for the currently stored play options /// \return Copy of the currently stored play options @@ -280,10 +284,12 @@ class PlayerImpl std::unordered_map service_clients_; private: - rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); + rosbag2_storage::SerializedBagMessageSharedPtr take_next_message_from_queue(); void load_storage_content(); bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); + void enqueue_up_to_boundary( + const size_t boundary, + const size_t message_queue_size) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; void play_messages_from_queue(); void prepare_publishers(); @@ -304,16 +310,23 @@ class PlayerImpl std::atomic_bool stop_playback_{false}; std::mutex reader_mutex_; - std::unique_ptr reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_); + std::vector readers_with_options_ RCPPUTILS_TSA_GUARDED_BY( + reader_mutex_); void publish_clock_update(); void publish_clock_update(const rclcpp::Time & time); Player * owner_; - rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; - moodycamel::ReaderWriterQueue message_queue_; + using BagMessageComparator = std::function< + int( + const rosbag2_storage::SerializedBagMessageSharedPtr &, + const rosbag2_storage::SerializedBagMessageSharedPtr &)>; + LockedPriorityQueue< + rosbag2_storage::SerializedBagMessageSharedPtr, + std::vector, + BagMessageComparator> message_queue_; mutable std::future storage_loading_future_; std::atomic_bool load_storage_content_{true}; std::unordered_map topic_qos_profile_overrides_; @@ -327,6 +340,15 @@ class PlayerImpl std::thread playback_thread_; std::condition_variable playback_finished_cv_; + // Request to play next + std::atomic_bool play_next_{false}; + // Whether we're done trying to play next + std::atomic_bool finished_play_next_{false}; + std::mutex finished_play_next_mutex_; + std::condition_variable finished_play_next_cv_; + // Whether we successfully played next + std::atomic_bool play_next_result_{false}; + rcutils_time_point_value_t starting_time_; // control services @@ -349,52 +371,69 @@ class PlayerImpl std::vector keyboard_callbacks_; std::shared_ptr player_service_client_manager_; + + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp. + struct + { + bool operator()( + const rosbag2_storage::SerializedBagMessageSharedPtr & l, + const rosbag2_storage::SerializedBagMessageSharedPtr & r) const + { + return l->recv_timestamp > r->recv_timestamp; + } + } bag_message_chronological_recv_timestamp_comparator; }; PlayerImpl::PlayerImpl( Player * owner, - std::unique_ptr reader, + std::vector && readers_with_options, std::shared_ptr keyboard_handler, - const rosbag2_storage::StorageOptions & storage_options, const rosbag2_transport::PlayOptions & play_options) -: owner_(owner), - storage_options_(storage_options), +: readers_with_options_(std::move(readers_with_options)), + owner_(owner), play_options_(play_options), + message_queue_(bag_message_chronological_recv_timestamp_comparator), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()) { for (auto & topic : play_options_.topics_to_filter) { topic = rclcpp::expand_topic_or_service_name( - topic, owner->get_name(), - owner->get_namespace(), false); + topic, owner_->get_name(), + owner_->get_namespace(), false); } for (auto & exclude_topic : play_options_.exclude_topics_to_filter) { exclude_topic = rclcpp::expand_topic_or_service_name( - exclude_topic, owner->get_name(), - owner->get_namespace(), false); + exclude_topic, owner_->get_name(), + owner_->get_namespace(), false); } for (auto & service_event_topic : play_options_.services_to_filter) { service_event_topic = rclcpp::expand_topic_or_service_name( - service_event_topic, owner->get_name(), - owner->get_namespace(), false); + service_event_topic, owner_->get_name(), + owner_->get_namespace(), false); } for (auto & exclude_service_event_topic : play_options_.exclude_services_to_filter) { exclude_service_event_topic = rclcpp::expand_topic_or_service_name( - exclude_service_event_topic, owner->get_name(), - owner->get_namespace(), false); + exclude_service_event_topic, owner_->get_name(), + owner_->get_namespace(), false); } { std::lock_guard lk(reader_mutex_); - reader_ = std::move(reader); - // keep reader open until player is destroyed - reader_->open(storage_options_, {"", rmw_get_serialization_format()}); - auto metadata = reader_->get_metadata(); - starting_time_ = std::chrono::duration_cast( - metadata.starting_time.time_since_epoch()).count(); + starting_time_ = std::numeric_limits::max(); + for (const auto & [reader, storage_options] : readers_with_options_) { + // keep readers open until player is destroyed + reader->open(storage_options, {"", rmw_get_serialization_format()}); + // Find earliest starting time + const auto metadata = reader->get_metadata(); + const auto metadata_starting_time = std::chrono::duration_cast( + metadata.starting_time.time_since_epoch()).count(); + if (metadata_starting_time < starting_time_) { + starting_time_ = metadata_starting_time; + } + } // If a non-default (positive) starting time offset is provided in PlayOptions, // then add the offset to the starting time obtained from reader metadata if (play_options_.start_offset < 0) { @@ -431,10 +470,12 @@ PlayerImpl::~PlayerImpl() keyboard_handler_->delete_key_press_callback(cb_handle); } } - // closes reader + // closes readers std::lock_guard lk(reader_mutex_); - if (reader_) { - reader_->close(); + for (const auto & [reader, _] : readers_with_options_) { + if (reader) { + reader->close(); + } } } @@ -491,7 +532,9 @@ bool PlayerImpl::play() } { std::lock_guard lk(reader_mutex_); - reader_->seek(starting_time_); + for (const auto & [reader, _] : readers_with_options_) { + reader->seek(starting_time_); + } clock_->jump(starting_time_); } load_storage_content_ = true; @@ -504,7 +547,7 @@ bool PlayerImpl::play() load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} - while (message_queue_.pop()) {} // cleanup queue + message_queue_.purge(); { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -515,7 +558,7 @@ bool PlayerImpl::play() RCLCPP_ERROR(owner_->get_logger(), "Failed to play: %s", e.what()); load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} - while (message_queue_.pop()) {} // cleanup queue + message_queue_.purge(); } { @@ -552,6 +595,15 @@ bool PlayerImpl::play() is_in_playback_ = false; playback_finished_cv_.notify_all(); } + + // If we get here and still have/just got a play next request, make sure to notify + // After that, requests will be automatically rejected since is_in_playback_ is false + if (play_next_.exchange(false)) { + std::lock_guard lk(finished_play_next_mutex_); + finished_play_next_ = true; + play_next_result_ = false; + finished_play_next_cv_.notify_all(); + } }); return true; } @@ -643,10 +695,10 @@ bool PlayerImpl::set_rate(double rate) return ok; } -rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_from_queue() +rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::take_next_message_from_queue() { - rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr_ptr = message_queue_.peek(); - while (!stop_playback_ && message_ptr_ptr == nullptr && + // Wait until there's a message in the queue, unless there are no new messages or we have to stop + while (!stop_playback_ && message_queue_.empty() && !is_storage_completely_loaded() && rclcpp::ok()) { RCLCPP_WARN_THROTTLE( @@ -655,25 +707,18 @@ rosbag2_storage::SerializedBagMessageSharedPtr PlayerImpl::peek_next_message_fro 1000, "Message queue starved. Messages will be delayed. Consider " "increasing the --read-ahead-queue-size option."); - std::this_thread::sleep_for(std::chrono::microseconds(100)); - message_ptr_ptr = message_queue_.peek(); } - - // Workaround for race condition between peek and is_storage_completely_loaded() - // Don't sync with mutex for the sake of the performance - if (message_ptr_ptr == nullptr) { - message_ptr_ptr = message_queue_.peek(); - } - - if (message_ptr_ptr != nullptr) { - return *message_ptr_ptr; - } - return nullptr; + // Note: this only returns nullptr if no more messages + return message_queue_.take().value_or(nullptr); } bool PlayerImpl::play_next() { + if (!is_in_playback_) { + RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but player is not playing."); + return false; + } if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); return false; @@ -682,15 +727,6 @@ bool PlayerImpl::play_next() // Use RCLCPP_DEBUG_STREAM to avoid delays in the burst mode RCLCPP_DEBUG_STREAM(owner_->get_logger(), "Playing next message."); - // Temporary take over playback from play_messages_from_queue() - std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); - // Check one more time that we are in pause mode after waiting on mutex. Someone could call - // resume() or stop() from another thread while we were waiting on mutex. - if (!clock_->is_paused()) { - RCLCPP_WARN_STREAM(owner_->get_logger(), "Called play next, but not in paused state."); - return false; - } - skip_message_in_main_play_loop_ = true; // Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to // be called if not yet and queue to be filled with messages. { @@ -698,19 +734,19 @@ bool PlayerImpl::play_next() ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); } - rosbag2_storage::SerializedBagMessageSharedPtr message_ptr = peek_next_message_from_queue(); + // Request to play next + play_next_ = true; - bool next_message_published = false; - while (rclcpp::ok() && !next_message_published && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) - { - next_message_published = publish_message(message_ptr); - clock_->jump(message_ptr->recv_timestamp); - - message_queue_.pop(); - message_ptr = peek_next_message_from_queue(); - } - return next_message_published; + // Wait for play next to be done, and then return the result + std::unique_lock lk(finished_play_next_mutex_); + // Temporarily resume clock to force wakeup in clock_->sleep_until(time), + // then return in pause mode to preserve original state of the player + clock_->resume(); + clock_->pause(); + finished_play_next_ = false; + finished_play_next_cv_.wait(lk, [this] {return finished_play_next_.load();}); + play_next_ = false; + return play_next_result_.exchange(false); } size_t PlayerImpl::burst(const size_t num_messages) @@ -754,8 +790,10 @@ void PlayerImpl::seek(rcutils_time_point_value_t time_point) { std::lock_guard lk(reader_mutex_); // Purge current messages in queue. - while (message_queue_.pop()) {} - reader_->seek(time_point); + message_queue_.purge(); + for (const auto & [reader, _] : readers_with_options_) { + reader->seek(time_point); + } clock_->jump(time_point); // Restart queuing thread if it has finished running (previously reached end of bag), // otherwise, queueing should continue automatically after releasing mutex @@ -886,7 +924,7 @@ Player::callback_handle_t PlayerImpl::get_new_on_play_msg_callback_handle() void PlayerImpl::wait_for_filled_queue() const { while ( - message_queue_.size_approx() < play_options_.read_ahead_queue_size && + message_queue_.size() < play_options_.read_ahead_queue_size && !is_storage_completely_loaded() && rclcpp::ok() && !stop_playback_) { std::this_thread::sleep_for(queue_read_wait_period_); @@ -901,10 +939,18 @@ void PlayerImpl::load_storage_content() while (rclcpp::ok() && load_storage_content_ && !stop_playback_) { rcpputils::unique_lock lk(reader_mutex_); - if (!reader_->has_next()) {break;} + const bool no_messages = std::all_of( + readers_with_options_.cbegin(), + readers_with_options_.cend(), + [](const auto & reader_options) {return !reader_options.first->has_next();}); + if (no_messages) { + break; + } - if (message_queue_.size_approx() < queue_lower_boundary) { - enqueue_up_to_boundary(queue_upper_boundary); + // The message queue size may get smaller after this, but that's OK + const size_t message_queue_size = message_queue_.size(); + if (message_queue_size < queue_lower_boundary) { + enqueue_up_to_boundary(queue_upper_boundary, message_queue_size); } else { lk.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -912,58 +958,106 @@ void PlayerImpl::load_storage_content() } } -void PlayerImpl::enqueue_up_to_boundary(size_t boundary) +void PlayerImpl::enqueue_up_to_boundary(const size_t boundary, const size_t message_queue_size) { - rosbag2_storage::SerializedBagMessageSharedPtr message; - for (size_t i = message_queue_.size_approx(); i < boundary; i++) { - if (!reader_->has_next()) { - break; + // Read messages from input bags in a round robin way + size_t input_bag_index = 0u; + for (size_t i = message_queue_size; i < boundary; i++) { + const auto & reader = readers_with_options_[input_bag_index].first; + // We are supposed to have at least one bag with messages to read + if (reader->has_next()) { + message_queue_.push(reader->read_next()); } - message = reader_->read_next(); - message_queue_.enqueue(message); + input_bag_index = (input_bag_index + 1) % readers_with_options_.size(); } } void PlayerImpl::play_messages_from_queue() { - // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) - // to support play_next() API logic. - rosbag2_storage::SerializedBagMessageSharedPtr message_ptr = peek_next_message_from_queue(); - { // Notify play_next() that we are ready for playback - // Note: We should do notification that we are ready for playback after peeking pointer to - // the next message. message_queue_.peek() is not allowed to be called from more than one - // thread concurrently. + { // Notify play_next()/seek() that we are ready for playback std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = true; ready_to_play_from_queue_cv_.notify_all(); } - while (rclcpp::ok() && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) - { - // Do not move on until sleep_until returns true - // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message_ptr->recv_timestamp)) { - if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { - break; + rosbag2_storage::SerializedBagMessageSharedPtr message_ptr = take_next_message_from_queue(); + + // While we haven't stopped playing, try to play messages and wait for a potential request to play + // the next message + while (rclcpp::ok() && !stop_playback_) { + // While there's a message to play and we haven't reached the end timestamp yet + while (rclcpp::ok() && !stop_playback_ && + message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) + { + // Sleep until the message's replay time, do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause + // However, skip sleeping if we're trying to play the next message + while (rclcpp::ok() && !stop_playback_ && !play_next_.load() && + !clock_->sleep_until(message_ptr->recv_timestamp)) + { + // Stop sleeping if cancelled + if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { + break; + } } - } - std::lock_guard lk(skip_message_in_main_play_loop_mutex_); - if (rclcpp::ok()) { - if (skip_message_in_main_play_loop_) { - skip_message_in_main_play_loop_ = false; - cancel_wait_for_next_message_ = false; - message_ptr = peek_next_message_from_queue(); - continue; + + std::lock_guard lk_skip_message(skip_message_in_main_play_loop_mutex_); + if (rclcpp::ok()) { + // This means that the message we took from the queue's top was invalidated after a seek(), + // so we need to take a fresh element from the top of the queue, unless we have to stop + if (skip_message_in_main_play_loop_) { + skip_message_in_main_play_loop_ = false; + cancel_wait_for_next_message_ = false; + message_ptr = take_next_message_from_queue(); + continue; + } + + const bool message_published = publish_message(message_ptr); + // If we tried to publish because of play_next(), jump the clock + if (play_next_.load()) { + clock_->jump(message_ptr->recv_timestamp); + // If we successfully played next, notify that we're done, otherwise keep trying + if (message_published) { + play_next_ = false; + std::lock_guard lk(finished_play_next_mutex_); + finished_play_next_ = true; + play_next_result_ = true; + finished_play_next_cv_.notify_all(); + } + } } - publish_message(message_ptr); + message_ptr = take_next_message_from_queue(); + } + + // At this point, we're at the end of the playback round, there are no more messages to play + // If we're still trying to play next or just got a request, we did not succeed + if (play_next_.exchange(false)) { + std::lock_guard lk(finished_play_next_mutex_); + finished_play_next_ = true; + play_next_result_ = false; + finished_play_next_cv_.notify_all(); + } + + // While we're in a pause state, make sure we don't return if we happen to be at the end of the + // queue or playback round. However, if we get a request for play next during sleep_until(...), + // we need to stop waiting here and proceed to handle the play next request by doing another + // loop of this while(). + while (!stop_playback_ && is_paused() && !play_next_.load() && rclcpp::ok()) { + clock_->sleep_until(clock_->now()); + } + // If we ran out of messages and are not in pause state, it means we're done playing, + // unless play_next() is resuming and pausing the clock in order to wake us up + if (!is_paused() && !play_next_.load()) { + break; + } + + // If we had run out of messages before but are starting to play next again, e.g., after a + // seek(), we need to take + if (play_next_.load()) { + std::lock_guard lk(skip_message_in_main_play_loop_mutex_); + skip_message_in_main_play_loop_ = false; + cancel_wait_for_next_message_ = false; + message_ptr = take_next_message_from_queue(); } - message_queue_.pop(); - message_ptr = peek_next_message_from_queue(); - } - // while we're in pause state, make sure we don't return - // if we happen to be at the end of queue - while (!stop_playback_ && is_paused() && rclcpp::ok()) { - clock_->sleep_until(clock_->now()); } } @@ -1055,7 +1149,9 @@ void PlayerImpl::prepare_publishers() storage_filter.regex_to_exclude = play_options_.exclude_regex_to_filter; storage_filter.exclude_topics = play_options_.exclude_topics_to_filter; storage_filter.exclude_service_events = play_options_.exclude_services_to_filter; - reader_->set_filter(storage_filter); + for (const auto & [reader, _] : readers_with_options_) { + reader->set_filter(storage_filter); + } // Create /clock publisher if (play_options_.clock_publish_frequency > 0.f || play_options_.clock_publish_on_topic_publish) { @@ -1092,7 +1188,12 @@ void PlayerImpl::prepare_publishers() } // Create topic publishers - auto topics = reader_->get_all_topics_and_types(); + // We could have duplicate topic names here, but we correctly handle it when creating publishers + std::vector topics{}; + for (const auto & [reader, _] : readers_with_options_) { + auto bag_topics = reader->get_all_topics_and_types(); + topics.insert(topics.end(), bag_topics.begin(), bag_topics.end()); + } std::string topic_without_support_acked; for (const auto & topic : topics) { const bool is_service_event_topic = rosbag2_cpp::is_service_event_topic(topic.name, topic.type); @@ -1180,7 +1281,9 @@ void PlayerImpl::prepare_publishers() message.node_name = owner_->get_fully_qualified_name(); split_event_pub_->publish(message); }; - reader_->add_event_callbacks(callbacks); + for (const auto & [reader, _] : readers_with_options_) { + reader->add_event_callbacks(callbacks); + } } void PlayerImpl::run_play_msg_pre_callbacks( @@ -1519,7 +1622,17 @@ void PlayerImpl::publish_clock_update(const rclcpp::Time & time) const rosbag2_storage::StorageOptions & PlayerImpl::get_storage_options() { - return storage_options_; + return readers_with_options_[0].second; +} + +std::vector PlayerImpl::get_all_storage_options() +{ + std::vector storage_options{}; + storage_options.reserve(readers_with_options_.size()); + for (const auto & [_, options] : readers_with_options_) { + storage_options.push_back(options); + } + return storage_options; } const rosbag2_transport::PlayOptions & PlayerImpl::get_play_options() @@ -1546,8 +1659,10 @@ Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_o auto reader = ReaderWriterFactory::make_reader(storage_options); + std::vector readers_with_options{}; + readers_with_options.emplace_back(std::move(reader), storage_options); pimpl_ = std::make_unique( - this, std::move(reader), keyboard_handler, storage_options, play_options); + this, std::move(readers_with_options), keyboard_handler, play_options); pimpl_->play(); } @@ -1556,10 +1671,31 @@ Player::Player( const rosbag2_transport::PlayOptions & play_options, const std::string & node_name, const rclcpp::NodeOptions & node_options) -: Player(std::make_unique(), - storage_options, play_options, node_name, node_options) +: Player(std::vector{storage_options}, play_options, node_name, node_options) {} +Player::Player( + const std::vector & storage_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: rclcpp::Node( + node_name, + rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)) +{ + std::shared_ptr keyboard_handler; + if (!play_options.disable_keyboard_controls) { + keyboard_handler = std::make_shared(); + } + + std::vector readers_with_options{}; + for (const auto & options : storage_options) { + readers_with_options.emplace_back(ReaderWriterFactory::make_reader(options), options); + } + pimpl_ = std::make_unique( + this, std::move(readers_with_options), keyboard_handler, play_options); +} + Player::Player( std::unique_ptr reader, const rosbag2_storage::StorageOptions & storage_options, @@ -1578,12 +1714,40 @@ Player::Player( const rosbag2_transport::PlayOptions & play_options, const std::string & node_name, const rclcpp::NodeOptions & node_options) +: rclcpp::Node( + node_name, + rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)) +{ + std::vector readers_with_options{}; + readers_with_options.emplace_back(std::move(reader), storage_options); + pimpl_ = std::make_unique( + this, std::move(readers_with_options), keyboard_handler, play_options); +} + +Player::Player( + std::vector && readers_with_options, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) +: Player( + std::move(readers_with_options), + play_options.disable_keyboard_controls ? nullptr : std::make_shared(), + play_options, + node_name, + node_options) +{} + +Player::Player( + std::vector && readers_with_options, + std::shared_ptr keyboard_handler, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name, + const rclcpp::NodeOptions & node_options) : rclcpp::Node( node_name, rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)), pimpl_(std::make_unique( - this, std::move(reader), std::move(keyboard_handler), - storage_options, play_options)) + this, std::move(readers_with_options), std::move(keyboard_handler), play_options)) {} Player::~Player() = default; @@ -1708,6 +1872,11 @@ const rosbag2_storage::StorageOptions & Player::get_storage_options() return pimpl_->get_storage_options(); } +std::vector Player::get_all_storage_options() +{ + return pimpl_->get_all_storage_options(); +} + const rosbag2_transport::PlayOptions & Player::get_play_options() { return pimpl_->get_play_options(); diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index 9220679b0b..4df421a3e7 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -35,6 +35,13 @@ class MockPlayer : public rosbag2_transport::Player : Player(std::move(reader), storage_options, play_options, node_name) {} + MockPlayer( + std::vector && input_bags, + const rosbag2_transport::PlayOptions & play_options, + const std::string & node_name = "rosbag2_mock_player") + : Player(std::move(input_bags), play_options, node_name) + {} + explicit MockPlayer( const std::string & node_name = "rosbag2_mock_composable_player", const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) @@ -79,7 +86,7 @@ class MockPlayer : public rosbag2_transport::Player } using rosbag2_transport::Player::wait_for_playback_to_start; - using rosbag2_transport::Player::get_storage_options; + using rosbag2_transport::Player::get_all_storage_options; using rosbag2_transport::Player::get_play_options; }; diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index 3875a0b1b0..eadd6fbd0a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -136,7 +136,7 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { auto player = std::make_shared("player_params_node", opts); auto play_options = player->get_play_options(); - auto storage_options = player->get_storage_options(); + auto storage_options = player->get_all_storage_options(); EXPECT_EQ(play_options.read_ahead_queue_size, 3); EXPECT_EQ(play_options.node_prefix, "test"); @@ -178,19 +178,20 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { play_options.service_requests_source, rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION); - EXPECT_EQ(storage_options.uri, uri_str); - EXPECT_EQ(storage_options.storage_id, GetParam()); - EXPECT_EQ(storage_options.storage_config_uri, ""); - EXPECT_EQ(storage_options.max_bagfile_size, 12345); - EXPECT_EQ(storage_options.max_bagfile_duration, 54321); - EXPECT_EQ(storage_options.max_cache_size, 9898); - EXPECT_EQ(storage_options.storage_preset_profile, "resilient"); - EXPECT_EQ(storage_options.snapshot_mode, false); + ASSERT_EQ(1, storage_options.size()); + EXPECT_EQ(storage_options[0].uri, uri_str); + EXPECT_EQ(storage_options[0].storage_id, GetParam()); + EXPECT_EQ(storage_options[0].storage_config_uri, ""); + EXPECT_EQ(storage_options[0].max_bagfile_size, 12345); + EXPECT_EQ(storage_options[0].max_bagfile_duration, 54321); + EXPECT_EQ(storage_options[0].max_cache_size, 9898); + EXPECT_EQ(storage_options[0].storage_preset_profile, "resilient"); + EXPECT_EQ(storage_options[0].snapshot_mode, false); std::unordered_map custom_data{ std::pair{"key1", "value1"}, std::pair{"key2", "value2"} }; - EXPECT_EQ(storage_options.custom_data, custom_data); + EXPECT_EQ(storage_options[0].custom_data, custom_data); } TEST_P(ComposablePlayerIntegrationTests, player_can_automatically_play_file_after_composition) { diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 8b7b40f509..64952fe39f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -174,6 +174,66 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_three_bags) +{ + auto msg = get_messages_basic_types()[0]; + msg->int32_value = 42; + + auto topic_types = std::vector{ + {1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""}, + {2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""}, + }; + + // Make sure each reader's/bag's messages are ordered by time + // However, do interlace messages across bags + std::vector>> messages_list{}; + messages_list.emplace_back(std::vector>{ + serialize_test_message("topic1", 1, msg), + serialize_test_message("topic2", 5, msg), + serialize_test_message("topic1", 8, msg), + serialize_test_message("topic2", 10, msg), + serialize_test_message("topic1", 13, msg), + serialize_test_message("topic2", 14, msg)}); + messages_list.emplace_back(std::vector>{ + serialize_test_message("topic1", 2, msg), + serialize_test_message("topic2", 3, msg), + serialize_test_message("topic1", 6, msg), + serialize_test_message("topic2", 10, msg), + serialize_test_message("topic1", 12, msg), + serialize_test_message("topic2", 16, msg)}); + messages_list.emplace_back(std::vector>{ + serialize_test_message("topic1", 1, msg), + serialize_test_message("topic2", 4, msg), + serialize_test_message("topic1", 7, msg), + serialize_test_message("topic2", 9, msg), + serialize_test_message("topic1", 11, msg), + serialize_test_message("topic2", 15, msg)}); + std::vector bags{}; + std::size_t total_messages = 0u; + for (std::size_t i = 0u; i < messages_list.size(); i++) { + auto prepared_mock_reader = std::make_unique(); + total_messages += messages_list[i].size(); + prepared_mock_reader->prepare(messages_list[i], topic_types); + bags.emplace_back( + std::make_unique(std::move(prepared_mock_reader)), storage_options_); + } + ASSERT_GT(total_messages, 0u); + + auto player = std::make_shared(std::move(bags), play_options_); + std::size_t num_played_messages = 0u; + rcutils_time_point_value_t last_timetamp = 0; + const auto callback = [&](std::shared_ptr msg) { + // Make sure messages are played in order + EXPECT_LE(last_timetamp, msg->recv_timestamp); + last_timetamp = msg->recv_timestamp; + num_played_messages++; + }; + player->add_on_play_message_pre_callback(callback); + player->play(); + player->wait_for_playback_to_finish(); + EXPECT_EQ(total_messages, num_played_messages); +} + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) { const std::string service_name1 = "/test_service1"; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp index 81af30549f..c9b6e2a2f1 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp @@ -49,6 +49,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_with_false_preconditions) { ASSERT_FALSE(player->play_next()); player->pause(); ASSERT_TRUE(player->is_paused()); + ASSERT_FALSE(player->play_next()); } TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { @@ -99,6 +100,9 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { ASSERT_TRUE(player->is_paused()); player->resume(); player->wait_for_playback_to_finish(); + ASSERT_FALSE(player->play_next()); + player->resume(); + ASSERT_FALSE(player->play_next()); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp index 65359904e1..347a3e67cd 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp @@ -103,7 +103,7 @@ class RosBag2PlaySeekTestFixture }; TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time) { - const size_t expected_number_of_messages = num_msgs_in_bag_ + 2; + const size_t expected_number_of_messages = num_msgs_in_bag_ + 4; auto player = std::make_shared(std::move(reader_), storage_options_, play_options_); sub_ = std::make_shared(); @@ -212,7 +212,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward) { await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); - EXPECT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); + ASSERT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); EXPECT_EQ(replayed_topic1[0]->int32_value, 1); EXPECT_EQ(replayed_topic1[1]->int32_value, 3); @@ -254,7 +254,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) { await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); - EXPECT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); + ASSERT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); for (size_t i = 0; i < num_msgs_in_bag_; i++) { EXPECT_EQ(replayed_topic1[i]->int32_value, static_cast(i + 1)) << "i=" << i; @@ -300,7 +300,7 @@ TEST_P(RosBag2PlaySeekTestFixture, seek_forward_from_the_end_of_the_bag) { await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages("/topic1"); - EXPECT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); + ASSERT_THAT(replayed_topic1, SizeIs(expected_number_of_messages)); for (size_t i = 0; i < replayed_topic1.size(); i++) { EXPECT_EQ(replayed_topic1[i]->int32_value, static_cast(i + 1)) << "i=" << i; diff --git a/shared_queues_vendor/CHANGELOG.rst b/shared_queues_vendor/CHANGELOG.rst deleted file mode 100644 index e0d6eb0699..0000000000 --- a/shared_queues_vendor/CHANGELOG.rst +++ /dev/null @@ -1,178 +0,0 @@ -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Changelog for package shared_queues_vendor -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -0.30.0 (2024-11-26) -------------------- - -0.29.0 (2024-09-03) -------------------- - -0.28.0 (2024-06-17) -------------------- - -0.27.0 (2024-04-30) -------------------- - -0.26.1 (2024-04-17) -------------------- - -0.26.0 (2024-04-16) -------------------- - -0.25.0 (2024-03-27) -------------------- -* Remove unused concurrentqueue implementation. (`#1465 `_) - rosbag2 only depends on the readerwriter queue. -* Contributors: Chris Lalancette - -0.24.0 (2023-07-11) -------------------- - -0.23.0 (2023-04-28) -------------------- - -0.22.0 (2023-04-18) -------------------- - -0.21.0 (2023-04-12) -------------------- - -0.20.0 (2023-02-14) -------------------- - -0.19.0 (2023-01-13) -------------------- -* Add Michael Orlov as maintainer in rosbag2 packages (`#1215 `_) -* Contributors: Michael Orlov - -0.18.0 (2022-11-15) -------------------- -* Fixes policy CMP0135 warning for CMake >= 3.24 (`#1084 `_) -* Contributors: Cristóbal Arroyo - -0.17.0 (2022-07-30) -------------------- - -0.16.0 (2022-05-11) -------------------- - -0.15.1 (2022-04-06) -------------------- - -0.15.0 (2022-04-05) -------------------- - -0.14.1 (2022-03-29) -------------------- -* Bump version number to avoid conflict -* Contributors: Chris Lalancette - -0.14.0 (2022-03-29) -------------------- - -0.13.0 (2022-01-13) -------------------- - -0.12.0 (2021-12-17) -------------------- - -0.11.0 (2021-11-08) -------------------- -* Update package maintainers (`#899 `_) -* Contributors: Michel Hidalgo - -0.10.1 (2021-10-22) -------------------- - -0.10.0 (2021-10-19) -------------------- - -0.9.0 (2021-05-17) ------------------- - -0.8.0 (2021-04-19) ------------------- -* Explicitly add emersonknapp as maintainer (`#692 `_) -* Contributors: Emerson Knapp - -0.7.0 (2021-03-18) ------------------- - -0.6.0 (2021-02-01) ------------------- - -0.5.0 (2020-12-02) ------------------- - -0.4.0 (2020-11-19) ------------------- -* Update the package.xml files with the latest Open Robotics maintainers (`#535 `_) -* Contributors: Michael Jeronimo - -0.3.2 (2020-06-03) ------------------- - -0.3.1 (2020-06-01) ------------------- - -0.3.0 (2020-05-26) ------------------- -* Export targets (`#403 `_) -* Contributors: Karsten Knese - -0.2.8 (2020-05-18) ------------------- - -0.2.7 (2020-05-12) ------------------- - -0.2.6 (2020-05-07) ------------------- - -0.2.5 (2020-04-30) ------------------- -* make ros tooling working group maintainer (`#211 `_) -* Contributors: Karsten Knese - -0.2.4 (2019-11-18) ------------------- - -0.2.3 (2019-11-18) ------------------- - -0.2.2 (2019-11-13) ------------------- - -0.2.1 (2019-10-23) ------------------- - -0.2.0 (2019-09-26) ------------------- - -0.1.2 (2019-05-20) ------------------- - -0.1.1 (2019-05-09) ------------------- - -0.1.0 (2019-05-08) ------------------- - -0.0.5 (2018-12-27) ------------------- - -0.0.4 (2018-12-19) ------------------- -* 0.0.3 -* Contributors: Karsten Knese - -0.0.2 (2018-12-12) ------------------- -* update maintainer email -* Contributors: Karsten Knese - -0.0.1 (2018-12-11) ------------------- -* Add correct timing behaviour for rosbag play (`#32 `_) -* Contributors: Martin Idel diff --git a/shared_queues_vendor/CMakeLists.txt b/shared_queues_vendor/CMakeLists.txt deleted file mode 100644 index 0c95c8bde3..0000000000 --- a/shared_queues_vendor/CMakeLists.txt +++ /dev/null @@ -1,44 +0,0 @@ -cmake_minimum_required(VERSION 3.5) -project(shared_queues_vendor) - -find_package(ament_cmake REQUIRED) - -# Avoid DOWNLOAD_EXTRACT_TIMESTAMP warning for CMake >= 3.24 -if (POLICY CMP0135) - cmake_policy(SET CMP0135 NEW) -endif() - -include(ExternalProject) -# Single producer single consumer queue by moodycamel - header only, don't build, install -ExternalProject_Add(ext-singleproducerconsumer - PREFIX singleproducerconsumer - DOWNLOAD_DIR ${CMAKE_CURRENT_BINARY_DIR}/download - URL https://github.com/cameron314/readerwriterqueue/archive/ef7dfbf553288064347d51b8ac335f1ca489032a.zip - URL_MD5 64c673dd381b8fae9254053ad7b2be4d - TIMEOUT 60 - INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR} - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - ) - -add_library(singleproducerconsumer INTERFACE) -target_include_directories(singleproducerconsumer INTERFACE $) - -install( - TARGETS singleproducerconsumer - EXPORT export_${PROJECT_NAME} - INCLUDES DESTINATION include -) - -# Install headers -install( - FILES - "${CMAKE_CURRENT_BINARY_DIR}/singleproducerconsumer/src/ext-singleproducerconsumer/atomicops.h" - "${CMAKE_CURRENT_BINARY_DIR}/singleproducerconsumer/src/ext-singleproducerconsumer/readerwriterqueue.h" - DESTINATION ${CMAKE_INSTALL_PREFIX}/include/moodycamel -) - -ament_export_include_directories(include) -ament_export_targets(export_${PROJECT_NAME}) -ament_package() diff --git a/shared_queues_vendor/package.xml b/shared_queues_vendor/package.xml deleted file mode 100644 index fa60963514..0000000000 --- a/shared_queues_vendor/package.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - shared_queues_vendor - 0.30.0 - Vendor package for concurrent queues from moodycamel - Michael Orlov - Geoffrey Biggs - Michel Hidalgo - Emerson Knapp - ROS Tooling Working Group - Apache License 2.0 - - ament_cmake - - - ament_cmake - -