Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
fix(elasticsearch): handle http_auth config params (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vixtir authored Aug 14, 2023
1 parent 3ab978b commit 90a8c28
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 18 deletions.
11 changes: 5 additions & 6 deletions config_examples/elasticsearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ plugins:
- type: elasticsearch
name: elasticsearch_adapter
description: "" # Optional string
host: host
port: 0000
http_auth: "" # Optional string, default None
use_ssl: "" # Optional boolean, default None
verify_certs: "" # Optional boolean, default None
ca_certs: "" # Optional string, default None
host: host # Optional string, i.e https://localhost:9200
http_auth: # Optional string, i.e elastic:my_strong_password, default None
use_ssl: # Optional boolean, default None
verify_certs: # Optional boolean, default None
ca_certs: # Optional string, default None
23 changes: 13 additions & 10 deletions odd_collector/adapters/elasticsearch/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import ElasticSearchGenerator

from odd_collector.domain.plugin import ElasticsearchPlugin

from .logger import logger
from .mappers.indexes import map_index
from .mappers.stream import map_data_stream, map_data_stream_template


class Adapter(AbstractAdapter):
def __init__(self, config) -> None:
def __init__(self, config: ElasticsearchPlugin) -> None:
self.__es_client = Elasticsearch(
config.host,
port=config.port,
http_auth=config.http_auth,
http_auth=config.http_auth.split(":") if config.http_auth else None,
use_ssl=config.use_ssl,
verify_certs=config.verify_certs,
ca_certs=config.ca_certs,
Expand All @@ -34,13 +35,15 @@ def get_data_source_oddrn(self) -> str:
return self.__oddrn_generator.get_data_source_oddrn()

def get_datasets(self) -> Iterable[DataEntity]:
logger.debug("Collect dataset")
logger.info(
"Start collecting datasets from Elasticsearch at {self.config.host}"
)
result = []
logger.info("Get indices")
indices = self.__get_indices()
logger.info(f"Got {indices=}")

logger.debug(f"Indeces are {indices}")

logger.debug("Process indeces")
logger.debug("Process indices")
for index in indices:
mapping = self.__get_mapping(index["index"])[index["index"]]
logger.debug(f"Mapping for index {index['index']} is {mapping}")
Expand All @@ -51,10 +54,10 @@ def get_datasets(self) -> Iterable[DataEntity]:
f"Elasticsearch adapter failed to process index {index}: KeyError {e}"
)

logger.debug("Process data streams and their templates")
logger.info("Process data streams and their templates")
all_data_streams = self.__get_data_streams()

logger.debug("Build template to data stream mapping")
logger.info("Build template to data stream mapping")
templates_info = self.get_templates_from_data_streams(all_data_streams)

for template, data_streams in templates_info.items():
Expand Down Expand Up @@ -165,7 +168,7 @@ def __get_mapping(self, index_name: str):

def __get_indices(self):
# System indices startswith `.` character
logger.debug("Get system indeces start with .")
logger.debug("Get system indices start with .")
return [
_
for _ in self.__es_client.cat.indices(format="json")
Expand Down
4 changes: 2 additions & 2 deletions odd_collector/domain/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class HivePlugin(BasePlugin):
connection_params: HiveConnectionParams


class ElasticsearchPlugin(WithHost, WithPort):
class ElasticsearchPlugin(WithHost):
type: Literal["elasticsearch"]
http_auth: Optional[str] = None
use_ssl: Optional[bool] = None
Expand Down Expand Up @@ -206,7 +206,7 @@ class CubeJSPlugin(BasePlugin):

@validator("token")
def validate_token(cls, value: Optional[SecretStr], values):
if values.get("dev_mode") == False and value is None:
if values.get("dev_mode") is False and value is None:
raise ValueError("Token must be set in production mode")

return value
Expand Down

0 comments on commit 90a8c28

Please sign in to comment.