Skip to content

Commit

Permalink
Replace spin_until_future_complete with spin_until_complete, add spin…
Browse files Browse the repository at this point in the history
…_for method

Co-authored-by: Hubert Liberacki <[email protected]>
Signed-off-by: Hubert Liberacki <[email protected]>
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard and hliberacki committed Apr 2, 2024
1 parent d2fd8fe commit fa1df4d
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 122 deletions.
60 changes: 53 additions & 7 deletions rclpy/rclpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@
After a node is created, items of work can be done (e.g. subscription callbacks) by *spinning* on
the node.
The following functions can be used to process work that is waiting to be executed: :func:`spin`,
:func:`spin_once`, and :func:`spin_until_future_complete`.
:func:`spin_once`, and :func:`spin_until_complete`.
When finished with a previously initialized :class:`.Context` (ie. done using
all ROS nodes associated with the context), the :func:`shutdown` function should be called.
This will invalidate all entities derived from the context.
"""

from typing import Callable
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union

import warnings

from rclpy.context import Context
from rclpy.parameter import Parameter
Expand Down Expand Up @@ -242,6 +246,52 @@ def spin(node: 'Node', executor: Optional['Executor'] = None) -> None:
executor.remove_node(node)


def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None:
"""
Execute block until the context associated with the executor is shutdown or time duration pass.
Callbacks will be executed by the provided executor.
This function blocks.
:param node: A node to add to the executor to check for work.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_for(duration_sec)
finally:
executor.remove_node(node)


def spin_until_complete(
node: 'Node',
condition: Union[Future, Callable[[], bool]],
executor: Optional['Executor'] = None,
timeout_sec: Optional[float] = None,
) -> None:
"""
Execute work until the condition is complete.
Callbacks and other work will be executed by the provided executor until ``condition()`` or
``future.done()`` returns ``True`` or the context associated with the executor is shutdown.
:param node: A node to add to the executor to check for work.
:param condition: The callable or future object to wait on.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait. Block until the condition is complete
if ``None`` or negative. Don't wait if 0.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_complete(condition, timeout_sec)
finally:
executor.remove_node(node)


def spin_until_future_complete(
node: 'Node',
future: Future,
Expand All @@ -260,9 +310,5 @@ def spin_until_future_complete(
:param timeout_sec: Seconds to wait. Block until the future is complete
if ``None`` or negative. Don't wait if 0.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_future_complete(future, timeout_sec)
finally:
executor.remove_node(node)
warnings.warn('Deprecated in favor of spin_until_complete.')
spin_until_complete(node, future, executor, timeout_sec)
79 changes: 67 additions & 12 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,32 +307,53 @@ def spin(self) -> None:
while self._context.ok() and not self._is_shutdown:
self.spin_once()

def spin_until_future_complete(
def spin_for(self, duration_sec: Optional[float] = None) -> None:
"""Execute callbacks until shutdown, or timeout."""
self.spin_until_complete(lambda: False, duration_sec)

def spin_until_complete(
self,
future: Future,
timeout_sec: Optional[float] = None
condition: Union[Future, Callable[[], bool]],
timeout_sec: Optional[float] = None,
) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
# Make sure the future wakes this executor when it is done
future.add_done_callback(lambda x: self.wake())
"""Execute callbacks until a given condition is done or a timeout occurs."""
# Common conditon for safisfying both Callable and Future
finish_condition = None
if (isinstance(condition, Future)):
# Make sure the future wakes this executor when it is done
condition.add_done_callback(lambda x: self.wake())
def finish_condition(): return condition.done()
elif (callable(condition)):
def finish_condition(): return condition()
else:
raise TypeError('Condition has to be of Future or Callable type')

if timeout_sec is None or timeout_sec < 0:
while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_sec)
while self._context.ok() and not finish_condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_sec)
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = TimeoutObject(timeout_sec)

while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_left)
while self._context.ok() and not finish_condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_left)
now = time.monotonic()

if now >= end:
return

timeout_left.timeout = end - now

def spin_until_future_complete(
self,
future: Future,
timeout_sec: Optional[float] = None,
) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
warnings.warn('Deprecated in favor of spin_until_complete.')
self.spin_until_complete(future, timeout_sec)

def spin_once(self, timeout_sec: Optional[float] = None) -> None:
"""
Wait for and execute a single callback.
Expand All @@ -346,6 +367,23 @@ def spin_once(self, timeout_sec: Optional[float] = None) -> None:
"""
raise NotImplementedError()

def spin_once_until_complete(
self,
condition: Union[Future, Callable[[], bool]],
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
) -> None:
"""
Wait for and execute a single callback.
This should behave in the same way as :meth:`spin_once`.
If needed by the implementation, it should awake other threads waiting.
:param condition: The executor will wait until this condition is done.
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
"""
raise NotImplementedError()

def spin_once_until_future_complete(
self,
future: Future,
Expand All @@ -361,6 +399,7 @@ def spin_once_until_future_complete(
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
"""
warnings.warn('Deprecated in favor of spin_once_until_complete.')
raise NotImplementedError()

def _take_timer(self, tmr):
Expand Down Expand Up @@ -826,13 +865,21 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
) -> None:
self._spin_once_impl(timeout_sec, condition)

def spin_once_until_future_complete(
self,
future: Future,
timeout_sec: Optional[Union[float, TimeoutObject]] = None
) -> None:
warnings.warn('Deprecated in favor of spin_once_until_complete.')
future.add_done_callback(lambda x: self.wake())
self._spin_once_impl(timeout_sec, future.done)
self.spin_once_until_complete(future.done, timeout_sec)


class MultiThreadedExecutor(Executor):
Expand Down Expand Up @@ -898,10 +945,18 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: float = None,
) -> None:
self._spin_once_impl(timeout_sec, condition)

def spin_once_until_future_complete(
self,
future: Future,
timeout_sec: Optional[Union[float, TimeoutObject]] = None
) -> None:
warnings.warn('Deprecated in favor of spin_once_until_complete.')
future.add_done_callback(lambda x: self.wake())
self._spin_once_impl(timeout_sec, future.done)
self.spin_once_until_complete(future.done, timeout_sec)
2 changes: 1 addition & 1 deletion rclpy/rclpy/parameter_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
Parameter('int_param', Parameter.Type.INTEGER, 88),
Parameter('string/param', Parameter.Type.STRING, 'hello world').to_parameter_msg(),
])
self.executor.spin_until_future_complete(future)
self.executor.spin_until_complete(future)
results = future.result() # rcl_interfaces.srv.SetParameters.Response
For more on service names, see: `ROS 2 docs`_.
Expand Down
30 changes: 15 additions & 15 deletions rclpy/test/test_action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def test_send_goal_async(self):
try:
self.assertTrue(ac.wait_for_server(timeout_sec=2.0))
future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)
self.assertTrue(future.done())
goal_handle = future.result()
self.assertTrue(goal_handle.accepted)
Expand All @@ -177,7 +177,7 @@ def test_send_goal_async_with_feedback_after_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback after goal has been accepted
self.mock_action_server.publish_feedback(goal_uuid)
Expand All @@ -202,7 +202,7 @@ def test_send_goal_async_with_feedback_before_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Check the feedback was not received
self.assertEqual(self.feedback, None)
Expand All @@ -220,12 +220,12 @@ def test_send_goal_async_with_feedback_after_goal_result_requested(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
rclpy.spin_until_complete(self.node, goal_future, self.executor)
self.assertTrue(goal_future.done())
# Then request result
goal_handle = goal_future.result()
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self.node, result_future, self.executor)
rclpy.spin_until_complete(self.node, result_future, self.executor)
self.assertTrue(result_future.done())

# Publish feedback after goal result is requested
Expand All @@ -246,14 +246,14 @@ def test_send_goal_async_with_feedback_for_another_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=first_goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Send another goal, but without a feedback callback
second_goal_uuid = UUID(uuid=list(uuid.uuid4().bytes))
future = ac.send_goal_async(
Fibonacci.Goal(),
goal_uuid=second_goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback for the second goal
self.mock_action_server.publish_feedback(second_goal_uuid)
Expand All @@ -277,7 +277,7 @@ def test_send_goal_async_with_feedback_for_not_a_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback for a non-existent goal ID
self.mock_action_server.publish_feedback(UUID(uuid=list(uuid.uuid4().bytes)))
Expand All @@ -298,9 +298,9 @@ def test_send_goal_multiple(self):
future_0 = ac.send_goal_async(Fibonacci.Goal())
future_1 = ac.send_goal_async(Fibonacci.Goal())
future_2 = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, future_0, executor)
rclpy.spin_until_future_complete(self.node, future_1, executor)
rclpy.spin_until_future_complete(self.node, future_2, executor)
rclpy.spin_until_complete(self.node, future_0, executor)
rclpy.spin_until_complete(self.node, future_1, executor)
rclpy.spin_until_complete(self.node, future_2, executor)
self.assertTrue(future_0.done())
self.assertTrue(future_1.done())
self.assertTrue(future_2.done())
Expand All @@ -326,13 +326,13 @@ def test_send_cancel_async(self):

# Send a goal
goal_future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
rclpy.spin_until_complete(self.node, goal_future, self.executor)
self.assertTrue(goal_future.done())
goal_handle = goal_future.result()

# Cancel the goal
cancel_future = goal_handle.cancel_goal_async()
rclpy.spin_until_future_complete(self.node, cancel_future, self.executor)
rclpy.spin_until_complete(self.node, cancel_future, self.executor)
self.assertTrue(cancel_future.done())
self.assertEqual(
cancel_future.result().goals_canceling[0].goal_id,
Expand All @@ -347,13 +347,13 @@ def test_get_result_async(self):

# Send a goal
goal_future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
rclpy.spin_until_complete(self.node, goal_future, self.executor)
self.assertTrue(goal_future.done())
goal_handle = goal_future.result()

# Get the goal result
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self.node, result_future, self.executor)
rclpy.spin_until_complete(self.node, result_future, self.executor)
self.assertTrue(result_future.done())
finally:
ac.destroy()
Expand Down
Loading

0 comments on commit fa1df4d

Please sign in to comment.