Skip to content

Commit

Permalink
Reduce Mccabe code complexity to 8 (astronomer#738)
Browse files Browse the repository at this point in the history
Reduce the maccabe score from 10 to 8 and refactored the code flagged as too complex.

I removed flake8, since we can use ruffs maccabe linter.

The following functions have been addressed
```shell
> pre-commit run flake8 --all-files
flake8...................................................................Failed
- hook id: flake8
- exit code: 1

cosmos/airflow/graph.py:196:1: C901 'build_airflow_graph' is too complex (9)
cosmos/converter.py:101:5: C901 'DbtToAirflowConverter.__init__' is too complex (9)
cosmos/dbt/parser/project.py:277:5: C901 'LegacyDbtProject.__post_init__' is too complex (10)
cosmos/dbt/selector.py:197:1: C901 'select_nodes' is too complex (9)

```
I picked out the functions which stood out to me as complex. If we want
to reduce it to 6, there are plenty more functions to tackle. For some
functions I would consider it unnecessary to drop it to 6.
```shell
cosmos/config.py:108:9: C901 `__init__` is too complex (7 > 6)
cosmos/dbt/parser/project.py:63:9: C901 `_config_selector_ooo` is too complex (7 > 6)
cosmos/dbt/parser/project.py:98:5: C901 `extract_python_file_upstream_requirements` is too complex (7 > 6)
cosmos/dbt/parser/project.py:165:9: C901 `extract_sql_file_requirements` is too complex (7 > 6)
cosmos/dbt/selector.py:52:9: C901 `load_from_statement` is too complex (7 > 6)
cosmos/dbt/selector.py:119:9: C901 `_should_include_node` is too complex (7 > 6)
cosmos/hooks/subprocess.py:34:9: C901 `run_command` is too complex (8 > 6)
cosmos/operators/base.py:137:9: C901 `get_env` is too complex (7 > 6)
cosmos/operators/base.py:186:9: C901 `add_global_flags` is too complex (7 > 6)
cosmos/operators/local.py:136:9: C901 `store_compiled_sql` is too complex (7 > 6)
cosmos/profiles/base.py:183:9: C901 `get_dbt_value` is too complex (8 > 6)
Found 11 errors.
```

This PR continues on the amazing work of @jbandoro on astronomer#525 
Related issue astronomer#641
  • Loading branch information
joppevos authored and arojasb3 committed Jul 14, 2024
1 parent 4380faa commit f5274b1
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 63 deletions.
13 changes: 12 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger
from typing import Union


logger = get_logger(__name__)
Expand Down Expand Up @@ -284,7 +285,17 @@ def build_airflow_graph(
for leaf_node_id in leaves_ids:
tasks_map[leaf_node_id] >> test_task

# Create the Airflow task dependencies between non-test nodes
create_airflow_task_dependencies(nodes, tasks_map)


def create_airflow_task_dependencies(
nodes: dict[str, DbtNode], tasks_map: dict[str, Union[TaskGroup, BaseOperator]]
) -> None:
"""
Create the Airflow task dependencies between non-test nodes.
:param nodes: Dictionary mapping dbt nodes (node.unique_id to node)
:param tasks_map: Dictionary mapping dbt nodes (node.unique_id to Airflow task)
"""
for node_id, node in nodes.items():
for parent_node_id in node.depends_on:
# depending on the node type, it will not have mapped 1:1 to tasks_map
Expand Down
28 changes: 17 additions & 11 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from __future__ import annotations

import copy
import inspect
from typing import Any, Callable
import copy

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
Expand All @@ -21,6 +21,18 @@
logger = get_logger(__name__)


def migrate_to_new_interface(
execution_config: ExecutionConfig, project_config: ProjectConfig, render_config: RenderConfig
):
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path
return execution_config, render_config


def specific_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]:
"""
Extract kwargs specific to the cosmos.converter.DbtToAirflowConverter class initialization method.
Expand Down Expand Up @@ -166,22 +178,16 @@ def __init__(
) -> None:
project_config.validate_project()

if not execution_config:
execution_config = ExecutionConfig()
if not render_config:
render_config = RenderConfig()
execution_config = execution_config or ExecutionConfig()
render_config = render_config or RenderConfig()
operator_args = operator_args or {}

validate_initial_user_config(execution_config, profile_config, project_config, render_config)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path
execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config)

validate_adapted_user_config(execution_config, project_config, render_config)

Expand Down
12 changes: 4 additions & 8 deletions cosmos/dbt/parser/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,10 @@ def __post_init__(self) -> None:
"""
Initializes the parser.
"""
if self.dbt_root_path is None:
self.dbt_root_path = "/usr/local/airflow/dags/dbt"
if self.dbt_models_dir is None:
self.dbt_models_dir = "models"
if self.dbt_snapshots_dir is None:
self.dbt_snapshots_dir = "snapshots"
if self.dbt_seeds_dir is None:
self.dbt_seeds_dir = "seeds"
self.dbt_root_path = self.dbt_root_path or "/usr/local/airflow/dags/dbt"
self.dbt_models_dir = self.dbt_models_dir or "models"
self.dbt_snapshots_dir = self.dbt_snapshots_dir or "snapshots"
self.dbt_seeds_dir = self.dbt_seeds_dir or "seeds"

# set the project and model dirs
self.project_dir = Path(os.path.join(self.dbt_root_path, self.project_name))
Expand Down
110 changes: 68 additions & 42 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,27 +210,39 @@ def load_from_statement(self, statement: str) -> None:

for item in items:
if item.startswith(PATH_SELECTOR):
index = len(PATH_SELECTOR)
if self.project_dir:
self.paths.append(self.project_dir / Path(item[index:]))
else:
self.paths.append(Path(item[index:]))
self._parse_path_selector(item)
elif item.startswith(TAG_SELECTOR):
index = len(TAG_SELECTOR)
self.tags.append(item[index:])
self._parse_tag_selector(item)
elif item.startswith(CONFIG_SELECTOR):
index = len(CONFIG_SELECTOR)
key, value = item[index:].split(":")
if key in SUPPORTED_CONFIG:
self.config[key] = value
self._parse_config_selector(item)
else:
if item:
graph_selector = GraphSelector.parse(item)
if graph_selector is not None:
self.graph_selectors.append(graph_selector)
else:
self.other.append(item)
logger.warning("Unsupported select statement: %s", item)
self._parse_unknown_selector(item)

def _parse_unknown_selector(self, item: str) -> None:
if item:
graph_selector = GraphSelector.parse(item)
if graph_selector is not None:
self.graph_selectors.append(graph_selector)
else:
self.other.append(item)
logger.warning("Unsupported select statement: %s", item)

def _parse_config_selector(self, item: str) -> None:
index = len(CONFIG_SELECTOR)
key, value = item[index:].split(":")
if key in SUPPORTED_CONFIG:
self.config[key] = value

def _parse_tag_selector(self, item: str) -> None:
index = len(TAG_SELECTOR)
self.tags.append(item[index:])

def _parse_path_selector(self, item: str) -> None:
index = len(PATH_SELECTOR)
if self.project_dir:
self.paths.append(self.project_dir / Path(item[index:]))
else:
self.paths.append(Path(item[index:]))

def __repr__(self) -> str:
return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other}, graph_selectors={self.graph_selectors})"
Expand Down Expand Up @@ -388,7 +400,44 @@ def select_nodes(
if not select and not exclude:
return nodes

# validates select and exclude filters
validate_filters(exclude, select)
subset_ids = apply_select_filter(nodes, project_dir, select)
if select:
nodes = get_nodes_from_subset(nodes, subset_ids)
exclude_ids = apply_exclude_filter(nodes, project_dir, exclude)
subset_ids = set(nodes.keys()) - exclude_ids

return get_nodes_from_subset(nodes, subset_ids)


def get_nodes_from_subset(nodes: dict[str, DbtNode], subset_ids: set[str]) -> dict[str, DbtNode]:
nodes = {id_: nodes[id_] for id_ in subset_ids}
return nodes


def apply_exclude_filter(nodes: dict[str, DbtNode], project_dir: Path | None, exclude: list[str]) -> set[str]:
exclude_ids: set[str] = set()
for statement in exclude:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)
exclude_ids.update(node_selector.select_nodes_ids_by_intersection)
return exclude_ids


def apply_select_filter(nodes: dict[str, DbtNode], project_dir: Path | None, select: list[str]) -> set[str]:
subset_ids: set[str] = set()
for statement in select:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)
select_ids = node_selector.select_nodes_ids_by_intersection
subset_ids.update(select_ids)
return subset_ids


def validate_filters(exclude: list[str], select: list[str]) -> None:
"""
Validate select and exclude filters.
"""
filters = [["select", select], ["exclude", exclude]]
for filter_type, filter in filters:
for filter_parameter in filter:
Expand All @@ -401,26 +450,3 @@ def select_nodes(
continue
elif ":" in filter_parameter:
raise CosmosValueError(f"Invalid {filter_type} filter: {filter_parameter}")

subset_ids: set[str] = set()

for statement in select:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)

select_ids = node_selector.select_nodes_ids_by_intersection
subset_ids.update(set(select_ids))

if select:
nodes = {id_: nodes[id_] for id_ in subset_ids}

nodes_ids = set(nodes.keys())

exclude_ids: set[str] = set()
for statement in exclude:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)
exclude_ids.update(set(node_selector.select_nodes_ids_by_intersection))
subset_ids = set(nodes_ids) - set(exclude_ids)

return {id_: nodes[id_] for id_ in subset_ids}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,5 @@ line-length = 120
universal = true

[tool.flake8]
max-complexity = 10
max-complexity = 8
select = "C"

0 comments on commit f5274b1

Please sign in to comment.