-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publish recall as a kpi metric #581
Changes from 1 commit
397b31b
44e37e6
e478335
a1b9af0
626953f
d40921d
387276c
4d38ad4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
# not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
|
@@ -1754,24 +1754,44 @@ def __call__(self): | |
op_type = task.operation.type | ||
error_rate = self.error_rate(t, op_type) | ||
duration = self.duration(t) | ||
|
||
if task.operation.include_in_results_publishing or error_rate > 0: | ||
self.logger.debug("Gathering request metrics for [%s].", t) | ||
result.add_op_metrics( | ||
t, | ||
task.operation.name, | ||
self.summary_stats("throughput", t, op_type, percentiles_list=self.throughput_percentiles), | ||
self.summary_stats( | ||
"throughput", | ||
t, | ||
op_type, | ||
percentiles_list=self.throughput_percentiles, | ||
), | ||
self.single_latency(t, op_type), | ||
self.single_latency(t, op_type, metric_name="service_time"), | ||
self.single_latency(t, op_type, metric_name="client_processing_time"), | ||
self.single_latency( | ||
t, op_type, metric_name="client_processing_time" | ||
), | ||
self.single_latency(t, op_type, metric_name="processing_time"), | ||
error_rate, | ||
duration, | ||
self.merge( | ||
self.workload.meta_data, | ||
self.test_procedure.meta_data, | ||
task.operation.meta_data, | ||
task.meta_data) | ||
task.meta_data, | ||
), | ||
) | ||
|
||
result.add_kpi_metrics( | ||
t, | ||
task.operation.name, | ||
self.single_latency(t, op_type, metric_name="recall@k"), | ||
self.single_latency(t, op_type, metric_name="recall@1"), | ||
self.single_latency(t, op_type, metric_name="recall_time_ms"), | ||
error_rate, | ||
duration, | ||
) | ||
|
||
self.logger.debug("Gathering indexing metrics.") | ||
result.total_time = self.sum("indexing_total_time") | ||
result.total_time_per_shard = self.shard_stats("indexing_total_time") | ||
|
@@ -1966,6 +1986,7 @@ def single_latency(self, task, operation_type, metric_name="latency"): | |
class GlobalStats: | ||
def __init__(self, d=None): | ||
self.op_metrics = self.v(d, "op_metrics", default=[]) | ||
self.kpi_metrics = self.v(d, "kpi_metrics", default=[]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the scope for kpi metrics? Aside from recall and maybe ndcg, what else would go in here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think any performance metric for k-NN is in scope for this. Right now it seems like recall is the most major one but if k-NN adds ndcg we could also track that in OSB. (The process for adding new metrics would be to calculate/add it to the metadata in the search query runner and then pull it from the metadata into this I mainly wanted to explicitly distinguish between algorithm metrics like recall and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see. KPI though might include latency right? It seems like a good category here would be |
||
self.total_time = self.v(d, "total_time") | ||
self.total_time_per_shard = self.v(d, "total_time_per_shard", default={}) | ||
self.indexing_throttle_time = self.v(d, "indexing_throttle_time") | ||
|
@@ -2094,6 +2115,18 @@ def add_op_metrics(self, task, operation, throughput, latency, service_time, cli | |
doc["meta"] = meta | ||
self.op_metrics.append(doc) | ||
|
||
def add_kpi_metrics(self, task, operation, recall_at_k_stats, recall_at_1_stats, recall_time_ms_stats, error_rate, duration): | ||
self.kpi_metrics.append({ | ||
"task": task, | ||
"operation": operation, | ||
"recall@k": recall_at_k_stats, | ||
"recall@1":recall_at_1_stats, | ||
"recall_time_ms": recall_time_ms_stats, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spoke with @VijayanB and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we remove from this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry not sure what "this" is referring to. Do you mean remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry meant the recall_time_ms - not sure if this is needed |
||
"error_rate": error_rate, | ||
"duration": duration | ||
} | ||
) | ||
|
||
def tasks(self): | ||
# ensure we can read test_execution.json files before Benchmark 0.8.0 | ||
return [v.get("task", v["operation"]) for v in self.op_metrics] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -156,6 +156,12 @@ def publish(self): | |
metrics_table.extend(self._publish_error_rate(record, task)) | ||
self.add_warnings(warnings, record, task) | ||
|
||
for record in stats.kpi_metrics: | ||
task = record["task"] | ||
res = self._publish_recall(record, task) | ||
if res: | ||
metrics_table.extend(res) | ||
|
||
self.write_results(metrics_table) | ||
|
||
if warnings: | ||
|
@@ -200,14 +206,27 @@ def _publish_service_time(self, values, task): | |
def _publish_processing_time(self, values, task): | ||
return self._publish_percentiles("processing time", task, values["processing_time"]) | ||
|
||
def _publish_percentiles(self, name, task, value): | ||
def _publish_recall(self, values, task): | ||
recall_k = values["recall@k"] | ||
recall_1 = values["recall@1"] | ||
|
||
try: | ||
return self._join( | ||
self._line("Mean recall@k", task, recall_k["mean"], "", lambda v: "%.2f" % v), | ||
self._line("Mean recall@1", task, recall_1["mean"], "", lambda v: "%.2f" % v), | ||
*self._publish_percentiles("recall_k percentiles", task, recall_k, unit="") | ||
) | ||
except KeyError: | ||
return None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to catch this? can you return from like 214 if keys doesn't exist? |
||
|
||
def _publish_percentiles(self, name, task, value, unit="ms"): | ||
lines = [] | ||
percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES) | ||
|
||
if value: | ||
for percentile in metrics.percentiles_for_sample_size(sys.maxsize, percentiles_list=percentiles): | ||
percentile_value = value.get(metrics.encode_float_key(percentile)) | ||
a_line = self._line("%sth percentile %s" % (percentile, name), task, percentile_value, "ms", | ||
a_line = self._line("%sth percentile %s" % (percentile, name), task, percentile_value, unit, | ||
force=self.publish_all_percentile_values) | ||
self._append_non_empty(lines, a_line) | ||
return lines | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
# not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
|
@@ -46,7 +46,6 @@ | |
from osbenchmark.workload import WorkloadProcessorRegistry, load_workload, load_workload_plugins | ||
from osbenchmark.utils import convert, console, net | ||
from osbenchmark.worker_coordinator.errors import parse_error | ||
|
||
################################## | ||
# | ||
# Messages sent between worker_coordinators | ||
|
@@ -847,6 +846,50 @@ def __call__(self, raw_samples): | |
start = total_start | ||
final_sample_count = 0 | ||
for idx, sample in enumerate(raw_samples): | ||
self.logger.debug( | ||
"All sample meta data: [%s],[%s],[%s],[%s],[%s]", | ||
self.workload_meta_data, | ||
self.test_procedure_meta_data, | ||
sample.operation_meta_data, | ||
sample.task.meta_data, | ||
sample.request_meta_data, | ||
) | ||
|
||
# if request_meta_data exists then it will have {"success": true/false} as a parameter. | ||
if sample.request_meta_data and len(sample.request_meta_data) > 1: | ||
self.logger.debug("Found: %s", sample.request_meta_data) | ||
recall_metric_names = ["recall@k", "recall@1"] | ||
|
||
for recall_metric_name in recall_metric_names: | ||
if recall_metric_name in sample.request_meta_data: | ||
meta_data = self.merge( | ||
self.workload_meta_data, | ||
self.test_procedure_meta_data, | ||
sample.operation_meta_data, | ||
sample.task.meta_data, | ||
sample.request_meta_data, | ||
) | ||
|
||
self.logger.debug( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need this to log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching this -- It's debug level so it shouldn't show up with normal logging settings. I was using it to figure out how samples worked, so this log has served its purpose and I'll remove it now. |
||
"Here are the sample stats: Task: %s, operation: %s, operation_type; %s, sample_type: %s", | ||
sample.task.name, | ||
sample.operation_name, | ||
sample.operation_type, | ||
sample.sample_type, | ||
) | ||
self.metrics_store.put_value_cluster_level( | ||
name=recall_metric_name, | ||
value=sample.request_meta_data[recall_metric_name], | ||
unit="", | ||
task=sample.task.name, # todo change unit to segment count unit... | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will remove todo comment in next revision |
||
operation=sample.operation_name, | ||
operation_type=sample.operation_type, | ||
sample_type=sample.sample_type, | ||
absolute_time=sample.absolute_time, | ||
relative_time=sample.relative_time, | ||
meta_data=meta_data, | ||
) | ||
|
||
if idx % self.downsample_factor == 0: | ||
final_sample_count += 1 | ||
meta_data = self.merge( | ||
|
@@ -895,7 +938,7 @@ def __call__(self, raw_samples): | |
end = time.perf_counter() | ||
self.logger.debug("Calculating throughput took [%f] seconds.", (end - start)) | ||
start = end | ||
for task, samples in aggregates.items(): | ||
for task, samples in aggregates.items(): # returns dict of task, and samples. | ||
meta_data = self.merge( | ||
self.workload_meta_data, | ||
self.test_procedure_meta_data, | ||
|
@@ -1190,6 +1233,7 @@ def __init__(self, start_timestamp, buffer_size=16384): | |
def add(self, task, client_id, sample_type, meta_data, absolute_time, request_start, latency, service_time, | ||
client_processing_time, processing_time, throughput, ops, ops_unit, time_period, percent_completed, | ||
dependent_timing=None): | ||
self.logger.debug("Logging with metadata: [%s]", meta_data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need this? |
||
try: | ||
self.q.put_nowait( | ||
Sample(client_id, absolute_time, request_start, self.start_timestamp, task, sample_type, meta_data, | ||
|
@@ -1369,7 +1413,7 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) | |
self.task_stats[task] = ThroughputCalculator.TaskStats(bucket_interval=bucket_interval_secs, | ||
sample_type=first_sample.sample_type, | ||
start_time=first_sample.absolute_time - first_sample.time_period) | ||
current = self.task_stats[task] | ||
current = self.task_stats[task] # TaskStats object | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this comment here and as part of this PR? |
||
count = current.total_count | ||
last_sample = None | ||
for sample in current_samples: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why my code formatter keeps on changing code outside of the selected lines. I'll revert this and the other spacing changes to out-of-scope code in the next revision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend reverting for this PR, just so its more clear what functional changes are being made.