Skip to content

Commit

Permalink
Add handling bulk response in opensearch output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
rnv812 committed Jul 5, 2024
1 parent efbb521 commit 0d72863
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
26 changes: 25 additions & 1 deletion eventum_plugins/output/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,30 @@ async def _perform_bulk(self, host: str, bulk_data: str) -> None:
f'HTTP {response.status} - {text}'
)

try:
result = json.loads(text)
except json.JSONDecodeError as e:
raise OutputPluginRuntimeError(
f'Failed to decode bulk response ({host}): {e}'
)

errors = []
try:
if result['errors']:
for item in result['items']:
if 'index' in item and 'error' in item['index']:
errors.append(item['index']['error'])
except KeyError as e:
raise OutputPluginRuntimeError(
f'Failed to process bulk response ({host}): {e}'
)

if errors:
raise OutputPluginRuntimeError(
'Failed to index some of the events:\n'
f'{json.dumps(errors, indent=2, ensure_ascii=False)}'
)

async def _write_many(self, events: Iterable[str]) -> None:
bulks_count = len(self._hosts)
bulks = [""] * bulks_count
Expand Down Expand Up @@ -143,7 +167,7 @@ async def _write_many(self, events: Iterable[str]) -> None:

if successful_count < len(results):
raise OutputPluginRuntimeError(
'Bulk indexing did not complete with success: only'
'Bulk indexing did not complete with success: only '
f'{successful_count}/{len(results)} nodes indexed events'
)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "eventum-plugins"
version = "1.0.13"
version = "1.0.14"
description = "Plugins for Eventum"
license = "Apache-2.0"
authors = ["Nikita Reznikov <[email protected]>"]
Expand Down

0 comments on commit 0d72863

Please sign in to comment.