Skip to content

Commit

Permalink
feat(ingest): track thread count in ingestion report (datahub-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 4, 2024
1 parent 20b7dd3 commit 2268c0c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
21 changes: 20 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import platform
import shutil
import sys
import threading
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Iterator, List, Optional, cast
Expand Down Expand Up @@ -129,9 +130,18 @@ class CliReport(Report):
py_version: str = sys.version
py_exec_path: str = sys.executable
os_details: str = platform.platform()

mem_info: Optional[str] = None
peak_memory_usage: Optional[str] = None
_peak_memory_usage: int = 0

disk_info: Optional[dict] = None
peak_disk_usage: Optional[str] = None
_peak_disk_usage: int = 0

thread_count: Optional[int] = None
peak_thread_count: Optional[int] = None

def compute_stats(self) -> None:
try:
mem_usage = psutil.Process(os.getpid()).memory_info().rss
Expand All @@ -141,7 +151,10 @@ def compute_stats(self) -> None:
self._peak_memory_usage
)
self.mem_info = humanfriendly.format_size(mem_usage)
except Exception as e:
logger.warning(f"Failed to compute memory usage: {e}")

try:
disk_usage = shutil.disk_usage("/")
if self._peak_disk_usage < disk_usage.used:
self._peak_disk_usage = disk_usage.used
Expand All @@ -152,7 +165,13 @@ def compute_stats(self) -> None:
"free": humanfriendly.format_size(disk_usage.free),
}
except Exception as e:
logger.warning(f"Failed to compute report memory usage: {e}")
logger.warning(f"Failed to compute disk usage: {e}")

try:
self.thread_count = threading.active_count()
self.peak_thread_count = max(self.peak_thread_count or 0, self.thread_count)
except Exception as e:
logger.warning(f"Failed to compute thread count: {e}")

return super().compute_stats()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# 3) Entity timeseries stat by user

import concurrent
import concurrent.futures
import dataclasses
import datetime
import logging
Expand Down

0 comments on commit 2268c0c

Please sign in to comment.