Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding geoip stats telemetry #1842

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ If you invoke ``esrally list telemetry``, it will show which telemetry devices a
data-stream-stats Data Stream Stats Regularly samples data stream stats
ingest-pipeline-stats Ingest Pipeline Stats Reports Ingest Pipeline stats at the end of the benchmark.
disk-usage-stats Disk usage of each field Runs the indices disk usage API after benchmarking
geoip-stats GeoIp Stats Writes geo ip stats to the metrics store at the end of the benchmark.

Keep in mind that each telemetry device may incur a runtime overhead which can skew results.

Expand Down Expand Up @@ -414,3 +415,15 @@ Example of recorded documents given two nodes in the target cluster::
.. note::

This telemetry device is only applicable to `Elastic Serverless <https://docs.elastic.co/serverless>`_ and requires elevated privleges only available to Elastic developers.

geoip-stats
----------------

The geoip-stats telemetry device fetches data from the `GeoIP Stats API <https://www.elastic.co/guide/en/elasticsearch/reference/current/geoip-stats-api.html>`_ at the end of the run, and stores geoip cache stats as metrics for the run. This is available only in Elasticsearch 8.14.0 and higher. Stored metrics include:

* ``geoip_cache_count``: The number of items in the cache.
* ``geoip_cache_hits``: The number of times an IP address was found in the cache.
* ``geoip_cache_misses``: The number of times an IP address was not found in the cache.
* ``geoip_cache_evictions``: The number of times an entry was evicted from the cache because the max cache size had been reached.
* ``geoip_cache_hits_time_in_millis``: The total amount of time spent fetching data from the cache, for cache hits only.
* ``geoip_cache_misses_time_in_millis``: The total amount of time spent fetching data from the cache, for cache misses only. This includes time spent fetching data from the geoip database.
1 change: 1 addition & 0 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def prepare_telemetry(self, es, enable, index_names, data_stream_names, build_ha
telemetry.IngestPipelineStats(es, self.metrics_store),
telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names, data_stream_names),
telemetry.BlobStoreStats(telemetry_params, es, self.metrics_store),
telemetry.GeoIpStats(es_default, self.metrics_store),
]
else:
devices = []
Expand Down
38 changes: 38 additions & 0 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def list_telemetry():
DataStreamStats,
IngestPipelineStats,
DiskUsageStats,
GeoIpStats,
]
]
console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"]))
Expand Down Expand Up @@ -2551,3 +2552,40 @@ def object_store_stats(self, stats):

def operational_backup_service_stats(self, stats):
return flatten_stats_fields(prefix="operational_backup", stats=stats.get("operational_backup_service_stats", {}))


class GeoIpStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "geoip-stats"
human_name = "GeoIp Stats"
help = "Writes geo ip stats to the metrics store at the end of the benchmark."

def __init__(self, client, metrics_store):
super().__init__()
self.client = client
self.metrics_store = metrics_store

def on_benchmark_stop(self):
self.logger.info("Gathering GeoIp stats at benchmark end")
# First, build a map of node id to node name, because the geoip stats API doesn't return node name:
try:
nodes_info = self.client.nodes.info(node_id="_all")["nodes"].items()
except BaseException:
self.logger.exception("Could not retrieve nodes info")
nodes_info = {}
node_id_to_name_dict = {}
for node_id, node in nodes_info:
node_id_to_name_dict[node_id] = node["name"]
geoip_stats = self.client.ingest.geo_ip_stats()
stats_dict = geoip_stats.body
nodes_dict = stats_dict["nodes"]
for node_id, node in nodes_dict.items():
node_name = node_id_to_name_dict[node_id]
cache_stats = node["cache_stats"]
self.metrics_store.put_value_node_level(node_name, "geoip_cache_count", cache_stats["count"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_hits", cache_stats["hits"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_misses", cache_stats["misses"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_evictions", cache_stats["evictions"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_hits_time_in_millis", cache_stats["hits_time_in_millis"])
self.metrics_store.put_value_node_level(node_name, "geoip_cache_misses_time_in_millis", cache_stats["misses_time_in_millis"])
Loading