Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
feat: change logic mapping external databases
Browse files Browse the repository at this point in the history
  • Loading branch information
Vixtir committed Sep 29, 2023
1 parent bd47e8d commit 9e2ec94
Show file tree
Hide file tree
Showing 17 changed files with 312 additions and 784 deletions.
63 changes: 30 additions & 33 deletions odd_collector/adapters/tableau/adapter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from typing import Dict, List, Type
from typing import Type
from urllib.parse import urlparse

from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models.models import DataEntity, DataEntityList
from odd_models.models import DataEntityList
from oddrn_generator import Generator, TableauGenerator

from odd_collector.adapters.tableau.domain.table import EmbeddedTable
from odd_collector.domain.plugin import TableauPlugin

from .client import TableauBaseClient, TableauClient
from .domain.table import EmbeddedTable, Table
from .client import TableauClient
from .logger import logger
from .mappers.sheets import map_sheet
from .mappers.tables import map_table

Expand All @@ -18,7 +19,7 @@ class Adapter(BaseAdapter):
generator: TableauGenerator

def __init__(
self, config: TableauPlugin, client: Type[TableauBaseClient] = TableauClient
self, config: TableauPlugin, client: Type[TableauClient] = TableauClient
) -> None:
super().__init__(config)
self.client = client(config)
Expand All @@ -29,42 +30,38 @@ def create_generator(self) -> Generator:
return TableauGenerator(host_settings=host, sites=site)

def get_data_entity_list(self) -> DataEntityList:
sheets, tables = self._get_sheets(), self._get_tables()
sheets = self.client.get_sheets()
tables = self.client.get_tables()

tables_data_entities_by_id: Dict[str, DataEntity] = {
table_id: map_table(self.generator, table)
for table_id, table in tables.items()
embedded_tables: list[EmbeddedTable] = [
t for t in tables.values() if isinstance(t, EmbeddedTable)
]

tbl_entities = {
table.id: map_table(self.generator, table) for table in embedded_tables
}
tables_data_entities = tables_data_entities_by_id.values()

sheets_data_entities = []
for sheet in sheets:
sheet_tables = [
tables_data_entities_by_id[table_id] for table_id in sheet.tables_id
]
data_entity = map_sheet(self.generator, sheet, sheet_tables)
sheets_data_entities.append(data_entity)

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[*tables_data_entities, *sheets_data_entities],
)
sheet_entity = map_sheet(self.generator, sheet)

def _get_tables(self) -> Dict[str, Table]:
tables: List[Table] = self.client.get_tables()
tables_by_id: Dict[str, Table] = {table.id: table for table in tables}
for table_id in sheet.tables_id:
table = tables.get(table_id)

ids = tables_ids_to_load(tables)
tables_columns = self.client.get_tables_columns(ids)
if not table:
logger.warning(f"Table {table_id} not found in tables, skipping it")
continue

for table_id, columns in tables_columns.items():
tables_by_id[table_id].columns = columns
if table.is_embedded:
oddrn = tbl_entities[table_id].oddrn
else:
oddrn = tables.get(table_id).get_oddrn()

return tables_by_id
sheet_entity.data_consumer.inputs.append(oddrn)

def _get_sheets(self):
return self.client.get_sheets()
sheets_data_entities.append(sheet_entity)


def tables_ids_to_load(tables: list[Table]):
return [table.id for table in tables if isinstance(table, EmbeddedTable)]
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[*tbl_entities.values(), *sheets_data_entities],
)
113 changes: 59 additions & 54 deletions odd_collector/adapters/tableau/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Union
from urllib.parse import urlparse
from typing import Any, Union

import tableauserverclient as TSC
from funcy import lmap
from odd_collector_sdk.errors import DataSourceAuthorizationError, DataSourceError
from tableauserverclient import PersonalAccessTokenAuth, TableauAuth

from odd_collector.adapters.tableau.domain.table import Table
from odd_collector.domain.plugin import TableauPlugin

from .domain.column import Column
from .domain.database import ConnectionParams, EmbeddedDatabase, ExternalDatabase
from .domain.sheet import Sheet
from .domain.table import Table, databases_to_tables
from .logger import logger

sheets_query = """
query GetSheets($count: Int, $after: String) {
Expand Down Expand Up @@ -49,6 +47,7 @@
nodes {
id
name
isEmbedded
connectionType
downstreamOwners {
name
Expand All @@ -58,6 +57,13 @@
schema
name
description
columns {
id
name
remoteType
isNullable
description
}
}
}
pageInfo {
Expand All @@ -67,18 +73,18 @@
}
}
"""
tables_columns_query = """
query GetTablesColumns($ids: [ID], $count: Int, $after: String){
tablesConnection(filter: {idWithin: $ids}, first: $count, after: $after, orderBy: {field: NAME, direction: ASC}) {

database_servers_query = """
query DatabaseServersConnection($count: Int, $after: String) {
databaseServersConnection(first: $count, after: $after, orderBy: {field: NAME, direction: ASC}) {
nodes {
id
columns {
id
name
remoteType
isNullable
description
}
name
isEmbedded
connectionType
hostName
port
service
}
pageInfo {
hasNextPage
Expand All @@ -89,57 +95,56 @@
"""


class TableauBaseClient(ABC):
class TableauClient:
def __init__(self, config: TableauPlugin) -> None:
self.config = config

@abstractmethod
def get_server_host(self):
raise NotImplementedError

@abstractmethod
def get_sheets(self) -> list[Sheet]:
raise NotImplementedError

@abstractmethod
def get_tables(self) -> list[Table]:
raise NotImplementedError

@abstractmethod
def get_tables_columns(self, tables_ids: list[str]) -> dict[str, list[Column]]:
raise NotImplementedError


class TableauClient(TableauBaseClient):
def __init__(self, config: TableauPlugin) -> None:
super().__init__(config)
self.__auth = self._get_auth(config)
self.server = TSC.Server(config.server, use_server_version=True)

def get_server_host(self):
return urlparse(self.config.server).netloc

def get_sheets(self) -> list[Sheet]:
sheets_response = self._query(query=sheets_query, root_key="sheetsConnection")

return [Sheet.from_response(response) for response in sheets_response]

def get_tables(self) -> list[Table]:
databases_response = self._query(
query=databases_query, root_key="databasesConnection"
)
return databases_to_tables(databases_response)
def get_databases(self) -> dict[str, Union[EmbeddedDatabase, ExternalDatabase]]:
databases = self._query(query=databases_query, root_key="databasesConnection")
connection_params = self.get_servers()

def get_tables_columns(self, table_ids: list[str]) -> Dict[str, list[Column]]:
response: list = self._query(
query=tables_columns_query,
variables={"ids": table_ids},
root_key="tablesConnection",
)
result = {}
for db in databases:
if db.get("isEmbedded"):
result[db.get("id")] = EmbeddedDatabase.from_dict(**db)
else:
try:
database = ExternalDatabase(
id=db.get("id"),
name=db.get("name"),
connection_type=db.get("connectionType"),
connection_params=connection_params[db.get("id")],
tables=db.get("tables"),
)
result[database.id] = database
except Exception as e:
logger.warning(f"Couldn't get database: {db.get('name')} {e}")
continue

return result

def get_tables(self) -> dict[str, Table]:
databases = self.get_databases()

return {
table.id: table
for database in databases.values()
for table in database.tables
}

def get_servers(self) -> dict[str, ConnectionParams]:
servers = self._query(
query=database_servers_query, root_key="databaseServersConnection"
)
return {
table.get("id"): lmap(Column.from_response, table.get("columns"))
for table in response
server.get("id"): ConnectionParams.from_dict(**server) for server in servers
}

def _query(
Expand Down
39 changes: 18 additions & 21 deletions odd_collector/adapters/tableau/domain/column.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class Column:
def __init__(
self,
id: str,
name: str,
is_nullable: bool,
remote_type: str = None,
description: str = None,
):
self.id = id
self.name = name
self.remote_type = remote_type
self.is_nullable = is_nullable
self.description = description or None
id: str
name: str
is_nullable: bool
remote_type: Optional[str] = None
description: Optional[str] = None

@staticmethod
def from_response(response):
return Column(
response.get("id"),
response.get("name"),
response.get("isNullable"),
response.get("remoteType"),
response.get("description"),
@classmethod
def from_dict(cls, **data) -> "Column":
return cls(
data["id"],
data["name"],
data["isNullable"],
data.get("remoteType"),
data.get("description"),
)
23 changes: 23 additions & 0 deletions odd_collector/adapters/tableau/domain/connection_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class ConnectionParams:
id: str
name: str
connection_type: str
host: str
port: int
service: Optional[str]

@classmethod
def from_dict(cls, **kwargs):
return cls(
id=kwargs["id"],
name=kwargs["name"],
connection_type=kwargs["connectionType"],
host=kwargs["hostName"],
port=kwargs["port"],
service=kwargs["service"],
)
Loading

0 comments on commit 9e2ec94

Please sign in to comment.