Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
fix(es): remove duplicates, change oddrn creation (#226)
Browse files Browse the repository at this point in the history
* fix: remove duplicates

* chore: update es generator
  • Loading branch information
Vixtir authored Sep 27, 2023
1 parent cb1c228 commit 4d5e064
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 552 deletions.
195 changes: 79 additions & 116 deletions odd_collector/adapters/elasticsearch/adapter.py
Original file line number Diff line number Diff line change
@@ -1,109 +1,114 @@
import logging
from typing import Dict, Iterable, Optional
from typing import Optional
from urllib.parse import urlparse

from elasticsearch import Elasticsearch
from funcy import get_lax
from odd_collector_sdk.domain.adapter import AbstractAdapter
from funcy import get_in, get_lax
from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import ElasticSearchGenerator
from oddrn_generator import ElasticSearchGenerator, Generator

from odd_collector.domain.plugin import ElasticsearchPlugin

from .client import Client
from .logger import logger
from .mappers.indexes import map_index
from .mappers.stream import map_data_stream, map_data_stream_template
from .mappers.indices import map_index
from .mappers.stream import map_data_stream
from .mappers.template import TemplateEntity, map_template


class Adapter(AbstractAdapter):
class Adapter(BaseAdapter):
config: ElasticsearchPlugin
generator: ElasticSearchGenerator

def __init__(self, config: ElasticsearchPlugin) -> None:
self.__es_client = Elasticsearch(
hosts=[f"{config.host}:{config.port}"],
basic_auth=(config.username, config.password.get_secret_value()),
verify_certs=config.verify_certs,
ca_certs=config.ca_certs,
)
self.__oddrn_generator = ElasticSearchGenerator(
host_settings=urlparse(config.host).netloc
)
super().__init__(config)
self.client = Client(config)

def create_generator(self) -> Generator:
return ElasticSearchGenerator(host_settings=urlparse(self.config.host).netloc)

def get_data_entity_list(self) -> DataEntityList:
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=self.get_datasets(),
items=list(self.get_datasets()),
)

def get_data_source_oddrn(self) -> str:
return self.__oddrn_generator.get_data_source_oddrn()

def get_datasets(self) -> Iterable[DataEntity]:
logger.debug(f"Start collecting datasets from Elasticsearch at {self.config}")
result = []
logger.info("Get indices")
indices = self.__get_indices()
logger.info(f"Got {indices=}")
def get_datasets(self) -> list[DataEntity]:
logger.debug(
f"Start collecting datasets from Elasticsearch at {self.config.host} with port {self.config.port}"
)

logger.debug("Process indices")
for index in indices:
mapping = self.__get_mapping(index["index"])[index["index"]]
logger.debug(f"Mapping for index {index['index']} is {mapping}")
try:
result.append(self.__process_index_data(index, mapping))
except KeyError as e:
logging.warning(
f"Elasticsearch adapter failed to process index {index}: KeyError {e}"
)
indices = self.client.get_indices("*")
templates = self.client.get_index_template("*")

logger.info("Process data streams and their templates")
all_data_streams = self.__get_data_streams()
mappings = self.client.get_mapping()
data_streams = self.client.get_data_streams()

logger.info("Build template to data stream mapping")
templates_info = self.get_templates_from_data_streams(all_data_streams)
indices = [
index for index in indices if not index["index"].startswith(".internal")
]
logger.success(f"Got {len(indices)} indices")

for template, data_streams in templates_info.items():
template_meta = self.__get_data_stream_templates_info(template).get(
"index_templates"
index_by_names = {index["index"]: index for index in indices}
templates_by_names = {
tmpl["name"]: tmpl for tmpl in templates if not tmpl["name"].startswith(".")
}
streams_by_names = {stream["name"]: stream for stream in data_streams}
mappings_by_names = dict(mappings.items())

indices_entities: dict[str, DataEntity] = {}
for index_name, index in index_by_names.items():
indices_entities[index_name] = map_index(
index=index,
generator=self.generator,
properties=get_in(
mappings_by_names,
[index_name, "mappings", "properties"],
default={},
),
)

data_stream_entities = []
logger.debug(f"Template {template} has metadata {template_meta}")

for data_stream in data_streams:
logger.debug(
f"Data stream {data_stream['name']} has template {template}"
)

lifecycle_policies = self.__get_rollover_policy(data_stream)
stream_data_entity = map_data_stream(
data_stream,
template_meta,
lifecycle_policies,
self.__oddrn_generator,
)
# map templates
template_entities: dict[str, TemplateEntity] = {}
for tmpl_name, tmpl in templates_by_names.items():
data_entity = map_template(tmpl, self.generator)
pattern = tmpl["index_template"]["index_patterns"]

data_stream_entities.append(stream_data_entity)

result.extend(data_stream_entities)
# Here we are trying to get all indices that match the pattern
# to show that current template works with index
# But if we can't get them, we just skip
try:
for index_name in self.client.get_indices(index=pattern, h="index"):
if index_entity := indices_entities.get(index_name["index"]):
data_entity.add_output(index_entity)
except Exception as e:
logger.warning(e)
continue

logger.debug(f"Create template data entity {template}")
template_entities[tmpl_name] = data_entity

data_streams_oddrn = [item.oddrn for item in data_stream_entities]
logger.debug(f"List of data streams oddrn {data_streams_oddrn}")
# map data streams
stream_entities = {}
for stream_name, stream in streams_by_names.items():
stream_data_entity = map_data_stream(stream, self.generator)
stream_entities[stream_name] = stream_data_entity

template_entity = map_data_stream_template(
template_meta, data_streams_oddrn, self.__oddrn_generator
)
result.append(template_entity)
if template_entity := template_entities.get(stream["template"]):
template_entity.add_input(stream_data_entity)

return result
return [
*indices_entities.values(),
*stream_entities.values(),
*template_entities.values(),
]

def __get_rollover_policy(self, stream_data: Dict) -> Optional[Dict]:
# TODO: implement mapping rollover policies
def _get_rollover_policy(self, stream_data: dict) -> Optional[dict]:
try:
backing_indices = [
index_info["index_name"] for index_info in stream_data["indices"]
]
for index in backing_indices:
index_settings = self.__es_client.indices.get(index=index)
index_settings = self.client.get_indices(index)
lifecycle_policy = get_lax(
index_settings, [index, "settings", "index", "lifecycle"]
)
Expand All @@ -112,7 +117,7 @@ def __get_rollover_policy(self, stream_data: Dict) -> Optional[Dict]:
logger.debug(
f"Index {index} has Lifecycle Policy {lifecycle_policy['name']}"
)
lifecycle_policy_data = self.__es_client.ilm.get_lifecycle(
lifecycle_policy_data = self.client.ilm.get_lifecycle(
name=lifecycle_policy["name"]
)

Expand All @@ -139,51 +144,9 @@ def __get_rollover_policy(self, stream_data: Dict) -> Optional[Dict]:

lifecycle_metadata = {"max_age": max_age, "max_size": max_size}
return lifecycle_metadata

else:
logger.debug(f"No lifecycle policy exists for this index {index}.")
return None
except KeyError:
logger.debug(f"Incorrect fields. Got fields: {stream_data}")
return None

def get_templates_from_data_streams(self, data_streams: Dict) -> Dict:
"""
Expected result
{
"template": [data_stream, data_stream1],
"another_template": [data_stream2]
}
"""
templates = {}
for data_stream in data_streams:
if data_stream["template"] not in templates:
templates[data_stream["template"]] = [data_stream]
else:
templates[data_stream["template"]].append(data_stream)
return templates

def __get_mapping(self, index_name: str):
return self.__es_client.indices.get_mapping(index=index_name)

def __get_indices(self):
# System indices startswith `.` character
logger.debug("Get system indices start with .")
return [
_
for _ in self.__es_client.cat.indices(format="json")
if not _["index"].startswith(".")
]

def __get_data_streams(self) -> Dict:
response = self.__es_client.indices.get_data_stream(name="*")
return response["data_streams"]

def __get_data_stream_templates_info(self, template_name: str) -> Dict:
response = self.__es_client.indices.get_index_template(name=template_name)
return response

def __process_index_data(self, index_name: str, index_mapping: dict):
mapping = index_mapping["mappings"]["properties"]
logger.debug(f"Process mapping for index {index_name} with mapping {mapping}")
return map_index(index_name, mapping, self.__oddrn_generator)
33 changes: 33 additions & 0 deletions odd_collector/adapters/elasticsearch/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Optional

from elasticsearch import Elasticsearch

from odd_collector.domain.plugin import ElasticsearchPlugin


class Client:
def __init__(self, config: ElasticsearchPlugin):
self._es = Elasticsearch(
hosts=[f"{config.host}:{config.port}"],
basic_auth=(config.username, config.password.get_secret_value()),
verify_certs=config.verify_certs,
ca_certs=config.ca_certs,
)

def get_indices(self, index: Optional[str] = None, h=None) -> list:
return self._es.cat.indices(format="json", index=index, h=h).body

def get_mapping(self, index_name: Optional[str] = None) -> dict:
return self._es.indices.get_mapping(index=index_name).body

def get_index_settings(self, index_name: str) -> dict:
return self._es.indices.get_settings(index=index_name).body

def get_data_streams(self, name: Optional[str] = None) -> dict:
response = self._es.indices.get_data_stream(name=name)
return response["data_streams"]

def get_index_template(self, template_name: str) -> list[dict]:
return self._es.indices.get_index_template(name=template_name).body.get(
"index_templates"
)
2 changes: 2 additions & 0 deletions odd_collector/adapters/elasticsearch/logger.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from odd_collector_sdk.logger import logger

logger = logger
3 changes: 0 additions & 3 deletions odd_collector/adapters/elasticsearch/mappers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from .metadata import MetadataExtractor

metadata_extractor = MetadataExtractor()
20 changes: 9 additions & 11 deletions odd_collector/adapters/elasticsearch/mappers/fields.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Any, Dict, Optional
from typing import Any, Dict

from odd_models.models import DataSetField, DataSetFieldType, Type

from ..logger import logger
from oddrn_generator import ElasticSearchGenerator

# As of ElasticSearch 7.x supported fields are listed here
# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html#
Expand Down Expand Up @@ -57,15 +56,14 @@ def __get_field_type(props: Dict[str, Any]) -> str:
return "unknown"


def map_field(field_name, field_metadata: dict, oddrn_generator) -> DataSetField:
def map_field(
field_name: str,
field_metadata: dict,
oddrn_generator: ElasticSearchGenerator,
path: str,
) -> DataSetField:
data_type: str = __get_field_type(field_metadata)
logger.debug(
f"Field {field_name} with metadata {field_metadata} has {data_type} type"
)

oddrn_path: str = oddrn_generator.get_oddrn_by_path("fields", field_name)
logger.debug(f"Field {field_name} has oddrn path {oddrn_path}")

oddrn_path: str = oddrn_generator.get_oddrn_by_path(path, field_name)
field_type = TYPES_ELASTIC_TO_ODD.get(data_type, Type.TYPE_UNKNOWN)

dsf: DataSetField = DataSetField(
Expand Down
28 changes: 0 additions & 28 deletions odd_collector/adapters/elasticsearch/mappers/indexes.py

This file was deleted.

30 changes: 30 additions & 0 deletions odd_collector/adapters/elasticsearch/mappers/indices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from odd_models.models import DataEntity, DataEntityType, DataSet
from oddrn_generator import ElasticSearchGenerator

from odd_collector.adapters.elasticsearch.mappers.fields import map_field
from odd_collector.adapters.elasticsearch.mappers.metadata import extract_index_metadata


def map_index(
index: dict,
properties: dict,
generator: ElasticSearchGenerator,
) -> DataEntity:
generator.set_oddrn_paths(indices=index["index"])
index_oddrn = generator.get_oddrn_by_path("indices")

# field type with `@` prefix defines alias for another field in same index
field_list = [
map_field(name, value, generator, "indices_fields")
for name, value in properties.items()
if not name.startswith("@")
]

return DataEntity(
oddrn=index_oddrn,
name=index["index"],
owner=None,
type=DataEntityType.TABLE,
metadata=[extract_index_metadata(index)],
dataset=DataSet(parent_oddrn=None, rows_number=0, field_list=field_list),
)
Loading

0 comments on commit 4d5e064

Please sign in to comment.