From 90a8c283ac776b4ca63ca03a36bbc61ec1fb30cb Mon Sep 17 00:00:00 2001 From: Pavel Makarichev Date: Mon, 14 Aug 2023 01:04:14 -0700 Subject: [PATCH] fix(elasticsearch): handle http_auth config params (#211) --- config_examples/elasticsearch.yaml | 11 ++++----- .../adapters/elasticsearch/adapter.py | 23 +++++++++++-------- odd_collector/domain/plugin.py | 4 ++-- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/config_examples/elasticsearch.yaml b/config_examples/elasticsearch.yaml index 579eab6b..91225462 100644 --- a/config_examples/elasticsearch.yaml +++ b/config_examples/elasticsearch.yaml @@ -5,9 +5,8 @@ plugins: - type: elasticsearch name: elasticsearch_adapter description: "" # Optional string - host: host - port: 0000 - http_auth: "" # Optional string, default None - use_ssl: "" # Optional boolean, default None - verify_certs: "" # Optional boolean, default None - ca_certs: "" # Optional string, default None \ No newline at end of file + host: host # Optional string, i.e https://localhost:9200 + http_auth: # Optional string, i.e elastic:my_strong_password, default None + use_ssl: # Optional boolean, default None + verify_certs: # Optional boolean, default None + ca_certs: # Optional string, default None \ No newline at end of file diff --git a/odd_collector/adapters/elasticsearch/adapter.py b/odd_collector/adapters/elasticsearch/adapter.py index 2d52b8e7..9ea552cb 100644 --- a/odd_collector/adapters/elasticsearch/adapter.py +++ b/odd_collector/adapters/elasticsearch/adapter.py @@ -7,17 +7,18 @@ from odd_models.models import DataEntity, DataEntityList from oddrn_generator import ElasticSearchGenerator +from odd_collector.domain.plugin import ElasticsearchPlugin + from .logger import logger from .mappers.indexes import map_index from .mappers.stream import map_data_stream, map_data_stream_template class Adapter(AbstractAdapter): - def __init__(self, config) -> None: + def __init__(self, config: ElasticsearchPlugin) -> None: self.__es_client = Elasticsearch( config.host, - port=config.port, - http_auth=config.http_auth, + http_auth=config.http_auth.split(":") if config.http_auth else None, use_ssl=config.use_ssl, verify_certs=config.verify_certs, ca_certs=config.ca_certs, @@ -34,13 +35,15 @@ def get_data_source_oddrn(self) -> str: return self.__oddrn_generator.get_data_source_oddrn() def get_datasets(self) -> Iterable[DataEntity]: - logger.debug("Collect dataset") + logger.info( + "Start collecting datasets from Elasticsearch at {self.config.host}" + ) result = [] + logger.info("Get indices") indices = self.__get_indices() + logger.info(f"Got {indices=}") - logger.debug(f"Indeces are {indices}") - - logger.debug("Process indeces") + logger.debug("Process indices") for index in indices: mapping = self.__get_mapping(index["index"])[index["index"]] logger.debug(f"Mapping for index {index['index']} is {mapping}") @@ -51,10 +54,10 @@ def get_datasets(self) -> Iterable[DataEntity]: f"Elasticsearch adapter failed to process index {index}: KeyError {e}" ) - logger.debug("Process data streams and their templates") + logger.info("Process data streams and their templates") all_data_streams = self.__get_data_streams() - logger.debug("Build template to data stream mapping") + logger.info("Build template to data stream mapping") templates_info = self.get_templates_from_data_streams(all_data_streams) for template, data_streams in templates_info.items(): @@ -165,7 +168,7 @@ def __get_mapping(self, index_name: str): def __get_indices(self): # System indices startswith `.` character - logger.debug("Get system indeces start with .") + logger.debug("Get system indices start with .") return [ _ for _ in self.__es_client.cat.indices(format="json") diff --git a/odd_collector/domain/plugin.py b/odd_collector/domain/plugin.py index 41f4b3a6..634ff95d 100644 --- a/odd_collector/domain/plugin.py +++ b/odd_collector/domain/plugin.py @@ -119,7 +119,7 @@ class HivePlugin(BasePlugin): connection_params: HiveConnectionParams -class ElasticsearchPlugin(WithHost, WithPort): +class ElasticsearchPlugin(WithHost): type: Literal["elasticsearch"] http_auth: Optional[str] = None use_ssl: Optional[bool] = None @@ -206,7 +206,7 @@ class CubeJSPlugin(BasePlugin): @validator("token") def validate_token(cls, value: Optional[SecretStr], values): - if values.get("dev_mode") == False and value is None: + if values.get("dev_mode") is False and value is None: raise ValueError("Token must be set in production mode") return value