Skip to content

Commit

Permalink
Avoid immense term error in metrics metadata fields (#1835)
Browse files Browse the repository at this point in the history
To avoid immense term error in metrics metadata fields:

* meta.error-description field mapping changes from keyword to wildcard
  which fits this type of content better while still allowing the
  search,
* remaining dynamically mapped text fields get "ignore_above": 8191
  parameter to prevent immense term error in the worst case of error
  message being composed entirely of UTF-8 characters,
* meta.error-description field generated in BulkIndex runner gets
  limited to 5 unique errors only for better log readability -
  currently, in the worst case the field is concatenated from errors
  from all bulk documents.
  • Loading branch information
gbanasiak authored Mar 13, 2024
1 parent a2c09a7 commit e4d9b3c
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 38 deletions.
40 changes: 35 additions & 5 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,44 @@ def extract_error_details(self, error_details, data):
else:
error_details.add((data["status"], None))

def _error_status_summary(self, error_details):
"""
Generates error status code summary.
:param error_details: accumulated error details
:return: error status summary
"""
status_counts = {}
for status, _ in error_details:
status_counts[status] = status_counts.get(status, 0) + 1
status_summaries = []
for status in sorted(status_counts.keys()):
status_summaries.append(f"{status_counts[status]}x{status}")
return ", ".join(status_summaries)

def error_description(self, error_details):
"""
Generates error description with an arbitrary limit of 5 errors.
:param error_details: accumulated error details
:return: error description
"""
error_descriptions = []
for status, reason in error_details:
if reason:
error_descriptions.append(f"HTTP status: {status}, message: {reason}")
is_truncated = False
for count, error_detail in enumerate(sorted(error_details)):
status, reason = error_detail
if count < 5:
if reason:
error_descriptions.append(f"HTTP status: {status}, message: {reason}")
else:
error_descriptions.append(f"HTTP status: {status}")
else:
error_descriptions.append(f"HTTP status: {status}")
return " | ".join(sorted(error_descriptions))
is_truncated = True
break
description = " | ".join(error_descriptions)
if is_truncated:
description = description + " | TRUNCATED " + self._error_status_summary(error_details)
return description

def __repr__(self, *args, **kwargs):
return "bulk-index"
Expand Down
74 changes: 41 additions & 33 deletions esrally/resources/metrics-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
"type": "keyword",
"ignore_above": 8191
}
}
}
Expand All @@ -29,42 +30,18 @@
"type": "date",
"format": "epoch_millis"
},
"relative-time": {
"type": "float"
},
"race-id": {
"type": "keyword"
},
"race-timestamp": {
"type": "date",
"format": "basic_date_time_no_millis",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"environment": {
"type": "keyword"
},
"track": {
"car": {
"type": "keyword"
},
"challenge": {
"type": "keyword"
},
"car": {
"environment": {
"type": "keyword"
},
"name": {
"job": {
"type": "keyword"
},
"value": {
"type": "float"
},
"min": {
"type": "float"
},
"max": {
"type": "float"
},
Expand All @@ -74,23 +51,54 @@
"median": {
"type": "float"
},
"unit": {
"meta": {
"properties": {
"error-description": {
"type": "wildcard"
}
}
},
"min": {
"type": "float"
},
"name": {
"type": "keyword"
},
"operation": {
"type": "keyword"
},
"operation-type": {
"type": "keyword"
},
"race-id": {
"type": "keyword"
},
"race-timestamp": {
"type": "date",
"format": "basic_date_time_no_millis",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"relative-time": {
"type": "float"
},
"sample-type": {
"type": "keyword"
},
"task": {
"type": "keyword"
},
"operation": {
"track": {
"type": "keyword"
},
"operation-type": {
"unit": {
"type": "keyword"
},
"job": {
"type": "keyword"
"value": {
"type": "float"
}
}
}
Expand Down
138 changes: 138 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,144 @@ async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es):

es.bulk.assert_awaited_with(body=bulk_params["body"], params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es):
es.bulk = mock.AsyncMock(
return_value={
"took": 5,
"errors": True,
"items": [
{
"create": {
"_index": "test",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, document already exists (current version [1])",
},
}
},
{
"create": {
"_index": "test",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[2]: version conflict, document already exists (current version [1])",
},
}
},
{
"create": {
"_index": "test",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[3]: version conflict, document already exists (current version [1])",
},
}
},
{
"create": {
"_index": "test",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[4]: version conflict, document already exists (current version [1])",
},
}
},
{
"create": {
"_index": "test",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[5]: version conflict, document already exists (current version [1])",
},
}
},
{
"create": {
"_index": "test",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[6]: version conflict, document already exists (current version [1])",
},
}
},
{
"create": {
"_index": "test",
"status": 429,
"error": {
"type": "cluster_block_exception",
"reason": "index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded "
"flood-stage watermark, index has read-only-allow-delete block];",
},
}
},
],
}
)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body(
'{ "index" : { "_index" : "test" } }',
'{"message" : "in a bottle #1"}',
'{ "index" : { "_index" : "test" } }',
'{"message" : "in a bottle #2"}',
'{ "index" : { "_index" : "test" } }',
'{"message" : "in a bottle #3"}',
'{ "index" : { "_index" : "test" } }',
'{"message" : "in a bottle #4"}',
'{ "index" : { "_index" : "test" } }',
'{"message" : "in a bottle #5"}',
'{ "index" : { "_index" : "test" } }',
'{"message" : "in a bottle #6"}',
'{ "index" : { "_index" : "test" } }',
'{"message" : "in a bottle #7"}',
),
"action-metadata-present": True,
"bulk-size": 7,
"unit": "docs",
"detailed-results": True,
"index": "test",
}

with mock.patch.object(bulk.logger, "warning") as mocked_warning_logger:
result = await bulk(es, bulk_params)
mocked_warning_logger.assert_has_calls([mock.call("Bulk request failed: [%s]", result["error-description"])])

assert result == {
"took": 5,
"index": "test",
"weight": 7,
"unit": "docs",
"success": False,
"success-count": 0,
"error-count": 7,
"error-type": "bulk",
"error-description": (
"HTTP status: 409, message: [1]: version conflict, document already exists (current version [1]) | "
"HTTP status: 409, message: [2]: version conflict, document already exists (current version [1]) | "
"HTTP status: 409, message: [3]: version conflict, document already exists (current version [1]) | "
"HTTP status: 409, message: [4]: version conflict, document already exists (current version [1]) | "
"HTTP status: 409, message: [5]: version conflict, document already exists (current version [1]) | "
"TRUNCATED 6x409, 1x429"
),
"ops": {"create": collections.Counter({"item-count": 7})},
"shards_histogram": [],
"total-document-size-bytes": 210,
"bulk-request-size-bytes": 455,
}

es.bulk.assert_awaited_with(body=bulk_params["body"], params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_default(self, es):
Expand Down

0 comments on commit e4d9b3c

Please sign in to comment.