Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for replaying multiple bags #1848

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ jobs:
rosbag2_test_common
rosbag2_tests
rosbag2_transport
shared_queues_vendor

ament_lint_cpp: # Linters applicable to C++ packages
name: ament_${{ matrix.linter }}
Expand Down Expand Up @@ -62,7 +61,6 @@ jobs:
rosbag2_test_common
rosbag2_tests
rosbag2_transport
shared_queues_vendor

ament_lint_clang_format: # Linters applicable to C++ packages formatted with clang-format
name: ament_${{ matrix.linter }}
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ jobs:
"rosbag2_test_msgdefs",
"rosbag2_tests",
"rosbag2_transport",
"shared_queues_vendor",
"sqlite3_vendor",
"zstd_vendor"
]
Expand All @@ -74,7 +73,6 @@ jobs:
"rosbag2_test_msgdefs",
"rosbag2_tests",
"rosbag2_transport",
"shared_queues_vendor",
"sqlite3_vendor",
"zstd_vendor"
]
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ $ ros2 bag play <bag>
The bag argument can be a directory containing `metadata.yaml` and one or more storage files, or to a single storage file such as `.mcap` or `.db3`.
The Player will automatically detect which storage implementation to use for playing.

To play back multiple bags:

```
$ ros2 bag play <bag1> -i <bag2> -i <bag3>
```

Messages from all provided bags will be played in order, based on their original recording reception timestamps.

#### Controlling playback via services

The Rosbag2 player provides the following services for remote control, which can be called via `ros2 service` commandline or from your nodes,
Expand Down
92 changes: 91 additions & 1 deletion ros2bag/ros2bag/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from rclpy.duration import Duration
Expand Down Expand Up @@ -158,15 +159,104 @@ def check_not_negative_int(arg: str) -> int:


def add_standard_reader_args(parser: ArgumentParser) -> None:
"""
Add arguments for one input bag.

:param parser: the parser
"""
reader_choices = rosbag2_py.get_registered_readers()
parser.add_argument(
'bag_path', type=check_path_exists, help='Bag to open')
'bag_path',
nargs=None,
type=check_path_exists,
help='Bag to open.')
parser.add_argument(
'-s', '--storage', default='', choices=reader_choices,
help='Storage implementation of bag. '
'By default attempts to detect automatically - use this argument to override.')


def add_standard_multi_reader_args(parser: ArgumentParser) -> None:
"""
Add arguments for multiple input bags.

:param parser: the parser
"""
# Let user provide an input bag path using an optional positional arg, but require them to use
# --input to provide an input bag with a specific storage ID
reader_choices = rosbag2_py.get_registered_readers()
parser.add_argument(
'bag_path',
nargs='?',
type=check_path_exists,
help='Bag to open. '
'Use --input instead to provide an input bag with a specific storage ID.')
parser.add_argument(
'-s', '--storage', default='', choices=reader_choices,
help='Storage implementation of bag. '
'By default attempts to detect automatically - use this argument to override.'
' (deprecated: use --input to provide an input bag with a specific storage ID)')
add_multi_bag_input_arg(parser, required=False)


def add_multi_bag_input_arg(parser: ArgumentParser, required: bool = False) -> None:
"""
Add option for list of input bags.

:param parser: the parser
:param required: whether this option should be required
"""
reader_choices = ', '.join(rosbag2_py.get_registered_readers())
parser.add_argument(
'-i', '--input',
required=required,
action='append', nargs='+',
metavar=('uri', 'storage_id'),
help='URI (and optional storage ID) of an input bag. '
'May be provided more than once for multiple input bags. '
f'Storage ID options are: {reader_choices}.')


def input_bag_arg_to_storage_options(
input_arg: List[List[str]],
storage_config_file: Optional[str] = None,
) -> List[rosbag2_py.StorageOptions]:
"""
Convert input bag argument value(s) to list of StorageOptions.

Raises ValueError if validation fails, including:
1. Bag path existence
2. Storage ID
3. Storage config file existence

:param input_arg: the values of the input argument
:param storage_config_file: the storage config file, if any
"""
if storage_config_file and not os.path.exists(storage_config_file):
raise ValueError(f"File '{storage_config_file}' does not exist!")
storage_id_options = rosbag2_py.get_registered_readers()
storage_options = []
for input_bag_info in input_arg:
if len(input_bag_info) > 2:
raise ValueError(
f'--input expects 1 or 2 arguments, {len(input_bag_info)} provided')
bag_path = input_bag_info[0]
if not os.path.exists(bag_path):
raise ValueError(f"Bag path '{bag_path}' does not exist!")
storage_id = input_bag_info[1] if len(input_bag_info) > 1 else ''
if storage_id and storage_id not in storage_id_options:
raise ValueError(
f"Unknown storage ID '{storage_id}', options are: {', '.join(storage_id_options)}")
options = rosbag2_py.StorageOptions(
uri=bag_path,
storage_id=storage_id,
)
if storage_config_file:
options.storage_config_uri = storage_config_file
storage_options.append(options)
return storage_options


def _parse_cli_storage_plugin():
plugin_choices = set(rosbag2_py.get_registered_writers())
default_storage = rosbag2_py.get_default_storage_id()
Expand Down
22 changes: 4 additions & 18 deletions ros2bag/ros2bag/verb/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse

from ros2bag.api import add_multi_bag_input_arg
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.verb import VerbExtension
from rosbag2_py import bag_rewrite
from rosbag2_py import StorageOptions


class ConvertVerb(VerbExtension):
"""Given an input bag, write out a new bag with different settings."""

def add_arguments(self, parser, cli_name):
parser.add_argument(
'-i', '--input',
required=True,
action='append', nargs='+',
metavar=('uri', 'storage_id'),
help='URI (and optional storage ID) of an input bag. May be provided more than once')
add_multi_bag_input_arg(parser, required=True)
parser.add_argument(
'-o', '--output-options',
type=str, required=True,
Expand All @@ -37,14 +31,6 @@ def add_arguments(self, parser, cli_name):
'objects. See README.md for some examples.')

def main(self, *, args):
input_options = []
for input_bag in args.input:
if len(input_bag) > 2:
raise argparse.ArgumentTypeError(
f'--input expects 1 or 2 arguments, {len(input_bag)} provided')
storage_options = StorageOptions(uri=input_bag[0])
if len(input_bag) > 1:
storage_options.storage_id = input_bag[1]
input_options.append(storage_options)
input_options = input_bag_arg_to_storage_options(args.input)

bag_rewrite(input_options, args.output_options)
46 changes: 39 additions & 7 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
from argparse import FileType

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import add_standard_reader_args
from ros2bag.api import add_standard_multi_reader_args
from ros2bag.api import check_not_negative_float
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.api import print_error
from ros2bag.api import print_warn
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
Expand All @@ -42,7 +44,7 @@ class PlayVerb(VerbExtension):
"""Play back ROS data from a bag."""

def add_arguments(self, parser, cli_name): # noqa: D102
add_standard_reader_args(parser)
add_standard_multi_reader_args(parser)
parser.add_argument(
'--read-ahead-queue-size', type=int, default=1000,
help='size of message queue rosbag tries to hold in memory to help deterministic '
Expand Down Expand Up @@ -196,11 +198,41 @@ def main(self, *, args): # noqa: D102
topic_remapping.append('--remap')
topic_remapping.append(remap_rule)

storage_options = StorageOptions(
uri=args.bag_path,
storage_id=args.storage,
storage_config_uri=storage_config_file,
)
# Do not allow using both positional arg and --input option for input bags
if args.bag_path and args.input:
return print_error(
'do not mix the [bag_path] positional argument and the -i,--input option; '
'for multiple input bags, use -i,--input multiple times')
# Add bag from optional positional arg, then bag(s) from optional flag
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
storage_options = []
if args.bag_path:
storage_options.append(StorageOptions(
uri=args.bag_path,
storage_id=args.storage,
storage_config_uri=storage_config_file,
))
if args.storage:
print(print_warn('--storage option is deprecated, use -i,--input to '
'provide an input bag with a specific storage ID'))
try:
storage_options.extend(
input_bag_arg_to_storage_options(args.input or [], storage_config_file))
except ValueError as e:
return print_error(str(e))
if not storage_options:
return print_error('no input bags were provided')

# Users can currently only provide one storage config file, which is storage
# implementation-specific. Since we can replay bags from different storage
# implementations, this may lead to errors. For now, just warn if input bags have
# different explicit storage IDs and a storage config file is provided.
storage_ids = {options.storage_id for options in storage_options if options.storage_id}
if storage_config_file and len(storage_ids) > 1:
print(
print_warn('a global --storage-config-file was provided, but -i,--input bags are '
'using different explicit storage IDs, which may lead to errors with '
f'replay: {storage_ids}'))

play_options = PlayOptions()
play_options.read_ahead_queue_size = args.read_ahead_queue_size
play_options.node_prefix = NODE_NAME_PREFIX
Expand Down
27 changes: 27 additions & 0 deletions ros2bag/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import unittest

from rclpy.qos import DurabilityPolicy
from rclpy.qos import HistoryPolicy
from rclpy.qos import ReliabilityPolicy
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import dict_to_duration
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.api import interpret_dict_as_qos_profile


RESOURCES_PATH = Path(__file__).parent / 'resources'


class TestRos2BagRecord(unittest.TestCase):

def test_dict_to_duration_valid(self):
Expand Down Expand Up @@ -84,3 +89,25 @@ def test_interpret_dict_as_qos_profile_negative(self):
qos_dict = {'history': 'keep_all', 'liveliness_lease_duration': {'sec': -1, 'nsec': -1}}
with self.assertRaises(ValueError):
interpret_dict_as_qos_profile(qos_dict)

def test_input_bag_arg_to_storage_options(self):
bag_path = (RESOURCES_PATH / 'empty_bag').as_posix()
# Just use a file that exists; the content does not matter
storage_config_file = (RESOURCES_PATH / 'qos_profile.yaml').as_posix()

with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([['path1', 'id1'], ['path2', 'id2', 'extra']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([['path-does-not-exist']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([[bag_path, 'id-does-not-exist']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([[bag_path, 'mcap']], 'config-file-doesnt-exist')

self.assertEqual([], input_bag_arg_to_storage_options([], None))
storage_options = input_bag_arg_to_storage_options(
[[bag_path, 'mcap']], storage_config_file)
self.assertEqual(1, len(storage_options))
self.assertEqual(bag_path, storage_options[0].uri)
self.assertEqual('mcap', storage_options[0].storage_id)
self.assertEqual(storage_config_file, storage_options[0].storage_config_uri)
1 change: 0 additions & 1 deletion rosbag2/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<exec_depend>rosbag2_py</exec_depend>
<exec_depend>rosbag2_storage</exec_depend>
<exec_depend>rosbag2_transport</exec_depend>
<exec_depend>shared_queues_vendor</exec_depend>

<!-- Default plugins -->
<exec_depend>rosbag2_compression_zstd</exec_depend>
Expand Down
1 change: 0 additions & 1 deletion rosbag2_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
<depend>rosidl_runtime_cpp</depend>
<depend>rosidl_typesupport_cpp</depend>
<depend>rosidl_typesupport_introspection_cpp</depend>
<depend>shared_queues_vendor</depend>

<test_depend>rosbag2_storage_default_plugins</test_depend>
<test_depend>ament_cmake_gmock</test_depend>
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ class Player:
def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int) -> None: ...
@staticmethod
def cancel() -> None: ...
@overload
def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions) -> None: ...
@overload
def play(self, storage_options: List[rosbag2_py._storage.StorageOptions], play_options: PlayOptions) -> None: ...

class RecordOptions:
all_services: bool
Expand Down
Loading
Loading