Skip to content

Commit

Permalink
use list projects admin api endpoint for usage collection (#772)
Browse files Browse the repository at this point in the history
  • Loading branch information
zubenkoivan authored Aug 8, 2024
1 parent 07d6404 commit eaf8550
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 58 deletions.
3 changes: 1 addition & 2 deletions charts/platform-storage/templates/cron-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ spec:
volumes:
{{- include "platformStorage.volumes" . | nindent 12 }}
{{- end }}
restartPolicy: OnFailure
backoffLimit: {{ .Values.storageUsageCollector.backoffLimit }}
restartPolicy: Never
1 change: 0 additions & 1 deletion charts/platform-storage/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ storageUsageCollector:
startingDeadlineSeconds: 300
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
backoffLimit: 3

resources:
requests:
Expand Down
69 changes: 33 additions & 36 deletions platform_storage_api/storage_usage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from collections import defaultdict
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from datetime import timezone
Expand Down Expand Up @@ -36,7 +37,7 @@ class Project:


@dataclass(frozen=True)
class OrgProjectPath:
class ProjectPath:
project_name: str
path: PurePath
org_name: Optional[str] = None
Expand All @@ -58,8 +59,7 @@ def __init__(
self._path_resolver = path_resolver

async def get_storage_usage(self) -> StorageUsage:
org_project_paths = await self._get_org_project_paths()
LOGGER.debug("Collecting disk usage for: %s", org_project_paths)
org_project_paths = await self._get_project_paths()
file_usages = await self._fs.disk_usage_by_file(
*(p.path for p in org_project_paths)
)
Expand All @@ -75,53 +75,50 @@ async def get_storage_usage(self) -> StorageUsage:
],
)

async def _get_org_project_paths(self) -> list[OrgProjectPath]:
org_clusters = await self._admin_client.list_org_clusters(
self._config.platform.cluster_name
)
org_names = {org_cluster.org_name for org_cluster in org_clusters}
result = await self._get_no_org_project_paths(org_names)
for org_cluster in org_clusters:
org_path = await self._path_resolver.resolve_path(
PurePath(f"/{org_cluster.org_name}")
)
async def _get_project_paths(self) -> list[ProjectPath]:
projects_by_org = await self._get_projects_by_org()
result = []
for org_name, project_names in projects_by_org.items():
org_path = await self._resolve_org_path(org_name)
try:
async with self._fs.iterstatus(org_path) as statuses:
async for status in statuses:
if status.type != FileStatusType.DIRECTORY:
continue
project_name = status.path.name
if project_name not in project_names:
continue
LOGGER.debug(
"Collecting storage usage for org %s, project %s",
org_name or "NO_ORG",
project_name,
)
result.append(
OrgProjectPath(
org_name=org_cluster.org_name,
project_name=status.path.name,
path=org_path / status.path.name,
ProjectPath(
org_name=org_name,
project_name=project_name,
path=org_path / project_name,
)
)
except FileNotFoundError:
continue
return result

async def _get_no_org_project_paths(
self, org_names: set[str]
) -> list[OrgProjectPath]:
result = []
no_org_path = await self._path_resolver.resolve_base_path(
async def _get_projects_by_org(self) -> dict[str | None, set[str]]:
projects = await self._admin_client.list_projects(
self._config.platform.cluster_name
)
projects_by_org: dict[str | None, set[str]] = defaultdict(set)
for project in projects:
projects_by_org[project.org_name].add(project.name)
return projects_by_org

async def _resolve_org_path(self, org_name: str | None) -> PurePath:
if org_name:
return await self._path_resolver.resolve_path(PurePath(f"/{org_name}"))
return await self._path_resolver.resolve_base_path(
PurePath(f"/{self._config.platform.cluster_name}")
)
async with self._fs.iterstatus(no_org_path) as statuses:
async for status in statuses:
if (
status.type != FileStatusType.DIRECTORY
or status.path.name in org_names
):
continue
result.append(
OrgProjectPath(
project_name=status.path.name,
path=no_org_path / status.path.name,
)
)
return result

async def upload_storage_usage(self) -> None:
storage_usage = await self.get_storage_usage()
Expand Down
12 changes: 10 additions & 2 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,16 @@ async def test_metrics(
passthrough=["http://0.0.0.0", "http://127.0.0.1"]
) as aiohttp_mock:
aiohttp_mock.get(
f"http://platform-admin/apis/admin/v1/clusters/{cluster_name}/orgs",
payload=[],
f"http://platform-admin/apis/admin/v1/clusters/{cluster_name}/projects",
payload=[
{
"name": "test-project",
"org_name": None,
"cluster_name": cluster_name,
"default_role": "writer",
"is_default": False,
},
],
)

(local_tmp_dir_path / "test-project").mkdir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def aiohttp_mock() -> Iterator[aioresponses]:
yield mocked


class TestMetrics:
class TestStorageUsageService:
async def test_upload_storage_usage(
self,
aiohttp_mock: aioresponses,
Expand All @@ -27,8 +27,16 @@ async def test_upload_storage_usage(
local_tmp_dir_path: Path,
) -> None:
aiohttp_mock.get(
f"http://platform-admin/apis/admin/v1/clusters/{cluster_name}/orgs",
payload=[],
f"http://platform-admin/apis/admin/v1/clusters/{cluster_name}/projects",
payload=[
{
"name": "test-project",
"org_name": None,
"cluster_name": cluster_name,
"default_role": "writer",
"is_default": False,
},
],
)

(local_tmp_dir_path / "test-project").mkdir()
Expand All @@ -52,8 +60,16 @@ async def test_upload_storage_usage__multiple_times(
local_tmp_dir_path: Path,
) -> None:
aiohttp_mock.get(
f"http://platform-admin/apis/admin/v1/clusters/{cluster_name}/orgs",
payload=[],
f"http://platform-admin/apis/admin/v1/clusters/{cluster_name}/projects",
payload=[
{
"name": "test-project",
"org_name": None,
"cluster_name": cluster_name,
"default_role": "writer",
"is_default": False,
},
],
)

(local_tmp_dir_path / "test-project").mkdir()
Expand Down
12 changes: 10 additions & 2 deletions tests/integration/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ async def test_run(
) as aiohttp_mock:
aiohttp_mock.get(
"http://platform-admin/apis/admin/v1/clusters"
f"/{config.platform.cluster_name}/orgs",
payload=[],
f"/{config.platform.cluster_name}/projects",
payload=[
{
"name": "test-project",
"org_name": None,
"cluster_name": config.platform.cluster_name,
"default_role": "writer",
"is_default": False,
},
],
)

await run(config)
Expand Down
38 changes: 28 additions & 10 deletions tests/unit/test_storage_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,22 @@ async def test_disk_usage(
aiohttp_mock: aioresponses,
) -> None:
aiohttp_mock.get(
URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/orgs"),
URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/projects"),
payload=[
{
"name": "test-project-1",
"org_name": None,
"cluster_name": "test-cluster",
"default_role": "writer",
"is_default": False,
},
{
"name": "test-project-2",
"org_name": "test-org",
"default_role": "user",
"maintenance": False,
}
"cluster_name": "test-cluster",
"default_role": "writer",
"is_default": False,
},
],
)
(local_tmp_dir_path / "test-project-1").mkdir()
Expand All @@ -98,25 +107,34 @@ async def test_disk_usage__empty_storage(
self, storage_usage_service: StorageUsageService, aiohttp_mock: aioresponses
) -> None:
aiohttp_mock.get(
URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/orgs"),
URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/projects"),
payload=[
{
"name": "test-project-1",
"org_name": None,
"cluster_name": "test-cluster",
"default_role": "writer",
"is_default": False,
},
{
"name": "test-project-2",
"org_name": "test-org",
"default_role": "user",
"maintenance": False,
}
"cluster_name": "test-cluster",
"default_role": "writer",
"is_default": False,
},
],
)

storage_usage = await storage_usage_service.get_storage_usage()

assert storage_usage == StorageUsage(projects=[])

async def test_disk_usage__no_orgs(
async def test_disk_usage__no_projects(
self, storage_usage_service: StorageUsageService, aiohttp_mock: aioresponses
) -> None:
aiohttp_mock.get(
URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/orgs"),
URL("http://platform-admin/apis/admin/v1/clusters/test-cluster/projects"),
payload=[],
)

Expand Down

0 comments on commit eaf8550

Please sign in to comment.