Skip to content

Commit

Permalink
Add support for replaying based on publication timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard committed Dec 5, 2024
1 parent c8feaea commit e4ddb48
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 33 deletions.
15 changes: 15 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ros2bag.api import print_warn
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import MessageOrder
from rosbag2_py import Player
from rosbag2_py import PlayOptions
from rosbag2_py import ServiceRequestsSource
Expand Down Expand Up @@ -166,6 +167,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Determine the source of the service requests to be replayed. This option only '
'makes sense if the "--publish-service-requests" option is set. By default,'
' the service requests replaying from recorded service introspection message.')
parser.add_argument(
'--message-order', default='recv',
choices=['recv', 'send'],
help='The reference to use for bag message chronological ordering. Choices: reception '
'timestamp, publication timestamp. Default: reception timestamp. '
'If messages are significantly disordered (within a single bag or across '
'multiple bags), replayed messages may not be correctly ordered. A possible '
'solution could be to increase the read_ahead_queue_size value to buffer (and '
'order) more messages.')
parser.add_argument(
'--log-level', type=str, default='info',
choices=['debug', 'info', 'warn', 'error', 'fatal'],
Expand Down Expand Up @@ -275,6 +285,11 @@ def main(self, *, args): # noqa: D102
play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION
else:
play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION
# argparse makes sure that we get a valid arg value
play_options.message_order = {
'recv': MessageOrder.RECV_TIMESTAMP,
'send': MessageOrder.SEND_TIMESTAMP,
}.get(args.message_order)

player = Player(args.log_level)
try:
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_py/rosbag2_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
Info,
)
from rosbag2_py._transport import (
MessageOrder,
Player,
PlayOptions,
ServiceRequestsSource,
Expand Down Expand Up @@ -98,6 +99,7 @@
'TopicInformation',
'BagMetadata',
'MessageDefinition',
'MessageOrder',
'MetadataIo',
'Info',
'Player',
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_py/rosbag2_py/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ from rosbag2_py._message_definitions import LocalMessageDefinitionSource as Loca
from rosbag2_py._reader import SequentialCompressionReader as SequentialCompressionReader, SequentialReader as SequentialReader, get_registered_readers as get_registered_readers
from rosbag2_py._reindexer import Reindexer as Reindexer
from rosbag2_py._storage import BagMetadata as BagMetadata, ConverterOptions as ConverterOptions, FileInformation as FileInformation, MessageDefinition as MessageDefinition, MetadataIo as MetadataIo, ReadOrder as ReadOrder, ReadOrderSortBy as ReadOrderSortBy, StorageFilter as StorageFilter, StorageOptions as StorageOptions, TopicInformation as TopicInformation, TopicMetadata as TopicMetadata, convert_rclcpp_qos_to_rclpy_qos as convert_rclcpp_qos_to_rclpy_qos, get_default_storage_id as get_default_storage_id, to_rclcpp_qos_vector as to_rclcpp_qos_vector
from rosbag2_py._transport import PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite
from rosbag2_py._transport import MessageOrder as MessageOrder, PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite
from rosbag2_py._writer import SequentialCompressionWriter as SequentialCompressionWriter, SequentialWriter as SequentialWriter, get_registered_compressors as get_registered_compressors, get_registered_serializers as get_registered_serializers, get_registered_writers as get_registered_writers

__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource']
__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MessageOrder', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource']
17 changes: 17 additions & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@ import rosbag2_py._storage
from _typeshed import Incomplete
from typing import ClassVar, List, overload

class MessageOrder:
__members__: ClassVar[dict] = ... # read-only
RECV_TIMESTAMP: ClassVar[MessageOrder] = ...
SEND_TIMESTAMP: ClassVar[MessageOrder] = ...
__entries: ClassVar[dict] = ...
def __init__(self, value: int) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __hash__(self) -> int: ...
def __index__(self) -> int: ...
def __int__(self) -> int: ...
def __ne__(self, other: object) -> bool: ...
@property
def name(self) -> str: ...
@property
def value(self) -> int: ...

class PlayOptions:
clock_publish_frequency: float
clock_publish_on_topic_publish: bool
Expand All @@ -14,6 +30,7 @@ class PlayOptions:
exclude_service_events_to_filter: List[str]
exclude_topics_to_filter: List[str]
loop: bool
message_order: Incomplete
node_prefix: str
playback_duration: float
playback_until_timestamp: int
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,19 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message)
.def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests)
.def_readwrite("service_requests_source", &PlayOptions::service_requests_source)
.def_readwrite("message_order", &PlayOptions::message_order)
;

py::enum_<rosbag2_transport::ServiceRequestsSource>(m, "ServiceRequestsSource")
.value("SERVICE_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::SERVICE_INTROSPECTION)
.value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION)
;

py::enum_<rosbag2_transport::MessageOrder>(m, "MessageOrder")
.value("RECV_TIMESTAMP", rosbag2_transport::MessageOrder::RECV_TIMESTAMP)
.value("SEND_TIMESTAMP", rosbag2_transport::MessageOrder::SEND_TIMESTAMP)
;

py::class_<RecordOptions>(m, "RecordOptions")
.def(py::init<>())
.def_readwrite("all_topics", &RecordOptions::all_topics)
Expand Down
14 changes: 14 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ enum class ServiceRequestsSource : int8_t
CLIENT_INTROSPECTION = 1
};

enum class MessageOrder : std::uint8_t
{
// Order chronologically by message reception timestamp
RECV_TIMESTAMP = 0,
// Order chronologically by message publication timestamp
SEND_TIMESTAMP = 1
};

struct PlayOptions
{
public:
Expand Down Expand Up @@ -123,6 +131,12 @@ struct PlayOptions

// The source of the service request
ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION;

// The reference to use for bag message chronological ordering.
// If messages are significantly disordered (within a single bag or across multiple bags),
// replayed messages may not be correctly ordered. A possible solution could be to increase the
// read_ahead_queue_size value to buffer (and order) more messages.
MessageOrder message_order = MessageOrder::RECV_TIMESTAMP;
};

} // namespace rosbag2_transport
Expand Down
62 changes: 56 additions & 6 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ class PlayerImpl
void create_control_services();
void configure_play_until_timestamp();
bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const;
rcutils_time_point_value_t get_message_order_timestamp(
const rosbag2_storage::SerializedBagMessageSharedPtr & message) const;

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
Expand All @@ -320,7 +322,7 @@ class PlayerImpl
rosbag2_transport::PlayOptions play_options_;
rcutils_time_point_value_t play_until_timestamp_ = -1;
using BagMessageComparator = std::function<
int(
bool(
const rosbag2_storage::SerializedBagMessageSharedPtr &,
const rosbag2_storage::SerializedBagMessageSharedPtr &)>;
LockedPriorityQueue<
Expand Down Expand Up @@ -372,6 +374,8 @@ class PlayerImpl

std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;

static BagMessageComparator get_bag_message_comparator(const MessageOrder & order);

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp.
struct
{
Expand All @@ -381,9 +385,39 @@ class PlayerImpl
{
return l->recv_timestamp > r->recv_timestamp;
}
} bag_message_chronological_recv_timestamp_comparator;
} static bag_message_chronological_recv_timestamp_comparator;

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp.
struct
{
bool operator()(
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
const rosbag2_storage::SerializedBagMessageSharedPtr & r) const
{
return l->send_timestamp > r->send_timestamp;
}
} static bag_message_chronological_send_timestamp_comparator;
};

decltype(PlayerImpl::bag_message_chronological_recv_timestamp_comparator)
PlayerImpl::bag_message_chronological_recv_timestamp_comparator;
decltype(PlayerImpl::bag_message_chronological_send_timestamp_comparator)
PlayerImpl::bag_message_chronological_send_timestamp_comparator;

PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order)
{
switch (order) {
case MessageOrder::RECV_TIMESTAMP:
return bag_message_chronological_recv_timestamp_comparator;
case MessageOrder::SEND_TIMESTAMP:
return bag_message_chronological_send_timestamp_comparator;
default:
throw std::runtime_error(
"unknown MessageOrder: " +
std::to_string(static_cast<std::underlying_type_t<MessageOrder>>(order)));
}
}

PlayerImpl::PlayerImpl(
Player * owner,
std::vector<reader_storage_options_pair_t> && readers_with_options,
Expand All @@ -392,7 +426,7 @@ PlayerImpl::PlayerImpl(
: readers_with_options_(std::move(readers_with_options)),
owner_(owner),
play_options_(play_options),
message_queue_(bag_message_chronological_recv_timestamp_comparator),
message_queue_(get_bag_message_comparator(play_options_.message_order)),
keyboard_handler_(std::move(keyboard_handler)),
player_service_client_manager_(std::make_shared<PlayerServiceClientManager>())
{
Expand Down Expand Up @@ -984,13 +1018,13 @@ void PlayerImpl::play_messages_from_queue()
while (rclcpp::ok() && !stop_playback_) {
// While there's a message to play and we haven't reached the end timestamp yet
while (rclcpp::ok() && !stop_playback_ &&
message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp))
message_ptr != nullptr && !shall_stop_at_timestamp(get_message_order_timestamp(message_ptr)))
{
// Sleep until the message's replay time, do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
// However, skip sleeping if we're trying to play the next message
while (rclcpp::ok() && !stop_playback_ && !play_next_.load() &&
!clock_->sleep_until(message_ptr->recv_timestamp))
!clock_->sleep_until(get_message_order_timestamp(message_ptr)))
{
// Stop sleeping if cancelled
if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) {
Expand All @@ -1012,7 +1046,7 @@ void PlayerImpl::play_messages_from_queue()
const bool message_published = publish_message(message_ptr);
// If we tried to publish because of play_next(), jump the clock
if (play_next_.load()) {
clock_->jump(message_ptr->recv_timestamp);
clock_->jump(get_message_order_timestamp(message_ptr));
// If we successfully played next, notify that we're done, otherwise keep trying
if (message_published) {
play_next_ = false;
Expand Down Expand Up @@ -1058,6 +1092,22 @@ void PlayerImpl::play_messages_from_queue()
}
}

rcutils_time_point_value_t PlayerImpl::get_message_order_timestamp(
const rosbag2_storage::SerializedBagMessageSharedPtr & message) const
{
switch (play_options_.message_order) {
case MessageOrder::RECV_TIMESTAMP:
return message->recv_timestamp;
case MessageOrder::SEND_TIMESTAMP:
return message->send_timestamp;
default:
throw std::runtime_error(
"unknown MessageOrder: " +
std::to_string(
static_cast<std::underlying_type_t<MessageOrder>>(play_options_.message_order)));
}
}

namespace
{
bool allow_topic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,42 @@ class Rosbag2TransportTestFixture : public Test
std::shared_ptr<rosbag2_storage::SerializedBagMessage>
serialize_test_message(
const std::string & topic,
int64_t milliseconds,
int64_t milliseconds_recv,
std::shared_ptr<MessageT> message)
{
return serialize_test_message(topic, milliseconds_recv, 0, message);
}

template<typename MessageT>
std::shared_ptr<rosbag2_storage::SerializedBagMessage>
serialize_test_message(
const std::string & topic,
int64_t milliseconds_recv,
int64_t milliseconds_send,
std::shared_ptr<MessageT> message)
{
auto bag_msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_msg->serialized_data = memory_management_.serialize_message(message);
bag_msg->recv_timestamp = milliseconds * 1000000;
bag_msg->recv_timestamp = milliseconds_recv * 1000000;
bag_msg->send_timestamp = milliseconds_send * 1000000;
bag_msg->topic_name = topic;

return bag_msg;
}

static std::string format_message_order(
const TestParamInfo<rosbag2_transport::MessageOrder> & info)
{
switch (info.param) {
case rosbag2_transport::MessageOrder::RECV_TIMESTAMP:
return "recv_timestamp";
case rosbag2_transport::MessageOrder::SEND_TIMESTAMP:
return "send_timestamp";
default:
throw std::runtime_error("unknown value");
}
}

MemoryManagement memory_management_;

rosbag2_storage::StorageOptions storage_options_;
Expand Down
Loading

0 comments on commit e4ddb48

Please sign in to comment.