Skip to content

Commit

Permalink
Fixes 17413: Fix one sided tests for columnValueLengthsToBeBetween an…
Browse files Browse the repository at this point in the history
…d columnValuesToBeBetween (open-metadata#17423)

* mysql integration tests

* fix(data-quality): accept between with no bounds

add between filters only when the bounds are defined. if they are not (ie: resolve to 'inf' values), do not add any filters

* format

* consolidated ingestion_config

* format

* fixed handling of date and time columns

* fixed tests
  • Loading branch information
sushi30 authored Aug 19, 2024
1 parent ad8b271 commit 4c08f82
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
"""
Validator for column value length to be between test case
"""


import math
from typing import Optional

from sqlalchemy import Column, inspect
Expand Down Expand Up @@ -65,14 +64,16 @@ def compute_row_count(self, column: Column, min_bound: int, max_bound: int):
NotImplementedError:
"""
row_count = self._compute_row_count(self.runner, column)
filters = []
if min_bound > -math.inf:
filters.append((LenFn(column), "lt", min_bound))
if max_bound < math.inf:
filters.append((LenFn(column), "gt", max_bound))
failed_rows = self._compute_row_count_between(
self.runner,
column,
{
"filters": [
(LenFn(column), "gt", max_bound),
(LenFn(column), "lt", min_bound),
],
"filters": filters,
"or_filter": True,
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""
Validator for column values to be between test case
"""

import math
from typing import Optional

from sqlalchemy import Column, inspect
Expand Down Expand Up @@ -63,11 +63,16 @@ def compute_row_count(self, column: Column, min_bound: int, max_bound: int):
NotImplementedError:
"""
row_count = self._compute_row_count(self.runner, column)
filters = []
if not isinstance(min_bound, (int, float)) or min_bound > -math.inf:
filters.append((column, "lt", min_bound))
if not isinstance(min_bound, (int, float)) or max_bound < math.inf:
filters.append((column, "gt", max_bound))
failed_rows = self._compute_row_count_between(
self.runner,
column,
{
"filters": [(column, "gt", max_bound), (column, "lt", min_bound)],
"filters": filters,
"or_filter": True,
},
)
Expand Down
28 changes: 28 additions & 0 deletions ingestion/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ def create_service_request(scope="module"):

@pytest.fixture()
def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch):
"""Patch the password for all db services returned by the metadata service.
Usage:
def test_my_test(db_service, patch_passwords_for_db_services):
...
OR
@pytest.usefixtures("patch_passwords_for_db_services")
def test_my_test(db_service):
...
"""

def override_password(getter):
def inner(*args, **kwargs):
result = getter(*args, **kwargs)
Expand Down Expand Up @@ -187,3 +201,17 @@ def inner(entity_type: Type[Entity], fqn: str):
entity = metadata.get_by_name(etype, fqn, fields=["*"])
if entity:
metadata.delete(etype, entity.id, recursive=True, hard_delete=True)


@pytest.fixture(scope="module")
def ingestion_config(db_service, metadata, workflow_config, sink_config):
return {
"source": {
"type": db_service.connection.config.type.value.lower(),
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
"serviceConnection": db_service.connection.model_dump(),
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
Empty file.
73 changes: 73 additions & 0 deletions ingestion/tests/integration/mysql/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
from subprocess import CalledProcessError

import pytest
from sqlalchemy import create_engine
from testcontainers.mysql import MySqlContainer

from _openmetadata_testutils.helpers.docker import try_bind
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)


@pytest.fixture(scope="module")
def mysql_container(tmp_path_factory):
"""Start a PostgreSQL container with the dvdrental database."""
test_db_tar_path = os.path.join(
os.path.dirname(__file__), "data", "mysql", "test_db-1.0.7.tar.gz"
)
container = MySqlContainer(dbname="employees")
with (
try_bind(container, 3306, 3307) if not os.getenv("CI") else container
) as container:
docker_container = container.get_wrapped_container()
docker_container.exec_run(["mkdir", "-p", "/data"])
docker_container.put_archive("/data", open(test_db_tar_path, "rb"))
for command in (
[
"sh",
"-c",
f"cd /data/test_db && mysql -uroot -p{container.password} < employees.sql",
],
[
"sh",
"-c",
f'mysql -uroot -p{container.password} -e \'GRANT SELECT ON employees.* TO "test"@"%";\'',
],
):
res = docker_container.exec_run(command)
if res[0] != 0:
raise CalledProcessError(
returncode=res[0], cmd=res, output=res[1].decode("utf-8")
)
engine = create_engine(container.get_connection_url())
engine.execute(
"ALTER TABLE employees ADD COLUMN last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
)
engine.execute(
"UPDATE employees SET last_update = hire_date + INTERVAL FLOOR(1 + RAND() * 500000) SECOND"
)
yield container


@pytest.fixture(scope="module")
def create_service_request(mysql_container, tmp_path_factory):
return CreateDatabaseServiceRequest.model_validate(
{
"name": "docker_test_" + tmp_path_factory.mktemp("mysql").name,
"serviceType": DatabaseServiceType.Mysql.value,
"connection": {
"config": {
"username": mysql_container.username,
"authType": {"password": mysql_container.password},
"hostPort": "localhost:"
+ mysql_container.get_exposed_port(mysql_container.port),
"databaseSchema": mysql_container.dbname,
}
},
}
)
3 changes: 3 additions & 0 deletions ingestion/tests/integration/mysql/data/mysql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# MySQL test db

https://github.com/datacharmer/test_db
Binary file not shown.
222 changes: 222 additions & 0 deletions ingestion/tests/integration/mysql/test_data_quality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import sys
from dataclasses import dataclass
from datetime import datetime
from typing import List

import pytest

from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
TestSuitePipeline,
)
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow

if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)


@pytest.fixture()
def get_test_suite_config(workflow_config, sink_config):
def inner(entity_fqn: str, test_case_definitions: List[TestCaseDefinition]):
return {
"source": {
"type": TestSuiteConfigType.TestSuite.value,
"serviceName": "MyTestSuite",
"sourceConfig": {
"config": TestSuitePipeline(
type=TestSuiteConfigType.TestSuite,
entityFullyQualifiedName=entity_fqn,
)
},
},
"processor": {
"type": "orm-test-runner",
"config": {
"testCases": [obj.model_dump() for obj in test_case_definitions]
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}

return inner


@dataclass
class TestColumnParameter:
entity_fqn: str
test_case_definition: TestCaseDefinition
expected_result: TestCaseResult


@pytest.fixture(
params=[
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="first_name_includes_tom_and_jerry_wo_enum",
testDefinitionName="columnValuesToBeInSet",
computePassedFailedRowCount=True,
columnName="first_name",
parameterValues=[
{"name": "allowedValues", "value": "['Tom', 'Jerry']"}
],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Failed,
),
),
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="value_lengths_between_3_and_5",
testDefinitionName="columnValueLengthsToBeBetween",
computePassedFailedRowCount=True,
columnName="first_name",
parameterValues=[
{"name": "minLength", "value": "3"},
{"name": "maxLength", "value": "5"},
],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Failed,
),
),
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="value_lengths_at_most_5",
testDefinitionName="columnValueLengthsToBeBetween",
columnName="first_name",
computePassedFailedRowCount=True,
parameterValues=[
{"name": "maxLength", "value": "5"},
],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Failed,
),
),
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="value_lengths_at_least_3",
testDefinitionName="columnValueLengthsToBeBetween",
columnName="first_name",
computePassedFailedRowCount=True,
parameterValues=[
{"name": "minLength", "value": "3"},
],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Success,
),
),
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="id_at_least_0",
testDefinitionName="columnValuesToBeBetween",
columnName="emp_no",
computePassedFailedRowCount=True,
parameterValues=[
{"name": "minValue", "value": "0"},
],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Success,
),
),
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="id_no_bounds",
testDefinitionName="columnValuesToBeBetween",
columnName="emp_no",
computePassedFailedRowCount=True,
parameterValues=[],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Success,
),
),
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="values_between_date",
testDefinitionName="columnValuesToBeBetween",
columnName="hire_date",
computePassedFailedRowCount=True,
parameterValues=[
{
"name": "minValue",
"value": str(int(datetime(1960, 1, 1).timestamp())),
},
],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Success,
),
),
TestColumnParameter(
entity_fqn="{database_service_fqn}.default.employees.employees",
test_case_definition=TestCaseDefinition(
name="value_between_timestamp",
testDefinitionName="columnValuesToBeBetween",
columnName="last_update",
computePassedFailedRowCount=True,
parameterValues=[
{
"name": "minValue",
"value": str(int(datetime(2000, 1, 1).timestamp())),
},
],
),
expected_result=TestCaseResult(
testCaseStatus=TestCaseStatus.Failed,
),
),
],
ids=lambda x: x.test_case_definition.name,
)
def parameters(request, db_service):
request.param.entity_fqn = request.param.entity_fqn.format(
database_service_fqn=db_service.fullyQualifiedName.root
)
return request.param


def test_column_test_cases(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
db_service: DatabaseService,
metadata: OpenMetadata,
parameters: TestColumnParameter,
get_test_suite_config,
cleanup_fqns,
):
run_workflow(MetadataWorkflow, ingestion_config)
test_suite_config = get_test_suite_config(
parameters.entity_fqn,
[parameters.test_case_definition],
)
run_workflow(TestSuiteWorkflow, test_suite_config)
test_case: TestCase = metadata.get_by_name(
TestCase,
f"{parameters.entity_fqn}.{parameters.test_case_definition.columnName}.{parameters.test_case_definition.name}",
fields=["*"],
nullable=False,
)
cleanup_fqns(TestCase, test_case.fullyQualifiedName.root)
assert_equal_pydantic_objects(
parameters.expected_result,
test_case.testCaseResult,
)
Loading

0 comments on commit 4c08f82

Please sign in to comment.