-
Notifications
You must be signed in to change notification settings - Fork 227
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Zhen Ju <[email protected]>
- Loading branch information
1 parent
42d95ea
commit cbe3290
Showing
8 changed files
with
336 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Copyright 2020 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from rclpy.node import Node | ||
|
||
|
||
class TestFoo(Node): | ||
def __init__(self, node_name='test_foo', **kwargs): | ||
super().__init__(node_name, **kwargs) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
# Copyright 2020 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import unittest | ||
import rclpy | ||
from multiprocessing import Process | ||
from rclpy.client import Client | ||
from rclpy.executors import SingleThreadedExecutor, MultiThreadedExecutor | ||
from rclpy.node import Node | ||
from composition_interfaces.srv import ListNodes, LoadNode, UnloadNode | ||
from rclpy_components.component_manager import ComponentManager | ||
|
||
TEST_COMPOSITION = 'test_composition' | ||
TEST_COMPOSITION_FOO = 'test_composition::TestFoo' | ||
TEST_COMPOSITION_NODE_NAME = '/testfoo' | ||
|
||
|
||
def run_container(container_name, multi_thread): | ||
rclpy.init() | ||
if multi_thread: | ||
executor = MultiThreadedExecutor() | ||
else: | ||
executor = SingleThreadedExecutor() | ||
|
||
component_manager = ComponentManager(executor, container_name) | ||
executor.add_node(component_manager) | ||
try: | ||
executor.spin() | ||
except Exception as e: | ||
print(e) | ||
pass | ||
|
||
component_manager.destroy_node() | ||
rclpy.shutdown() | ||
|
||
|
||
class TestComponentManager(unittest.TestCase): | ||
helper_node: Node = None | ||
container_process: Process = None | ||
|
||
container_name = 'TestComponentManager' | ||
# service names & clients will be generated with container_name | ||
load_node_svc_name = "" | ||
unload_node_svc_name = "" | ||
list_node_svc_name = "" | ||
load_cli: Client = None | ||
unload_cli: Client = None | ||
list_cli: Client = None | ||
|
||
use_multi_threaded_executor = False | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.load_node_svc_name = f'{cls.container_name}/_container/load_node' | ||
cls.unload_node_svc_name = f'{cls.container_name}/_container/unload_node' | ||
cls.list_node_svc_name = f'{cls.container_name}/_container/list_nodes' | ||
|
||
# Start the test component manager in the background | ||
cls.container_process = Process(target=run_container, | ||
args=(cls.container_name, cls.use_multi_threaded_executor)) | ||
cls.container_process.start() | ||
|
||
# Setup the helper_node, which will help create client and test the services | ||
rclpy.init() | ||
cls.helper_node = rclpy.create_node('helper_node') | ||
cls.load_cli = cls.helper_node.create_client(LoadNode, cls.load_node_svc_name) | ||
cls.unload_cli = cls.helper_node.create_client(UnloadNode, cls.unload_node_svc_name) | ||
cls.list_cli = cls.helper_node.create_client(ListNodes, cls.list_node_svc_name) | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
cls.container_process.terminate() | ||
cls.load_cli.destroy() | ||
cls.unload_cli.destroy() | ||
cls.list_cli.destroy() | ||
cls.helper_node.destroy_node() | ||
rclpy.shutdown() | ||
|
||
@classmethod | ||
def load_node(cls, package_name, plugin_name, node_name="", node_namespace=""): | ||
if not cls.load_cli.wait_for_service(timeout_sec=5.0): | ||
raise RuntimeError(f'No load service found in /{cls.container_name}') | ||
|
||
load_req = LoadNode.Request() | ||
load_req.package_name = package_name | ||
load_req.plugin_name = plugin_name | ||
|
||
if node_name != "": | ||
load_req.node_name = node_name | ||
if node_namespace != "": | ||
load_req.node_namespace = node_namespace | ||
|
||
future = cls.load_cli.call_async(load_req) | ||
rclpy.spin_until_future_complete(cls.helper_node, future) | ||
return future.result() | ||
|
||
@classmethod | ||
def unload_node(cls, unique_id): | ||
if not cls.unload_cli.wait_for_service(timeout_sec=5.0): | ||
raise RuntimeError(f'No unload service found in /{cls.container_name}') | ||
|
||
unload_req = UnloadNode.Request() | ||
unload_req.unique_id = unique_id | ||
|
||
future = cls.unload_cli.call_async(unload_req) | ||
rclpy.spin_until_future_complete(cls.helper_node, future) | ||
return future.result() | ||
|
||
@classmethod | ||
def list_nodes(cls): | ||
if not cls.list_cli.wait_for_service(timeout_sec=5.0): | ||
raise RuntimeError(f'No list service found in {cls.container_name}') | ||
list_req = ListNodes.Request() | ||
future = cls.list_cli.call_async(list_req) | ||
rclpy.spin_until_future_complete(cls.helper_node, future) | ||
return future.result() | ||
|
||
def load_node_test(self): | ||
load_res = self.__class__.load_node(TEST_COMPOSITION, TEST_COMPOSITION_FOO) | ||
assert load_res.success is True | ||
assert load_res.error_message == "" | ||
assert load_res.unique_id == 1 | ||
assert load_res.full_node_name == TEST_COMPOSITION_NODE_NAME | ||
|
||
node_name = "renamed_node" | ||
node_ns = 'testing_ns' | ||
remap_load_res = self.__class__.load_node( | ||
TEST_COMPOSITION, TEST_COMPOSITION_FOO, node_name=node_name, | ||
node_namespace=node_ns) | ||
assert remap_load_res.success is True | ||
assert remap_load_res.error_message == "" | ||
assert remap_load_res.unique_id == 2 | ||
assert remap_load_res.full_node_name == f'/{node_ns}/{node_name}' | ||
|
||
list_res: ListNodes.Response = self.__class__.list_nodes() | ||
assert len(list_res.unique_ids) == len(list_res.full_node_names) == 2 | ||
|
||
def unload_node_test(self): | ||
if not self.__class__.unload_cli.wait_for_service(timeout_sec=5.0): | ||
raise RuntimeError(f'no unload service found in {self.__class__.container_name}') | ||
|
||
unload_res: UnloadNode.Response = self.__class__.unload_node(1) | ||
assert unload_res.success is True | ||
assert unload_res.error_message == "" | ||
|
||
# Should be only 1 node left | ||
list_res: ListNodes.Response = self.__class__.list_nodes() | ||
assert len(list_res.full_node_names) == len(list_res.unique_ids) == 1 | ||
|
||
# The index definitely won't exist | ||
unload_res: UnloadNode.Response = self.__class__.unload_node(1000) | ||
assert unload_res.error_message != "" | ||
assert unload_res.success is False | ||
list_res: ListNodes.Response = self.__class__.list_nodes() | ||
assert len(list_res.full_node_names) == len(list_res.unique_ids) == 1 | ||
|
||
# Unload the last node | ||
unload_req = UnloadNode.Request() | ||
unload_req.unique_id = 2 | ||
future = self.__class__.unload_cli.call_async(unload_req) | ||
rclpy.spin_until_future_complete(self.__class__.helper_node, future) | ||
unload_res: UnloadNode.Response = future.result() | ||
assert unload_res.success is True | ||
assert unload_res.error_message == "" | ||
|
||
# Won't be any node in the container | ||
list_res: ListNodes.Response = self.__class__.list_nodes() | ||
assert len(list_res.full_node_names) == len(list_res.unique_ids) == 0 | ||
|
||
def list_nodes_test(self): | ||
container_name = self.__class__.container_name | ||
print(f'{container_name}: list_nodes tested within test_load_node and test_unload_node') | ||
|
||
def test_composition_api(self): | ||
# Control the order of test | ||
self.load_node_test() | ||
self.unload_node_test() | ||
self.list_nodes_test() | ||
|
||
|
||
class TestComponentManagerMT(TestComponentManager): | ||
use_multi_threaded_executor = True | ||
container_name = 'TestComponentManagerMT' | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
# Copyright 2020 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import unittest | ||
import rclpy | ||
from composition_interfaces.srv import ListNodes, LoadNode, UnloadNode | ||
from rclpy.executors import Executor, SingleThreadedExecutor, MultiThreadedExecutor | ||
from rclpy_components.component_manager import ComponentManager | ||
|
||
TEST_COMPOSITION = 'test_composition' | ||
TEST_COMPOSITION_FOO = 'test_composition::TestFoo' | ||
|
||
|
||
class TestComponentManagerUT(unittest.TestCase): | ||
executor: Executor | ||
component_manager: ComponentManager | ||
use_multi_threaded_executor: bool = False | ||
container_name: str = "ut_container" | ||
|
||
@classmethod | ||
def setUpClass(cls) -> None: | ||
rclpy.init() | ||
|
||
if cls.use_multi_threaded_executor: | ||
cls.executor = MultiThreadedExecutor() | ||
else: | ||
cls.executor = SingleThreadedExecutor() | ||
|
||
cls.executor = SingleThreadedExecutor() | ||
cls.component_manager = ComponentManager(cls.executor, cls.container_name) | ||
cls.executor.add_node(cls.component_manager) | ||
|
||
@classmethod | ||
def tearDownClass(cls) -> None: | ||
cls.executor.remove_node(cls.component_manager) | ||
cls.executor.shutdown() | ||
rclpy.shutdown() | ||
|
||
def list_nodes(self): | ||
res = ListNodes.Response() | ||
req = ListNodes.Request() | ||
self.__class__.component_manager.on_list_node(req, res) | ||
|
||
return res.unique_ids, res.full_node_names | ||
|
||
def test_gen_unique_id(self): | ||
current_index = self.component_manager.gen_unique_id() | ||
assert current_index == 1 # The unique id start from 1 | ||
|
||
def test_list_node(self): | ||
unique_ids, full_node_names = self.list_nodes() | ||
assert len(full_node_names) == 0 | ||
assert len(unique_ids) == 0 | ||
|
||
def test_load_node(self): | ||
mock_res = LoadNode.Response() | ||
mock_req = LoadNode.Request() | ||
mock_req.package_name = TEST_COMPOSITION | ||
mock_req.plugin_name = TEST_COMPOSITION_FOO | ||
self.component_manager.on_load_node(mock_req, mock_res) | ||
|
||
print(mock_res.success, mock_res.error_message) | ||
|
||
unique_ids, full_node_names = self.list_nodes() | ||
assert len(unique_ids) == 1 | ||
assert len(full_node_names) == 1 | ||
|
||
def test_unload_node(self): | ||
mock_res = UnloadNode.Response() | ||
mock_req = UnloadNode.Request() | ||
|
||
# Unload a non-existing node | ||
mock_req.unique_id = 0 | ||
self.component_manager.on_unload_node(mock_req, mock_res) | ||
assert mock_res.error_message | ||
assert (not mock_res.success) | ||
|
||
# Unload the first node | ||
ids, node_names = self.list_nodes() | ||
mock_req.unique_id = ids[0] | ||
# Don't forget to remove the previous test results | ||
mock_res = UnloadNode.Response() | ||
self.component_manager.on_unload_node(mock_req, mock_res) | ||
assert mock_res.success | ||
assert (not mock_res.error_message) | ||
|
||
# There should be (n-1) nodes left | ||
unique_ids, full_node_names = self.list_nodes() | ||
assert len(unique_ids) == len(ids) - 1 | ||
assert len(full_node_names) == len(node_names) - 1 | ||
|
||
|
||
class TestComponentManagerUTMT(TestComponentManagerUT): | ||
use_multi_threaded_executor = True | ||
container_name = 'ut_component_mt' | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters