Skip to content

Commit

Permalink
Modify telemetry data export based on internal discussion
Browse files Browse the repository at this point in the history
  • Loading branch information
lkomali committed Aug 21, 2024
1 parent ee6c629 commit 38c2114
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 190 deletions.
74 changes: 40 additions & 34 deletions genai-perf/genai_perf/export_data/console_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from typing import Dict

from genai_perf.export_data.exporter_config import ExporterConfig
from genai_perf.metrics import TelemetryMetrics
from rich.console import Console
from rich.table import Table
from rich.text import Text

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'Text' is not used.
Expand All @@ -40,32 +39,33 @@ class ConsoleExporter:
"""

STAT_COLUMN_KEYS = ["avg", "min", "max", "p99", "p90", "p75"]
CONSTANT_TELEMETRY_METRICS = {"gpu_power_limit", "total_gpu_memory"}
TELEMETRY_GROUPS = {
"Power": ["gpu_power_usage", "gpu_power_limit", "energy_consumption"],
"Memory": ["gpu_memory_used", "total_gpu_memory"],
"Utilization": ["gpu_utilization"],
}

def __init__(self, config: ExporterConfig):
self._stats = config.stats
self._telemetry_stats = config.telemetry_stats
self._metrics = config.metrics
self._args = config.args
self._is_telemetry_data = config.is_telemetry_data

def _get_title(self):
if self._args.endpoint_type == "embeddings":
return "Embeddings Metrics"
elif self._args.endpoint_type == "rankings":
return "Rankings Metrics"
elif isinstance(self._metrics, TelemetryMetrics):
return "Telemetry Metrics"
else:
return "LLM Metrics"

def export(self) -> None:
console = Console()
title = self._get_title()

if self._is_telemetry_data:
self._export_telemetry_metrics(console, title)
else:
self._export_llm_metrics(console, title)
if self._args.verbose:
self._export_telemetry_metrics(console)
self._export_llm_metrics(console, title)

def _export_llm_metrics(self, console: Console, title: str) -> None:
table = Table(title=title)
Expand Down Expand Up @@ -97,46 +97,52 @@ def _construct_llm_table(self, table: Table) -> None:

table.add_row(*row_values)

def _export_telemetry_metrics(self, console: Console, title: str) -> None:

# Iterate over all telemetry metrics and print them in separate tables
for metric_name, metric_data in self._stats.items():
unit = metric_data.get("unit", "N/A")
table_title = f"{metric_name.replace('_', ' ').title()} ({unit})"
table = Table(title=table_title)

if metric_name in self.CONSTANT_TELEMETRY_METRICS:
table.add_column("GPU Index", justify="left")
table.add_column("Value", justify="right", style="green")

for gpu_index in metric_data.keys():
if gpu_index != "unit":
value = metric_data.get(gpu_index, "N/A")
table.add_row(
gpu_index, f"{value:.2f}" if value != "N/A" else "N/A"
)
else:
table.add_column("GPU Index", justify="left")
def _export_telemetry_metrics(self, console: Console) -> None:
for group_name, metrics in self.TELEMETRY_GROUPS.items():
table = Table(title=f"{group_name} Metrics")

for metric_name in metrics:
metric_data = self._telemetry_stats.get(metric_name, {})

unit = metric_data.get("unit", "N/A")
metric_name_display = self._capitalize_abbreviation(
metric_name.replace("_", " ")
)
table_title = f"{metric_name_display}{f' ({unit})' if unit else ''}"
sub_table = Table(title=table_title)

sub_table.add_column("GPU Index", justify="left")
for stat in self.STAT_COLUMN_KEYS:
table.add_column(stat, justify="right", style="green")
sub_table.add_column(stat, justify="right", style="green")

self._construct_telemetry_table(table, metric_data)
self._construct_telemetry_table(sub_table, metric_data)
table.add_row(sub_table)

console.print(table)

def _construct_telemetry_table(
self, table: Table, metric_data: Dict[str, Dict[str, float]]
) -> None:
avg_metric = metric_data.get("avg", {})
gpu_indices = list(avg_metric.keys())
gpu_indices = [key for key in metric_data.keys() if key != "unit"]

for gpu_index in gpu_indices:
row = [f"{gpu_index}"]
for stat in self.STAT_COLUMN_KEYS:
value = metric_data.get(stat, {}).get(gpu_index, "N/A")
value = metric_data.get(gpu_index, {}).get(stat, "N/A")
row.append(f"{value:.2f}" if isinstance(value, (int, float)) else "N/A")
table.add_row(*row)

def _capitalize_abbreviation(self, text: str) -> str:
"""
Capitalizes abbreviations (e.g., GPU) while normalizing other text.
"""
words = text.split()
capitalized_words = [
word.upper() if word.lower() in ["gpu"] else word.capitalize()
for word in words
]
return " ".join(capitalized_words)

# (TMA-1976) Refactor this method as the csv exporter shares identical method.
def _should_skip(self, metric_name: str) -> bool:
if self._args.endpoint_type == "embeddings":
Expand Down
69 changes: 33 additions & 36 deletions genai-perf/genai_perf/export_data/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,18 @@ class CsvExporter:
"Value",
]

TELEMETRY_CONSTANT_METRICS = ["gpu_power_limit", "total_gpu_memory"]

def __init__(self, config: ExporterConfig):
self._stats = config.stats
self._telemetry_stats = config.telemetry_stats
self._metrics = config.metrics
self._output_dir = config.artifact_dir
self._args = config.args
self._is_telemetry_data = config.is_telemetry_data

def export(self) -> None:
if self._is_telemetry_data:
self._export_telemetry_metrics()
else:
self._export_llm_metrics()

def _export_llm_metrics(self) -> None:
filename = (
self._output_dir
/ f"{self._args.profile_export_file.stem}_llm_genai_perf.csv"
self._output_dir / f"{self._args.profile_export_file.stem}_genai_perf.csv"
)
logger.info(f"Generating {filename}")

Expand All @@ -101,18 +96,10 @@ def _export_llm_metrics(self) -> None:
self._write_request_metrics(writer)
writer.writerow([])
self._write_system_metrics(writer)

def _export_telemetry_metrics(self) -> None:
telemetry_filename = (
self._output_dir / f"{self._args.profile_export_file.stem}_telemetry_genai_perf.csv"
)
logger.info(f"Generating {telemetry_filename}")

with open(telemetry_filename, mode="w", newline="") as f:
writer = csv.writer(f)
self._write_telemetry_metrics(writer)
writer.writerow([])
self._write_constant_metrics(writer)
self._write_telemetry_aggregated_metrics(writer)
writer.writerow([])
self._write_telemetry_constant_metrics(writer)

def _write_request_metrics(self, csv_writer) -> None:
csv_writer.writerow(self.REQUEST_METRICS_HEADER)
Expand All @@ -137,43 +124,53 @@ def _write_system_metrics(self, csv_writer) -> None:
value = self._stats[metric.name]["avg"]
csv_writer.writerow([metric_str, f"{value:.2f}"])

def _write_telemetry_metrics(self, csv_writer) -> None:
def _write_telemetry_aggregated_metrics(self, csv_writer) -> None:
csv_writer.writerow(self.TELEMETRY_AGGREGATED_METRICS_HEADER)

for metric_name, metric_data in self._stats.items():
# Skip constant metrics
if metric_name in ["gpu_power_limit", "total_gpu_memory"]:
for metric_name, metric_data in self._telemetry_stats.items():
if metric_name in self.TELEMETRY_CONSTANT_METRICS:
continue

metric_str = metric_name.replace("_", " ").title()
metric_str = self._capitalize_abbreviation(metric_name.replace("_", " "))
metric_str += f" ({metric_data['unit']})"

# Iterate through each GPU
for gpu in metric_data["avg"].keys():
row_values = [metric_str, gpu]
for key, gpu_data in metric_data.items():
if key == "unit":
continue

row_values = [metric_str, key]

for stat in self.TELEMETRY_AGGREGATED_METRICS_HEADER[2:]:
value = metric_data[stat][gpu]
value = gpu_data.get(stat, 0.0)
row_values.append(f"{value:,.2f}")

csv_writer.writerow(row_values)

def _write_constant_metrics(self, csv_writer) -> None:
def _write_telemetry_constant_metrics(self, csv_writer) -> None:
csv_writer.writerow(self.TELEMETRY_CONSTANT_METRICS_HEADER)

for metric_name, metric_data in self._stats.items():
# Skip non-constant metrics
if metric_name not in ["gpu_power_limit", "total_gpu_memory"]:
for metric_name, metric_data in self._telemetry_stats.items():
if metric_name not in self.TELEMETRY_CONSTANT_METRICS:
continue

metric_str = metric_name.replace("_", " ").title()
metric_str = self._capitalize_abbreviation(metric_name.replace("_", " "))
metric_str += f" ({metric_data['unit']})"
for gpu in metric_data.keys():
if gpu == "unit":
continue
value = metric_data[gpu]
value = metric_data[gpu]["avg"]
csv_writer.writerow([metric_str, gpu, f"{value:.2f}"])

def _capitalize_abbreviation(self, text: str) -> str:
"""
Capitalizes abbreviations (e.g., GPU) while normalizing other text.
"""
words = text.split()
capitalized_words = [
word.upper() if word.lower() in ["gpu"] else word.capitalize()
for word in words
]
return " ".join(capitalized_words)

def _format_value(self, value) -> str:
if isinstance(value, (int, float)):
return f"{value:,.2f}" # Format with commas and two decimal places
Expand Down
18 changes: 9 additions & 9 deletions genai-perf/genai_perf/export_data/exporter_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
class ExporterConfig:
def __init__(self):
self._stats = None
self._telemetry_stats = None
self._metrics = None
self._args = None
self._extra_inputs = None
self._artifact_dir = None
self._is_telemetry_data = False

@property
def stats(self):
Expand All @@ -45,6 +45,14 @@ def stats(self):
def stats(self, stats_value):
self._stats = stats_value

@property
def telemetry_stats(self):
return self._telemetry_stats

@telemetry_stats.setter
def telemetry_stats(self, telemetry_stats):
self._telemetry_stats = telemetry_stats

@property
def metrics(self):
return self._metrics
Expand Down Expand Up @@ -76,11 +84,3 @@ def artifact_dir(self):
@artifact_dir.setter
def artifact_dir(self, artifact_dir_value):
self._artifact_dir = artifact_dir_value

@property
def is_telemetry_data(self):
return self._is_telemetry_data

@is_telemetry_data.setter
def is_telemetry_data(self, is_telemetry_data):
self._is_telemetry_data = is_telemetry_data
35 changes: 12 additions & 23 deletions genai-perf/genai_perf/export_data/json_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,22 @@ class JsonExporter:

def __init__(self, config: ExporterConfig):
self._stats: Dict = config.stats
self._telemetry_stats = config.telemetry_stats
self._args = dict(vars(config.args))
self._output_dir = config.artifact_dir
self._is_telemetry_data = config.is_telemetry_data
if not self._is_telemetry_data:
self._extra_inputs = config.extra_inputs
self._stats_and_args: Dict = {}
self._prepare_args_for_export()
self._merge_stats_and_args()
self._extra_inputs = config.extra_inputs
self._stats_and_args: Dict = {}
self._prepare_args_for_export()
self._merge_stats_and_args()

def export(self) -> None:
prefix = os.path.splitext(os.path.basename(self._args["profile_export_file"]))[
0
]

if self._is_telemetry_data:
filename = self._output_dir / f"{prefix}_genai_perf_telemetry_data.json"
data_to_export = self._stats
else:
filename = self._output_dir / f"{prefix}_genai_perf.json"
data_to_export = self._stats_and_args
self._write_to_json(filename, data_to_export)
filename = self._output_dir / f"{prefix}_genai_perf.json"
logger.info(f"Generating {filename}")
with open(str(filename), "w") as f:
f.write(json.dumps(self._stats_and_args, indent=2))

def _prepare_args_for_export(self) -> None:
self._args.pop("func", None)
Expand All @@ -81,13 +76,7 @@ def _add_extra_inputs_to_args(self) -> None:
self._args.update({"extra_inputs": self._extra_inputs})

def _merge_stats_and_args(self) -> None:
self._stats_and_args = dict(self._stats)
self._stats_and_args = dict()
self._stats_and_args.update({"llm_metrics": self._stats})
self._stats_and_args.update({"telemetry_metrics": self._telemetry_stats})
self._stats_and_args.update({"input_config": self._args})

def _write_to_json(self, filename: str, data: Dict) -> None:
"""
Write metrics data to a JSON file.
"""
logger.info(f"Generating {filename}")
with open(str(filename), "w") as f:
json.dump(data, f, indent=2)
21 changes: 11 additions & 10 deletions genai-perf/genai_perf/export_data/output_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from genai_perf.export_data.data_exporter_factory import DataExporterFactory
from genai_perf.export_data.exporter_config import ExporterConfig
from genai_perf.metrics import Statistics
from genai_perf.metrics import Statistics, TelemetryMetricsStatistics
from genai_perf.parser import get_extra_inputs_as_dict


Expand All @@ -38,12 +38,16 @@ class OutputReporter:
A class to orchestrate output generation.
"""

def __init__(self, stats, args: Namespace, is_telemetry_data: bool):
def __init__(
self,
stats: Statistics,
telemetry_stats: TelemetryMetricsStatistics,
args: Namespace,
):
self.args = args
self.stats = stats
self.is_telemetry_data = is_telemetry_data
if not self.is_telemetry_data:
self.stats.scale_data()
self.telemetry_stats = telemetry_stats
self.stats.scale_data()

def report_output(self) -> None:
factory = DataExporterFactory()
Expand All @@ -56,13 +60,10 @@ def report_output(self) -> None:
def _create_exporter_config(self) -> ExporterConfig:
config = ExporterConfig()
config.stats = self.stats.stats_dict
config.telemetry_stats = self.telemetry_stats.stats_dict
config.metrics = self.stats.metrics
config.args = self.args
config.artifact_dir = self.args.artifact_dir
config.is_telemetry_data = self.is_telemetry_data

# Only set extra_inputs if dealing with LLM metrics (Statistics)
if isinstance(self.stats, Statistics):
config.extra_inputs = get_extra_inputs_as_dict(self.args)
config.extra_inputs = get_extra_inputs_as_dict(self.args)

return config
Loading

0 comments on commit 38c2114

Please sign in to comment.