Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Snowflake adapter warnings + add more debug logs #107

Merged
merged 8 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ jobs:
unixodbc-dev openssl libsasl2-dev
poetry install

# for now, only tests for PostgreSQL adapter are being invoked, others need to be checked and updated for future use
# for now, only tests for PostgreSQL and Snowflake adapters are being invoked,
# others need to be checked and updated for future use
- name: Run tests
working-directory: odd-collector
run: poetry run pytest tests/integration/test_postgres.py -v
run: |
poetry run pytest tests/integration/test_postgres.py -v
poetry run pytest tests/adapters/snowflake -v
2 changes: 1 addition & 1 deletion odd-collector/odd_collector/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "0.1.64"
VERSION = "0.1.65"
42 changes: 23 additions & 19 deletions odd-collector/odd_collector/adapters/snowflake/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ def _fk_constraints(self) -> list[ForeignKeyConstraint]:
def _unique_constraints(self) -> list[UniqueConstraint]:
return self._get_metadata()["unique_constraints"]

@property
def _pipe_entities(self) -> list[tuple[Pipe, DataEntity]]:
@ttl_cache(ttl=CACHE_TTL)
def get_pipe_entities(self) -> list[tuple[Pipe, DataEntity]]:
pipes: list[Pipe] = []
for raw_pipe in self._raw_pipes:
pipes.extend(
Pipe(
catalog=raw_pipe.pipe_catalog,
schema=raw_pipe.pipe_schema,
schema_name=raw_pipe.pipe_schema,
name=raw_pipe.pipe_name,
definition=raw_pipe.definition,
stage_url=raw_stage.stage_url,
Expand All @@ -89,8 +89,8 @@ def _pipe_entities(self) -> list[tuple[Pipe, DataEntity]]:
)
return [(pipe, map_pipe(pipe, self.generator)) for pipe in pipes]

@property
def _table_entities(self) -> list[tuple[Table, DataEntity]]:
@ttl_cache(ttl=CACHE_TTL)
def get_table_entities(self) -> list[tuple[Table, DataEntity]]:
result = []

for table in self._tables:
Expand All @@ -100,32 +100,36 @@ def _table_entities(self) -> list[tuple[Table, DataEntity]]:
result.append((table, map_table(table, self.generator)))
return result

@property
def _relationship_entities(self) -> list[DataEntity]:
@ttl_cache(ttl=CACHE_TTL)
def get_relationship_entities(self) -> list[DataEntity]:
return DataEntityRelationshipsMapper(
oddrn_generator=self.generator,
unique_constraints=self._unique_constraints,
table_entities_pair=self._table_entities,
table_entities_pair=self.get_table_entities(),
).map(self._fk_constraints)

@property
def _schema_entities(self) -> list[DataEntity]:
return map_schemas(self._table_entities, self._pipe_entities, self.generator)
@ttl_cache(ttl=CACHE_TTL)
def get_schema_entities(self) -> list[DataEntity]:
return map_schemas(
self.get_table_entities(), self.get_pipe_entities(), self.generator
)

@property
def _database_entity(self) -> DataEntity:
return map_database(self._database_name, self._schema_entities, self.generator)
@ttl_cache(ttl=CACHE_TTL)
def get_database_entity(self) -> DataEntity:
return map_database(
self._database_name, self.get_schema_entities(), self.generator
)

def get_data_entity_list(self) -> DataEntityList:
try:
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[
*[te[1] for te in self._table_entities],
*self._schema_entities,
self._database_entity,
*[pe[1] for pe in self._pipe_entities],
*self._relationship_entities,
*[te[1] for te in self.get_table_entities()],
*self.get_schema_entities(),
self.get_database_entity(),
*[pe[1] for pe in self.get_pipe_entities()],
*self.get_relationship_entities(),
],
)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def stage_full_name(self) -> str:

class Pipe(Connectable):
catalog: str
schema: str
schema_name: str
name: str
definition: str
stage_url: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List

from odd_collector.adapters.snowflake.domain import Column
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataSetField, DataSetFieldType, Type
from oddrn_generator import SnowflakeGenerator

Expand Down Expand Up @@ -52,6 +53,10 @@ def map_columns(
result: List[DataSetField] = []

for column in columns:
logger.debug(
f"Mapping column from table {column.table_schema}.{column.table_name}: {column.column_name}"
)

column_oddrn_key = f"{parent_path.value}_columns"
generator_params = {column_oddrn_key: column.column_name}
generator.set_oddrn_paths(**generator_params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List

from funcy import lpluck_attr
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import SnowflakeGenerator

Expand All @@ -11,6 +12,8 @@ def map_database(
schemas_entities: List[DataEntity],
generator: SnowflakeGenerator,
) -> DataEntity:
logger.debug(f"Mapping database: {database_name}")

generator = deepcopy(generator)
oddrn = generator.get_oddrn_by_path("databases", database_name)
return DataEntity(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import logging
from abc import abstractmethod
from copy import deepcopy
from typing import List

import sqlparse
from odd_collector.adapters.snowflake.domain import Pipe
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityType, DataTransformer
from oddrn_generator import SnowflakeGenerator
from oddrn_generator.generators import S3Generator

from .view import _map_connection

logger = logging.getLogger("Snowpipe")


def map_pipe(pipe: Pipe, generator: SnowflakeGenerator) -> DataEntity:
full_pipe_name = f"{pipe.schema_name}.{pipe.name}"
logger.debug(f"Mapping pipe: {full_pipe_name}")

generator = deepcopy(generator)
generator.set_oddrn_paths(schemas=pipe.schema, pipes=pipe.name)
generator.set_oddrn_paths(schemas=pipe.schema_name, pipes=pipe.name)

return DataEntity(
oddrn=generator.get_oddrn_by_path("pipes"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Table,
UniqueConstraint,
)
from odd_collector.adapters.snowflake.logger import logger
from odd_collector.adapters.snowflake.mappers.relationships.relationship_mapper import (
RelationshipMapper,
)
Expand Down Expand Up @@ -35,6 +36,11 @@ def _map_data_entity_relationship(
referenced_schema_name = fk_cons.referenced_schema_name
referenced_table_name = fk_cons.referenced_table_name

logger.debug(
f"Mapping relationship referencing to the table {referenced_schema_name}.{referenced_table_name}: "
f"{fk_cons.constraint_name}"
)

self.oddrn_generator.set_oddrn_paths(
schemas=schema_name,
tables=table_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import defaultdict
from copy import deepcopy

from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import SnowflakeGenerator

Expand All @@ -20,11 +21,13 @@ def map_schemas(
grouped[table.table_catalog][table.table_schema].add(entity.oddrn)

for pipe, entity in pipe_entities:
grouped[pipe.catalog][pipe.schema].add(entity.oddrn)
grouped[pipe.catalog][pipe.schema_name].add(entity.oddrn)

entities = []
for catalog, schemas in grouped.items():
for schema, oddrns in schemas.items():
logger.debug(f"Mapping schema: {schema}")

generator.set_oddrn_paths(databases=catalog, schemas=schema)
oddrn = generator.get_oddrn_by_path("schemas")
entity = DataEntity(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from copy import deepcopy

from odd_collector.adapters.snowflake.domain import Table
from odd_collector.adapters.snowflake.logger import logger
from odd_models.models import DataEntity, DataEntityType, DataSet
from oddrn_generator import SnowflakeGenerator

Expand All @@ -11,6 +12,9 @@


def map_table(table: Table, generator: SnowflakeGenerator) -> DataEntity:
full_table_name = f"{table.table_schema}.{table.table_name}"
logger.debug(f"Mapping table: {full_table_name}")

generator = deepcopy(generator)
generator.set_oddrn_paths(schemas=table.table_schema, tables=table.table_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@


def map_view(view: View, generator: SnowflakeGenerator) -> DataEntity:
full_view_name = f"{view.table_schema}.{view.table_name}"
logger.debug(f"Mapping view: {full_view_name}")

generator = deepcopy(generator)
generator.set_oddrn_paths(schemas=view.table_schema, views=view.table_name)

sql = None
try:
sql = sqlparse.format(view.view_definition)
except Exception:
logger.warning(f"Couldn't parse view definition {view.view_definition}")
logger.warning(
f"Couldn't parse {full_view_name} view definition: {view.view_definition}"
)

return DataEntity(
oddrn=generator.get_oddrn_by_path("views"),
Expand Down
Loading
Loading