Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
[PostgreSQL] Create DEG for schemas (#225)
Browse files Browse the repository at this point in the history
* [PostgreSQL] Create DEG for schemas

* Add schemas metadata

* Use defaultdict

---------

Co-authored-by: Mateusz Kulas <[email protected]>
  • Loading branch information
m-qlas and Mateusz Kulas authored Sep 26, 2023
1 parent 2aa8a9c commit cb1c228
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 16 deletions.
27 changes: 21 additions & 6 deletions odd_collector/adapters/postgresql/adapter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from funcy import lpluck_attr
from collections import defaultdict

from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models import DataEntity
from odd_models.models import DataEntityList
from oddrn_generator import PostgresqlGenerator

from odd_collector.domain.plugin import PostgreSQLPlugin

from .mappers.database import map_database
from .mappers.tables import map_tables
from .mappers.schemas import map_schema
from .repository import ConnectionParams, PostgreSQLRepository


Expand All @@ -26,17 +29,29 @@ def get_data_entity_list(self) -> DataEntityList:
with PostgreSQLRepository(
ConnectionParams.from_config(self.config), self.config.schemas_filter
) as repo:
table_entities: list[DataEntity] = []
schema_entities: list[DataEntity] = []
tables = repo.get_tables()
schemas = repo.get_schemas()
self.generator.set_oddrn_paths(**{"databases": self.config.database})

tables_by_schema = defaultdict(list)
for table in tables:
tables_by_schema[table.table_schema].append(table)

table_entities = map_tables(generator=self.generator, tables=tables)
for schema in schemas:
tables_per_schema = tables_by_schema.get(schema.schema_name, [])
table_entities_tmp = map_tables(self.generator, tables_per_schema)
schema_entities.append(
map_schema(self.generator, schema, table_entities_tmp)
)
table_entities.extend(table_entities_tmp)

database_entity = map_database(
self.generator,
self.config.database,
lpluck_attr("oddrn", table_entities),
self.generator, self.config.database, schema_entities
)

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[*table_entities, database_entity],
items=[*table_entities, *schema_entities, database_entity],
)
7 changes: 5 additions & 2 deletions odd_collector/adapters/postgresql/mappers/database.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from funcy import lpluck_attr
from odd_models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import PostgresqlGenerator


def map_database(
generator: PostgresqlGenerator, database: str, entities: list[str]
generator: PostgresqlGenerator, database: str, schema_entities: list[DataEntity]
) -> DataEntity:
return DataEntity(
oddrn=generator.get_oddrn_by_path("databases"),
name=database,
type=DataEntityType.DATABASE_SERVICE,
metadata=[],
data_entity_group=DataEntityGroup(entities_list=entities),
data_entity_group=DataEntityGroup(
entities_list=lpluck_attr("oddrn", schema_entities)
),
)
20 changes: 20 additions & 0 deletions odd_collector/adapters/postgresql/mappers/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from funcy import lpluck_attr
from odd_collector_sdk.utils.metadata import extract_metadata, DefinitionType
from odd_models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import PostgresqlGenerator

from odd_collector.adapters.postgresql.models import Schema


def map_schema(
generator: PostgresqlGenerator, schema: Schema, table_entities: list[DataEntity]
) -> DataEntity:
return DataEntity(
oddrn=generator.get_oddrn_by_path("schemas"),
name=schema.schema_name,
type=DataEntityType.DATABASE_SERVICE,
metadata=[extract_metadata("postgres", schema, DefinitionType.DATASET)],
data_entity_group=DataEntityGroup(
entities_list=lpluck_attr("oddrn", table_entities)
),
)
18 changes: 17 additions & 1 deletion odd_collector/adapters/postgresql/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dataclasses import dataclass, field
from typing import Any

from funcy import omit
from sql_metadata import Parser

from .logger import logger
from odd_collector_sdk.utils.metadata import HasMetadata


@dataclass
Expand Down Expand Up @@ -178,3 +178,19 @@ def dependencies(self) -> list[Dependency]:
f"Couldn't parse dependencies from {self.view_definition}. {e}"
)
return []


@dataclass(frozen=True)
class Schema(HasMetadata):
schema_name: str
schema_owner: str
oid: int
description: str
total_size_bytes: int

@property
def odd_metadata(self):
return omit(
self.__dict__,
{"schema_name"},
)
20 changes: 13 additions & 7 deletions odd_collector/adapters/postgresql/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
EnumTypeLabel,
PrimaryKey,
Table,
Schema,
)
from odd_collector.domain.plugin import PostgreSQLPlugin

Expand Down Expand Up @@ -47,10 +48,10 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()

def get_schemas(self) -> list[str]:
def get_schemas(self) -> list[Schema]:
with self.conn.cursor() as cur:
schemas = [
raw[0]
Schema(*raw)
for raw in self.execute(self.schemas_query, cur)
if self.schemas_filter.is_allowed(raw[0])
and raw[0]
Expand All @@ -65,7 +66,7 @@ def get_schemas(self) -> list[str]:
return schemas

def get_tables(self) -> list[Table]:
schemas = self.get_schemas()
schemas = [schema.schema_name for schema in self.get_schemas()]
schemas_str = ", ".join([f"'{schema}'" for schema in schemas])
query = self.tables_query(schemas_str)

Expand Down Expand Up @@ -123,10 +124,15 @@ def pks_query(self):
@property
def schemas_query(self):
return """
SELECT nspname AS schema_name
FROM pg_namespace
ORDER BY nspname;
select
n.nspname as schema_name,
pg_catalog.pg_get_userbyid(n.nspowner) as schema_owner,
n.oid as oid,
pg_catalog.obj_description(n.oid, 'pg_namespace') as description,
pg_total_relation_size(n.oid) as total_size_bytes
from pg_catalog.pg_namespace n
where n.nspname not like 'pg_temp_%'
and n.nspname not in ('pg_toast', 'pg_internal', 'catalog_history', 'pg_catalog', 'information_schema');
"""

def tables_query(self, schemas: str):
Expand Down

0 comments on commit cb1c228

Please sign in to comment.