Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jazzy] Add PlayerClock::wakeup() to interrupt sleeping (backport #1869) #1875

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class PlayerClock
ROSBAG2_CPP_PUBLIC
virtual bool sleep_until(rclcpp::Time time) = 0;


/**
* Change the rate of the flow of time for the clock.
* \param rate new rate of clock playback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ class TimeControllerClock : public PlayerClock
ROSBAG2_CPP_PUBLIC
bool sleep_until(rclcpp::Time until) override;

/**
* \return whether the clock is currently sleeping.
*/
ROSBAG2_CPP_PUBLIC
bool is_sleeping();

/**
* \brief Wake up the clock if it is sleeping.
* \note This will wake any waiting `sleep_until`.
*/
ROSBAG2_CPP_PUBLIC
void wakeup();

/**
* Change the rate of the flow of time for the clock.
*
Expand Down
16 changes: 16 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TimeControllerClockImpl
std::condition_variable cv RCPPUTILS_TSA_GUARDED_BY(state_mutex);
double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0;
bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
bool is_sleeping RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex);
};

Expand Down Expand Up @@ -165,12 +166,16 @@ bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until)
{
rcpputils::unique_lock<std::mutex> lock(impl_->state_mutex);
if (impl_->paused) {
impl_->is_sleeping = true;
impl_->cv.wait_for(lock, impl_->sleep_time_while_paused);
impl_->is_sleeping = false;
} else {
const auto steady_until = impl_->ros_to_steady(until);
// wait only if necessary for performance
if (steady_until > impl_->now_fn()) {
impl_->is_sleeping = true;
impl_->cv.wait_until(lock, steady_until);
impl_->is_sleeping = false;
}
}
if (impl_->paused) {
Expand All @@ -187,6 +192,17 @@ bool TimeControllerClock::sleep_until(rclcpp::Time until)
return sleep_until(until.nanoseconds());
}

bool TimeControllerClock::is_sleeping()
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
return impl_->is_sleeping;
}

void TimeControllerClock::wakeup()
{
impl_->cv.notify_all();
}

bool TimeControllerClock::set_rate(double rate)
{
if (rate <= 0) {
Expand Down
26 changes: 26 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,32 @@ TEST_F(TimeControllerClockTest, interrupted_sleep_returns_false_immediately)
EXPECT_FALSE(thread_sleep_result);
}

TEST_F(TimeControllerClockTest, wakeup_sleep_returns_immediately)
{
// Use a bigger value for sleep_time_while_paused
auto sleep_time_while_paused = std::chrono::seconds(30);
rosbag2_cpp::TimeControllerClock clock(ros_start_time, now_fn, sleep_time_while_paused);
clock.pause();
EXPECT_FALSE(clock.is_sleeping());
auto steady_start = std::chrono::steady_clock::now();
std::atomic_bool thread_sleep_result{true};
auto sleep_long_thread = std::thread(
[&clock, &thread_sleep_result]() {
bool sleep_result = clock.sleep_until(clock.now() + RCUTILS_S_TO_NS(10));
thread_sleep_result.store(sleep_result);
});
// Wait for the thread to be sleeping for the purposes of this test
while (!clock.is_sleeping()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
clock.wakeup();
sleep_long_thread.join();
EXPECT_FALSE(clock.is_sleeping());
// Check that the clock slept less than the sleep_time_while_paused value
EXPECT_LT(std::chrono::steady_clock::now() - steady_start, sleep_time_while_paused);
EXPECT_FALSE(thread_sleep_result);
}

TEST_F(TimeControllerClockTest, resumes_at_correct_ros_time)
{
rosbag2_cpp::TimeControllerClock clock(ros_start_time, now_fn);
Expand Down
17 changes: 7 additions & 10 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class PlayerImpl
mutable std::future<void> storage_loading_future_;
std::atomic_bool load_storage_content_{true};
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
std::unique_ptr<rosbag2_cpp::TimeControllerClock> clock_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
std::mutex skip_message_in_main_play_loop_mutex_;
bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY(
Expand Down Expand Up @@ -641,8 +641,8 @@ void PlayerImpl::stop()
}

if (clock_->is_paused()) {
clock_->resume(); // Temporary resume clock to force wakeup in clock_->sleep_until(time)
clock_->pause(); // Return in pause mode to preserve original state of the player
// Wake up the clock in case it's in a sleep_until(time) call
clock_->wakeup();
}
// Note: Don't clean up message queue here. It will be cleaned up automatically in
// playback thread after finishing play_messages_from_queue();
Expand Down Expand Up @@ -739,10 +739,8 @@ bool PlayerImpl::play_next()

// Wait for play next to be done, and then return the result
std::unique_lock<std::mutex> lk(finished_play_next_mutex_);
// Temporarily resume clock to force wakeup in clock_->sleep_until(time),
// then return in pause mode to preserve original state of the player
clock_->resume();
clock_->pause();
// Wake up the clock in case it's in a sleep_until(time) call
clock_->wakeup();
finished_play_next_ = false;
finished_play_next_cv_.wait(lk, [this] {return finished_play_next_.load();});
play_next_ = false;
Expand Down Expand Up @@ -1044,9 +1042,8 @@ void PlayerImpl::play_messages_from_queue()
while (!stop_playback_ && is_paused() && !play_next_.load() && rclcpp::ok()) {
clock_->sleep_until(clock_->now());
}
// If we ran out of messages and are not in pause state, it means we're done playing,
// unless play_next() is resuming and pausing the clock in order to wake us up
if (!is_paused() && !play_next_.load()) {
// If we ran out of messages and are not in pause state, it means we're done playing
if (!is_paused()) {
break;
}

Expand Down
Loading