Skip to content

Commit

Permalink
renamed methods and variable to be more generic
Browse files Browse the repository at this point in the history
  • Loading branch information
saikishor committed Jul 17, 2024
1 parent abc9d12 commit e2b0924
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 68 deletions.
119 changes: 60 additions & 59 deletions include/realtime_tools/async_function_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,38 +47,38 @@ class AsyncFunctionHandler
public:
AsyncFunctionHandler() = default;

~AsyncFunctionHandler() { stop_async_update(); }
~AsyncFunctionHandler() { stop_thread(); }

/// Initialize the AsyncFunctionHandler with the async_function and thread_priority
/// Initialize the AsyncFunctionHandler with the callback and thread_priority
/**
* @param async_function Function that will be called asynchronously
* @param callback Function that will be called asynchronously
* If the AsyncFunctionHandler is already initialized and is running, it will throw a runtime
* error.
* If the parsed functions are not valid, it will throw a runtime error.
*/
void init(
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> async_function,
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
int thread_priority = 50)
{
if (async_function == nullptr) {
if (callback == nullptr) {
throw std::runtime_error(
"AsyncFunctionHandler: parsed function to call asynchronously is not valid!");
}
if (thread_.joinable()) {
throw std::runtime_error(
"AsyncFunctionHandler: Cannot reinitialize while the thread is "
"running. Please stop the async update first!");
"running. Please stop the async callback first!");
}
async_function_ = async_function;
async_function_ = callback;
thread_priority_ = thread_priority;
}

/// Initialize the AsyncFunctionHandler with the async_function and trigger_predicate
/// Initialize the AsyncFunctionHandler with the callback, trigger_predicate and thread_priority
/**
* @param async_function Function that will be called asynchronously
* @param trigger_predicate Predicate function that will be called to check if the async update
* method should be triggered.
* @param thread_priority Priority of the async update thread
* @param callback Function that will be called asynchronously.
* @param trigger_predicate Predicate function that will be called to check if the async callback
* method should be triggered or not.
* @param thread_priority Priority of the async worker thread.
*
* \note The parsed trigger_predicate should be free from any concurrency issues. It is expected
* to be both thread-safe and reentrant.
Expand All @@ -88,59 +88,59 @@ class AsyncFunctionHandler
* If the parsed functions are not valid, it will throw a runtime error.
*/
void init(
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> async_function,
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
std::function<bool()> trigger_predicate, int thread_priority = 50)
{
if (trigger_predicate == nullptr) {
throw std::runtime_error("AsyncFunctionHandler: parsed trigger predicate is not valid!");
}
init(async_function, thread_priority);
init(callback, thread_priority);
trigger_predicate_ = trigger_predicate;
}

/// Triggers the async update method cycle
/// Triggers the async callback method cycle
/**
* @param time Current time
* @param period Current period
* @return A pair with the first element being a boolean indicating if the async update method was
* @return A pair with the first element being a boolean indicating if the async callback method was
* triggered and the second element being the last return value of the async function.
* If the AsyncFunctionHandler is not initialized properly, it will throw a runtime error.
* If the async update method is waiting for the trigger, it will notify the async thread to start
* the update cycle.
* If the async update method is still running, it will return the last return value of the async
* function.
* If the callback method is waiting for the trigger, it will notify the async thread to start
* the callback.
* If the async callback method is still running, it will return the last return value from the
* last trigger cycle.
*
* \note In the case of controllers, The controller manager is responsible
* for triggering and maintaining the controller's update rate, as it should be only acting as a
* scheduler. Same applies to the resource manager when handling the hardware components.
*/
std::pair<bool, T> trigger_async_update(
std::pair<bool, T> trigger_async_callback(
const rclcpp::Time & time, const rclcpp::Duration & period)
{
if (!is_initialized()) {
throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
}
if (!is_running()) {
throw std::runtime_error(
"AsyncFunctionHandler: need to start the async update thread first before triggering!");
"AsyncFunctionHandler: need to start the async callback thread first before triggering!");
}
std::unique_lock<std::mutex> lock(async_mtx_, std::try_to_lock);
bool trigger_status = false;
if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) {
{
std::unique_lock<std::mutex> scoped_lock(std::move(lock));
trigger_in_progress_ = true;
current_update_time_ = time;
current_update_period_ = period;
current_callback_time_ = time;
current_callback_period_ = period;
}
async_update_condition_.notify_one();
async_callback_condition_.notify_one();
trigger_status = true;
}
const T return_value = async_update_return_;
const T return_value = async_callback_return_;
return std::make_pair(trigger_status, return_value);
}

/// Waits until the current async update method cycle to finish
/// Waits until the current async callback method trigger cycle is finished
/**
* If the async method is running, it will wait for the current async method call to finish.
*/
Expand All @@ -158,74 +158,74 @@ class AsyncFunctionHandler
*/
bool is_initialized() const { return async_function_ && trigger_predicate_; }

/// Join the async update thread
/// Join the async callback thread
/**
* If the async method is running, it will join the async thread.
* If the async method is not running, it will return immediately.
*/
void join_async_update_thread()
void join_async_callback_thread()
{
if (is_running()) {
thread_.join();
}
}

/// Check if the async update thread is running
/// Check if the async worker thread is running
/**
* @return True if the async update thread is running, false otherwise
* @return True if the async worker thread is running, false otherwise
*/
bool is_running() const { return thread_.joinable(); }

/// Check if the async update is triggered to stop the cycle
/// Check if the async callback is triggered to stop the cycle
/**
* @return True if the async update is stopped, false otherwise
* @return True if the async callback is requested to be stopped, false otherwise
*/
bool is_stopped() const { return async_update_stop_; }
bool is_stopped() const { return stop_async_callback_; }

/// Get the async update thread
/// Get the async worker thread
/**
* @return The async update thread
* @return The async callback thread
*/
std::thread & get_async_thread() { return thread_; }
std::thread & get_thread() { return thread_; }

/// Check if the async update method is in progress
/// Check if the async callback method is in progress
/**
* @return True if the async update method is in progress, false otherwise
* @return True if the async callback method is in progress, false otherwise
*/
bool is_trigger_cycle_in_progress() const { return trigger_in_progress_; }

/// Stops the async update thread
/// Stops the callback thread
/**
* If the async method is running, it will notify the async thread to stop and then joins the
* async thread.
*/
void stop_async_update()
void stop_thread()
{
if (is_running()) {
{
std::unique_lock<std::mutex> lock(async_mtx_);
async_update_stop_ = true;
stop_async_callback_ = true;
}
async_update_condition_.notify_one();
async_callback_condition_.notify_one();
thread_.join();
}
}

/// Initialize the async update thread
/// Initializes and starts the callback thread
/**
* If the async update thread is not running, it will start the async update thread.
* If the async update thread is already configured and running, does nothing and return
* If the worker thread is not running, it will start the async callback thread.
* If the worker thread is already configured and running, does nothing and returns
* immediately.
*/
void start_async_update_thread()
void start_thread()
{
if (!is_initialized()) {
throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
}
if (!thread_.joinable()) {
async_update_stop_ = false;
stop_async_callback_ = false;
trigger_in_progress_ = false;
async_update_return_ = T();
async_callback_return_ = T();
thread_ = std::thread([this]() -> void {
if (!realtime_tools::configure_sched_fifo(thread_priority_)) {
RCLCPP_WARN(
Expand All @@ -237,13 +237,14 @@ class AsyncFunctionHandler
"for details.");
}

while (!async_update_stop_.load(std::memory_order_relaxed)) {
while (!stop_async_callback_.load(std::memory_order_relaxed)) {
{
std::unique_lock<std::mutex> lock(async_mtx_);
async_update_condition_.wait(
lock, [this] { return trigger_in_progress_ || async_update_stop_; });
if (!async_update_stop_) {
async_update_return_ = async_function_(current_update_time_, current_update_period_);
async_callback_condition_.wait(
lock, [this] { return trigger_in_progress_ || stop_async_callback_; });
if (!stop_async_callback_) {
async_callback_return_ =
async_function_(current_callback_time_, current_callback_period_);
}
trigger_in_progress_ = false;
}
Expand All @@ -254,19 +255,19 @@ class AsyncFunctionHandler
}

private:
rclcpp::Time current_update_time_;
rclcpp::Duration current_update_period_{0, 0};
rclcpp::Time current_callback_time_;
rclcpp::Duration current_callback_period_{0, 0};

std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> async_function_;
std::function<bool()> trigger_predicate_ = []() { return true; };

// Async related variables
std::thread thread_;
int thread_priority_ = std::numeric_limits<int>::quiet_NaN();
std::atomic_bool async_update_stop_{false};
std::atomic_bool stop_async_callback_{false};
std::atomic_bool trigger_in_progress_{false};
std::atomic<T> async_update_return_;
std::condition_variable async_update_condition_;
std::atomic<T> async_callback_return_;
std::condition_variable async_callback_condition_;
std::condition_variable cycle_end_condition_;
std::mutex async_mtx_;
};
Expand Down
18 changes: 9 additions & 9 deletions test/test_async_function_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void TestAsyncFunctionHandler::initialize()

std::pair<bool, return_type> TestAsyncFunctionHandler::trigger()
{
return handler_.trigger_async_update(last_callback_time_, last_callback_period_);
return handler_.trigger_async_callback(last_callback_time_, last_callback_period_);
}

return_type TestAsyncFunctionHandler::update(
Expand Down Expand Up @@ -88,7 +88,7 @@ TEST_F(AsyncFunctionHandlerTest, check_initialization)
ASSERT_FALSE(async_class.get_handler().is_stopped());

// Once initialized, it should not be possible to initialize again
async_class.get_handler().start_async_update_thread();
async_class.get_handler().start_thread();
auto trigger_status = async_class.trigger();
ASSERT_TRUE(trigger_status.first);
ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second);
Expand All @@ -114,7 +114,7 @@ TEST_F(AsyncFunctionHandlerTest, check_triggering)
ASSERT_FALSE(async_class.get_handler().is_stopped());
// It shouldn't be possible to trigger without starting the thread
ASSERT_THROW(async_class.trigger(), std::runtime_error);
async_class.get_handler().start_async_update_thread();
async_class.get_handler().start_thread();

EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE);
auto trigger_status = async_class.trigger();
Expand All @@ -123,7 +123,7 @@ TEST_F(AsyncFunctionHandlerTest, check_triggering)
ASSERT_TRUE(async_class.get_handler().is_initialized());
ASSERT_TRUE(async_class.get_handler().is_running());
ASSERT_FALSE(async_class.get_handler().is_stopped());
ASSERT_TRUE(async_class.get_handler().get_async_thread().joinable());
ASSERT_TRUE(async_class.get_handler().get_thread().joinable());
ASSERT_TRUE(async_class.get_handler().is_trigger_cycle_in_progress());
async_class.get_handler().wait_for_trigger_cycle_to_finish();
ASSERT_FALSE(async_class.get_handler().is_trigger_cycle_in_progress());
Expand All @@ -136,7 +136,7 @@ TEST_F(AsyncFunctionHandlerTest, check_triggering)
ASSERT_TRUE(async_class.get_handler().is_initialized());
ASSERT_TRUE(async_class.get_handler().is_running());
ASSERT_FALSE(async_class.get_handler().is_stopped());
async_class.get_handler().stop_async_update();
async_class.get_handler().stop_thread();
ASSERT_LE(async_class.get_counter(), 2);

// now the async update should be preempted
Expand All @@ -153,7 +153,7 @@ TEST_F(AsyncFunctionHandlerTest, trigger_for_several_cycles)
ASSERT_TRUE(async_class.get_handler().is_initialized());
ASSERT_FALSE(async_class.get_handler().is_running());
ASSERT_FALSE(async_class.get_handler().is_stopped());
async_class.get_handler().start_async_update_thread();
async_class.get_handler().start_thread();

EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE);

Expand All @@ -176,7 +176,7 @@ TEST_F(AsyncFunctionHandlerTest, trigger_for_several_cycles)
// Make sure that the failed triggers are less than 0.1%
ASSERT_LT(missed_triggers, static_cast<int>(0.001 * total_cycles))
<< "The missed triggers cannot be more than 0.1%!";
async_class.get_handler().stop_async_update();
async_class.get_handler().stop_thread();

// now the async update should be preempted
ASSERT_FALSE(async_class.get_handler().is_running());
Expand All @@ -195,7 +195,7 @@ TEST_F(AsyncFunctionHandlerTest, test_with_deactivate_and_activate_cycles)
ASSERT_TRUE(async_class.get_handler().is_initialized());
ASSERT_FALSE(async_class.get_handler().is_running());
ASSERT_FALSE(async_class.get_handler().is_stopped());
async_class.get_handler().start_async_update_thread();
async_class.get_handler().start_thread();
ASSERT_TRUE(async_class.get_handler().is_running());
ASSERT_FALSE(async_class.get_handler().is_stopped());
EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_INACTIVE);
Expand Down Expand Up @@ -252,7 +252,7 @@ TEST_F(AsyncFunctionHandlerTest, test_with_deactivate_and_activate_cycles)
ASSERT_FALSE(async_class.get_handler().is_stopped());

// now the async update should be preempted
async_class.get_handler().stop_async_update();
async_class.get_handler().stop_thread();
ASSERT_FALSE(async_class.get_handler().is_running());
ASSERT_TRUE(async_class.get_handler().is_stopped());
}

0 comments on commit e2b0924

Please sign in to comment.