Skip to content

Commit

Permalink
Enable type checks (MyPy) in the CI (#510)
Browse files Browse the repository at this point in the history
We had to disable MyPy checks from the pre-commit hook because we were
reaching a quota from the pre-commit service:
```
build of https://github.com/pre-commit/mirrors-mypy:types-PyYAML,types-attrs,attrs,types-requests,types-python-dateutil,[email protected] for python@python3 exceeds tier max size 250MiB: 262.2MiB
```

As observed in #482.

This PR re-enables MyPy checks but in the CI.

Bonus: Fix Openlineage import error handling
  • Loading branch information
tatiana authored Sep 6, 2023
1 parent c7a203a commit 6c3c003
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 33 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ jobs:
steps:
- run: true

Type-Check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
with:
python-version: '3.10'
architecture: 'x64'
- uses: actions/cache@v3
with:
path: |
~/.cache/pip
key: ${{ runner.os }}-${{ hashFiles('pyproject.toml') }}
- run: pip3 install hatch mypy
- run: hatch run tests.py3.9-2.7:type-check

Run-Unit-Tests:
runs-on: ubuntu-latest
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ repos:
rev: 'v1.5.1'
hooks:
- id: mypy
name: mypy-python-sdk
name: mypy-python
additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow]
files: ^cosmos

Expand Down
2 changes: 2 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
DBT_TARGET_DIR_NAME = "target"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"


Expand Down
10 changes: 5 additions & 5 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
)


class DbtDockerBaseOperator(DockerOperator, DbtBaseOperator): # type: ignore[misc] # ignores subclass MyPy error
class DbtDockerBaseOperator(DockerOperator, DbtBaseOperator): # type: ignore
"""
Executes a dbt core cli command in a Docker container.
"""

template_fields: Sequence[str] = DbtBaseOperator.template_fields + DockerOperator.template_fields
template_fields: Sequence[str] = tuple(list(DbtBaseOperator.template_fields) + list(DockerOperator.template_fields))

intercept_flag = False

Expand All @@ -39,7 +39,7 @@ def __init__(

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}") # type: ignore[has-type]
self.log.info(f"Running command: {self.command}")
result = super().execute(context)
logger.info(result)

Expand All @@ -50,8 +50,8 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) ->
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
# set env vars
self.environment = {**env_vars, **self.environment} # type: ignore[has-type]
self.command = dbt_cmd
self.environment: dict[str, Any] = {**env_vars, **self.environment}
self.command: list[str] = dbt_cmd

def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context)
Expand Down
12 changes: 7 additions & 5 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
)


class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore[misc]
class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore
"""
Executes a dbt core cli command in a Kubernetes Pod.
"""

template_fields: Sequence[str] = DbtBaseOperator.template_fields + KubernetesPodOperator.template_fields
template_fields: Sequence[str] = tuple(
list(DbtBaseOperator.template_fields) + list(KubernetesPodOperator.template_fields)
)

intercept_flag = False

Expand All @@ -41,12 +43,12 @@ def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -
super().__init__(**kwargs)

def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None:
env_vars_dict = dict()
env_vars_dict: dict[str, str] = dict()

for env_var in self.env_vars: # type: ignore[has-type]
for env_var in self.env_vars:
env_vars_dict[env_var.name] = env_var.value

self.env_vars = convert_env_vars({**env, **env_vars_dict})
self.env_vars: list[Any] = convert_env_vars({**env, **env_vars_dict})

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_kube_args(context, cmd_flags)
Expand Down
27 changes: 18 additions & 9 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pathlib import Path
from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING

import airflow
import yaml
from airflow import DAG
from airflow.compat.functools import cached_property
Expand All @@ -30,7 +31,7 @@

from sqlalchemy.orm import Session

from cosmos.constants import OPENLINEAGE_PRODUCER
from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER
from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import DbtBaseOperator
Expand All @@ -43,18 +44,20 @@
)

logger = get_logger(__name__)
lineage_namespace = conf.get("openlineage", "namespace", fallback=os.getenv("OPENLINEAGE_NAMESPACE", "default"))


try:
from airflow.providers.openlineage.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError):
from openlineage.airflow.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError):
logger.warning(
"To enable emitting Openlineage events, please, upgrade to Airflow 2.7 or install `openlineage-airflow`."
)
is_openlineage_available = False
try:
from openlineage.airflow.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError) as error:
logger.warning(
"To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or "
"install astronomer-cosmos[openlineage]."
)
logger.exception(error)
is_openlineage_available = False


class DbtLocalBaseOperator(DbtBaseOperator):
Expand Down Expand Up @@ -248,6 +251,12 @@ def calculate_openlineage_events_completes(
for key, value in env.items():
os.environ[key] = str(value)

lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
try:
lineage_namespace = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
pass

openlineage_processor = DbtLocalArtifactProcessor(
producer=OPENLINEAGE_PRODUCER,
job_namespace=lineage_namespace,
Expand All @@ -261,7 +270,7 @@ def calculate_openlineage_events_completes(
try:
events = openlineage_processor.parse()
self.openlineage_events_completes = events.completes
except FileNotFoundError as error:
except (FileNotFoundError, NotImplementedError) as error:
logger.exception(error)

def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
Expand Down
18 changes: 13 additions & 5 deletions docs/configuration/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and virtualenv execution methods (read `execution modes <../getting_started/exec
To emit lineage events, Cosmos can use one of the following:

1. Airflow 2.7 `built-in support to OpenLineage <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_, or
2. The `openlineage-airflow <https://openlineage.io/docs/integrations/airflow/>`_ package
2. `Additional libraries <https://openlineage.io/docs/integrations/airflow/>`_.

No change to the user DAG files is required to use OpenLineage.

Expand All @@ -20,14 +20,22 @@ Installation

If using Airflow 2.7, no other dependency is required.

Otherwise, install the Python package ``openlineage-airflow``.
Otherwise, install Cosmos using ``astronomer-cosmos[openlineage]``.


Namespace configuration
-----------------------
Configuration
-------------

If using Airflow 2.7, follow `these instructions <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_ on how to configure OpenLineage.

Otherwise, follow `these instructions <https://openlineage.io/docs/integrations/airflow/>`_.


Namespace
.........

Cosmos will use the Airflow ``[openlineage]`` ``namespace`` property as a namespace, `if available <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_.

Otherwise, it attempts to use the environment variable ``OPENLINEAGE_NAMESPACE`` as the namespace.

If not defined, it uses ``"default"`` as the namespace.
Finally, if neither are defined, it uses ``"cosmos"`` as the namespace.
23 changes: 15 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ dependencies = [
"typing-extensions; python_version < '3.8'",
"virtualenv",
"openlineage-integration-common",
"openlineage-airflow"
]

[project.optional-dependencies]
Expand Down Expand Up @@ -76,12 +75,12 @@ dbt-snowflake = [
dbt-spark = [
"dbt-spark<=1.5.4",
]
lineage = [
openlineage = [
"openlineage-airflow",
]
all = [
"astronomer-cosmos[dbt-all]",
"astronomer-cosmos[lineage]"
"astronomer-cosmos[openlineage]"
]
docs =[
"sphinx",
Expand Down Expand Up @@ -132,7 +131,12 @@ dependencies = [
"astronomer-cosmos[tests]",
"apache-airflow-providers-docker>=3.5.0",
"apache-airflow-providers-cncf-kubernetes>=5.1.1,<7.3.0",
"openlineage-airflow"
"types-PyYAML",
"types-attrs",
"attrs",
"types-requests",
"types-python-dateutil",
"apache-airflow"
]

[[tool.hatch.envs.tests.matrix]]
Expand All @@ -150,22 +154,23 @@ matrix.airflow.dependencies = [

[tool.hatch.envs.tests.scripts]
freeze = "pip freeze"
type-check = "mypy cosmos"
test = 'pytest -vv --durations=0 . -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py'
test-cov = 'pytest -vv --cov=cosmos --cov-report=term-missing --cov-report=xml --durations=0 -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py'
test-cov = """pytest -vv --cov=cosmos --cov-report=term-missing --cov-report=xml --durations=0 -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py"""
# we install using the following workaround to overcome installation conflicts, such as:
# apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies
test-integration-setup = """pip uninstall dbt-postgres dbt-databricks; \
rm -rf airflow.*; \
airflow db init; \
pip install 'dbt-postgres<=1.5.4' 'dbt-databricks<=1.5.4'"""
pip install 'dbt-core==1.5.4' 'dbt-databricks<=1.5.4' 'dbt-postgres<=1.5.4' 'openlineage-airflow'"""
test-integration = """rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
--durations=0 \
-m integration \
-k 'not (example_cosmos_python_models or example_cosmos_python_models or example_virtualenv)'
-k 'not (example_cosmos_python_models or example_virtualenv)'
"""
test-integration-expensive = """rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
Expand All @@ -174,7 +179,7 @@ pytest -vv \
--cov-report=xml \
--durations=0 \
-m integration \
-k 'example_cosmos_python_models or example_cosmos_python_models or example_virtualenv'"""
-k 'example_cosmos_python_models or example_virtualenv'"""

[tool.pytest.ini_options]
filterwarnings = [
Expand Down Expand Up @@ -215,6 +220,8 @@ known_third_party = ["airflow", "jinja2"]

[tool.mypy]
strict = true
ignore_missing_imports = true
no_warn_unused_ignores = true

[tool.ruff]
line-length = 120
Expand Down
1 change: 1 addition & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def test_run_operator_dataset_inlets_and_outlets():
assert test_operator.outlets == []


@pytest.mark.integration
def test_run_operator_emits_events():
class MockRun:
facets = {"c": 3}
Expand Down

0 comments on commit 6c3c003

Please sign in to comment.