From 7c946d51819a24559505f013208e5305caa3b10d Mon Sep 17 00:00:00 2001 From: Tobias Wolf Date: Tue, 26 Mar 2024 11:42:45 +0100 Subject: [PATCH] Add initial testing foundation Signed-off-by: Tobias Wolf --- .github/workflows/pre-commit.yml | 1 + .pre-commit-config.yaml | 1 + pyproject.toml | 3 + setup.cfg | 4 + tests/mock_ceph.py | 29 +++++++ tests/mock_k8s.py | 23 ++++++ tests/mock_ssh_server.py | 136 +++++++++++++++++++++++++++++++ tests/modules/__init__.py | 0 tests/modules/test_example.py | 11 +++ tests/test_mock_ceph.py | 22 +++++ tests/test_mock_k8s.py | 22 +++++ tests/test_mock_ssh_server.py | 23 ++++++ 12 files changed, 275 insertions(+) create mode 100644 tests/mock_ceph.py create mode 100644 tests/mock_k8s.py create mode 100644 tests/mock_ssh_server.py create mode 100644 tests/modules/__init__.py create mode 100644 tests/modules/test_example.py create mode 100644 tests/test_mock_ceph.py create mode 100644 tests/test_mock_k8s.py create mode 100644 tests/test_mock_ssh_server.py diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 2b11178..f17e487 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -11,4 +11,5 @@ jobs: steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v3 + - run: python -m pip install .[tests] - uses: pre-commit/action@v3.0.1 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 898981e..baeef2d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,3 +22,4 @@ repos: args: [--strict, --ignore-missing-imports, --check-untyped-defs] additional_dependencies: - types-PyYAML + - types-paramiko==3.4.0.* diff --git a/pyproject.toml b/pyproject.toml index f841c8a..97da775 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,3 +24,6 @@ classifiers = [ [project.urls] Homepage = "https://scs.community" + +[tool.pytest.ini_options] +pythonpath = [ "src" ] diff --git a/setup.cfg b/setup.cfg index b1254e4..d1edfe8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,3 +3,7 @@ platforms = any [options] install_requires=file:requirements.txt + +[options.extras_require] +tests = + pytest==8.0.2 diff --git a/tests/mock_ceph.py b/tests/mock_ceph.py new file mode 100644 index 0000000..1f0d647 --- /dev/null +++ b/tests/mock_ceph.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +import json +from collections.abc import Callable +from rookify.modules.module import ModuleException +from typing import Any, Dict, List, Tuple + + +class MockCeph(object): + def __init__( + self, + config: Dict[str, Any], + _callable: Callable[[str, bytes], Tuple[int, bytes, str]], + ): + if not callable(_callable): + raise RuntimeError("Handler function given is invalid") + + self._callback_handler = _callable + + def mon_command( + self, command: str, inbuf: bytes, **kwargs: Any + ) -> Dict[str, Any] | List[Any]: + ret, outbuf, outstr = self._callback_handler(command, inbuf, **kwargs) + if ret != 0: + raise ModuleException("Ceph did return an error: {0!r}".format(outbuf)) + + data = json.loads(outbuf) + assert isinstance(data, dict) or isinstance(data, list) + return data diff --git a/tests/mock_k8s.py b/tests/mock_k8s.py new file mode 100644 index 0000000..548412b --- /dev/null +++ b/tests/mock_k8s.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- + +from collections.abc import Callable +from typing import Any, Dict, List + + +class MockK8s(object): + def __init__(self, _callable: Callable[[str], Any], name: str = "") -> None: + if not callable(_callable): + raise RuntimeError("Handler function given is invalid") + + self._callback_handler = _callable + self._attr_name = name + + def __call__(self, *args: List[Any], **kwargs: Dict[Any, Any]) -> Any: + return self._callback_handler(self._attr_name, *args, **kwargs) + + def __getattr__(self, name: str) -> Any: + attr_name = ( + name if self._attr_name == "" else "{0}.{1}".format(self._attr_name, name) + ) + + return MockK8s(self._callback_handler, attr_name) diff --git a/tests/mock_ssh_server.py b/tests/mock_ssh_server.py new file mode 100644 index 0000000..7127108 --- /dev/null +++ b/tests/mock_ssh_server.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- + + +from collections.abc import Callable +from socket import AF_INET, IPPROTO_TCP, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket +from threading import Event, RLock +from typing import Any, Optional + +from paramiko import ( # type: ignore[attr-defined] + AUTH_FAILED, + AUTH_SUCCESSFUL, + OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED, + OPEN_SUCCEEDED, + AutoAddPolicy, + Channel, + PKey, + RSAKey, + ServerInterface, + SSHClient, + Transport, +) + + +class MockSSHServer(ServerInterface): + """An ssh server accepting the pre-generated key.""" + + ssh_username = "pytest" + ssh_key = RSAKey.generate(4096) + + def __init__(self, _callable: Callable[[bytes, Channel], None]) -> None: + if not callable(_callable): + raise RuntimeError("Handler function given is invalid") + + ServerInterface.__init__(self) + + self._callback_handler = _callable + self._channel: Any = None + self._client: Optional[SSHClient] = None + self._command: Optional[bytes] = None + self.event = Event() + self._server_transport: Optional[Transport] = None + self._thread_lock = RLock() + + def __del__(self) -> None: + self.close() + + @property + def client(self) -> SSHClient: + with self._thread_lock: + if self._client is None: + connection_event = Event() + + server_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) + server_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + server_socket.bind(("127.0.0.1", 0)) + server_socket.listen() + + server_address = server_socket.getsockname() + + client_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) + client_socket.connect(server_address) + + (transport_socket, _) = server_socket.accept() + + self._server_transport = Transport(transport_socket) + self._server_transport.add_server_key(self.__class__.ssh_key) + self._server_transport.start_server(connection_event, self) + + self._client = SSHClient() + self._client.set_missing_host_key_policy(AutoAddPolicy()) + + self._client.connect( + server_address[0], + server_address[1], + username=self.__class__.ssh_username, + pkey=self.__class__.ssh_key, + sock=client_socket, + ) + + connection_event.wait() + + return self._client + + def check_channel_request(self, kind: str, chanid: int) -> int: + if kind == "session": + return OPEN_SUCCEEDED # type: ignore[no-any-return] + return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED # type: ignore[no-any-return] + + def check_auth_password(self, username: str, password: str) -> int: + return AUTH_FAILED # type: ignore[no-any-return] + + def check_auth_publickey(self, username: str, key: PKey) -> int: + if username == self.__class__.ssh_username and key == self.__class__.ssh_key: + return AUTH_SUCCESSFUL # type: ignore[no-any-return] + return AUTH_FAILED # type: ignore[no-any-return] + + def check_channel_exec_request(self, channel: Channel, command: bytes) -> bool: + if self.event.is_set(): + return False + + self.event.set() + + with self._thread_lock: + self._channel = channel + self._command = command + + if self._callback_handler is not None: + self.handle_exec_request(self._callback_handler) + + return True + + def close(self) -> None: + if self._server_transport is not None: + self._server_transport.close() + self._server_transport = None + + def get_allowed_auths(self, username: str) -> str: + if username == self.__class__.ssh_username: + return "publickey" + return "" + + def handle_exec_request(self, _callable: Callable[[bytes, Channel], None]) -> None: + if not callable(_callable): + raise RuntimeError("Handler function given is invalid") + + _callable(self._command, self._channel) # type: ignore[arg-type] + + if self._channel.recv_ready() is not True: + self._channel.send( + bytes("Command {0!r} invalid\n".format(self._command), "utf-8") + ) + + self._channel = None + self._client = None + + self.event.clear() diff --git a/tests/modules/__init__.py b/tests/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/modules/test_example.py b/tests/modules/test_example.py new file mode 100644 index 0000000..1ff294a --- /dev/null +++ b/tests/modules/test_example.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +import pytest + +from rookify.modules.example.main import ExampleHandler +from rookify.modules.module import ModuleException + + +def test_preflight() -> None: + with pytest.raises(ModuleException): + ExampleHandler({}, {}, "").preflight() diff --git a/tests/test_mock_ceph.py b/tests/test_mock_ceph.py new file mode 100644 index 0000000..4cb136b --- /dev/null +++ b/tests/test_mock_ceph.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +from typing import Any, Dict, Tuple +from unittest import TestCase + +from .mock_ceph import MockCeph + + +class TestMockCeph(TestCase): + def setUp(self) -> None: + self.ceph = MockCeph({}, self._command_callback) + + def _command_callback( + self, command: str, inbuf: bytes, **kwargs: Dict[Any, Any] + ) -> Tuple[int, bytes, str]: + if command == "test": + return 0, b'["ok"]', "" + return -1, b'["Command not found"]', "" + + def test_self(self) -> None: + res = self.ceph.mon_command("test", b"") + self.assertEqual(res, ["ok"]) diff --git a/tests/test_mock_k8s.py b/tests/test_mock_k8s.py new file mode 100644 index 0000000..9273bb3 --- /dev/null +++ b/tests/test_mock_k8s.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +from typing import Any, Dict, List +from unittest import TestCase + +from .mock_k8s import MockK8s + + +class TestMockK8s(TestCase): + def setUp(self) -> None: + self.k8s = MockK8s(self._request_callback) + + def _request_callback( + self, method: str, *args: List[Any], **kwargs: Dict[Any, Any] + ) -> Any: + if method == "core_v1_api.test": + return True + return None + + def test_self(self) -> None: + res = self.k8s.core_v1_api.test() + self.assertEqual(res, True) diff --git a/tests/test_mock_ssh_server.py b/tests/test_mock_ssh_server.py new file mode 100644 index 0000000..b9bdcec --- /dev/null +++ b/tests/test_mock_ssh_server.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- + +from paramiko import Channel +from unittest import TestCase + +from .mock_ssh_server import MockSSHServer + + +class TestMockSSHServer(TestCase): + def setUp(self) -> None: + self.ssh_server = MockSSHServer(self._command_callback) + self.ssh_client = self.ssh_server.client + + def tearDown(self) -> None: + self.ssh_server.close() + + def _command_callback(self, command: bytes, channel: Channel) -> None: + if command == b"test": + channel.send(b"ok\n") + + def test_self(self) -> None: + _, stdout, _ = self.ssh_client.exec_command("test") + self.assertEqual(stdout.readline(), "ok\n")