Skip to content

Commit

Permalink
Address more minor feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard committed Nov 14, 2024
1 parent a9ed239 commit 4ddc5fb
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
7 changes: 4 additions & 3 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ class Player
{
install_signal_handlers();
try {
std::vector<rosbag2_transport::Player::ReaderStorageOptionsPair> readers{};
std::vector<rosbag2_transport::Player::ReaderStorageOptionsPair> readers_with_options{};
for (const auto & options : storage_options) {
readers.emplace_back(rosbag2_transport::ReaderWriterFactory::make_reader(options), options);
readers_with_options.emplace_back(
rosbag2_transport::ReaderWriterFactory::make_reader(options), options);
}
std::shared_ptr<KeyboardHandler> keyboard_handler;
if (!play_options.disable_keyboard_controls) {
Expand All @@ -270,7 +271,7 @@ class Player
#endif
}
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(readers), std::move(keyboard_handler), play_options);
std::move(readers_with_options), std::move(keyboard_handler), play_options);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
Expand Down
47 changes: 23 additions & 24 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,7 @@ class PlayerImpl
rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue();
void load_storage_content();
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(
const size_t boundary,
const size_t bag_index) RCPPUTILS_TSA_REQUIRES(reader_mutex_);
void enqueue_up_to_boundary(const size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_);
void wait_for_filled_queue() const;
void play_messages_from_queue();
void prepare_publishers();
Expand Down Expand Up @@ -904,7 +902,8 @@ Player::callback_handle_t PlayerImpl::get_new_on_play_msg_callback_handle()

void PlayerImpl::wait_for_filled_queue() const
{
const auto read_ahead_queues_filled = [this]() -> bool {
const auto read_ahead_queues_filled =
[this]() {
for (std::size_t i = 0; i < input_bags_.size(); i++) {
if (message_queue_.size_approx(i) < play_options_.read_ahead_queue_size) {
return false;
Expand All @@ -922,9 +921,10 @@ void PlayerImpl::wait_for_filled_queue() const

void PlayerImpl::load_storage_content()
{
auto queue_lower_boundary =
static_cast<size_t>(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = play_options_.read_ahead_queue_size;
// Multiply the boundary parameters by the number of input bags
auto queue_lower_boundary = static_cast<size_t>(
input_bags_.size() * play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = input_bags_.size() * play_options_.read_ahead_queue_size;

while (rclcpp::ok() && load_storage_content_ && !stop_playback_) {
rcpputils::unique_lock lk(reader_mutex_);
Expand All @@ -935,30 +935,28 @@ void PlayerImpl::load_storage_content()
if (no_messages) {
break;
}
for (size_t bag_index = 0; bag_index < input_bags_.size(); bag_index++) {
if (!input_bags_[bag_index].first->has_next()) {
continue;
}

if (message_queue_.size_approx(bag_index) < queue_lower_boundary) {
enqueue_up_to_boundary(queue_upper_boundary, bag_index);
} else {
lk.unlock();
}
if (message_queue_.size_approx() < queue_lower_boundary) {
enqueue_up_to_boundary(queue_upper_boundary);
} else {
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}

void PlayerImpl::enqueue_up_to_boundary(const size_t boundary, const size_t bag_index)
void PlayerImpl::enqueue_up_to_boundary(const size_t boundary)
{
rosbag2_storage::SerializedBagMessageSharedPtr message;
for (size_t i = message_queue_.size_approx(bag_index); i < boundary; i++) {
if (!input_bags_[bag_index].first->has_next()) {
break;
// Read messages from input bags in a round robin way
size_t input_bag_index = 0u;
for (size_t i = message_queue_.size_approx(); i < boundary; i++) {
const auto & reader = input_bags_[input_bag_index].first;
// We are supposed to have at least one bag with messages to read
if (reader->has_next()) {
message = reader->read_next();
message_queue_.enqueue(message, input_bag_index);
}
message = input_bags_[bag_index].first->read_next();
message_queue_.enqueue(message, bag_index);
input_bag_index = (input_bag_index + 1) % input_bags_.size();
}
}

Expand Down Expand Up @@ -1132,6 +1130,7 @@ void PlayerImpl::prepare_publishers()
}

// Create topic publishers
// We could have duplicate topic names here, but we correctly handle it when creating publishers
std::vector<rosbag2_storage::TopicMetadata> topics{};
for (const auto & [reader, _] : input_bags_) {
auto bag_topics = reader->get_all_topics_and_types();
Expand Down

0 comments on commit 4ddc5fb

Please sign in to comment.