Skip to content

Commit

Permalink
Support replaying multiple bags
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard committed Nov 1, 2024
1 parent ee7f68a commit 8315454
Show file tree
Hide file tree
Showing 15 changed files with 823 additions and 95 deletions.
87 changes: 84 additions & 3 deletions ros2bag/ros2bag/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from rclpy.duration import Duration
Expand Down Expand Up @@ -157,14 +158,94 @@ def check_not_negative_int(arg: str) -> int:
raise ArgumentTypeError('{} is not the valid type (int)'.format(value))


def add_standard_reader_args(parser: ArgumentParser) -> None:
def add_standard_reader_args(parser: ArgumentParser, multi: bool = False) -> None:
"""
Add arguments for one of multiple input bags.
:param parser: the parser
:param multi: whether this is part of a verb that takes in multiple input bags
"""
# If multi, let user provide an input bag path using an optional positional arg, but require
# them to use --input to provide an input bag with a specific storage ID

reader_choices = rosbag2_py.get_registered_readers()
bag_help_multi_str = ''
storage_help_multi_str = ''
if multi:
bag_help_multi_str = ' Use --input to provide an input bag with a specific storage ID.'
storage_help_multi_str = \
' (deprecated: use --input to provide an input bag with a specific storage ID)'
parser.add_argument(
'bag_path', type=check_path_exists, help='Bag to open')
'bag_path',
nargs='?' if multi else None,
type=check_path_exists,
help='Bag to open.' + bag_help_multi_str)
parser.add_argument(
'-s', '--storage', default='', choices=reader_choices,
help='Storage implementation of bag. '
'By default attempts to detect automatically - use this argument to override.')
'By default attempts to detect automatically - use this argument to override.'
+ storage_help_multi_str)

if multi:
add_multi_bag_input_arg(parser, required=False)


def add_multi_bag_input_arg(parser: ArgumentParser, required: bool = False) -> None:
"""
Add option for list of input bags.
:param parser: the parser
:param required: whether this option should be required
"""
reader_choices = ', '.join(rosbag2_py.get_registered_readers())
parser.add_argument(
'-i', '--input',
required=required,
action='append', nargs='+',
metavar=('uri', 'storage_id'),
help='URI (and optional storage ID) of an input bag. '
'May be provided more than once for multiple input bags. '
f'Storage ID options are: {reader_choices}.')


def input_bag_arg_to_storage_options(
input_arg: List[List[str]],
storage_config_file: Optional[str] = None,
) -> List[rosbag2_py.StorageOptions]:
"""
Convert input bag argument value(s) to list of StorageOptions.
Raises ValueError if validation fails, including:
1. Bag path existence
2. Storage ID
3. Storage config file existence
:param input_arg: the values of the input argument
:param storage_config_file: the storage config file, if any
"""
if storage_config_file and not os.path.exists(storage_config_file):
raise ValueError(f"File '{storage_config_file}' does not exist!")
storage_id_options = rosbag2_py.get_registered_readers()
storage_options = []
for input_bag_info in input_arg:
if len(input_bag_info) > 2:
raise ValueError(
f'--input expects 1 or 2 arguments, {len(input_bag_info)} provided')
bag_path = input_bag_info[0]
if not os.path.exists(bag_path):
raise ValueError(f"Bag path '{bag_path}' does not exist!")
storage_id = input_bag_info[1] if len(input_bag_info) > 1 else ''
if storage_id and storage_id not in storage_id_options:
raise ValueError(
f"Unknown storage ID '{storage_id}', options are: {', '.join(storage_id_options)}")
options = rosbag2_py.StorageOptions(
uri=bag_path,
storage_id=storage_id,
)
if storage_config_file:
options.storage_config_uri = storage_config_file
storage_options.append(options)
return storage_options


def _parse_cli_storage_plugin():
Expand Down
22 changes: 4 additions & 18 deletions ros2bag/ros2bag/verb/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse

from ros2bag.api import add_multi_bag_input_arg
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.verb import VerbExtension
from rosbag2_py import bag_rewrite
from rosbag2_py import StorageOptions


class ConvertVerb(VerbExtension):
"""Given an input bag, write out a new bag with different settings."""

def add_arguments(self, parser, cli_name):
parser.add_argument(
'-i', '--input',
required=True,
action='append', nargs='+',
metavar=('uri', 'storage_id'),
help='URI (and optional storage ID) of an input bag. May be provided more than once')
add_multi_bag_input_arg(parser, required=True)
parser.add_argument(
'-o', '--output-options',
type=str, required=True,
Expand All @@ -37,14 +31,6 @@ def add_arguments(self, parser, cli_name):
'objects. See README.md for some examples.')

def main(self, *, args):
input_options = []
for input_bag in args.input:
if len(input_bag) > 2:
raise argparse.ArgumentTypeError(
f'--input expects 1 or 2 arguments, {len(input_bag)} provided')
storage_options = StorageOptions(uri=input_bag[0])
if len(input_bag) > 1:
storage_options.storage_id = input_bag[1]
input_options.append(storage_options)
input_options = input_bag_arg_to_storage_options(args.input)

bag_rewrite(input_options, args.output_options)
39 changes: 33 additions & 6 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.api import print_error
from ros2bag.api import print_warn
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
Expand All @@ -42,7 +44,7 @@ class PlayVerb(VerbExtension):
"""Play back ROS data from a bag."""

def add_arguments(self, parser, cli_name): # noqa: D102
add_standard_reader_args(parser)
add_standard_reader_args(parser, multi=True)
parser.add_argument(
'--read-ahead-queue-size', type=int, default=1000,
help='size of message queue rosbag tries to hold in memory to help deterministic '
Expand Down Expand Up @@ -196,11 +198,36 @@ def main(self, *, args): # noqa: D102
topic_remapping.append('--remap')
topic_remapping.append(remap_rule)

storage_options = StorageOptions(
uri=args.bag_path,
storage_id=args.storage,
storage_config_uri=storage_config_file,
)
# Add bag from optional positional arg, then bag(s) from optional flag
storage_options = []
if args.bag_path:
storage_options.append(StorageOptions(
uri=args.bag_path,
storage_id=args.storage,
storage_config_uri=storage_config_file,
))
if args.storage:
print(print_warn('--storage option is deprecated, use --input to '
'provide an input bag with a specific storage ID'))
try:
storage_options.extend(
input_bag_arg_to_storage_options(args.input or [], storage_config_file))
except ValueError as e:
return print_error(str(e))
if not storage_options:
return print_error('no input bags were provided')

# Users can currently only provide one storage config file, which is storage
# implementation-specific. Since we can replay bags from different storage
# implementations, this may lead to errors. For now, just warn if input bags have
# different explicit storage IDs and a storage config file is provided.
storage_ids = {options.storage_id for options in storage_options if options.storage_id}
if storage_config_file and len(storage_ids) > 1:
print(
print_warn('a global --storage-config-file was provided, but --input bags are '
'using different explicit storage IDs, which may lead to errors with '
f'replay: {storage_ids}'))

play_options = PlayOptions()
play_options.read_ahead_queue_size = args.read_ahead_queue_size
play_options.node_prefix = NODE_NAME_PREFIX
Expand Down
27 changes: 27 additions & 0 deletions ros2bag/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import unittest

from rclpy.qos import DurabilityPolicy
from rclpy.qos import HistoryPolicy
from rclpy.qos import ReliabilityPolicy
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import dict_to_duration
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.api import interpret_dict_as_qos_profile


RESOURCES_PATH = Path(__file__).parent / 'resources'


class TestRos2BagRecord(unittest.TestCase):

def test_dict_to_duration_valid(self):
Expand Down Expand Up @@ -84,3 +89,25 @@ def test_interpret_dict_as_qos_profile_negative(self):
qos_dict = {'history': 'keep_all', 'liveliness_lease_duration': {'sec': -1, 'nsec': -1}}
with self.assertRaises(ValueError):
interpret_dict_as_qos_profile(qos_dict)

def test_input_bag_arg_to_storage_options(self):
bag_path = (RESOURCES_PATH / 'empty_bag').as_posix()
# Just use a file that exists; the content does not matter
storage_config_file = (RESOURCES_PATH / 'qos_profile.yaml').as_posix()

with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([['path1', 'id1'], ['path2', 'id2', 'extra']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([['path-does-not-exist']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([[bag_path, 'id-does-not-exist']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([[bag_path, 'mcap']], 'config-file-doesnt-exist')

self.assertEqual([], input_bag_arg_to_storage_options([], None))
storage_options = input_bag_arg_to_storage_options(
[[bag_path, 'mcap']], storage_config_file)
self.assertEqual(1, len(storage_options))
self.assertEqual(bag_path, storage_options[0].uri)
self.assertEqual('mcap', storage_options[0].storage_id)
self.assertEqual(storage_config_file, storage_options[0].storage_config_uri)
3 changes: 3 additions & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ class Player:
def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int) -> None: ...
@staticmethod
def cancel() -> None: ...
@overload
def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions) -> None: ...
@overload
def play(self, storage_options: List[rosbag2_py._storage.StorageOptions], play_options: PlayOptions) -> None: ...

class RecordOptions:
all_services: bool
Expand Down
31 changes: 26 additions & 5 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ class Player
void play(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options)
{
play_impl(std::vector{storage_options}, play_options, false);
}

void play(
const std::vector<rosbag2_storage::StorageOptions> & storage_options,
PlayOptions & play_options)
{
play_impl(storage_options, play_options, false);
}
Expand All @@ -192,7 +199,7 @@ class Player
PlayOptions & play_options,
size_t num_messages)
{
play_impl(storage_options, play_options, true, num_messages);
play_impl(std::vector{storage_options}, play_options, true, num_messages);
}

protected:
Expand Down Expand Up @@ -240,14 +247,17 @@ class Player
}

void play_impl(
const rosbag2_storage::StorageOptions & storage_options,
const std::vector<rosbag2_storage::StorageOptions> & storage_options,
PlayOptions & play_options,
bool burst = false,
size_t burst_num_messages = 0)
{
install_signal_handlers();
try {
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
std::vector<rosbag2_transport::Player::ReaderStorageOptionsPair> bags{};
for (const auto & options : storage_options) {
bags.emplace_back(rosbag2_transport::ReaderWriterFactory::make_reader(options), options);
}
std::shared_ptr<KeyboardHandler> keyboard_handler;
if (!play_options.disable_keyboard_controls) {
#ifndef _WIN32
Expand All @@ -260,7 +270,7 @@ class Player
#endif
}
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), std::move(keyboard_handler), storage_options, play_options);
std::move(bags), std::move(keyboard_handler), play_options);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
Expand Down Expand Up @@ -591,7 +601,18 @@ PYBIND11_MODULE(_transport, m) {
py::class_<rosbag2_py::Player>(m, "Player")
.def(py::init<>())
.def(py::init<const std::string &>())
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
.def(
"play",
py::overload_cast<const rosbag2_storage::StorageOptions &, PlayOptions &>(
&rosbag2_py::Player::play),
py::arg("storage_options"),
py::arg("play_options"))
.def(
"play",
py::overload_cast<const std::vector<rosbag2_storage::StorageOptions> &, PlayOptions &>(
&rosbag2_py::Player::play),
py::arg("storage_options"),
py::arg("play_options"))
.def(
"burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"),
py::arg("num_messages"))
Expand Down
12 changes: 12 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ find_package(keyboard_handler REQUIRED)
find_package(rcl REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rmw REQUIRED)
find_package(rosbag2_compression REQUIRED)
Expand Down Expand Up @@ -65,6 +66,7 @@ target_link_libraries(${PROJECT_NAME} PUBLIC
target_link_libraries(${PROJECT_NAME} PRIVATE
rcl::rcl
rclcpp_components::component
rcpputils::rcpputils
rcutils::rcutils
rmw::rmw
rosbag2_compression::rosbag2_compression
Expand Down Expand Up @@ -113,6 +115,7 @@ if(BUILD_TESTING)
add_definitions(-D_SRC_RESOURCES_DIR_PATH="${CMAKE_CURRENT_SOURCE_DIR}/test/resources")

find_package(ament_cmake_gmock REQUIRED)
find_package(ament_cmake_gtest REQUIRED)
find_package(ament_index_cpp REQUIRED)
find_package(ament_lint_auto REQUIRED)
find_package(composition_interfaces REQUIRED)
Expand Down Expand Up @@ -534,6 +537,15 @@ if(BUILD_TESTING)
rcpputils::rcpputils
rosbag2_test_common::rosbag2_test_common
)

ament_add_gtest(test_meta_priority_queue
test/rosbag2_transport/test_meta_priority_queue.cpp)
target_include_directories(test_meta_priority_queue PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>)
target_link_libraries(test_meta_priority_queue
rcpputils::rcpputils
shared_queues_vendor::singleproducerconsumer
)
endif()

ament_package()
Loading

0 comments on commit 8315454

Please sign in to comment.