diff --git a/.github/workflows/workflow-engine-unit-tests.yml b/.github/workflows/workflow-engine-unit-tests.yml new file mode 100644 index 0000000000..bdeea0f185 --- /dev/null +++ b/.github/workflows/workflow-engine-unit-tests.yml @@ -0,0 +1,35 @@ +name: Workflow engine unit tests + +on: + workflow_dispatch: + pull_request: + paths: + - '.github/workflows/workflow-engine-unit-tests.yml' + - 'deployability/modules/workflow_engine/**' + +jobs: + build: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ['3.10'] + env: + PYTHONPATH: /home/runner/work/wazuh-qa/wazuh-qa/deployability/modules + steps: + - uses: actions/checkout@v3 + + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: 'pip' + cache-dependency-path: 'deployability/modules/workflow_engine/requirements-dev.txt' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip wheel + pip install -r deployability/modules/workflow_engine/requirements-dev.txt --no-build-isolation + + - name: Run workflow_engine tests + run: python -m pytest deployability/modules/workflow_engine \ No newline at end of file diff --git a/deployability/modules/workflow_engine/requirements-dev.txt b/deployability/modules/workflow_engine/requirements-dev.txt new file mode 100644 index 0000000000..05a93e2895 --- /dev/null +++ b/deployability/modules/workflow_engine/requirements-dev.txt @@ -0,0 +1,2 @@ +-r ../../deps/requirements.txt +-r ../../deps/remote_requirements.txt \ No newline at end of file diff --git a/deployability/modules/workflow_engine/tests/TESTING-README.md b/deployability/modules/workflow_engine/tests/TESTING-README.md new file mode 100644 index 0000000000..9144e08dc3 --- /dev/null +++ b/deployability/modules/workflow_engine/tests/TESTING-README.md @@ -0,0 +1,167 @@ +# Workflow engine Unit Testing using Pytest + +The workflow_engine module includes pytest unit tests. + +## Requirements + +- Make sure you have Python installed on your system. You can download it from + [python.org](https://www.python.org/downloads/). +- Clone the wazuh-qa repository in your local environment. +- Install the necessary dependencies by running: +```bash +git clone https://github.com/wazuh/wazuh-qa.git -b [your-branch] +cd wazuh-qa +pip install -r deployability/modules/workflow_engine/requirements-dev.txt +``` +- Configure the `PYTHONPATH` variable with the full path to the directory `deployability/modules`, for example if you've +cloned the `wazuh-qa` repository into `/wazuh/wazuh-qa`, configure the `PYTHONPATH` in this way: +```bash +> pwd +/wazuh/wazuh-qa +> export PYTHONPATH=$PYTHONPATH:$PWD/deployability/modules +> echo $PYTHONPATH +/wazuh/wazuh-qa/deployability/modules +``` + +## Test Structure +The directory `deployability/modules/workflow_engine/tests/` contains the unit test files for the `workflow_engine` +module. + +## Running Tests +To run the tests, make sure that your system meets the requirements by executing the following command from the project +root: + +```bash +pytest -vv deployability/modules/workflow_engine +``` +This command will run all tests in the `tests/` directory. Using additional arguments, You can also run specific tests +or directories. The output of this command looks like this: +```bash +pytest -vv deployability/modules/workflow_engine +============================================================================================== test session starts ============================================================================================== +platform linux -- Python 3.10.13, pytest-7.1.2, pluggy-1.3.0 -- /usr/local/bin/python3 +cachedir: .pytest_cache +metadata: {'Python': '3.10.13', 'Platform': 'Linux-5.15.146.1-microsoft-standard-WSL2-x86_64-with-glibc2.31', 'Packages': {'pytest': '7.1.2', 'pluggy': '1.3.0'}, 'Plugins': {'anyio': '4.2.0', 'testinfra': '5.0.0', 'metadata': '3.0.0', 'html': '3.1.1'}} +rootdir: /home/marcelo/wazuh/wazuh-qa/deployability/modules +plugins: anyio-4.2.0, testinfra-5.0.0, metadata-3.0.0, html-3.1.1 +collected 92 items + +deployability/modules/workflow_engine/tests/test_dag.py::test_dag_constructor[True] PASSED [ 1%] +deployability/modules/workflow_engine/tests/test_dag.py::test_dag_constructor[False] PASSED [ 2%] +deployability/modules/workflow_engine/tests/test_dag.py::test_dag_is_active[True-dag0] PASSED [ 3%] +deployability/modules/workflow_engine/tests/test_dag.py::test_dag_is_active[True-dag1] PASSED [ 4%] +deployability/modules/workflow_engine/tests/test_dag.py::test_dag_is_active[False-dag0] PASSED [ 5%] +deployability/modules/workflow_engine/tests/test_dag.py::test_dag_is_active[False-dag1] PASSED [ 6%] +deployability/modules/workflow_engine/tests/test_dag.py::test_get_execution_plan[dag0] PASSED [ 7%] +deployability/modules/workflow_engine/tests/test_dag.py::test_set_status[task1-failed-dag0] PASSED [ 8%] +deployability/modules/workflow_engine/tests/test_dag.py::test_set_status[task1-canceled-dag0] PASSED [ 9%] +deployability/modules/workflow_engine/tests/test_dag.py::test_set_status[task1-successful-dag0] PASSED [ 10%] +deployability/modules/workflow_engine/tests/test_dag.py::test_set_status[task1-non_existing_status-dag0] FAILED [ 11%] +deployability/modules/workflow_engine/tests/test_dag.py::test_set_status[non_existing_task-successful-dag0] PASSED [ 13%] +deployability/modules/workflow_engine/tests/test_dag.py::test_set_status[non_existing_task-non_existing_status-dag0] FAILED [ 14%] +deployability/modules/workflow_engine/tests/test_dag.py::test_should_be_canceled[True-dag0] PASSED [ 15%] +deployability/modules/workflow_engine/tests/test_dag.py::test_should_be_canceled[False-dag0] PASSED [ 16%] +deployability/modules/workflow_engine/tests/test_dag.py::test_build_dag[dag0] PASSED [ 17%] +deployability/modules/workflow_engine/tests/test_dag.py::test_build_dag[dag1] PASSED [ 18%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task1-abort-all-to_be_canceled0-dag0] PASSED [ 19%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task1-abort-all-to_be_canceled0-dag1] PASSED [ 20%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task1-abort-related-flows-to_be_canceled1-dag0] FAILED [ 21%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task1-abort-related-flows-to_be_canceled1-dag1] FAILED [ 22%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task1-continue-to_be_canceled2-dag0] FAILED [ 23%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task1-continue-to_be_canceled2-dag1] FAILED [ 25%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task2-abort-all-to_be_canceled3-dag0] FAILED [ 26%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task2-abort-all-to_be_canceled3-dag1] FAILED [ 27%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task2-abort-related-flows-to_be_canceled4-dag0] FAILED [ 28%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task2-abort-related-flows-to_be_canceled4-dag1] FAILED [ 29%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task2-continue-to_be_canceled5-dag0] FAILED [ 30%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task2-continue-to_be_canceled5-dag1] FAILED [ 31%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task5-abort-all-to_be_canceled6-dag0] PASSED [ 32%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task5-abort-all-to_be_canceled6-dag1] PASSED [ 33%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task5-abort-related-flows-to_be_canceled7-dag0] PASSED [ 34%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task5-abort-related-flows-to_be_canceled7-dag1] PASSED [ 35%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task5-continue-to_be_canceled8-dag0] FAILED [ 36%] +deployability/modules/workflow_engine/tests/test_dag.py::test_cancel_dependant_tasks[task5-continue-to_be_canceled8-dag1] FAILED [ 38%] +deployability/modules/workflow_engine/tests/test_dag.py::test_create_execution_plan[dag0-exec_plan0] PASSED [ 39%] +deployability/modules/workflow_engine/tests/test_dag.py::test_create_execution_plan[dag1-exec_plan1] PASSED [ 40%] +deployability/modules/workflow_engine/tests/test_schema_validator.py::test_schema_validator_constructor[logger_mock0] PASSED [ 41%] +deployability/modules/workflow_engine/tests/test_schema_validator.py::test_schema_validator_constructor_ko PASSED [ 42%] +deployability/modules/workflow_engine/tests/test_schema_validator.py::test_preprocess_data PASSED [ 43%] +deployability/modules/workflow_engine/tests/test_schema_validator.py::test_preprocess_data_ko[wf-ko-no-path-on-do.yaml-Missing required properties in 'with' for task: {'task': 'run-agent-tests-{agent}'] PASSED [ 44%] +deployability/modules/workflow_engine/tests/test_schema_validator.py::test_preprocess_data_ko[wf-ko-no-path-on-cleanup.yaml-Missing required properties in 'with' for task: {'task': 'allocate-manager'] PASSED [ 45%] +deployability/modules/workflow_engine/tests/test_schema_validator.py::test_validate_schema PASSED [ 46%] +deployability/modules/workflow_engine/tests/test_schema_validator.py::test_validate_schema_ko[logger_mock0] PASSED [ 47%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_constructor[task0] PASSED [ 48%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute[task0] PASSED [ 50%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute[task1] PASSED [ 51%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute[task2] PASSED [ 52%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute[task3] PASSED [ 53%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute[task4] PASSED [ 54%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute_ko[subproc_run_exc0-1-task0] PASSED [ 55%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute_ko[subproc_run_exc0-0-task0] PASSED [ 56%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute_ko[subproc_run_exc1-1-task0] PASSED [ 57%] +deployability/modules/workflow_engine/tests/test_task.py::test_process_task_execute_ko[subproc_run_exc1-0-task0] PASSED [ 58%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_constructor PASSED [ 59%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_validate_schema[logger_mock0] PASSED [ 60%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_validate_schema_ko[logger_mock0] PASSED [ 61%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_load_workflow[logger_mock0] PASSED [ 63%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_load_workflow_ko[logger_mock0] PASSED [ 64%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_process_workflow[logger_mock0] PASSED [ 65%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_process_workflow_ok[logger_mock0] PASSED [ 66%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_replace_placeholder[element0-values0-return_value0] PASSED [ 67%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_replace_placeholder[element1-values1-return_value1] PASSED [ 68%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_replace_placeholder[string_element {value}-values2-string_element value] PASSED [ 69%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_replace_placeholder[element3-None-return_value3] PASSED [ 70%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_expand_task[task0-return_value0-variables0] PASSED [ 71%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_expand_task[task1-return_value1-variables1] PASSED [ 72%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_static_workflow_validation PASSED [ 73%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_static_workflow_validation_ko[task_collection0-Duplicated task names: task 1] PASSED [ 75%] +deployability/modules/workflow_engine/tests/test_workflow_file.py::test_workflow_file_static_workflow_validation_ko[task_collection1-Tasks do not exist: task 3, task 4] PASSED [ 76%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_workflow_processor_constructor[workflow.yaml-False-1-info-schema.yaml] PASSED [ 77%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_workflow_processor_constructor[workflow.yaml-True-1-debug-schema.yaml] PASSED [ 78%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_workflow_processor_constructor[workflow.yaml-True-1-debug-None] PASSED [ 79%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_task[logger_mock0-w_processor0-dag0-custom_action-True] PASSED [ 80%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_task[logger_mock1-w_processor1-dag1-custom_action-False] PASSED [ 81%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_task_ko[logger_mock0-w_processor0-dag0-KeyboardInterrupt-None] PASSED [ 82%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_task_ko[logger_mock0-w_processor0-dag0-KeyboardInterrupt-abort-all] PASSED [ 83%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_task_ko[logger_mock1-w_processor1-dag1-Exception-None] PASSED [ 84%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_task_ko[logger_mock1-w_processor1-dag1-Exception-abort-all] PASSED [ 85%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_create_task_object[w_processor0-process] PASSED [ 86%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_create_task_object[w_processor0-dummy] PASSED [ 88%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_create_task_object[w_processor0-dummy-random] PASSED [ 89%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_create_task_object_ko[w_processor0] PASSED [ 90%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_tasks_parallel[logger_mock0-w_processor0-dag0-False] PASSED [ 91%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_tasks_parallel[logger_mock0-w_processor0-dag0-True] PASSED [ 92%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_tasks_parallel_ko[logger_mock0-w_processor0-dag0-False] PASSED [ 93%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_execute_tasks_parallel_ko[logger_mock0-w_processor0-dag0-True] PASSED [ 94%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_generate_futures[w_processor0] PASSED [ 95%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_generate_futures_reverse[w_processor0] PASSED [ 96%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_run[logger_mock0-w_processor0-False] PASSED [ 97%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_run[logger_mock0-w_processor0-True] PASSED [ 98%] +deployability/modules/workflow_engine/tests/test_workflow_processor.py::test_handle_interrupt[logger_mock0-w_processor0] PASSED [100%] + +=================================================================================================== FAILURES ==================================================================================================== +``` + +The `.github/workflow/workflow-engine-unit-tests.yaml` automatically runs the unit tests in the GitHub environment. +The run results are in the `checks` tab or your GitHub pull request. + +## Relevant Files +- `tests/test_[test name].py`: all the unit test files start with a `test_` prefix. There is one unit test file for + each tested class. +- `tests/conftest.py`: contains the fixtures used throughout the unit tests. + +## Unit test development guidelines and recommendations +- Use Python coding style standards and recommendations to develop unit tests: snake case for all variable and function + names, maximum line length of 120 characters, two empty lines must separate each function, typing all your functions + and return values, create Docstring for all functions with numpy style. +- Develop unit tests for each function or method of the module. +- Error flows are usually created in a second unit test with the suffix `_ko`. For example, the + `test_process_task_execute` found in the `deployability/modules/workflow_engine/tests/test_workflow_processor` is the + unit test normal flow for the `WorkflowProcessor.process_task_execute` method. The + `WorkflowProcessor.process_task_execute_ko` unit test implements the error flow. +- Use the pytest's decorator `@pytest.mark.parametrize` to implement test cases for the same unit test. +- Mock the object instance and functions used by your tested function using the `unitest.mock.patch` and + `unitest.mock.patch.object` functions or decorators. +- Try to factorize your testing code using `pytest.fixtures`. The shared fixtures are in the `conftest.py` file. In + many unit tests of this project, the fixtures implement a `request` object that receives parameters from the + `pytest.mark.parametrize`. diff --git a/deployability/modules/workflow_engine/tests/conftest.py b/deployability/modules/workflow_engine/tests/conftest.py new file mode 100644 index 0000000000..5de92b5222 --- /dev/null +++ b/deployability/modules/workflow_engine/tests/conftest.py @@ -0,0 +1,75 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +"""Common unit test fixtures.""" +import graphlib + +from unittest.mock import patch, MagicMock +import pytest + +from workflow_engine.workflow_processor import DAG, WorkflowProcessor + +DEFAULT_TASK_COLLECTION = [ + {'task': 'task1', 'path': '/cmd1', 'args': [{"param1": "value1"}]}, + {'task': 'task2', 'path': '/cmd2', 'args': [{"param1": "value1"}]}, + {'task': 'task3', 'path': '/cmd3', 'args': [{"param1": "value1"}]}, +] + + +@pytest.fixture +def logger_mock(request) -> MagicMock: + """Fixture to mock common logger methods.""" + logger_to_patch = request.param.get('logger_to_patch', "workflow_engine.workflow_processor.logger") + with patch(logger_to_patch) as l_mock: + patch.object(l_mock, 'warning') + patch.object(l_mock, 'info') + patch.object(l_mock, 'debug') + patch.object(l_mock, 'error') + yield l_mock + + +@pytest.fixture +def dag(request) -> DAG: + """Create a mocked DAG instance.""" + ret_dag: DAG + reverse = request.param.get('reverse', False) + task_collection = request.param.get('task_collection', DEFAULT_TASK_COLLECTION) + if request.param.get('patch', True): + execution_plan_dict = request.param.get('execution_plan_dict', {}) + gl_dag = graphlib.TopologicalSorter() + dep_dict = {'task1': 'task2'} + with patch.object(gl_dag, 'prepare'), \ + patch('workflow_engine.workflow_processor.DAG._DAG__build_dag', + return_value=(gl_dag, dep_dict)), \ + patch('workflow_engine.workflow_processor.DAG._DAG__create_execution_plan', + return_value=execution_plan_dict): + ret_dag = DAG(task_collection=task_collection, reverse=reverse) + else: + ret_dag = DAG(task_collection=task_collection, reverse=reverse) + + if finished_task_status := request.param.get('finished_task_status', False): + ret_dag.finished_tasks_status = finished_task_status + + return ret_dag + + +@pytest.fixture +def w_processor(request) -> WorkflowProcessor: + """Create a mocked WorkflowProcessor instance.""" + + workflow_file = request.param.get('workflow_file', 'workflow.yaml') + dry_run = request.param.get('dry_run', False) + threads = request.param.get('threads', 1) + log_level = request.param.get('log_level', 'info') + schema_file = request.param.get('schema_file', 'schema.yaml') + with patch("workflow_engine.workflow_processor.WorkflowFile") as file_mock: + workflow_file_instance = file_mock.return_value + workflow_file_instance.task_collection = request.param.get('task_collection', DEFAULT_TASK_COLLECTION) + if request.param.get('patch', True): + with patch('workflow_engine.workflow_processor.logger.setLevel'): + processor = WorkflowProcessor(workflow_file, dry_run, threads, + log_level, schema_file) + else: + processor = WorkflowProcessor(workflow_file, dry_run, + threads, log_level, schema_file) + return processor diff --git a/deployability/modules/workflow_engine/tests/data/wf-ko-no-path-on-cleanup.yaml b/deployability/modules/workflow_engine/tests/data/wf-ko-no-path-on-cleanup.yaml new file mode 100644 index 0000000000..a637e4184f --- /dev/null +++ b/deployability/modules/workflow_engine/tests/data/wf-ko-no-path-on-cleanup.yaml @@ -0,0 +1,169 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +version: 0.1 +description: This workflow is used to test agents deployment with a single manager. +variables: + agents-os: + - linux-ubuntu-22.04-amd64 + manager-os: linux-ubuntu-22.04-amd64 + infra-provider: vagrant + working-dir: /tmp/dtt1 + +tasks: + # Generic agent test task + - task: "run-agent-tests-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - agent: "{working-dir}/agent-{agent}/inventory.yaml" + - tests: "install,register,stop" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "provision-install-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent test task + - task: "run-agent-tests-uninstall-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - tests: "uninstall" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "run-agent-tests-{agent}" + - "provision-uninstall-{agent}" + foreach: + - variable: agents-os + as: agent + + # Unique manager provision task + - task: "provision-manager" + description: "Provision the manager." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-manager + type: aio + version: "4.7.0" + depends-on: + - "allocate-manager" + + # Unique manager allocate task + - task: "allocate-manager" + description: "Allocate resources for the manager." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: large + - composite-name: "{manager-os}" + - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + cleanup: + this: process + with: + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + + # Generic agent provision task + - task: "provision-install-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-agent + type: aio + version: "4.8.0" + live: False + depends-on: + - "allocate-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent provision task + - task: "provision-uninstall-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - uninstall: + - component: wazuh-agent + type: package + depends-on: + - "provision-install-{agent}" + foreach: + - variable: agents-os + as: agent + + # Generic agent allocate task + - task: "allocate-{agent}" + description: "Allocate resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: small + - composite-name: "{agent}" + - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" + - track-output: "{working-dir}/agent-{agent}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/agent-{agent}/track.yaml" + foreach: + - variable: agents-os + as: agent \ No newline at end of file diff --git a/deployability/modules/workflow_engine/tests/data/wf-ko-no-path-on-do.yaml b/deployability/modules/workflow_engine/tests/data/wf-ko-no-path-on-do.yaml new file mode 100644 index 0000000000..6b2d2512d0 --- /dev/null +++ b/deployability/modules/workflow_engine/tests/data/wf-ko-no-path-on-do.yaml @@ -0,0 +1,169 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +version: 0.1 +description: This workflow is used to test agents deployment with a single manager. +variables: + agents-os: + - linux-ubuntu-22.04-amd64 + manager-os: linux-ubuntu-22.04-amd64 + infra-provider: vagrant + working-dir: /tmp/dtt1 + +tasks: + # Generic agent test task + - task: "run-agent-tests-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - agent: "{working-dir}/agent-{agent}/inventory.yaml" + - tests: "install,register,stop" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "provision-install-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent test task + - task: "run-agent-tests-uninstall-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - tests: "uninstall" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "run-agent-tests-{agent}" + - "provision-uninstall-{agent}" + foreach: + - variable: agents-os + as: agent + + # Unique manager provision task + - task: "provision-manager" + description: "Provision the manager." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-manager + type: aio + version: "4.7.0" + depends-on: + - "allocate-manager" + + # Unique manager allocate task + - task: "allocate-manager" + description: "Allocate resources for the manager." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: large + - composite-name: "{manager-os}" + - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + + # Generic agent provision task + - task: "provision-install-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-agent + type: aio + version: "4.8.0" + live: False + depends-on: + - "allocate-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent provision task + - task: "provision-uninstall-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - uninstall: + - component: wazuh-agent + type: package + depends-on: + - "provision-install-{agent}" + foreach: + - variable: agents-os + as: agent + + # Generic agent allocate task + - task: "allocate-{agent}" + description: "Allocate resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: small + - composite-name: "{agent}" + - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" + - track-output: "{working-dir}/agent-{agent}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/agent-{agent}/track.yaml" + foreach: + - variable: agents-os + as: agent \ No newline at end of file diff --git a/deployability/modules/workflow_engine/tests/data/wf-ko-schema-error.yaml b/deployability/modules/workflow_engine/tests/data/wf-ko-schema-error.yaml new file mode 100644 index 0000000000..62c130f64b --- /dev/null +++ b/deployability/modules/workflow_engine/tests/data/wf-ko-schema-error.yaml @@ -0,0 +1,156 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +version: 0.1 +description: This workflow is used to test agents deployment with a single manager. +variables: + agents-os: + - linux-ubuntu-22.04-amd64 + manager-os: linux-ubuntu-22.04-amd64 + infra-provider: vagrant + working-dir: /tmp/dtt1 + +tasks: + # Generic agent test task + - task: "run-agent-tests-{agent}" + description: "Run tests uninstall for the {agent} agent." + depends-on: + - "provision-install-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent test task + - task: "run-agent-tests-uninstall-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - tests: "uninstall" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "run-agent-tests-{agent}" + - "provision-uninstall-{agent}" + foreach: + - variable: agents-os + as: agent + + # Unique manager provision task + - task: "provision-manager" + description: "Provision the manager." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-manager + type: aio + version: "4.7.0" + depends-on: + - "allocate-manager" + + # Unique manager allocate task + - task: "allocate-manager" + description: "Allocate resources for the manager." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: large + - composite-name: "{manager-os}" + - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + + # Generic agent provision task + - task: "provision-install-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-agent + type: aio + version: "4.8.0" + live: False + depends-on: + - "allocate-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent provision task + - task: "provision-uninstall-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - uninstall: + - component: wazuh-agent + type: package + depends-on: + - "provision-install-{agent}" + foreach: + - variable: agents-os + as: agent + + # Generic agent allocate task + - task: "allocate-{agent}" + description: "Allocate resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: small + - composite-name: "{agent}" + - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" + - track-output: "{working-dir}/agent-{agent}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/agent-{agent}/track.yaml" + foreach: + - variable: agents-os + as: agent \ No newline at end of file diff --git a/deployability/modules/workflow_engine/tests/data/wf-ok.yaml b/deployability/modules/workflow_engine/tests/data/wf-ok.yaml new file mode 100644 index 0000000000..fa979a6f81 --- /dev/null +++ b/deployability/modules/workflow_engine/tests/data/wf-ok.yaml @@ -0,0 +1,170 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +version: 0.1 +description: This workflow is used to test agents deployment with a single manager. +variables: + agents-os: + - linux-ubuntu-22.04-amd64 + manager-os: linux-ubuntu-22.04-amd64 + infra-provider: vagrant + working-dir: /tmp/dtt1 + +tasks: + # Generic agent test task + - task: "run-agent-tests-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - agent: "{working-dir}/agent-{agent}/inventory.yaml" + - tests: "install,register,stop" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "provision-install-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent test task + - task: "run-agent-tests-uninstall-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - tests: "uninstall" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "run-agent-tests-{agent}" + - "provision-uninstall-{agent}" + foreach: + - variable: agents-os + as: agent + + # Unique manager provision task + - task: "provision-manager" + description: "Provision the manager." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-manager + type: aio + version: "4.7.0" + depends-on: + - "allocate-manager" + + # Unique manager allocate task + - task: "allocate-manager" + description: "Allocate resources for the manager." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: large + - composite-name: "{manager-os}" + - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + + # Generic agent provision task + - task: "provision-install-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-agent + type: aio + version: "4.8.0" + live: False + depends-on: + - "allocate-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent provision task + - task: "provision-uninstall-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - uninstall: + - component: wazuh-agent + type: package + depends-on: + - "provision-install-{agent}" + foreach: + - variable: agents-os + as: agent + + # Generic agent allocate task + - task: "allocate-{agent}" + description: "Allocate resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: small + - composite-name: "{agent}" + - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" + - track-output: "{working-dir}/agent-{agent}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/agent-{agent}/track.yaml" + foreach: + - variable: agents-os + as: agent \ No newline at end of file diff --git a/deployability/modules/workflow_engine/tests/test_dag.py b/deployability/modules/workflow_engine/tests/test_dag.py new file mode 100644 index 0000000000..b3c6ec181e --- /dev/null +++ b/deployability/modules/workflow_engine/tests/test_dag.py @@ -0,0 +1,294 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +import graphlib + +from unittest.mock import patch, MagicMock, call +import pytest + +from workflow_engine.workflow_processor import DAG + + +@pytest.mark.parametrize("reverse", [True, False]) +@patch("workflow_engine.workflow_processor.DAG._DAG__build_dag") +@patch("workflow_engine.workflow_processor.DAG._DAG__create_execution_plan") +def test_dag_constructor(create_exec_plan_mock: MagicMock, build_dag_mock: MagicMock, reverse: bool): + """Test ProcessTask constructor + Check all the dag object state after initialization and if the private dag methods are called during the instance + construction. + + Parameters + ---------- + create_exec_plan_mock : MagicMock + Patch of the DAG.__create_execution_plan method. + build_dag_mock : MagicMock + Patch of the DAG.__build_dag_ method. + reverse : bool + Parametrized value used by the DAG constructor. + """ + task_collection = [ + {'task': 'task1', 'path': '/cmd1', 'args': [{"param1": "value1"}]}, + {'task': 'task2', 'path': '/cmd2', 'args': [{"param1": "value1"}]}, + {'task': 'task3', 'path': '/cmd3', 'args': [{"param1": "value1"}]}, + ] + gl_dag = graphlib.TopologicalSorter() + + dep_dict = {'task1': 'task2'} + build_dag_mock.return_value = (gl_dag, dep_dict) + plan_dict = {'task1', 'task2'} + create_exec_plan_mock.return_value = plan_dict + with patch.object(gl_dag, 'prepare') as prepare_mock: + dag = DAG(task_collection=task_collection, reverse=reverse) + + assert dag.task_collection == task_collection + assert dag.reverse == reverse + assert dag.dag == gl_dag + assert dag.dependency_tree == dep_dict + assert isinstance(dag.to_be_canceled, set) and not dag.to_be_canceled + assert dag.finished_tasks_status == { + 'failed': set(), + 'canceled': set(), + 'successful': set(), + } + assert dag.execution_plan == plan_dict + build_dag_mock.assert_called_once() + create_exec_plan_mock.assert_called_once_with(dep_dict) + prepare_mock.assert_called_once() + + +@pytest.mark.parametrize('dag', + [{'reverse': True}, {'reverse': False}], + indirect=True) +@pytest.mark.parametrize('is_active', [True, False]) +def test_dag_is_active(is_active: bool, dag: DAG): + """Test DAG.is_active method. + Check if dag.is_active method returns the value of the dag.dag.is_active() method. + + Parameters + ---------- + is_active : bool + Parametrized value returned by dag.dag.is_active + dag : DAG + DAG fixture defined in conftest.py. + """ + with patch.object(dag.dag, 'is_active', return_value=is_active) as is_active_mock: + assert dag.is_active() == is_active + is_active_mock.assert_called_once() + + +@pytest.mark.parametrize('dag', + [{'execution_plan_dict': {'task1', 'task2'} }], indirect=True) +def test_get_execution_plan(dag: DAG): + """Test DAG.get_execution_plan method. + Check if the dag.get_execution_plan returns the dag.execution_plan instance + + Parameters + ---------- + dag : DAG + DAG fixture defined in conftest.py. + """ + assert dag.get_execution_plan() == dag.execution_plan + + +@pytest.mark.parametrize('dag', [{}], indirect=True) +@pytest.mark.parametrize('task_name, status', [ + ('task1', 'failed'), + ('task1', 'canceled'), + ('task1', 'successful'), +]) +def test_set_status(task_name:str, status:str, dag: DAG): + """Test DAG.set_status method. + Check if the dag.dag.done mode is properly called and that the task is in the failed, canceled or + successful set. + + Parameters + ---------- + task_name : str + Parameterized value for the task name passed to dag.set_status method. + status : str + Parameterized value for the task name passed to dag.set_status method. + dag : DAG + DAG fixture defined in conftest.py. + """ + with patch.object(dag.dag, "done") as done_mock: + dag.set_status(task_name=task_name, status=status) + assert task_name in dag.finished_tasks_status[status] + done_mock.assert_called_once_with(task_name) + + +@pytest.mark.parametrize('dag', [{}], indirect=True) +@pytest.mark.parametrize('in_cancel', [True, False]) +def test_should_be_canceled(in_cancel:bool, dag: DAG): + """Test DAG.should_be_canceled method. + Check if dag.should_be_canceled returns True or False if the task is in the dab.to_be_canceled set. + + Parameters + ---------- + in_cancel : bool + Parameterized value to test the method dag.should_be_canceled with 'task1' + in the dag.to_be_canceled set or not. + dag : DAG + DAG fixture defined in conftest.py. + """ + if in_cancel: + dag.to_be_canceled.add('task1') + else: + if 'task1' in dag.to_be_canceled: + dag.to_be_canceled.remove('task1') + + assert dag.should_be_canceled(task_name='task1') == in_cancel + + +@pytest.mark.parametrize('dag', + [{ + 'task_collection': [ + {'task': 'task1', }, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': ['task1']}, + {'task': 'task4', 'depends-on': ['task1']}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']} + ] + }, + {'task_collection': [ + {'task': 'task1', }, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': ['task1']}, + {'task': 'task4', 'depends-on': ['task1']}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']}], + 'reverse': True + } + ], + indirect=True) +def test_build_dag(dag: DAG): + """Test DAG.__build_dag method. + The test uses a task collection and checks the calls to the graphlib.TopologicalSorter.add. + The function calls depend on the dag.reverse instance variable, that it is also parameterized. + + Parameters + ---------- + dag : DAG + DAG fixture defined in conftest.py with task_collection parameterized. + """ + with patch('workflow_engine.workflow_processor.graphlib.TopologicalSorter.add') as mock_add: + res_dag, res_dependency_dict = dag._DAG__build_dag() + assert isinstance(res_dag, graphlib.TopologicalSorter) + call_list = [] + dependency_dict = {} + for task in dag.task_collection: + dependencies = task.get('depends-on', []) + task_name = task['task'] + if dag.reverse: + for dependency in dependencies: + call_list.append(call(dependency, task_name)) + else: + call_list.append(call(task_name, *dependencies)) + dependency_dict[task_name] = dependencies + + assert res_dependency_dict == dependency_dict + mock_add.assert_has_calls(call_list, any_order=True) + + +@pytest.mark.parametrize('dag', + [{ + 'task_collection': [ + {'task': 'task1', }, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': []}, + {'task': 'task4', 'depends-on': []}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']} + ], + 'patch': False + }, + {'task_collection': [ + {'task': 'task1', }, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': []}, + {'task': 'task4', 'depends-on': []}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']}], + 'reverse': True, + 'patch': False, + 'finished_task_status': { + 'failed': set(), + 'canceled': set(), + 'successful': set()} + }, + ], + indirect=True) +@pytest.mark.parametrize('task, cancel_policy, to_be_canceled', + [('task1', 'abort-all', {'task4', 'task3', 'task2', 'task5', 'task1'}), + ('task2', 'abort-all', {'task4', 'task3', 'task2', 'task5', 'task1'}), + ('task5', 'abort-all', {'task4', 'task3', 'task2', 'task5', 'task1'}), + ('task1', 'abort-related-flows', {'task4', 'task3', 'task2', 'task5', 'task1'}), + ('task2', 'abort-related-flows', {'task4', 'task3', 'task2', 'task5', 'task1'}), + ('task5', 'abort-related-flows', {'task4', 'task3', 'task2', 'task5', 'task1'}), + ('task1', 'continue', set()), + ('task2', 'continue', set()), + ('task5', 'continue', set()), + ]) +def test_cancel_dependant_tasks(task: str, cancel_policy: str, to_be_canceled: set, dag: DAG): + """Test DAG.cancel_dependant_tasks method. + Check the to_be_canceled set after calling the cancel_dependant_tasks method with a parameterized task_collection + in reverse True and False test cases. + + Parameters + ---------- + task : str + Parameterized task name. + cancel_policy : str + Parameterized cancel policy using valid values (abort-all, abort-related-flows, continue). + to_be_canceled : set + [description] + dag : DAG + DAG fixture defined in conftest.py parameterized with complete object state + (task_collection, reverse, finished_task_status sets). The patch false parameter avoids patching the + DAG.__build_dag' and DAG.__create_execution_plan methods + """ + dag.cancel_dependant_tasks(task, cancel_policy=cancel_policy) + assert dag.to_be_canceled == to_be_canceled + + +@pytest.mark.parametrize('dag, exec_plan', + [( + {'task_collection': [ + {'task': 'task1', }, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': ['task1']}, + {'task': 'task4', 'depends-on': ['task1']}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']} + ], + 'patch': False}, + {"task5": {"task2": {"task1": {}}, + "task3": {"task1": {}}, + "task4": {"task1": {}}}} + ), + ( + { + 'task_collection': [ + {'task': 'task1', }, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': ['task1']}, + {'task': 'task4', 'depends-on': ['task1']}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']}, + {'task': 'task6', 'depends-on': ['task5']} + ], + 'patch': False + }, + {"task6": {"task5": {"task2": {"task1": {}}, + "task3": {"task1": {}}, + "task4": {"task1": {}}}}} + ) + ], + indirect=['dag']) +def test_create_execution_plan(exec_plan: dict, dag: DAG): + """Test DAG._create_execution_plan method. + This private method is executed by the constructor. In this Test, + the results are left in the execution_plan instance variable. + + Parameters + ---------- + exec_plan : dict + execution plan. + dag : DAG + DAG fixture defined in conftest.py. + """ + assert dag.execution_plan == exec_plan diff --git a/deployability/modules/workflow_engine/tests/test_schema_validator.py b/deployability/modules/workflow_engine/tests/test_schema_validator.py new file mode 100644 index 0000000000..25a45db95a --- /dev/null +++ b/deployability/modules/workflow_engine/tests/test_schema_validator.py @@ -0,0 +1,119 @@ +# Copyright (C) 2015-2021, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 +"""SchemaValidator unit tests.""" +import uuid +import random +from pathlib import Path +from unittest.mock import MagicMock, call, patch +import json +from ruamel.yaml import YAML +import pytest +from jsonschema.exceptions import ValidationError, UnknownType + +from workflow_engine.schema_validator import SchemaValidator + +@pytest.mark.parametrize('logger_mock', + [{'logger_to_patch':'workflow_engine.schema_validator.logger'}], + indirect=True) +def test_schema_validator_constructor(logger_mock: MagicMock): + """Test SchemaValidator constructor normal flow. + Check the state of the SchemaValidator instance variables after creation. + + Parameters + ---------- + logger_mock : MagicMock + logger fixture to check debug calls + """ + schema_path = Path(__file__).parent.parent / 'schemas' / 'schema_v1.json' + with open(schema_path, 'r') as schema_file: + schema_data = json.load(schema_file) + + wf_file_path = Path(__file__).parent / 'data' / 'wf-ok.yaml' + with open(wf_file_path, 'r') as file: + yaml = YAML(typ='safe', pure=True) + yaml_data = yaml.load(file) + + validator = SchemaValidator(schema_path, wf_file_path) + assert validator.schema_data == schema_data + assert validator.yaml_data == yaml_data + calls = [call(f"Loading schema file: {schema_path}"), + call(f"Loading yaml file: {wf_file_path}")] + logger_mock.debug.assert_has_calls(calls) + + +def test_schema_validator_constructor_ko(): + """"Test SchemaValidator constructor error flows. + Check if the FileNotFoundError is raisen with a random file name. + """ + schema_path = str(uuid.UUID(int=random.randint(0, 2^32))) + with pytest.raises(FileNotFoundError, match=f'File "{schema_path}" not found.'): + SchemaValidator(schema_path, schema_path) + + +def test_preprocess_data(): + """Test SchemaValidator preprocess_data. + Check if the preprocess_data method does not raise exceptions with a valid file. + """ + schema_path = Path(__file__).parent.parent / 'schemas' / 'schema_v1.json' + wf_file_path = Path(__file__).parent / 'data' / 'wf-ok.yaml' + validator = SchemaValidator(schema_path, wf_file_path) + validator.preprocess_data() + + +@pytest.mark.parametrize('workflow_file, error_msg', + [('wf-ko-no-path-on-do.yaml', + "Missing required properties in 'with' for task: {'task': 'run-agent-tests-{agent}'"), + ('wf-ko-no-path-on-cleanup.yaml', + "Missing required properties in 'with' for task: {'task': 'allocate-manager'"),]) +def test_preprocess_data_ko(workflow_file: str, error_msg: str): + """Test SchemaValidator preprocess_data error flow. + Check the ValidationError generated by invalid yml files. + + Parameters + ---------- + workflow_file : str + workflow yml file name. + error_msg : str + Error message to check + """ + schema_path = Path(__file__).parent.parent / 'schemas' / 'schema_v1.json' + wf_file_path = Path(__file__).parent / 'data' / workflow_file + validator = SchemaValidator(schema_path, wf_file_path) + with pytest.raises(ValidationError, match=error_msg): + validator.preprocess_data() + + +def test_validate_schema(): + """Test SchemaValidator validate_schema with a valid yml file.""" + schema_path = Path(__file__).parent.parent / 'schemas' / 'schema_v1.json' + wf_file_path = Path(__file__).parent / 'data' / 'wf-ok.yaml' + validator = SchemaValidator(schema_path, wf_file_path) + validator.validateSchema() + + +@pytest.mark.parametrize('logger_mock', + [{'logger_to_patch':'workflow_engine.schema_validator.logger'}], + indirect=True) +def test_validate_schema_ko(logger_mock: MagicMock): + """Test SchemaValidator validate_schema error flows. + Check the messages sent to the log when an invalid workflow yml file is used. + + Parameters + ---------- + logger_mock : MagicMock + logger fixture defined in conftest.py + """ + schema_path = Path(__file__).parent.parent / 'schemas' / 'schema_v1.json' + wf_file_path = Path(__file__).parent / 'data' / 'wf-ko-schema-error.yaml' + validator = SchemaValidator(schema_path, wf_file_path) + validator.validateSchema() + logger_mock.error.assert_called_once() + assert 'Schema validation error:' in logger_mock.error.call_args[0][0] + + logger_mock.error.reset_mock() + validator = SchemaValidator(schema_path, wf_file_path) + with patch('workflow_engine.schema_validator.jsonschema.validate', side_effect=UnknownType): + validator.validateSchema() + logger_mock.error.assert_called_once() + assert 'Unexpected error at schema validation:' in logger_mock.error.call_args[0][0] diff --git a/deployability/modules/workflow_engine/tests/test_task.py b/deployability/modules/workflow_engine/tests/test_task.py new file mode 100644 index 0000000000..604c8e2926 --- /dev/null +++ b/deployability/modules/workflow_engine/tests/test_task.py @@ -0,0 +1,114 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +from typing import List, Tuple +from subprocess import CompletedProcess, CalledProcessError +from unittest.mock import patch, MagicMock, call +import pytest + +from workflow_engine.task import ProcessTask + +@pytest.fixture +def task(request) -> ProcessTask: + """Shared fixture to create task.""" + task_name, task_parms = request.param + return ProcessTask(task_name=task_name, task_parameters=task_parms) + + +@pytest.mark.parametrize("task", [('task1', {"param1": "value1"})], indirect=True) +def test_process_task_constructor(task: ProcessTask): + """Test ProcessTask constructor. + Check the task instance varialbes after constructing the ProcessTask. + + Parameters + ---------- + task : ProcessTask + The task fixture. + """ + assert task.task_name == 'task1' + assert task.task_parameters == {"param1": "value1"} + + +@pytest.mark.parametrize("task", [('task1', {"path": "/mypath", + "args": [{"param1": "value1"}]}), + ('task2', {"path": "/mypath", + "args": ["param1"]}), + ('task3', {"path": "/mypath", + "args": ["param1", "param2"]}), + ('task4', {"path": "/mypath", + "args": ["param1", {"param2": "value2"}]}), + ('task5', {"path": "/mypath", + "args": [{"param1": "value1"}, {"param2": "value2"}]}) + ], indirect=True) +@patch("workflow_engine.task.logger") +def test_process_task_execute(logger_mock: MagicMock, task: ProcessTask): + """Test ProcessTask.execute method normal flow. + Check that ProcessTask.execute calls subprocess.run to run commands with the defined parameters. The + task mock in conftest.py is used to thy diferent command argument formats. + + Parameters + ---------- + logger_mock : MagicMock + The logger mock defined in conftest.py + task : ProcessTask + The task fixture. + """ + results = {} + results["task1"] = {"parm_list": [task.task_parameters['path'], "--param1=value1"]} + results["task2"] = {"parm_list": [task.task_parameters['path'], "param1"]} + results["task3"] = {"parm_list": [task.task_parameters['path'], "param1", "param2"]} + results["task4"] = {"parm_list": [task.task_parameters['path'], "param1", + "--param2=value2"]} + results["task5"] = {"parm_list": [task.task_parameters['path'], "--param1=value1", + "--param2=value2"]} + result = CompletedProcess(args=results[task.task_name]["parm_list"][1:], + returncode=0, stdout="command output", + stderr="") + debug_calls = [call(f'Running task "{task.task_name}" with arguments: ' + f'{results[task.task_name]["parm_list"][1:]}')] + with patch("workflow_engine.task.subprocess.run", return_value=result) as proc_run_mock, \ + patch.object(logger_mock, "debug") as logger_debug_mock: + debug_calls.append(call(f'Finished task "{task.task_name}" execution ' + f'with result:\n{str(result.stdout)}')) + task.execute() + + logger_debug_mock.assert_has_calls(debug_calls) + proc_run_mock.assert_called_once_with(results[task.task_name]['parm_list'], check=True, + capture_output=True, text=True) + + +@pytest.mark.parametrize("task", [('task1', {"path": "/mypath", + "args": [{"param1": "value1"}]}), + ], indirect=True) +@pytest.mark.parametrize("subproc_retval", [1, 0]) +@pytest.mark.parametrize("subproc_run_exc", [(True, KeyboardInterrupt, "KeyboardInterrupt error"), + (True, Exception, "Other Error")]) +def test_process_task_execute_ko(subproc_retval: int, subproc_run_exc: List[Tuple], task: ProcessTask): + """Test ProcessTask.execute method exception flows. + Check ProcessTask.execute flow when the subprocess.run returns errors. + + Parameters + ---------- + subproc_retval : int + return code from subprocess.run + subproc_run_exc : bool + Tuple + task : ProcessTask + The task fixture. + """ + raise_exc, exception_type, stderr = subproc_run_exc + if exception_type is Exception: + match = f"Error executing process task {stderr}" + else: + match = "Error executing process task with keyboard interrupt." + result = CompletedProcess(args=["--param1=value1"], + returncode=subproc_retval, stdout="command output", + stderr=stderr) + with patch("workflow_engine.task.subprocess.run", return_value=result) as proc_run_mock: + if raise_exc: + proc_run_mock.side_effect = CalledProcessError(returncode=1, + cmd=task.task_parameters['path'], + stderr=stderr) + + with pytest.raises(exception_type, match=match): + task.execute() diff --git a/deployability/modules/workflow_engine/tests/test_workflow_file.py b/deployability/modules/workflow_engine/tests/test_workflow_file.py new file mode 100644 index 0000000000..a3b411899e --- /dev/null +++ b/deployability/modules/workflow_engine/tests/test_workflow_file.py @@ -0,0 +1,271 @@ +# Copyright (C) 2015-2021, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 +"""WorkflowFile unit tests.""" +from typing import Any, List +from unittest.mock import patch, MagicMock, call, mock_open +import pytest + +from workflow_engine.workflow_processor import WorkflowFile + + +def test_workflow_file_constructor(): + """Test WorkflowFile constructor. + Check the function calls and instance variables after object creation.""" + with patch("workflow_engine.workflow_processor.WorkflowFile._WorkflowFile__validate_schema") as validate_mock, \ + patch("workflow_engine.workflow_processor.WorkflowFile._WorkflowFile__load_workflow", + return_value={'data': 'data'}) as load_mock, \ + patch("workflow_engine.workflow_processor.WorkflowFile._WorkflowFile__process_workflow") as process_mock, \ + patch("workflow_engine.workflow_processor.WorkflowFile._WorkflowFile__static_workflow_validation") \ + as static_validation_mock: + wf = WorkflowFile(workflow_file_path='my_file.yaml', schema_path='my_schema.yaml') + assert wf.schema_path == 'my_schema.yaml' + validate_mock.assert_called_once_with('my_file.yaml') + load_mock.assert_called_once_with('my_file.yaml') + assert wf.workflow_raw_data == {'data': 'data'} + process_mock.assert_called_once() + static_validation_mock.assert_called_once() + + +@pytest.mark.parametrize('logger_mock', [{}], indirect=True) +def test_workflow_file_validate_schema(logger_mock: MagicMock): + """Test WorkflowFile.__validate_schema. + Check debug messages and function called by the method. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py. + """ + wf = MagicMock() + wf.schema_path = 'my_schema_path.yaml' + workflow_file = 'my_file_path.yaml' + schema_validator = MagicMock() + with patch('workflow_engine.workflow_processor.SchemaValidator', + return_value=schema_validator) as schema_validator_mock: + with patch.object(schema_validator, 'preprocess_data') as preprocess_mock, \ + patch.object(schema_validator, 'validateSchema') as validate_schema_mock: + WorkflowFile._WorkflowFile__validate_schema(self=wf, workflow_file=workflow_file) + + logger_mock.debug.assert_called_once_with(f"Validating input file: {workflow_file}") + schema_validator_mock.assert_called_once_with(wf.schema_path, workflow_file) + preprocess_mock.assert_called_once() + validate_schema_mock.assert_called_once() + + +@pytest.mark.parametrize('logger_mock', [{}], indirect=True) +def test_workflow_file_validate_schema_ko(logger_mock: MagicMock): + """Test WorkflowFile.__validate_schema error flow. + Check logged messages and function calls of the method. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py. + """ + wf = MagicMock() + wf.schema_path = 'my_schema_path.yaml' + workflow_file = 'my_file_path.yaml' + file_exc = FileNotFoundError() + with patch('workflow_engine.workflow_processor.SchemaValidator', side_effect=file_exc) as schema_validator_mock, \ + pytest.raises(FileNotFoundError): + WorkflowFile._WorkflowFile__validate_schema(self=wf, workflow_file=workflow_file) + + logger_mock.debug.assert_called_once_with(f"Validating input file: {workflow_file}") + schema_validator_mock.assert_called_once_with(wf.schema_path, workflow_file) + logger_mock.error.assert_called_once_with("Error while validating schema [%s] with error: %s", + wf.schema_path, + file_exc) + + +@pytest.mark.parametrize('logger_mock', [{}], indirect=True) +@patch('builtins.open', new_callable=mock_open, read_data='YAML content') +def test_workflow_file_load_workflow(mock_open: MagicMock, logger_mock: MagicMock): + """Test WorkflowFile.__load_workflow. + Check logged messages and function calls of the method. + + Parameters + ---------- + mock_open : MagicMock + The mock fixture defined in conftest.py. + logger_mock : MagicMock + The logger fixture defined in conftest.py. + """ + wf = MagicMock() + wf.schema_path = 'my_schema_path.yaml' + workflow_file = 'my_file_path.yaml' + mock_open.return_value.__enter__.return_value = mock_open + with patch('workflow_engine.workflow_processor.os.path.exists', return_value=True) as path_exists_mock, \ + patch('workflow_engine.workflow_processor.yaml.safe_load') as safe_load_mock: + WorkflowFile._WorkflowFile__load_workflow(self=wf, file_path=workflow_file) + + path_exists_mock.assert_called_once_with(workflow_file) + logger_mock.debug.assert_called_once_with(f"Loading workflow file: {workflow_file}") + mock_open.assert_called_once_with(workflow_file, 'r', encoding='utf-8') + safe_load_mock.assert_called_once_with(mock_open) + + +@pytest.mark.parametrize('logger_mock', [{}], indirect=True) +@patch('builtins.open', new_callable=mock_open, read_data='YAML content') +def test_workflow_file_load_workflow_ko(mock_open: MagicMock, logger_mock: MagicMock): + """Test WorkflowFile.__load_workflow error flow. + Check if the FileNotFoundError exception is raised by the method. + + Parameters + ---------- + mock_open : MagicMock + unittest mock of the open function + logger_mock : MagicMock + The logger fixture defined in conftest.py + """ + wf = MagicMock() + wf.schema_path = 'my_schema_path.yaml' + workflow_file = 'my_file_path.yaml' + mock_open.return_value.__enter__.return_value = mock_open + with patch('workflow_engine.workflow_processor.os.path.exists', return_value=False) as path_exists_mock, \ + pytest.raises(FileNotFoundError, match=f'File "{workflow_file}" not found.') as file_exc: + WorkflowFile._WorkflowFile__load_workflow(self=wf, file_path=workflow_file) + + +@pytest.mark.parametrize('logger_mock', [{}], indirect=True) +def test_workflow_file_process_workflow(logger_mock: MagicMock): + """Test WorkflowFile.__process_workflow. + Check that the method calls the expand_task method of each task using a lambda as a side effect. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py + """ + variable_list = {'variable_1': 'value_1', 'variable_2': 'value_2'} + task_list = [{'task': 'task1'}, {'task': 'task2'}, {'task': 'task3'}] + expanded_task_list = [{'task': 'task3_1'}, {'task': 'task3_2'}] + wf = MagicMock() + wf.workflow_raw_data = {'tasks': task_list, 'variables': variable_list} + wf._WorkflowFile__expand_task.side_effect = lambda task, variables: [task] + \ + (expanded_task_list if task['task'] == 'task3' else []) + tasks = WorkflowFile._WorkflowFile__process_workflow(wf) + + logger_mock.debug.assert_called_once_with("Process workflow.") + calls = [call(task, variable_list) for task in task_list] + wf._WorkflowFile__expand_task.assert_has_calls(calls) + task_list.extend(expanded_task_list) + assert tasks == task_list + + +@pytest.mark.parametrize('logger_mock', [{}], indirect=True) +def test_workflow_file_process_workflow_ok(logger_mock: MagicMock): + """Test WorkflowFile.__process_workflow error flow. + Check that a ValueError is raised when no task are found in the workflow. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py + """ + wf = MagicMock() + wf.workflow_row_data = { + 'tasks': [] + } + wf.__expand_task.return_value = [] + with pytest.raises(ValueError, match="No tasks found in the workflow."): + tasks = WorkflowFile._WorkflowFile__process_workflow(self=wf) + + logger_mock.debug.assert_called_once_with("Process workflow.") + + +@pytest.mark.parametrize('element, values, return_value', + [({'key_1': 'key_1 {value_1}', 'key_2': 'key_2 {value_2}'}, + {'value_1': 'value_1', 'value_2': 'value_2'}, + {'key_1': 'key_1 value_1', 'key_2': 'key_2 value_2'}), + (['element_1 {value_1}', 'element_2 {value_2}', 'element_3 {value_3}'], + {'value_1': 'value_1', 'value_2': 'value_2', 'value_3': 'value_3'}, + ['element_1 value_1', 'element_2 value_2', 'element_3 value_3']), + ('string_element {value}', {'value': 'value'}, 'string_element value'), + ({1, 2}, None, {1, 2})]) +def test_workflow_file_replace_placeholder(element: Any, values: dict, return_value: Any): + """Test WorkflowFile.__replace_placeholder.""" + wf = MagicMock() + wf._WorkflowFile__replace_placeholders.side_effect = \ + lambda s, e, v: WorkflowFile._WorkflowFile__replace_placeholders(wf, s, e, v) + result = WorkflowFile._WorkflowFile__replace_placeholders(self=wf, element=element, values=values) + assert result == return_value + + +@pytest.mark.parametrize('task, return_value, variables', + [({'task': 'task: {as_variable_1}', 'param': '{as_variable_2}', + 'foreach': [{'variable': 'variable_1', 'as': 'as_variable_1'}, + {'variable': 'variable_2', 'as': 'as_variable_2'}]}, + [{"task": "task: value_1_1", 'param': 'value_2_1', + "foreach": [{"variable": "variable_1", "as": "as_variable_1"}, + {"variable": "variable_2", "as": "as_variable_2"}]}, + {"task": "task: value_1_1", 'param': 'value_2_2', + "foreach": [{"variable": "variable_1", "as": "as_variable_1"}, + {"variable": "variable_2", "as": "as_variable_2"}]}, + {"task": "task: value_1_2", 'param': 'value_2_1', + "foreach": [{"variable": "variable_1", "as": "as_variable_1"}, + {"variable": "variable_2", "as": "as_variable_2"}]}, + {"task": "task: value_1_2", 'param': 'value_2_2', + "foreach": [{"variable": "variable_1", "as": "as_variable_1"}, + {"variable": "variable_2", "as": "as_variable_2"}]}], + {'variable_1': ['value_1_1', 'value_1_2'], + 'variable_2': ['value_2_1', 'value_2_2']}), + ({'task': 'task1', 'placeholder': 'placeholder {variable_1}'}, + [{'task': 'task1', 'placeholder': 'placeholder value_1'}], + {'variable_1': 'value_1'}) + ]) +def test_workflow_file_expand_task(task: dict, return_value: dict, variables: dict): + """Test WorkflowFile.___expand_task. + Check the if the expand_task return dictionary is ok. + + Parameters + ---------- + task : dict + A task dictionary used as the input parameter for the expand_task method. + return_value : dict + The expected return value. + variables : dict + The variables dictionary used as the input parameter for the expand_task method. + """ + def side_effect(s, e, v = None): + return WorkflowFile._WorkflowFile__replace_placeholders(wf, s, e, v) + wf = MagicMock() + wf._WorkflowFile__replace_placeholders.side_effect = side_effect + + tasks = WorkflowFile._WorkflowFile__expand_task(wf, task, variables) + assert tasks == return_value + + +def test_workflow_file_static_workflow_validation(): + """Test WorkflowFile.__static_workflow_validation. + Check if no exception is raised with a valid task_collection""" + wf = MagicMock() + wf.task_collection = [{"task": "task 1", "param": "1"}, + {"task": "task 2", "param": "2", 'depends-on': ['task 1']} + ] + WorkflowFile._WorkflowFile__static_workflow_validation(wf) + + +@pytest.mark.parametrize('task_collection, error_msg', [ + ([{"task": "task 1", "param": "1"}, + {"task": "task 1", "param": "2", 'depends-on': ['task 1']}], + 'Duplicated task names: task 1'), + ([{"task": "task 1", "param": "1", 'depends-on': ['task 3', 'task 4']}, + {"task": "task 2", "param": "2", 'depends-on': ['task 3']}], + 'Tasks do not exist: task 3, task 4') +]) +def test_workflow_file_static_workflow_validation_ko(task_collection: List[dict], error_msg: str): + """Test WorkflowFile.__static_workflow_validation. + Check if the validation raises ValueError exceptions with invalid task collections. + + Parameters + ---------- + task_collection : List[dict] + List of tasks + error_msg : str + Expected exception errors + """ + wf = MagicMock() + wf.task_collection = task_collection + with pytest.raises(ValueError, match=error_msg): + WorkflowFile._WorkflowFile__static_workflow_validation(wf) diff --git a/deployability/modules/workflow_engine/tests/test_workflow_processor.py b/deployability/modules/workflow_engine/tests/test_workflow_processor.py new file mode 100644 index 0000000000..a0639c6ea0 --- /dev/null +++ b/deployability/modules/workflow_engine/tests/test_workflow_processor.py @@ -0,0 +1,385 @@ +# Copyright (C) 2015-2021, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 +"""WorkflowProcessor Unit tests""" +import time +import json +from concurrent.futures import Future +from unittest.mock import patch, MagicMock, call +import pytest + +from workflow_engine.workflow_processor import WorkflowProcessor, DAG +from workflow_engine.task import ProcessTask, TASKS_HANDLERS + + +@pytest.mark.parametrize('workflow_file, dry_run, threads, log_level, schema_file', + [('workflow.yaml', False, 1, 'info', 'schema.yaml'), + ('workflow.yaml', True, 1, 'debug', 'schema.yaml'), + ('workflow.yaml', True, 1, 'debug', None), + ]) +@patch("workflow_engine.workflow_processor.logger") +@patch("workflow_engine.workflow_processor.WorkflowFile") +def test_workflow_processor_constructor(file_mock: MagicMock, logger_mock: MagicMock, + workflow_file:str, dry_run: bool, threads: int, log_level: str, + schema_file:str): + """Test WorkflowProcessor constructor. + Check the workflowprocessor instance variables after construction. + + Parameters + ---------- + file_mock : MagicMock + Mock of a WorkflowFile Constructor. + logger_mock : MagicMock + The logger fixture defined in conftest.py. + workflow_file : str + Path to workflow yaml file. + dry_run : bool + Define if the workflow will run or not + threads : int + number of threads + log_level : str + Log level string + schema_file : str + Path to the schema.yml file + """ + task_collection = [ + {'task': 'task1', 'path': '/cmd1', 'args': [{"param1": "value1"}]}, + {'task': 'task2', 'path': '/cmd2', 'args': [{"param1": "value1"}]}, + {'task': 'task3', 'path': '/cmd3', 'args': [{"param1": "value1"}]}, + ] + workflow_file_instance = file_mock.return_value + workflow_file_instance.task_collection = task_collection + with patch.object(logger_mock, 'setLevel') as set_level_mock: + processor = WorkflowProcessor(workflow_file, dry_run, threads, log_level, schema_file) + set_level_mock.assert_called_once_with(log_level) + file_mock.assert_called_once_with(workflow_file, schema_file) + assert processor.task_collection == task_collection + assert processor.dry_run == dry_run + assert processor.threads == threads + + +@pytest.mark.parametrize('logger_mock, w_processor, dag, action, should_be_canceled', + [({}, {}, {}, 'custom_action', True), + ({}, {}, {}, 'custom_action', False),], + indirect=["dag", "w_processor", "logger_mock"]) +def test_execute_task(logger_mock: MagicMock, w_processor: WorkflowProcessor, dag: DAG, action: str, + should_be_canceled: bool): + """Test WorflowProcessor.execute_task function normal + Check the execute_task method when log messages and function calls when the should_be_canceled return value + is True or False. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py. + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + dag : DAG + The dag fixture defined in conftest.py. + action : str + action name + should_be_canceled : bool + should_be_canceled method patched return value. + + Returns + ------- + [type] + [description] + """ + start_time = time.time() + elapsed_time = 10 + def time_side_effect(): + nonlocal start_time + start_time=start_time + elapsed_time + return start_time + + task = {'task': 'task1'} + p_task = ProcessTask('task1', {}) + with patch.object(dag, 'should_be_canceled', return_value=should_be_canceled) as should_be_canceled_mock, \ + patch.object(w_processor, 'create_task_object', return_value=p_task) as create_task_mock, \ + patch.object(dag, 'set_status') as set_status_mock, \ + patch.object(p_task, 'execute') as exec_mock, \ + patch('workflow_engine.workflow_processor.time') as time_mock: + time_mock.time = MagicMock(side_effect=time_side_effect) + w_processor.execute_task(dag=dag, task=task, action=action) + should_be_canceled_mock.assert_called_once_with(task['task']) + if should_be_canceled: + logger_mock.warning.assert_called_once_with( + "[%s] Skipping task due to dependency failure.", task['task']) + set_status_mock.assert_called_once_with(task['task'], 'canceled') + else: + create_task_mock.assert_called_once_with(task, action) + exec_mock.assert_called_once() + logger_mock.info.assert_has_calls([ + call("[%s] Starting task.", task['task']), + call("[%s] Finished task in %.2f seconds.", task['task'], elapsed_time) + ] + ) + set_status_mock.assert_called_once_with(task['task'], 'successful') + + +@pytest.mark.parametrize('on_error', [None, 'abort-all']) +@pytest.mark.parametrize('logger_mock, w_processor, dag, exception', + [({}, {}, {}, KeyboardInterrupt), + ({}, {}, {}, Exception)], + indirect=["dag", "w_processor", "logger_mock"]) +def test_execute_task_ko(logger_mock: MagicMock, w_processor: WorkflowProcessor, dag: DAG, exception, + on_error: str): + """Test WorflowProcessor.execute_task function, error flows. + Check logged messages, set_status call and cancel_dependant_tasks in the failure flow. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py. + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + dag : DAG + The dag fixture defined in conftest.py. + exception : [type] + Expected exception. + on_error : str + set on-error of the task. + """ + task = {'task': 'task1'} + task.update({'on-error': on_error} if on_error else {}) + p_task = ProcessTask('task1', {}) + exc = exception() + with patch.object(dag, 'should_be_canceled', return_value=False), \ + patch.object(w_processor, 'create_task_object', return_value=p_task), \ + patch.object(dag, 'set_status') as set_status_mock, \ + patch.object(p_task, 'execute', side_effect=exc), \ + patch('workflow_engine.workflow_processor.time'), \ + patch.object(dag, 'cancel_dependant_tasks') as cancel_mock, \ + pytest.raises(expected_exception=exception): + w_processor.execute_task(dag=dag, task=task, action='action') + + logger_mock.error.assert_called_once_with("[%s] Task failed with error: %s.", task['task'], exc) + set_status_mock.assert_called_once_with(task['task'], 'failed') + cancel_mock.assert_called_once_with(task['task'], on_error if on_error else 'abort-related-flows') + + +@pytest.mark.parametrize('task_type', ['process', 'dummy', 'dummy-random']) +@pytest.mark.parametrize('w_processor', [{}], indirect=True) +def test_create_task_object(w_processor: WorkflowProcessor, task_type: str): + """Test WorkfowProcess.create_task_object function normal flow. + Check the task type returned by the method. + + Parameters + ---------- + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + task_type : str + type of task + """ + task_dict = {'task': 'task1', 'action': {'this': task_type, 'with': {'param'}}} + task = w_processor.create_task_object(task_dict, 'action') + assert isinstance(task, TASKS_HANDLERS.get(task_type)) + + +@pytest.mark.parametrize('w_processor', [{}], indirect=True) +def test_create_task_object_ko(w_processor: WorkflowProcessor): + """Test WorkfowProcess.create_task_object function error flow. + Check that the create_task_object raise a ValueError exception for invalid types.} + + Parameters + ---------- + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + """ + task_type = 'unknown' + task_dict = {'task': 'task1', 'action': {'this': task_type, 'with': {'param'}}} + with pytest.raises(ValueError, match=f"Unknown task type '{task_type}'."): + w_processor.create_task_object(task_dict, 'action') + + +@pytest.mark.parametrize('reverse', [False, True]) +@pytest.mark.parametrize('logger_mock, w_processor, dag',[({}, {}, {})], + indirect=["dag", "w_processor", "logger_mock"]) +@patch('workflow_engine.workflow_processor.concurrent.futures.ThreadPoolExecutor') +def test_execute_tasks_parallel(executor_mock: MagicMock, logger_mock: MagicMock, w_processor: WorkflowProcessor, + dag: DAG, reverse: bool): + """Test WorkfowProcess.execute_task_parallel function. + Check if the logged messages and function calls of the method with reverse True and False cases. + + Parameters + ---------- + executor_mock : MagicMock + Mock of the ThreadPoolExecutor. + logger_mock : MagicMock + The logger fixture defined in conftest.py. + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + dag : DAG + The dag fixture defined in conftest.py. + reverse : bool + Parameterized value for the execute__tasks_parallel reverse parameter. + """ + futures = MagicMock() + futures.values = MagicMock(return_value = (x := MagicMock())) + y = MagicMock() + y.__enter__ = MagicMock(return_value=y) + executor_mock.return_value = y + with patch('workflow_engine.workflow_processor.concurrent.futures.wait') as wait_mock, \ + patch.object(w_processor, 'generate_futures', return_value=futures) as gen_futures_mock: + w_processor.execute_tasks_parallel(dag, reverse=reverse) + logger_mock.info.assert_called_once_with("Executing tasks in parallel.") + executor_mock.assert_called_once_with(max_workers=w_processor.threads) + wait_mock.assert_called_once_with(x) + gen_futures_mock.assert_called_once_with(dag, y, reverse) + + +@pytest.mark.parametrize('reverse', [False, True]) +@pytest.mark.parametrize('logger_mock, w_processor, dag',[({}, {}, {})], + indirect=["dag", "w_processor", "logger_mock"]) +@patch('workflow_engine.workflow_processor.concurrent.futures.ThreadPoolExecutor') +def test_execute_tasks_parallel_ko(executor_mock: MagicMock, logger_mock: MagicMock, w_processor: WorkflowProcessor, + dag: DAG, reverse: bool): + """Test WorkfowProcess.execute_task_parallel function error flow. + Check function call message loggin and calls when the KeyboardInterrupt is generated while waiting the subprocess + to finish execution. + + Parameters + ---------- + executor_mock : MagicMock + not used, just patched + logger_mock : MagicMock + The logger fixture defined in conftest.py. + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + dag : DAG + The dag fixture defined in conftest.py. + reverse : bool + Parameterized value for the execute__tasks_parallel reverse parameter. + """ + execute_parallel_mock = MagicMock() + def patch_recursive_and_return_exception(_): + w_processor.execute_tasks_parallel = execute_parallel_mock + raise KeyboardInterrupt() + + with patch('workflow_engine.workflow_processor.concurrent.futures.wait', + side_effect=patch_recursive_and_return_exception), \ + patch.object(w_processor, 'generate_futures'): + w_processor.execute_tasks_parallel(dag, reverse=reverse) + logger_mock.info.assert_called_once_with("Executing tasks in parallel.") + logger_mock.error.assert_called_once_with("User interrupt detected. Aborting execution...") + execute_parallel_mock.assert_called_once_with(dag, reverse=True) + + +@pytest.mark.parametrize('w_processor', + [{'task_collection': [ + {'task': 'task1'}, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': ['task1']}, + {'task': 'task4', 'depends-on': ['task1']}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']}],}, + ], + indirect=True) +def test_generate_futures(w_processor: WorkflowProcessor): + """Test WorkfowProcess.generate_futures function without reverse. + Check the futures returned by the method. + + Parameters + ---------- + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + """ + def submit_execute_task_side_effect(_, dag: DAG, task, __): + dag.set_status(task['task'], 'successful') + return Future() + + executor = MagicMock() + executor.submit.side_effect=submit_execute_task_side_effect + dag = DAG(task_collection=w_processor.task_collection) + futures = w_processor.generate_futures(dag, executor=executor) + assert len(futures) == len(w_processor.task_collection) and \ + all(isinstance(element, Future) for element in futures.values()) + + +@pytest.mark.parametrize('w_processor', + [{'task_collection': [ + {'task': 'task1'}, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': ['task1']}, + {'task': 'task4', 'depends-on': ['task1']}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']}],}, + ], + indirect=True) +def test_generate_futures_reverse(w_processor: WorkflowProcessor): + """Test WorkfowProcess.generate_futures function with reverse True. + Check that set_status with successful is called for the tasks. + + Parameters + ---------- + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + """ + + def set_status_side_effect(task, status): + dag.finished_tasks_status[status].add(task) + dag.dag.done(task) + + executor = MagicMock() + dag = DAG(task_collection=w_processor.task_collection, reverse=True) + with patch.object(dag, 'set_status', side_effect=set_status_side_effect) as set_status_mock: + futures = w_processor.generate_futures(dag, executor=executor, reverse=True) + calls = [call(task['task'], 'successful') for task in w_processor.task_collection] + set_status_mock.assert_has_calls(calls, any_order=True) + + +@pytest.mark.parametrize('dry_run', [False, True]) +@pytest.mark.parametrize('logger_mock, w_processor', + [({}, { + 'task_collection': [ + {'task': 'task1'}, + {'task': 'task2', 'depends-on': ['task1']}, + {'task': 'task3', 'depends-on': ['task1']}, + {'task': 'task4', 'depends-on': ['task1']}, + {'task': 'task5', 'depends-on': ['task2', 'task3', 'task4']}],})], + indirect=True) +def test_run(logger_mock: MagicMock, w_processor: WorkflowProcessor, dry_run: bool): + """Test WorkfowProcess.run function. + Check log message and execute_tasks_parallel call. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py. + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + dry_run : bool + Parameterized value to test the run method. + """ + def dag_constructor(_, reverse=False): + return reverse_dag if reverse else dag + + w_processor.dry_run = dry_run + dag = DAG(w_processor.task_collection) + reverse_dag = DAG(w_processor.task_collection, reverse=True) + with patch.object(w_processor, 'execute_tasks_parallel') as exec_tasks_mock, \ + patch('workflow_engine.workflow_processor.DAG', side_effect=dag_constructor) as dag_mock: + w_processor.run() + if dry_run: + dag_mock.assert_called_once_with(w_processor.task_collection) + logger_mock.info.assert_called_once_with("Execution plan: %s", json.dumps(dag.get_execution_plan(), indent=2)) + else: + logger_mock.info.assert_has_calls([call("Executing DAG tasks."), call("Executing Reverse DAG tasks.")]) + exec_tasks_mock.assert_has_calls([call(dag), call(reverse_dag, reverse=True)]) + dag_mock.assert_has_calls([call(w_processor.task_collection), call(w_processor.task_collection, reverse=True)]) + + +@pytest.mark.parametrize('logger_mock, w_processor', [({}, {})], indirect=['logger_mock', 'w_processor']) +def test_handle_interrupt(logger_mock: MagicMock, w_processor: WorkflowProcessor): + """Test WorkfowProcess.handle_interrupt function. + Check logging when the handle_interrupt is called. + + Parameters + ---------- + logger_mock : MagicMock + The logger fixture defined in conftest.py. + w_processor : WorkflowProcessor + The workflow processor fixture defined in conftest.py. + """ + with pytest.raises(KeyboardInterrupt, match="User interrupt detected. End process..."): + w_processor.handle_interrupt(0, 0) + logger_mock.error.assert_called_once_with("User interrupt detected. End process...") diff --git a/deployability/modules/workflow_engine/workflow_processor.py b/deployability/modules/workflow_engine/workflow_processor.py index bd4b19ef7f..794ef6cbaa 100755 --- a/deployability/modules/workflow_engine/workflow_processor.py +++ b/deployability/modules/workflow_engine/workflow_processor.py @@ -331,7 +331,7 @@ def execute_tasks_parallel(self, dag: DAG, reverse: bool = False) -> None: logger.error("User interrupt detected. Aborting execution...") self.execute_tasks_parallel(dag, reverse=True) - def generate_futures(self, dag, executor, reverse: bool = False): + def generate_futures(self, dag: DAG, executor, reverse: bool = False): futures = {} while True: