Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Mccabe code complexity to 8 #738

Merged
merged 6 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger
from typing import Union


logger = get_logger(__name__)
Expand Down Expand Up @@ -271,7 +272,17 @@ def build_airflow_graph(
for leaf_node_id in leaves_ids:
tasks_map[leaf_node_id] >> test_task

# Create the Airflow task dependencies between non-test nodes
create_airflow_task_dependencies(nodes, tasks_map)


def create_airflow_task_dependencies(
nodes: dict[str, DbtNode], tasks_map: dict[str, Union[TaskGroup, BaseOperator]]
) -> None:
"""
Create the Airflow task dependencies between non-test nodes.
:param nodes: Dictionary mapping dbt nodes (node.unique_id to node)
:param tasks_map: Dictionary mapping dbt nodes (node.unique_id to Airflow task)
"""
for node_id, node in nodes.items():
for parent_node_id in node.depends_on:
# depending on the node type, it will not have mapped 1:1 to tasks_map
Expand Down
28 changes: 17 additions & 11 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from __future__ import annotations

import copy
import inspect
from typing import Any, Callable
import copy
tatiana marked this conversation as resolved.
Show resolved Hide resolved

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
Expand All @@ -21,6 +21,18 @@
logger = get_logger(__name__)


def migrate_to_new_interface(
execution_config: ExecutionConfig, project_config: ProjectConfig, render_config: RenderConfig
):
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path
return execution_config, render_config


def specific_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]:
"""
Extract kwargs specific to the cosmos.converter.DbtToAirflowConverter class initialization method.
Expand Down Expand Up @@ -166,22 +178,16 @@ def __init__(
) -> None:
project_config.validate_project()

if not execution_config:
execution_config = ExecutionConfig()
if not render_config:
render_config = RenderConfig()
execution_config = execution_config or ExecutionConfig()
render_config = render_config or RenderConfig()
operator_args = operator_args or {}

validate_initial_user_config(execution_config, profile_config, project_config, render_config)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path
execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config)

validate_adapted_user_config(execution_config, project_config, render_config)

Expand Down
12 changes: 4 additions & 8 deletions cosmos/dbt/parser/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,10 @@ def __post_init__(self) -> None:
"""
Initializes the parser.
"""
if self.dbt_root_path is None:
self.dbt_root_path = "/usr/local/airflow/dags/dbt"
if self.dbt_models_dir is None:
self.dbt_models_dir = "models"
if self.dbt_snapshots_dir is None:
self.dbt_snapshots_dir = "snapshots"
if self.dbt_seeds_dir is None:
self.dbt_seeds_dir = "seeds"
self.dbt_root_path = self.dbt_root_path or "/usr/local/airflow/dags/dbt"
self.dbt_models_dir = self.dbt_models_dir or "models"
self.dbt_snapshots_dir = self.dbt_snapshots_dir or "snapshots"
self.dbt_seeds_dir = self.dbt_seeds_dir or "seeds"

# set the project and model dirs
self.project_dir = Path(os.path.join(self.dbt_root_path, self.project_name))
Expand Down
110 changes: 68 additions & 42 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,27 +210,39 @@

for item in items:
if item.startswith(PATH_SELECTOR):
index = len(PATH_SELECTOR)
if self.project_dir:
self.paths.append(self.project_dir / Path(item[index:]))
else:
self.paths.append(Path(item[index:]))
self._parse_path_selector(item)
elif item.startswith(TAG_SELECTOR):
index = len(TAG_SELECTOR)
self.tags.append(item[index:])
self._parse_tag_selector(item)
elif item.startswith(CONFIG_SELECTOR):
index = len(CONFIG_SELECTOR)
key, value = item[index:].split(":")
if key in SUPPORTED_CONFIG:
self.config[key] = value
self._parse_config_selector(item)
else:
if item:
graph_selector = GraphSelector.parse(item)
if graph_selector is not None:
self.graph_selectors.append(graph_selector)
else:
self.other.append(item)
logger.warning("Unsupported select statement: %s", item)
self._parse_unknown_selector(item)

def _parse_unknown_selector(self, item: str) -> None:
if item:
graph_selector = GraphSelector.parse(item)
if graph_selector is not None:
self.graph_selectors.append(graph_selector)
else:
self.other.append(item)
logger.warning("Unsupported select statement: %s", item)

Check warning on line 228 in cosmos/dbt/selector.py

View check run for this annotation

Codecov / codecov/patch

cosmos/dbt/selector.py#L227-L228

Added lines #L227 - L228 were not covered by tests

def _parse_config_selector(self, item: str) -> None:
index = len(CONFIG_SELECTOR)
key, value = item[index:].split(":")
if key in SUPPORTED_CONFIG:
self.config[key] = value

def _parse_tag_selector(self, item: str) -> None:
index = len(TAG_SELECTOR)
self.tags.append(item[index:])

def _parse_path_selector(self, item: str) -> None:
index = len(PATH_SELECTOR)
if self.project_dir:
self.paths.append(self.project_dir / Path(item[index:]))
else:
self.paths.append(Path(item[index:]))

Check warning on line 245 in cosmos/dbt/selector.py

View check run for this annotation

Codecov / codecov/patch

cosmos/dbt/selector.py#L245

Added line #L245 was not covered by tests

def __repr__(self) -> str:
return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other}, graph_selectors={self.graph_selectors})"
Expand Down Expand Up @@ -388,7 +400,44 @@
if not select and not exclude:
return nodes

# validates select and exclude filters
validate_filters(exclude, select)
subset_ids = apply_select_filter(nodes, project_dir, select)
if select:
nodes = get_nodes_from_subset(nodes, subset_ids)
exclude_ids = apply_exclude_filter(nodes, project_dir, exclude)
subset_ids = set(nodes.keys()) - exclude_ids

return get_nodes_from_subset(nodes, subset_ids)


def get_nodes_from_subset(nodes: dict[str, DbtNode], subset_ids: set[str]) -> dict[str, DbtNode]:
nodes = {id_: nodes[id_] for id_ in subset_ids}
return nodes


def apply_exclude_filter(nodes: dict[str, DbtNode], project_dir: Path | None, exclude: list[str]) -> set[str]:
exclude_ids: set[str] = set()
for statement in exclude:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)
exclude_ids.update(node_selector.select_nodes_ids_by_intersection)
return exclude_ids


def apply_select_filter(nodes: dict[str, DbtNode], project_dir: Path | None, select: list[str]) -> set[str]:
subset_ids: set[str] = set()
for statement in select:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)
select_ids = node_selector.select_nodes_ids_by_intersection
subset_ids.update(select_ids)
return subset_ids


def validate_filters(exclude: list[str], select: list[str]) -> None:
"""
Validate select and exclude filters.
"""
filters = [["select", select], ["exclude", exclude]]
for filter_type, filter in filters:
for filter_parameter in filter:
Expand All @@ -401,26 +450,3 @@
continue
elif ":" in filter_parameter:
raise CosmosValueError(f"Invalid {filter_type} filter: {filter_parameter}")

subset_ids: set[str] = set()

for statement in select:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)

select_ids = node_selector.select_nodes_ids_by_intersection
subset_ids.update(set(select_ids))

if select:
nodes = {id_: nodes[id_] for id_ in subset_ids}

nodes_ids = set(nodes.keys())

exclude_ids: set[str] = set()
for statement in exclude:
config = SelectorConfig(project_dir, statement)
node_selector = NodeSelector(nodes, config)
exclude_ids.update(set(node_selector.select_nodes_ids_by_intersection))
subset_ids = set(nodes_ids) - set(exclude_ids)

return {id_: nodes[id_] for id_ in subset_ids}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,5 @@ line-length = 120
universal = true

[tool.flake8]
max-complexity = 10
max-complexity = 8
select = "C"
Loading