Skip to content

Commit

Permalink
Add spin_until_complete
Browse files Browse the repository at this point in the history
Co-authored-by: Hubert Liberacki <[email protected]>
Co-authored-by: Tomoya Fujita <[email protected]>
Signed-off-by: Hubert Liberacki <[email protected]>
Signed-off-by: Tomoya Fujita <[email protected]>
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
3 people committed Apr 3, 2024
1 parent f9c4894 commit 449c07a
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 22 deletions.
71 changes: 49 additions & 22 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,32 +350,22 @@ class Executor
virtual void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/// Spin (blocking) until the condition is complete, it times out waiting, or rclcpp is
/// interrupted.
/**
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
* accessed without blocking (though it may still throw an exception).
* \param[in] condition The callable condition to wait on.
* \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
template<typename DurationT = std::chrono::milliseconds>
FutureReturnCode
spin_until_future_complete(
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
spin_until_complete(
const std::function<bool(void)> condition,
DurationT timeout = DurationT(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
Expand All @@ -384,17 +374,20 @@ class Executor
}
std::chrono::nanoseconds timeout_left = timeout_ns;

// Preliminary check, finish if condition is done already.
if (condition()) {
return FutureReturnCode::SUCCESS;
}

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
throw std::runtime_error("spin_until_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
if (condition()) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
Expand All @@ -410,10 +403,44 @@ class Executor
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
// The condition did not pass before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

/// Spin (blocking) for at least the given amount of duration.
/**
* \param[in] duration gets passed to Executor::spin_node_once,
* spins the executor for given duration.
*/
template<typename DurationT>
void
spin_for(DurationT duration)
{
(void)spin_until_complete([]() {return false;}, duration);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
* accessed without blocking (though it may still throw an exception).
* \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
FutureReturnCode
spin_until_future_complete(
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
const auto condition = [&future]() {
return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
};
return spin_until_complete(condition, timeout);
}

/// Cancel any running spin* function, causing it to return.
/**
* This function can be called asynchonously from any thread.
Expand Down
63 changes: 63 additions & 0 deletions rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,48 @@ namespace executors
using rclcpp::executors::MultiThreadedExecutor;
using rclcpp::executors::SingleThreadedExecutor;

/// Spin (blocking) until the conditon is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
* \param[in] node_ptr The node to spin.
* \param[in] condition The callable condition to wait on.
* \param[in] timeout Optional timeout parameter, which gets passed to
* Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a `TIMEOUT` return code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const std::function<bool(void)> condition,
DurationT timeout = DurationT(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_complete(condition, timeout);
executor.remove_node(node_ptr);
return retcode;
}

template<typename NodeT = rclcpp::Node, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const std::function<bool(void)> & condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::executors::spin_node_until_complete(
executor,
node_ptr->get_node_base_interface(),
condition,
timeout);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
Expand Down Expand Up @@ -113,6 +155,27 @@ spin_node_until_future_complete(

} // namespace executors

template<typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const std::function<bool(void)> condition,
DurationT timeout = DurationT(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_complete(executor, node_ptr, condition, timeout);
}

template<typename NodeT = rclcpp::Node, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
std::shared_ptr<NodeT> node_ptr,
const std::function<bool(void)> condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), condition, timeout);
}

template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_until_future_complete(
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/rclcpp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* - Executors (responsible for execution of callbacks through a blocking spin):
* - rclcpp::spin()
* - rclcpp::spin_some()
* - rclcpp::spin_until_complete()
* - rclcpp::spin_until_future_complete()
* - rclcpp::executors::SingleThreadedExecutor
* - rclcpp::executors::SingleThreadedExecutor::add_node()
Expand Down
20 changes: 20 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,26 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureComplete)
EXPECT_EQ(rclcpp::FutureReturnCode::SUCCESS, ret);
}

// Check executor exits immediately if condition is complete.
TYPED_TEST(TestExecutors, testSpinUntilCompleteCallable)
{
using ExecutorType = TypeParam;
ExecutorType executor;
executor.add_node(this->node);

// test success of an immediately completed condition
auto condition = []() {return true;};

// spin_until_complete is expected to exit immediately, but would block up until its
// timeout if the future is not checked before spin_once_impl.
auto start = std::chrono::steady_clock::now();
auto ret = executor.spin_until_complete(condition, 1s);
executor.remove_node(this->node, true);
// Check it didn't reach timeout
EXPECT_GT(500ms, (std::chrono::steady_clock::now() - start));
EXPECT_EQ(rclcpp::FutureReturnCode::SUCCESS, ret);
}

// Same test, but uses a shared future.
TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete)
{
Expand Down
40 changes: 40 additions & 0 deletions rclcpp/test/rclcpp/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,37 @@ TEST_F(TestExecutor, spin_some_elapsed) {
ASSERT_TRUE(timer_called);
}

TEST_F(TestExecutor, spin_for_duration) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
bool timer_called = false;
auto timer =
node->create_wall_timer(
std::chrono::milliseconds(0), [&]() {
timer_called = true;
});
dummy.add_node(node);
// Wait for the wall timer to have expired.
dummy.spin_for(std::chrono::milliseconds(0));

ASSERT_TRUE(timer_called);
}

TEST_F(TestExecutor, spin_for_longer_timer) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
bool timer_called = false;
auto timer =
node->create_wall_timer(
std::chrono::seconds(10), [&]() {
timer_called = true;
});
dummy.add_node(node);
dummy.spin_for(std::chrono::milliseconds(5));

ASSERT_FALSE(timer_called);
}

TEST_F(TestExecutor, spin_once_in_spin_once) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
Expand Down Expand Up @@ -488,6 +519,15 @@ TEST_F(TestExecutor, spin_until_future_complete_future_already_complete) {
dummy.spin_until_future_complete(future, std::chrono::milliseconds(1)));
}

TEST_F(TestExecutor, spin_until_complete_condition_already_complete) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
auto condition = []() {return true;};
EXPECT_EQ(
rclcpp::FutureReturnCode::SUCCESS,
dummy.spin_until_complete(condition, std::chrono::milliseconds(1)));
}

TEST_F(TestExecutor, is_spinning) {
DummyExecutor dummy;
ASSERT_FALSE(dummy.is_spinning());
Expand Down

0 comments on commit 449c07a

Please sign in to comment.