Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

feat(pg): lineage between schemas #233

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions odd_collector/adapters/postgresql/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from odd_models.models import DataEntityList
from oddrn_generator import PostgresqlGenerator

from odd_collector.adapters.postgresql.models import Table
from odd_collector.domain.plugin import PostgreSQLPlugin

from .logger import logger
from .mappers.database import map_database
from .mappers.tables import map_tables
from .mappers.schemas import map_schema
from .mappers.tables import map_tables
from .repository import ConnectionParams, PostgreSQLRepository


Expand All @@ -29,29 +31,61 @@ def get_data_entity_list(self) -> DataEntityList:
with PostgreSQLRepository(
ConnectionParams.from_config(self.config), self.config.schemas_filter
) as repo:
table_entities: list[DataEntity] = []
schema_entities: list[DataEntity] = []

all_table_entities: dict[str, DataEntity] = {}

tables = repo.get_tables()
schemas = repo.get_schemas()

self.generator.set_oddrn_paths(**{"databases": self.config.database})

tables_by_schema = defaultdict(list)
for table in tables:
tables_by_schema[table.table_schema].append(table)

for schema in schemas:
tables_per_schema = tables_by_schema.get(schema.schema_name, [])
tables_per_schema: list[Table] = tables_by_schema.get(
schema.schema_name, []
)
table_entities_tmp = map_tables(self.generator, tables_per_schema)
schema_entities.append(
map_schema(self.generator, schema, table_entities_tmp)
map_schema(
self.generator, schema, list(table_entities_tmp.values())
)
)
table_entities.extend(table_entities_tmp)
all_table_entities |= table_entities_tmp

database_entity = map_database(
self.generator, self.config.database, schema_entities
)

create_lineage(tables, all_table_entities)

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[*table_entities, *schema_entities, database_entity],
items=[*all_table_entities.values(), *schema_entities, database_entity],
)


def create_lineage(tables: list[Table], data_entities: dict[str, DataEntity]) -> None:
views = [table for table in tables if table.table_type in ("v", "m")]

for view in views:
try:
depending_entity = data_entities.get(view.as_dependency.uid)

if depending_entity.data_transformer is None:
continue

for dependency in view.dependencies:
if dependency_entity := data_entities.get(dependency.uid):
if (
dependency_entity.oddrn
not in depending_entity.data_transformer.inputs
):
depending_entity.data_transformer.inputs.append(
dependency_entity.oddrn
)
except Exception as e:
logger.warning(f"Error creating lineage for {view.table_name} {e=}")
15 changes: 4 additions & 11 deletions odd_collector/adapters/postgresql/mappers/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def map_table(generator: PostgresqlGenerator, table: Table):
def map_tables(
generator: PostgresqlGenerator,
tables: list[Table],
) -> list[DataEntity]:
data_entities: dict[str, tuple[Table, DataEntity]] = {}
) -> dict[str, DataEntity]:
data_entities: dict[str, DataEntity] = {}

for table in tables:
logger.debug(f"Map table {table.table_name} {table.table_type}")
Expand All @@ -49,13 +49,6 @@ def map_tables(
)
continue

data_entities[table.as_dependency.uid] = table, entity
data_entities[table.as_dependency.uid] = entity

for table, data_entity in data_entities.values():
for dependency in table.dependencies:
if dependency.uid in data_entities and data_entity.data_transformer:
data_entity.data_transformer.inputs.append(
data_entities[dependency.uid][1].oddrn
)

return [entity for _, entity in data_entities.values()]
return data_entities
2 changes: 1 addition & 1 deletion tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def find_by_type(
def find_by_name(data_entity_list: DataEntityList, name: str) -> DataEntity:
return next(
filter(
lambda data_entity: data_entity.name == name,
lambda data_entity: data_entity.name.lower() == name.lower(),
data_entity_list.items,
)
)
Loading
Loading