Skip to content

Commit

Permalink
Replace spin_until_future_complete with spin_until_complete, add spin…
Browse files Browse the repository at this point in the history
…_for method

* Deprecate spin_until_future_complete
* Add spin_until_complete
* Add spin_for method
* Udpdate unit tests

Signed-off-by: Hubert Liberacki <[email protected]>
  • Loading branch information
hliberacki committed Apr 3, 2022
1 parent 8e8c978 commit cf96887
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 75 deletions.
57 changes: 50 additions & 7 deletions rclpy/rclpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
After a node is created, items of work can be done (e.g. subscription callbacks) by *spinning* on
the node.
The following functions can be used to process work that is waiting to be executed: :func:`spin`,
:func:`spin_once`, and :func:`spin_until_future_complete`.
:func:`spin_once`, and :func:`spin_until_complete`.
When finished with a previously initialized :class:`.Context` (ie. done using
all ROS nodes associated with the context), the :func:`shutdown` function should be called.
Expand Down Expand Up @@ -224,6 +224,52 @@ def spin(node: 'Node', executor: 'Executor' = None) -> None:
executor.remove_node(node)


def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None:
"""
Execute block until the context associated with the executor is shutdown or time duration pass.
Callbacks will be executed by the provided executor.
This function blocks.
:param node: A node to add to the executor to check for work.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_once(duration_sec)
finally:
executor.remove_node(node)


def spin_until_complete(
node: 'Node',
condition,
executor: 'Executor' = None,
timeout_sec: float = None
) -> None:
"""
Execute work until the condition is complete.
Callbacks and other work will be executed by the provided executor until ``condition()`` or
``future.done()`` returns ``True`` or the context associated with the executor is shutdown.
:param node: A node to add to the executor to check for work.
:param condition: The callable or future object to wait on.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait. Block until the condition is complete
if ``None`` or negative. Don't wait if 0.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_complete(condition, timeout_sec)
finally:
executor.remove_node(node)


def spin_until_future_complete(
node: 'Node',
future: Future,
Expand All @@ -241,10 +287,7 @@ def spin_until_future_complete(
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait. Block until the future is complete
if ``None`` or negative. Don't wait if 0.
Deprecated in favor of spin_until_complete
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_future_complete(future, timeout_sec)
finally:
executor.remove_node(node)
spin_until_complete(node, future, executor, timeout_sec)
70 changes: 60 additions & 10 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,28 +278,52 @@ def spin(self) -> None:
while self._context.ok() and not self._is_shutdown:
self.spin_once()

def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
# Make sure the future wakes this executor when it is done
future.add_done_callback(lambda x: self.wake())
def spin_for(self, duration_sec: float = None) -> None:
"""Execute callbacks until shutdown, or timeout."""
self.spin_until_complete(lambda: False, duration_sec)

def spin_until_complete(self, condition, timeout_sec: float = None) -> None:
"""
Execute callbacks until a given condition is done or a timeout occurs.
Deprecated in favor of spin_until_complete.
"""
# Common conditon for safisfying both Callable and Future
finish_condition = None
if (isinstance(condition, Future)):
# Make sure the future wakes this executor when it is done
condition.add_done_callback(lambda x: self.wake())
def finish_condition(): return condition.done()
elif (callable(condition)):
def finish_condition(): return condition()
else:
raise TypeError('Condition has to be of Future or Callable type')

if timeout_sec is None or timeout_sec < 0:
while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_sec)
while self._context.ok() and not finish_condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_sec)
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = timeout_sec

while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_left)
while self._context.ok() and not finish_condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_left)
now = time.monotonic()

if now >= end:
return

timeout_left = end - now

def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
"""
Execute callbacks until a given future is done or a timeout occurs.
Deprecated in favor of spin_until_complete.
"""
self.spin_until_complete(future, timeout_sec)

def spin_once(self, timeout_sec: float = None) -> None:
"""
Wait for and execute a single callback.
Expand All @@ -311,6 +335,19 @@ def spin_once(self, timeout_sec: float = None) -> None:
"""
raise NotImplementedError()

def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None:
"""
Wait for and execute a single callback.
This should behave in the same way as :meth:`spin_once`.
If needed by the implementation, it should awake other threads waiting.
:param condition: The executor will wait until this condition is done.
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
"""
raise NotImplementedError()

def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
"""
Wait for and execute a single callback.
Expand All @@ -321,6 +358,8 @@ def spin_once_until_future_complete(self, future: Future, timeout_sec: float = N
:param future: The executor will wait until this future is done.
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
Deprecated in favor of spin_once_until_complete.
"""
raise NotImplementedError()

Expand Down Expand Up @@ -711,9 +750,14 @@ def spin_once(self, timeout_sec: float = None) -> None:
if handler.exception() is not None:
raise handler.exception()

def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None:
self.spin_once(timeout_sec)

"""Deprecated in favor of spin_once_until_complete"""

def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
self.spin_once_until_complete(timeout_sec)


class MultiThreadedExecutor(Executor):
"""
Expand Down Expand Up @@ -756,5 +800,11 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: float = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None:
self._spin_once_impl(timeout_sec, condition if callable(
condition) else condition.done)

"""Deprecated in favor of spin_once_until_complete"""

def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
self._spin_once_impl(timeout_sec, future.done)
self.spin_once_until_complete(timeout_sec, future.done)
26 changes: 13 additions & 13 deletions rclpy/test/test_action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def test_send_goal_async(self):
try:
self.assertTrue(ac.wait_for_server(timeout_sec=2.0))
future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)
self.assertTrue(future.done())
goal_handle = future.result()
self.assertTrue(goal_handle.accepted)
Expand All @@ -177,7 +177,7 @@ def test_send_goal_async_with_feedback_after_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback after goal has been accepted
self.mock_action_server.publish_feedback(goal_uuid)
Expand All @@ -202,7 +202,7 @@ def test_send_goal_async_with_feedback_before_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Check the feedback was not received
self.assertEqual(self.feedback, None)
Expand All @@ -220,14 +220,14 @@ def test_send_goal_async_with_feedback_for_another_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=first_goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Send another goal, but without a feedback callback
second_goal_uuid = UUID(uuid=list(uuid.uuid4().bytes))
future = ac.send_goal_async(
Fibonacci.Goal(),
goal_uuid=second_goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback for the second goal
self.mock_action_server.publish_feedback(second_goal_uuid)
Expand All @@ -251,7 +251,7 @@ def test_send_goal_async_with_feedback_for_not_a_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback for a non-existent goal ID
self.mock_action_server.publish_feedback(UUID(uuid=list(uuid.uuid4().bytes)))
Expand All @@ -272,9 +272,9 @@ def test_send_goal_multiple(self):
future_0 = ac.send_goal_async(Fibonacci.Goal())
future_1 = ac.send_goal_async(Fibonacci.Goal())
future_2 = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, future_0, executor)
rclpy.spin_until_future_complete(self.node, future_1, executor)
rclpy.spin_until_future_complete(self.node, future_2, executor)
rclpy.spin_until_complete(self.node, future_0, executor)
rclpy.spin_until_complete(self.node, future_1, executor)
rclpy.spin_until_complete(self.node, future_2, executor)
self.assertTrue(future_0.done())
self.assertTrue(future_1.done())
self.assertTrue(future_2.done())
Expand All @@ -300,13 +300,13 @@ def test_send_cancel_async(self):

# Send a goal
goal_future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
rclpy.spin_until_complete(self.node, goal_future, self.executor)
self.assertTrue(goal_future.done())
goal_handle = goal_future.result()

# Cancel the goal
cancel_future = goal_handle.cancel_goal_async()
rclpy.spin_until_future_complete(self.node, cancel_future, self.executor)
rclpy.spin_until_complete(self.node, cancel_future, self.executor)
self.assertTrue(cancel_future.done())
self.assertEqual(
cancel_future.result().goals_canceling[0].goal_id,
Expand All @@ -321,13 +321,13 @@ def test_get_result_async(self):

# Send a goal
goal_future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
rclpy.spin_until_complete(self.node, goal_future, self.executor)
self.assertTrue(goal_future.done())
goal_handle = goal_future.result()

# Get the goal result
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self.node, result_future, self.executor)
rclpy.spin_until_complete(self.node, result_future, self.executor)
self.assertTrue(result_future.done())
finally:
ac.destroy()
Expand Down
Loading

0 comments on commit cf96887

Please sign in to comment.