diff --git a/eventum_plugins/output/opensearch.py b/eventum_plugins/output/opensearch.py index 604c976..a5650fe 100644 --- a/eventum_plugins/output/opensearch.py +++ b/eventum_plugins/output/opensearch.py @@ -116,6 +116,30 @@ async def _perform_bulk(self, host: str, bulk_data: str) -> None: f'HTTP {response.status} - {text}' ) + try: + result = json.loads(text) + except json.JSONDecodeError as e: + raise OutputPluginRuntimeError( + f'Failed to decode bulk response ({host}): {e}' + ) + + errors = [] + try: + if result['errors']: + for item in result['items']: + if 'index' in item and 'error' in item['index']: + errors.append(item['index']['error']) + except KeyError as e: + raise OutputPluginRuntimeError( + f'Failed to process bulk response ({host}): {e}' + ) + + if errors: + raise OutputPluginRuntimeError( + 'Failed to index some of the events:\n' + f'{json.dumps(errors, indent=2, ensure_ascii=False)}' + ) + async def _write_many(self, events: Iterable[str]) -> None: bulks_count = len(self._hosts) bulks = [""] * bulks_count @@ -143,7 +167,7 @@ async def _write_many(self, events: Iterable[str]) -> None: if successful_count < len(results): raise OutputPluginRuntimeError( - 'Bulk indexing did not complete with success: only' + 'Bulk indexing did not complete with success: only ' f'{successful_count}/{len(results)} nodes indexed events' ) diff --git a/pyproject.toml b/pyproject.toml index 4f0a181..2378ad7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "eventum-plugins" -version = "1.0.13" +version = "1.0.14" description = "Plugins for Eventum" license = "Apache-2.0" authors = ["Nikita Reznikov "]