diff --git a/platform_monitoring/kube_client.py b/platform_monitoring/kube_client.py index 7826efb9..eb0631ff 100644 --- a/platform_monitoring/kube_client.py +++ b/platform_monitoring/kube_client.py @@ -588,7 +588,10 @@ async def get_pod_container_gpu_stats( if e.status == 401 and not doing_retry: await self._recreate_http_client() return await self.get_pod_container_gpu_stats( - node_name, pod_name, container_name, doing_retry=True, + node_name, + pod_name, + container_name, + doing_retry=True, ) except aiohttp.ClientError as e: logger.exception(e) @@ -678,9 +681,7 @@ def _assert_resource_kind( elif kind != expected_kind: raise ValueError(f"unknown kind: {kind}") - def _raise_for_status( - self, payload: dict[str, Any], job_id: Optional[str] - ) -> None: + def _raise_for_status(self, payload: dict[str, Any], job_id: Optional[str]) -> None: if payload["code"] == 400: if "ContainerCreating" in payload["message"]: raise JobNotFoundException(f"job '{job_id}' was not created yet") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 259d7efa..b1660715 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -246,9 +246,7 @@ def _f(**kwargs: Any) -> Config: platform_api=platform_api_config, platform_config=platform_config, s3=s3_config, - logs=LogsConfig( - storage_type=LogsStorageType.S3, cleanup_interval_sec=0.5 - ), + logs=LogsConfig(storage_type=LogsStorageType.S3, cleanup_interval_sec=0.5), kube=kube_config, registry=registry_config, container_runtime=container_runtime_config, diff --git a/tests/integration/conftest_config.py b/tests/integration/conftest_config.py index 14e3e195..c4bf1376 100644 --- a/tests/integration/conftest_config.py +++ b/tests/integration/conftest_config.py @@ -52,11 +52,12 @@ async def _wait_for_platform_api_config( ) -> None: while True: import warnings + with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) try: async with create_platform_api_client( - platform_api_config.url, platform_api_config.token + platform_api_config.url, platform_api_config.token ): return except Exception: diff --git a/tests/integration/conftest_kube.py b/tests/integration/conftest_kube.py index eebeb33c..203d3f01 100644 --- a/tests/integration/conftest_kube.py +++ b/tests/integration/conftest_kube.py @@ -20,7 +20,6 @@ class MyKubeClient(KubeClient): - # TODO (A Yushkovskiy, 30-May-2019) delete pods automatically async def create_pod(self, job_pod_descriptor: dict[str, Any]) -> str: diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index 4bb93592..952cb3ee 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -1037,7 +1037,8 @@ async def test_attach_forbidden( async with client.ws_connect(url): pass except WSServerHandshakeError as e: - assert e.headers and e.headers.get("X-Error") + assert e.headers + assert e.headers.get("X-Error") assert e.message == "Invalid response status" assert e.status == HTTPUnauthorized.status_code @@ -1401,7 +1402,6 @@ async def test_kill( jobs_client: JobsClient, infinite_job: str, ) -> None: - headers = jobs_client.headers url = monitoring_api.generate_kill_url(infinite_job) diff --git a/tests/integration/test_jobs_service.py b/tests/integration/test_jobs_service.py index b9b60345..26a6a18a 100644 --- a/tests/integration/test_jobs_service.py +++ b/tests/integration/test_jobs_service.py @@ -317,7 +317,8 @@ async def test_save_commit_fails_with_exception( async def test_get_available_jobs_count(self, jobs_service: JobsService) -> None: result = await jobs_service.get_available_jobs_counts() - assert result and "cpu-small" in result + assert result + assert "cpu-small" in result async def test_mark_logs_dropped( self, diff --git a/tests/integration/test_kube.py b/tests/integration/test_kube.py index ff20b810..a850f6d2 100644 --- a/tests/integration/test_kube.py +++ b/tests/integration/test_kube.py @@ -4,11 +4,11 @@ import logging import re import uuid -from pathlib import Path from collections.abc import AsyncIterator, Callable, Coroutine from contextlib import AbstractAsyncContextManager from datetime import datetime, timedelta, timezone -from typing import Any, Union +from pathlib import Path +from typing import Any from unittest import mock from uuid import uuid4 @@ -23,8 +23,8 @@ from platform_monitoring.kube_client import ( JobNotFoundException, KubeClient, - KubeClientException, KubeClientAuthType, + KubeClientException, PodContainerStats, PodPhase, ) @@ -72,15 +72,16 @@ async def _stats_summary(request: web.Request) -> web.Response: async def _gpu_metrics(request: web.Request) -> web.Response: return web.Response(content_type="text/plain") - def _unauthorized_gpu_metrics( - ) -> Callable[[web.Request], Coroutine[Any, Any, web.Response]]: - + def _unauthorized_gpu_metrics() -> ( + Callable[[web.Request], Coroutine[Any, Any, web.Response]] + ): async def _inner(request: web.Request) -> web.Response: auth_header = request.headers.get("Authorization", "") if auth_header.split(" ")[1] == "authorized": return web.Response(content_type="text/plain") else: return web.Response(status=401) + return _inner def _create_app() -> web.Application: @@ -534,7 +535,7 @@ async def test_get_pods( [ ["authorized", True], ["badtoken", False], - ] + ], ) async def test_get_pod_container_gpu_stats_handles_unauthorized( self, @@ -1144,7 +1145,7 @@ async def stop_func() -> bool: def run_log_reader( name: str, delay: float = 0, timeout_s: float = 60.0 ) -> None: - async def coro() -> Union[bytes, Exception]: + async def coro() -> bytes | Exception: await asyncio.sleep(delay) try: async with timeout(timeout_s): @@ -1226,7 +1227,7 @@ async def stop_func() -> bool: def run_log_reader( name: str, delay: float = 0, timeout_s: float = 60.0 ) -> None: - async def coro() -> Union[bytes, Exception]: + async def coro() -> bytes | Exception: await asyncio.sleep(delay) try: async with timeout(timeout_s): @@ -1313,7 +1314,7 @@ async def stop_func() -> bool: def run_log_reader( name: str, delay: float = 0, timeout_s: float = 60.0 ) -> None: - async def coro() -> Union[bytes, Exception]: + async def coro() -> bytes | Exception: await asyncio.sleep(delay) try: async with timeout(timeout_s):