Skip to content

Commit

Permalink
Support replaying multiple bags (#1848)
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard authored Dec 1, 2024
1 parent 694c8af commit 125db50
Show file tree
Hide file tree
Showing 24 changed files with 754 additions and 422 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ jobs:
rosbag2_test_common
rosbag2_tests
rosbag2_transport
shared_queues_vendor
ament_lint_cpp: # Linters applicable to C++ packages
name: ament_${{ matrix.linter }}
Expand Down Expand Up @@ -62,7 +61,6 @@ jobs:
rosbag2_test_common
rosbag2_tests
rosbag2_transport
shared_queues_vendor
ament_lint_clang_format: # Linters applicable to C++ packages formatted with clang-format
name: ament_${{ matrix.linter }}
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ jobs:
"rosbag2_test_msgdefs",
"rosbag2_tests",
"rosbag2_transport",
"shared_queues_vendor",
"sqlite3_vendor",
"zstd_vendor"
]
Expand All @@ -74,7 +73,6 @@ jobs:
"rosbag2_test_msgdefs",
"rosbag2_tests",
"rosbag2_transport",
"shared_queues_vendor",
"sqlite3_vendor",
"zstd_vendor"
]
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ $ ros2 bag play <bag>
The bag argument can be a directory containing `metadata.yaml` and one or more storage files, or to a single storage file such as `.mcap` or `.db3`.
The Player will automatically detect which storage implementation to use for playing.

To play back multiple bags:

```
$ ros2 bag play <bag1> -i <bag2> -i <bag3>
```

Messages from all provided bags will be played in order, based on their original recording reception timestamps.

#### Controlling playback via services

The Rosbag2 player provides the following services for remote control, which can be called via `ros2 service` commandline or from your nodes,
Expand Down
92 changes: 91 additions & 1 deletion ros2bag/ros2bag/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from rclpy.duration import Duration
Expand Down Expand Up @@ -158,15 +159,104 @@ def check_not_negative_int(arg: str) -> int:


def add_standard_reader_args(parser: ArgumentParser) -> None:
"""
Add arguments for one input bag.
:param parser: the parser
"""
reader_choices = rosbag2_py.get_registered_readers()
parser.add_argument(
'bag_path', type=check_path_exists, help='Bag to open')
'bag_path',
nargs=None,
type=check_path_exists,
help='Bag to open.')
parser.add_argument(
'-s', '--storage', default='', choices=reader_choices,
help='Storage implementation of bag. '
'By default attempts to detect automatically - use this argument to override.')


def add_standard_multi_reader_args(parser: ArgumentParser) -> None:
"""
Add arguments for multiple input bags.
:param parser: the parser
"""
# Let user provide an input bag path using an optional positional arg, but require them to use
# --input to provide an input bag with a specific storage ID
reader_choices = rosbag2_py.get_registered_readers()
parser.add_argument(
'bag_path',
nargs='?',
type=check_path_exists,
help='Bag to open. '
'Use --input instead to provide an input bag with a specific storage ID.')
parser.add_argument(
'-s', '--storage', default='', choices=reader_choices,
help='Storage implementation of bag. '
'By default attempts to detect automatically - use this argument to override.'
' (deprecated: use --input to provide an input bag with a specific storage ID)')
add_multi_bag_input_arg(parser, required=False)


def add_multi_bag_input_arg(parser: ArgumentParser, required: bool = False) -> None:
"""
Add option for list of input bags.
:param parser: the parser
:param required: whether this option should be required
"""
reader_choices = ', '.join(rosbag2_py.get_registered_readers())
parser.add_argument(
'-i', '--input',
required=required,
action='append', nargs='+',
metavar=('uri', 'storage_id'),
help='URI (and optional storage ID) of an input bag. '
'May be provided more than once for multiple input bags. '
f'Storage ID options are: {reader_choices}.')


def input_bag_arg_to_storage_options(
input_arg: List[List[str]],
storage_config_file: Optional[str] = None,
) -> List[rosbag2_py.StorageOptions]:
"""
Convert input bag argument value(s) to list of StorageOptions.
Raises ValueError if validation fails, including:
1. Bag path existence
2. Storage ID
3. Storage config file existence
:param input_arg: the values of the input argument
:param storage_config_file: the storage config file, if any
"""
if storage_config_file and not os.path.exists(storage_config_file):
raise ValueError(f"File '{storage_config_file}' does not exist!")
storage_id_options = rosbag2_py.get_registered_readers()
storage_options = []
for input_bag_info in input_arg:
if len(input_bag_info) > 2:
raise ValueError(
f'--input expects 1 or 2 arguments, {len(input_bag_info)} provided')
bag_path = input_bag_info[0]
if not os.path.exists(bag_path):
raise ValueError(f"Bag path '{bag_path}' does not exist!")
storage_id = input_bag_info[1] if len(input_bag_info) > 1 else ''
if storage_id and storage_id not in storage_id_options:
raise ValueError(
f"Unknown storage ID '{storage_id}', options are: {', '.join(storage_id_options)}")
options = rosbag2_py.StorageOptions(
uri=bag_path,
storage_id=storage_id,
)
if storage_config_file:
options.storage_config_uri = storage_config_file
storage_options.append(options)
return storage_options


def _parse_cli_storage_plugin():
plugin_choices = set(rosbag2_py.get_registered_writers())
default_storage = rosbag2_py.get_default_storage_id()
Expand Down
22 changes: 4 additions & 18 deletions ros2bag/ros2bag/verb/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse

from ros2bag.api import add_multi_bag_input_arg
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.verb import VerbExtension
from rosbag2_py import bag_rewrite
from rosbag2_py import StorageOptions


class ConvertVerb(VerbExtension):
"""Given an input bag, write out a new bag with different settings."""

def add_arguments(self, parser, cli_name):
parser.add_argument(
'-i', '--input',
required=True,
action='append', nargs='+',
metavar=('uri', 'storage_id'),
help='URI (and optional storage ID) of an input bag. May be provided more than once')
add_multi_bag_input_arg(parser, required=True)
parser.add_argument(
'-o', '--output-options',
type=str, required=True,
Expand All @@ -37,14 +31,6 @@ def add_arguments(self, parser, cli_name):
'objects. See README.md for some examples.')

def main(self, *, args):
input_options = []
for input_bag in args.input:
if len(input_bag) > 2:
raise argparse.ArgumentTypeError(
f'--input expects 1 or 2 arguments, {len(input_bag)} provided')
storage_options = StorageOptions(uri=input_bag[0])
if len(input_bag) > 1:
storage_options.storage_id = input_bag[1]
input_options.append(storage_options)
input_options = input_bag_arg_to_storage_options(args.input)

bag_rewrite(input_options, args.output_options)
46 changes: 39 additions & 7 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
from argparse import FileType

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import add_standard_reader_args
from ros2bag.api import add_standard_multi_reader_args
from ros2bag.api import check_not_negative_float
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.api import print_error
from ros2bag.api import print_warn
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
Expand All @@ -42,7 +44,7 @@ class PlayVerb(VerbExtension):
"""Play back ROS data from a bag."""

def add_arguments(self, parser, cli_name): # noqa: D102
add_standard_reader_args(parser)
add_standard_multi_reader_args(parser)
parser.add_argument(
'--read-ahead-queue-size', type=int, default=1000,
help='size of message queue rosbag tries to hold in memory to help deterministic '
Expand Down Expand Up @@ -196,11 +198,41 @@ def main(self, *, args): # noqa: D102
topic_remapping.append('--remap')
topic_remapping.append(remap_rule)

storage_options = StorageOptions(
uri=args.bag_path,
storage_id=args.storage,
storage_config_uri=storage_config_file,
)
# Do not allow using both positional arg and --input option for input bags
if args.bag_path and args.input:
return print_error(
'do not mix the [bag_path] positional argument and the -i,--input option; '
'for multiple input bags, use -i,--input multiple times')
# Add bag from optional positional arg, then bag(s) from optional flag
storage_options = []
if args.bag_path:
storage_options.append(StorageOptions(
uri=args.bag_path,
storage_id=args.storage,
storage_config_uri=storage_config_file,
))
if args.storage:
print(print_warn('--storage option is deprecated, use -i,--input to '
'provide an input bag with a specific storage ID'))
try:
storage_options.extend(
input_bag_arg_to_storage_options(args.input or [], storage_config_file))
except ValueError as e:
return print_error(str(e))
if not storage_options:
return print_error('no input bags were provided')

# Users can currently only provide one storage config file, which is storage
# implementation-specific. Since we can replay bags from different storage
# implementations, this may lead to errors. For now, just warn if input bags have
# different explicit storage IDs and a storage config file is provided.
storage_ids = {options.storage_id for options in storage_options if options.storage_id}
if storage_config_file and len(storage_ids) > 1:
print(
print_warn('a global --storage-config-file was provided, but -i,--input bags are '
'using different explicit storage IDs, which may lead to errors with '
f'replay: {storage_ids}'))

play_options = PlayOptions()
play_options.read_ahead_queue_size = args.read_ahead_queue_size
play_options.node_prefix = NODE_NAME_PREFIX
Expand Down
27 changes: 27 additions & 0 deletions ros2bag/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import unittest

from rclpy.qos import DurabilityPolicy
from rclpy.qos import HistoryPolicy
from rclpy.qos import ReliabilityPolicy
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import dict_to_duration
from ros2bag.api import input_bag_arg_to_storage_options
from ros2bag.api import interpret_dict_as_qos_profile


RESOURCES_PATH = Path(__file__).parent / 'resources'


class TestRos2BagRecord(unittest.TestCase):

def test_dict_to_duration_valid(self):
Expand Down Expand Up @@ -84,3 +89,25 @@ def test_interpret_dict_as_qos_profile_negative(self):
qos_dict = {'history': 'keep_all', 'liveliness_lease_duration': {'sec': -1, 'nsec': -1}}
with self.assertRaises(ValueError):
interpret_dict_as_qos_profile(qos_dict)

def test_input_bag_arg_to_storage_options(self):
bag_path = (RESOURCES_PATH / 'empty_bag').as_posix()
# Just use a file that exists; the content does not matter
storage_config_file = (RESOURCES_PATH / 'qos_profile.yaml').as_posix()

with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([['path1', 'id1'], ['path2', 'id2', 'extra']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([['path-does-not-exist']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([[bag_path, 'id-does-not-exist']])
with self.assertRaises(ValueError):
input_bag_arg_to_storage_options([[bag_path, 'mcap']], 'config-file-doesnt-exist')

self.assertEqual([], input_bag_arg_to_storage_options([], None))
storage_options = input_bag_arg_to_storage_options(
[[bag_path, 'mcap']], storage_config_file)
self.assertEqual(1, len(storage_options))
self.assertEqual(bag_path, storage_options[0].uri)
self.assertEqual('mcap', storage_options[0].storage_id)
self.assertEqual(storage_config_file, storage_options[0].storage_config_uri)
1 change: 0 additions & 1 deletion rosbag2/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<exec_depend>rosbag2_py</exec_depend>
<exec_depend>rosbag2_storage</exec_depend>
<exec_depend>rosbag2_transport</exec_depend>
<exec_depend>shared_queues_vendor</exec_depend>

<!-- Default plugins -->
<exec_depend>rosbag2_compression_zstd</exec_depend>
Expand Down
1 change: 0 additions & 1 deletion rosbag2_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
<depend>rosidl_runtime_cpp</depend>
<depend>rosidl_typesupport_cpp</depend>
<depend>rosidl_typesupport_introspection_cpp</depend>
<depend>shared_queues_vendor</depend>

<test_depend>rosbag2_storage_default_plugins</test_depend>
<test_depend>ament_cmake_gmock</test_depend>
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ class Player:
def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int) -> None: ...
@staticmethod
def cancel() -> None: ...
@overload
def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions) -> None: ...
@overload
def play(self, storage_options: List[rosbag2_py._storage.StorageOptions], play_options: PlayOptions) -> None: ...

class RecordOptions:
all_services: bool
Expand Down
Loading

0 comments on commit 125db50

Please sign in to comment.