Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Relationships ingestion for Snowflake adapter #45

Merged
129 changes: 75 additions & 54 deletions odd-collector/odd_collector/adapters/snowflake/adapter.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,66 @@
from typing import List, Optional, Tuple, Type
from functools import cached_property

from odd_collector.domain.plugin import SnowflakePlugin
from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_collector_sdk.errors import MappingDataError
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import Generator, SnowflakeGenerator

from .client import SnowflakeClient, SnowflakeClientBase
from .domain import Pipe, Table, View
from .map import map_database, map_pipe, map_schemas, map_table, map_view
from .client import SnowflakeClient
from .domain import ForeignKeyConstraint, Pipe, RawPipe, RawStage, Table, View, UniqueConstraint
from .mappers import map_database, map_pipe, map_schemas, map_table, map_view
from .mappers.relationships import DataEntityRelationshipsMapper


class Adapter(BaseAdapter):
config: SnowflakePlugin
generator: SnowflakeGenerator

def __init__(
self,
config: SnowflakePlugin,
client: Optional[Type[SnowflakeClientBase]] = SnowflakeClient,
):
def __init__(self, config: SnowflakePlugin) -> None:
self._database_name = config.database.upper()
self._client = client(config)

super().__init__(config)
self._metadata = self._get_database_metadata()

def create_generator(self) -> Generator:
return SnowflakeGenerator(
host_settings=self.config.host,
databases=self._database_name,
)

def get_data_entity_list(self) -> DataEntityList:
raw_pipes = self._client.get_raw_pipes()
raw_stages = self._client.get_raw_stages()
pipes: List[Pipe] = []
for raw_pipe in raw_pipes:
def _get_database_metadata(self) -> dict[str, list]:
with SnowflakeClient(self.config) as client:
return {
"tables": client.get_tables(),
"raw_pipes": client.get_raw_pipes(),
"raw_stages": client.get_raw_stages(),
"fk_constraints": client.get_fk_constraints(),
"unique_constraints": client.get_unique_constraints(),
}

@cached_property
def _tables(self) -> list[Table]:
return self._metadata["tables"]

@cached_property
def _raw_pipes(self) -> list[RawPipe]:
return self._metadata["raw_pipes"]

@cached_property
def _raw_stages(self) -> list[RawStage]:
return self._metadata["raw_stages"]

@cached_property
def _fk_constraints(self) -> list[ForeignKeyConstraint]:
return self._metadata["fk_constraints"]

@cached_property
def _unique_constraints(self) -> list[UniqueConstraint]:
return self._metadata["unique_constraints"]

@cached_property
def _pipe_entities(self) -> list[DataEntity]:
pipes: list[Pipe] = []
for raw_pipe in self._raw_pipes:
pipes.extend(
Pipe(
name=raw_pipe.pipe_name,
Expand All @@ -44,53 +69,49 @@ def get_data_entity_list(self) -> DataEntityList:
stage_type=raw_stage.stage_type,
downstream=raw_pipe.downstream,
)
for raw_stage in raw_stages
for raw_stage in self._raw_stages
if raw_pipe.stage_full_name == raw_stage.stage_full_name
)
pipes_entities = [map_pipe(pipe, self.generator) for pipe in pipes]
tables = self._client.get_tables()

tables_with_data_entities: List[
Tuple[Table, DataEntity]
] = self._get_tables_entities(tables)

try:
tables_entities = [
table_with_entity[1] for table_with_entity in tables_with_data_entities
]
schemas_entities = self._get_schemas_entities(tables_with_data_entities)
database_entity: DataEntity = self._get_database_entity(schemas_entities)

all_entities = [
*tables_entities,
*schemas_entities,
database_entity,
*pipes_entities,
]

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(), items=all_entities
)
except Exception as e:
raise MappingDataError("Error during mapping") from e
return [map_pipe(pipe, self.generator) for pipe in pipes]

def _get_tables_entities(
self, tables: List[Table]
) -> List[Tuple[Table, DataEntity]]:
@cached_property
def _table_entities(self) -> list[tuple[Table, DataEntity]]:
result = []

for table in tables:
for table in self._tables:
if isinstance(table, View):
result.append((table, map_view(table, self.generator)))
else:
result.append((table, map_table(table, self.generator)))

return result

def _get_schemas_entities(
self, tables_with_entities: List[Tuple[Table, DataEntity]]
) -> List[DataEntity]:
return map_schemas(tables_with_entities, self.generator)
@cached_property
def _relationship_entities(self) -> list[DataEntity]:
return DataEntityRelationshipsMapper(
oddrn_generator=self.generator,
unique_constraints=self._unique_constraints,
table_entities_pair=self._table_entities,
).map(self._fk_constraints)

@cached_property
def _schema_entities(self) -> list[DataEntity]:
return map_schemas(self._table_entities, self.generator)

@cached_property
def _database_entity(self) -> DataEntity:
return map_database(self._database_name, self._schema_entities, self.generator)

def _get_database_entity(self, schemas: List[DataEntity]) -> DataEntity:
return map_database(self._database_name, schemas, self.generator)
def get_data_entity_list(self) -> DataEntityList:
try:
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[
*[te[1] for te in self._table_entities],
*self._schema_entities,
self._database_entity,
*self._pipe_entities,
*self._relationship_entities,
],
)
except Exception as e:
raise MappingDataError("Error during mapping") from e
Loading
Loading