Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful handling of exception thrown by genai-perf if metrics url is unreachable #47

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions genai-perf/genai_perf/telemetry_data/telemetry_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,30 @@ def __init__(
self._stop_event = Event()
self._thread: Optional[Thread] = None

def is_url_reachable(self) -> bool:
timeout_seconds = 5
if self._server_metrics_url:
try:
response = requests.get(
self._server_metrics_url, timeout=timeout_seconds
)
return response.status_code == requests.codes.ok
except requests.RequestException:
return False
return True

def start(self) -> None:
"""Start the telemetry data collection thread."""
if self._thread is None or not self._thread.is_alive():
self._stop_event.clear()
self._thread = Thread(target=self._collect_metrics)
self._thread.start()

def stop(self) -> None:
"""Stop the telemetry data collection thread."""
if self._thread is not None and self._thread.is_alive():
self._stop_event.set()
self._thread.join()

def _fetch_metrics(self) -> str:
"""Fetch metrics from the metrics endpoint"""
response = requests.get(self._server_metrics_url)
response.raise_for_status()
return response.text
Expand All @@ -71,13 +80,16 @@ def _process_and_update_metrics(self, metrics_data: str) -> None:
pass

def _collect_metrics(self) -> None:
"""Continuously collect telemetry metrics at for every second"""
"""Continuously collect telemetry metrics for every second"""
while not self._stop_event.is_set():
metrics_data = self._fetch_metrics()
self._process_and_update_metrics(metrics_data)
time.sleep(self._collection_interval)

@property
def metrics(self) -> TelemetryMetrics:
"""Return the collected metrics."""
return self._metrics

@property
def metrics_url(self) -> str:
return self._server_metrics_url
8 changes: 7 additions & 1 deletion genai-perf/genai_perf/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,13 @@ def run(
) -> None:
try:
if telemetry_data_collector is not None:
telemetry_data_collector.start()
if telemetry_data_collector.is_url_reachable():
telemetry_data_collector.start()
else:
logger.warning(
f"The metrics URL ({telemetry_data_collector.metrics_url}) is unreachable. "
"GenAI-Perf cannot collect telemetry data."
)
cmd = Profiler.build_cmd(args, extra_args)
logger.info(f"Running Perf Analyzer : '{' '.join(cmd)}'")
if args and args.verbose:
Expand Down
50 changes: 41 additions & 9 deletions genai-perf/tests/test_telemetry_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TestTelemetryDataCollector:

TEST_SERVER_URL = "http://testserver:8080/metrics"

triton_metrics_response = """\
TRITON_METRICS_RESPONSE = """\
nv_gpu_power_usage{gpu="0",uuid="GPU-1234"} 123.45
nv_gpu_power_usage{gpu="1",uuid="GPU-5678"} 234.56
nv_gpu_utilization{gpu="0",uuid="GPU-1234"} 76.3
Expand Down Expand Up @@ -98,14 +98,14 @@ def test_fetch_metrics_success(

mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = self.triton_metrics_response
mock_response.text = self.TRITON_METRICS_RESPONSE
mock_requests_get.return_value = mock_response

result = collector._fetch_metrics()

mock_requests_get.assert_called_once_with(self.TEST_SERVER_URL)

assert result == self.triton_metrics_response
assert result == self.TRITON_METRICS_RESPONSE

@patch("requests.get")
def test_fetch_metrics_failure(
Expand All @@ -125,21 +125,53 @@ def test_collect_metrics(
collector: MockTelemetryDataCollector,
) -> None:

mock_fetch_metrics.return_value = self.triton_metrics_response
mock_fetch_metrics.return_value = self.TRITON_METRICS_RESPONSE

with patch.object(
collector, "_process_and_update_metrics", new_callable=MagicMock
) as mock_process_and_update_metrics:
# Mock _stop_event.is_set

collector._stop_event = MagicMock()
collector._stop_event.is_set = MagicMock(
side_effect=[False, True]
) # Ensure loop exits immediately
collector._stop_event.is_set = MagicMock(side_effect=[False, True])

collector._collect_metrics()

mock_fetch_metrics.assert_called_once()
mock_process_and_update_metrics.assert_called_once_with(
self.triton_metrics_response
self.TRITON_METRICS_RESPONSE
)
mock_sleep.assert_called_once()

@patch("requests.get")
def test_url_reachability_check_success(
self,
mock_get: MagicMock,
collector: MockTelemetryDataCollector,
) -> None:
mock_get.return_value.status_code = requests.codes.ok
assert collector.is_url_reachable() is True

@patch("requests.get")
def test_url_reachability_check_failure(
self, mock_get: MagicMock, collector: MockTelemetryDataCollector
) -> None:
mock_get.return_value.status_code = requests.codes.not_found
assert collector.is_url_reachable() is False

mock_get.return_value.status_code = requests.codes.server_error
assert collector.is_url_reachable() is False

mock_get.return_value.status_code = requests.codes.forbidden
assert collector.is_url_reachable() is False

mock_get.side_effect = requests.exceptions.Timeout
assert collector.is_url_reachable() is False

mock_get.side_effect = requests.exceptions.ConnectionError
assert collector.is_url_reachable() is False

mock_get.side_effect = requests.exceptions.TooManyRedirects
assert collector.is_url_reachable() is False

mock_get.side_effect = requests.exceptions.RequestException
assert collector.is_url_reachable() is False
Loading