Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
feat(ckan): Integration with CKAN (#214)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Mateusz Kulas <[email protected]>
  • Loading branch information
m-qlas and Mateusz Kulas authored Sep 5, 2023
1 parent 423b557 commit 4aeb172
Show file tree
Hide file tree
Showing 15 changed files with 445 additions and 0 deletions.
9 changes: 9 additions & 0 deletions config_examples/ckan.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
default_pulling_interval: 60 # in minutes. If not specified, collector will pull data once.
platform_host_url: http://localhost:8080
token: ""
plugins:
- type: ckan
name: ckan_adapter
host: localhost
port: 8443
token: token
Empty file.
80 changes: 80 additions & 0 deletions odd_collector/adapters/ckan/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from odd_collector_sdk.domain.adapter import AsyncAbstractAdapter
from odd_collector_sdk.errors import MappingDataError, DataSourceError
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import CKANGenerator

from odd_collector.domain.plugin import CKANPlugin

from .client import CKANRestClient
from .mappers.group import map_group
from .mappers.organization import map_organization
from .mappers.dataset import map_dataset
from .mappers.resource import map_resource


class Adapter(AsyncAbstractAdapter):
def __init__(self, config: CKANPlugin) -> None:
self.oddrn_generator = CKANGenerator(host_settings=f"{config.host}")
self.client = CKANRestClient(config)

def get_data_source_oddrn(self) -> str:
return self.oddrn_generator.get_data_source_oddrn()

async def get_data_entity_list(self) -> DataEntityList:
organizations = await self.client.get_organizations()
groups = await self.client.get_groups()
organization_entities: list[DataEntity] = []
datasets_entities: list[DataEntity] = []
resources_entities: list[DataEntity] = []
groups_entities: list[DataEntity] = []

try:
for organization in organizations:
datasets_entities_tmp: list[DataEntity] = []
datasets = await self.client.get_datasets(organization.id)
for dataset in datasets:
resources_entities_tmp = []
self.oddrn_generator.set_oddrn_paths(
organizations=organization.name,
datasets=dataset.name,
)

for resource in dataset.resources:
fields = await self.client.get_resource_fields(resource.id)
resources_entities_tmp.append(
map_resource(self.oddrn_generator, resource, fields)
)

datasets_entities_tmp.append(
map_dataset(
self.oddrn_generator, dataset, resources_entities_tmp
)
)
resources_entities.extend(resources_entities_tmp)

organization_entities.append(
map_organization(
self.oddrn_generator, organization, datasets_entities_tmp
)
)
datasets_entities.extend(datasets_entities_tmp)

for group_name in groups:
group = await self.client.get_group_details(group_name)
groups_entities.append(map_group(self.oddrn_generator, group))

except DataSourceError:
raise

except Exception as e:
raise MappingDataError(f"Error during mapping: {e}") from e

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[
*resources_entities,
*datasets_entities,
*organization_entities,
*groups_entities,
],
)
98 changes: 98 additions & 0 deletions odd_collector/adapters/ckan/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import aiohttp
from odd_collector_sdk.errors import DataSourceError
from odd_collector.domain.plugin import CKANPlugin
from .logger import logger
from asyncio import gather

from .mappers.models import Organization, Dataset, Group, ResourceField


class CKANRestClient:
def __init__(self, config: CKANPlugin):
self.__host = f"https://{config.host}:{config.port}"
self.__headers = (
{"Authorization": config.token.get_secret_value()} if config.token else None
)

@staticmethod
def is_response_successful(resp: dict):
if resp and resp.get("success"):
return True
return False

async def _get_request(self, url: str, params: dict = None) -> dict:
async with aiohttp.ClientSession(
self.__host,
headers=self.__headers,
) as session:
try:
async with session.get(url, params=params) as resp:
result = await resp.json()
logger.debug(f"Result of request {url} is {result}")
if not self.is_response_successful(result):
raise DataSourceError(
f"Request: {url}, Error: {result['error']}"
)
return result
except Exception as e:
raise DataSourceError(
f"Error during getting data from host {self.__host}: {e}"
) from e

async def _post_request(self, url: str, payload: dict = None) -> dict:
async with aiohttp.ClientSession(
self.__host,
headers=self.__headers,
) as session:
try:
async with session.post(url, json=payload) as resp:
result = await resp.json()
logger.debug(f"Result of request {url} is {result}")
return result
except Exception as e:
raise DataSourceError(
f"Error during getting data from host {self.__host}: {e}"
) from e

async def get_organizations(self) -> list[Organization]:
url = "/api/action/organization_list"
resp = await self._get_request(url)
org_names = resp["result"]
response = await gather(
*[
self.get_organization_details(organization_name)
for organization_name in org_names
]
)
return response

async def get_organization_details(self, organization_name: str) -> Organization:
url = "/api/action/organization_show"
params = {"id": organization_name}
resp = await self._get_request(url, params)
return Organization(resp["result"])

async def get_groups(self) -> list[str]:
url = "/api/action/group_list"
resp = await self._get_request(url)
return resp["result"]

async def get_group_details(self, group_name: str) -> Group:
url = "/api/action/group_show"
params = {"id": group_name, "include_datasets": "True"}
resp = await self._get_request(url, params)
return Group(resp["result"])

async def get_datasets(self, organization_id: str) -> list[Dataset]:
url = "/api/action/package_search"
params = {"q": f"owner_org:{organization_id}", "include_private": "True"}
resp = await self._get_request(url, params)
return [Dataset(dataset) for dataset in resp["result"]["results"]]

async def get_resource_fields(self, resource_id: str) -> list[ResourceField]:
url = "/api/action/datastore_info"
payload = {"id": resource_id}
resp = await self._post_request(url, payload)
if self.is_response_successful(resp):
return [ResourceField(field) for field in resp["result"]["fields"]]
return []
1 change: 1 addition & 0 deletions odd_collector/adapters/ckan/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from odd_collector_sdk.logger import logger
Empty file.
25 changes: 25 additions & 0 deletions odd_collector/adapters/ckan/mappers/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from funcy import lpluck_attr
from odd_collector_sdk.utils.metadata import extract_metadata, DefinitionType
from odd_models import DataEntityGroup
from odd_models.models import DataEntity, DataEntityType
from oddrn_generator import CKANGenerator

from odd_collector.adapters.ckan.mappers.models import Dataset


def map_dataset(
oddrn_generator: CKANGenerator,
dataset: Dataset,
resources_entities: list[DataEntity],
) -> DataEntity:
return DataEntity(
oddrn=oddrn_generator.get_oddrn_by_path("datasets", dataset.name),
name=dataset.name,
type=DataEntityType.DAG,
metadata=[
extract_metadata("ckan", dataset, DefinitionType.DATASET, flatten=True)
],
data_entity_group=DataEntityGroup(
entities_list=lpluck_attr("oddrn", resources_entities)
),
)
23 changes: 23 additions & 0 deletions odd_collector/adapters/ckan/mappers/field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from odd_collector_sdk.utils.metadata import DefinitionType, extract_metadata
from odd_models.models import DataSetField, DataSetFieldType, Type
from oddrn_generator import CKANGenerator

from .models import ResourceField
from .types import TYPES_SQL_TO_ODD


def map_field(
oddrn_generator: CKANGenerator,
field: ResourceField,
) -> DataSetField:
return DataSetField(
oddrn=oddrn_generator.get_oddrn_by_path("fields", field.name),
name=field.name,
owner=None,
metadata=[extract_metadata("ckan", field, DefinitionType.DATASET_FIELD)],
type=DataSetFieldType(
type=TYPES_SQL_TO_ODD.get(field.type, Type.TYPE_UNKNOWN),
logical_type=field.type,
is_nullable=field.is_nullable,
),
)
26 changes: 26 additions & 0 deletions odd_collector/adapters/ckan/mappers/group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from odd_collector_sdk.utils.metadata import extract_metadata, DefinitionType
from odd_models.models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import CKANGenerator
from odd_collector.adapters.ckan.mappers.models import Group


def map_group(
oddrn_generator: CKANGenerator,
group: Group,
) -> DataEntity:
datasets_oddrns: list[str] = []
for dataset in group.datasets:
oddrn_generator.set_oddrn_paths(organizations=dataset["organization"]["name"])
datasets_oddrns.append(
oddrn_generator.get_oddrn_by_path("datasets", dataset["name"])
)
return DataEntity(
oddrn=oddrn_generator.get_oddrn_by_path("groups", group.name),
name=group.name,
type=DataEntityType.DAG,
metadata=[
extract_metadata("ckan", group, DefinitionType.DATASET, flatten=True)
],
data_entity_group=DataEntityGroup(entities_list=datasets_oddrns),
owner=None,
)
92 changes: 92 additions & 0 deletions odd_collector/adapters/ckan/mappers/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from dataclasses import dataclass
from datetime import datetime
from functools import cached_property
from typing import Any

from odd_collector_sdk.utils.metadata import HasMetadata
from odd_collector.adapters.ckan.utils import get_metadata, get_groups


@dataclass
class CKANObject(HasMetadata):
data: dict
excluded_keys = ["name", "description", "tags", "created"]

@property
def name(self) -> str:
return self.data["name"]

@property
def description(self) -> str:
return self.data["description"]

@property
def created_at(self) -> datetime:
return datetime.strptime(self.data["created"], "%Y-%m-%dT%H:%M:%S.%f")

@property
def odd_metadata(self) -> dict:
return get_metadata(self.data, self.excluded_keys)


@dataclass
class Organization(CKANObject):
@property
def id(self) -> str:
return self.data["id"]

@property
def tags(self) -> list[str]:
return self.data["tags"]


@dataclass
class Group(CKANObject):
@property
def datasets(self) -> list[dict]:
return [dataset for dataset in self.data["packages"]]


@dataclass
class Resource(CKANObject):
@property
def id(self) -> str:
return self.data["id"]


@dataclass
class ResourceField(HasMetadata):
data: dict

@property
def name(self) -> str:
return self.data["id"]

@property
def type(self) -> str:
return self.data["type"]

@property
def odd_metadata(self) -> dict[str, Any]:
return self.data["schema"]

@property
def is_nullable(self) -> bool:
return False if self.odd_metadata["notnull"] else True


@dataclass
class Dataset(CKANObject):
@property
def tags(self) -> list[str]:
return self.data["tags"]

@cached_property
def resources(self) -> list[Resource]:
return [Resource(resource) for resource in self.data["resources"]]

@property
def odd_metadata(self) -> dict[str, Any]:
metadata = get_metadata(self.data, self.excluded_keys)
transformed = get_groups(metadata)
return transformed
24 changes: 24 additions & 0 deletions odd_collector/adapters/ckan/mappers/organization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from funcy import lpluck_attr
from odd_collector_sdk.utils.metadata import extract_metadata, DefinitionType
from odd_models.models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator import CKANGenerator
from odd_collector.adapters.ckan.mappers.models import Organization


def map_organization(
oddrn_generator: CKANGenerator,
organization: Organization,
datasets_entities: list[DataEntity],
) -> DataEntity:
return DataEntity(
oddrn=oddrn_generator.get_oddrn_by_path("organizations", organization.name),
name=organization.name,
type=DataEntityType.DAG,
metadata=[
extract_metadata("ckan", organization, DefinitionType.DATASET, flatten=True)
],
data_entity_group=DataEntityGroup(
entities_list=lpluck_attr("oddrn", datasets_entities)
),
owner=None,
)
Loading

0 comments on commit 4aeb172

Please sign in to comment.