Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests #6

Merged
merged 7 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,042 changes: 3,883 additions & 159 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ classifiers = [
packages = [{include = "airflow", from = "src"}]

[tool.poetry.dependencies]
python = "^3.8"
python = ">=3.8,<3.12"
anomalo = "^0.17.0"
importlib-resources = "^6.1.1"

[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
]

[tool.poetry.group.dev.dependencies]
pre-commit = "^3.5.0"
ruff = "^0.1.7"
apache-airflow = "^2.8.1"
pytest = "^8.0.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
4 changes: 1 addition & 3 deletions src/airflow/providers/anomalo/operators/anomalo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import date, timedelta
from typing import Callable, Mapping, Optional

from airflow import AirflowException
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.anomalo.hooks.anomalo import AnomaloHook

Expand Down Expand Up @@ -113,8 +113,6 @@ class AnomaloRunCheckOperator(BaseOperator):
:param anomalo_conn_id: (Optional) The connection ID used to connect to Anomalo.
"""

# todo cache? better name?

def __init__(
self,
table_name,
Expand Down
2 changes: 1 addition & 1 deletion src/airflow/providers/anomalo/sensors/anomalo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from airflow import AirflowException
from airflow.exceptions import AirflowException
from airflow.providers.anomalo.hooks.anomalo import AnomaloHook
from airflow.sensors.base import BaseSensorOperator

Expand Down
Empty file added tests/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions tests/test_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pytest

from airflow.providers.anomalo.hooks.anomalo import AnomaloHook
from airflow.models.connection import Connection


def test_get_conn(mocker):
hook = AnomaloHook()
get_connection = mocker.patch.object(
hook,
"get_connection",
return_value=Connection(host="foo", extra={"api_token": "bar"}),
)
mock_client = mocker.patch("airflow.providers.anomalo.hooks.anomalo.Client")

hook.get_conn()
get_connection.assert_called()
mock_client.assert_called_with(api_token="bar", host="foo")
102 changes: 102 additions & 0 deletions tests/test_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import List, Mapping, Tuple
import pytest

from datetime import date

from airflow.exceptions import AirflowException
from airflow.providers.anomalo.operators.anomalo import (
AnomaloPassFailOperator,
AnomaloRunCheckOperator,
AnomaloCheckRunResultOperator,
)


def test_run_check_operator(mocker):
mock_hook = mocker.patch("airflow.providers.anomalo.operators.anomalo.AnomaloHook")
mock_client = mock_hook.return_value.get_client.return_value = mocker.MagicMock()

mock_client.get_table_information.return_value = {"id": "foo_id"}
mock_client.run_checks.return_value = {"run_checks_job_id": "anomalo_job_id"}

run_check = AnomaloRunCheckOperator(
table_name="foo", run_date=date(2023, 2, 1), task_id="bar"
)

assert run_check.execute(context=None) == "anomalo_job_id"
mock_client.get_table_information.assert_called_with(table_name="foo")
mock_client.run_checks.assert_called_with(
table_id="foo_id", interval_id="2023-02-01", check_ids=None
)


@pytest.mark.parametrize(
"status_checker, expect_passes", [(lambda x: True, True), (lambda x: False, False)]
)
def test_check_run_result_operator(mocker, status_checker, expect_passes):
mock_hook = mocker.patch("airflow.providers.anomalo.operators.anomalo.AnomaloHook")
mock_client = mock_hook.return_value.get_client.return_value = mocker.MagicMock()

mock_client.get_table_information.return_value = {"id": "foo_id"}
mock_client.get_check_intervals.return_value = [
{"latest_run_checks_job_id": "anomalo_job_id"}
]
run_result_mock = mocker.MagicMock()
mock_client.get_run_result.return_value = run_result_mock

check_run = AnomaloCheckRunResultOperator(
table_name="foo",
run_date=date(2023, 2, 1),
task_id="bar",
status_checker=status_checker,
)

try:
results = check_run.execute(context=None)
actual_passes = True
except AirflowException:
actual_passes = False

mock_client.get_table_information.assert_called_with(table_name="foo")
mock_client.get_check_intervals.assert_called_with(
table_id="foo_id", start="2023-02-01", end=None
)
mock_client.get_run_result.assert_called_with(job_id="anomalo_job_id")

assert actual_passes == expect_passes

if actual_passes:
assert results == run_result_mock


def create_mock_run_result(type_to_success: List[Tuple[str, bool]]) -> Mapping:
runs = []
for check_type, success in type_to_success:
runs.append(
{
"run_config": {"_metadata": {"check_type": check_type}},
"results": {"success": success},
}
)
return {"check_runs": runs}


@pytest.mark.parametrize(
"results, must_pass, expect_passes",
[
(create_mock_run_result([("rule", True)]), ["rule"], True),
(create_mock_run_result([("rule", True), ("rule", False)]), ["rule"], False),
(create_mock_run_result([("rule", True), ("metric", False)]), ["rule"], True),
(
create_mock_run_result([("rule", True), ("metric", False)]),
["rule", "metric"],
False,
),
],
)
def test_check_pass_fail_operator(mocker, results, must_pass, expect_passes):
check_pass = AnomaloPassFailOperator(
table_name="foo", run_date=date(2023, 2, 1), must_pass=must_pass, task_id="bar"
)

actual_passes = check_pass.status_checker(results=results)
assert actual_passes == expect_passes
35 changes: 35 additions & 0 deletions tests/test_sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from airflow.providers.anomalo.sensors.anomalo import AnomaloJobCompleteSensor


@pytest.mark.parametrize(
"run_result, complete",
[
({"check_runs": [{"results_pending": False}]}, True),
({"check_runs": [{"results_pending": True}]}, False),
],
)
def test_job_complete_sensor_true(mocker, run_result, complete):
mock_hook = mocker.patch("airflow.providers.anomalo.sensors.anomalo.AnomaloHook")
mock_client = mock_hook.return_value.get_client.return_value = mocker.MagicMock()

sensor = AnomaloJobCompleteSensor(job_id="foo", task_id="bar")
mock_client.get_run_result.return_value = run_result
assert sensor.poke(context=None) == complete

mock_client.get_run_result.assert_called_with("foo")


def test_job_complete_sensor_xcom(mocker):
mock_hook = mocker.patch("airflow.providers.anomalo.sensors.anomalo.AnomaloHook")
mock_client = mock_hook.return_value.get_client.return_value = mocker.MagicMock()
mock_context = mocker.MagicMock()

sensor = AnomaloJobCompleteSensor(xcom_job_id_task="foo_task", task_id="bar")
xcom_pull = mocker.patch.object(sensor, "xcom_pull", return_value="foo")

sensor.poke(context=mock_context)

xcom_pull.assert_called()
mock_client.get_run_result.assert_called_with("foo")