Skip to content

Commit

Permalink
Release v0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
dormant-user committed Dec 31, 2024
1 parent 854b282 commit 195e580
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 43 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# PyUdisk (Linux)

PyUdisk is a wrapper around the `udisksctl` command-line tool to generate a comprehensive list of all block devices and drives on a Linux machines.
# PyUdisk

PyUdisk is a python module to generate a S.M.A.R.T metrics for all drives/partitions on macOS and Linux machines.

### Installation

Expand Down Expand Up @@ -46,7 +45,7 @@ pyudisk start
> _By default, `PyUdisk` will look for a `.env` file in the current working directory._
</details>
- **UDISK_LIB**: Path to the `udisksctl` command-line tool. Default: `/usr/bin/udisksctl`
- **SMART_LIB**: Path to the `udisksctl` command-line tool. Default: `/usr/bin/udisksctl`
- **METRICS**: List of metrics to monitor. Default: `[]`
- **GMAIL_USER**: Gmail username to authenticate SMTP library.
- **GMAIL_PASS**: Gmail password to authenticate SMTP library.
Expand Down
2 changes: 1 addition & 1 deletion pyudisk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .main import EnvConfig, generate_report, monitor, smart_metrics # noqa: F401

version = "0.2.0a1"
version = "0.2.0"


@click.command()
Expand Down
76 changes: 38 additions & 38 deletions pyudisk/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from .support import humanize_usage_metrics, load_dump, load_partitions
from .util import standard

# todo: Extend usage for Darwin OS to monitor


def get_disk(env: EnvConfig) -> Generator[sdiskpart]:
"""Gathers disk information using the 'psutil' library.
Expand Down Expand Up @@ -287,42 +285,6 @@ def smart_metrics(env: EnvConfig) -> Generator[linux.Disk | darwin.Disk]:
yield linux.Disk(id=drive, model=data.get("Info", {}).get("Model", ""), **data)


def monitor_disk(env: EnvConfig) -> Generator[linux.Disk]:
"""Monitors disk attributes based on the configuration.
Args:
env: Environment variables configuration.
Yields:
Disk:
Data structure parsed as a Disk object.
"""
message = ""
for disk in smart_metrics(env):
if disk.Attributes:
for metric in env.metrics:
attribute = disk.Attributes.model_dump().get(metric.attribute)
if metric.max_threshold and attribute >= metric.max_threshold:
msg = f"{metric.attribute!r} for {disk.id!r} is >= {metric.max_threshold} at {attribute}"
LOGGER.critical(msg)
message += msg + "\n"
if metric.min_threshold and attribute <= metric.min_threshold:
msg = f"{metric.attribute!r} for {disk.id!r} is <= {metric.min_threshold} at {attribute}"
LOGGER.critical(msg)
message += msg + "\n"
if metric.equal_match and attribute != metric.equal_match:
msg = f"{metric.attribute!r} for {disk.id!r} IS NOT {metric.equal_match} at {attribute}"
LOGGER.critical(msg)
message += msg + "\n"
else:
LOGGER.warning("No attributes were loaded for %s", disk.model)
yield disk
if message:
notification_service(
title="Disk Monitor Alert!!", message=message, env_config=env
)


def generate_html(
data: List[Dict[str, str | int | float | bool]], filepath: NewPath = None
) -> str | NoReturn:
Expand Down Expand Up @@ -388,12 +350,50 @@ def generate_report(**kwargs) -> str:
return report_file


def monitor_disk(env: EnvConfig) -> Generator[linux.Disk]:
"""Monitors disk attributes based on the configuration.
Args:
env: Environment variables configuration.
Yields:
Disk:
Data structure parsed as a Disk object.
"""
assert OPERATING_SYSTEM == OperationSystem.linux, "Monitoring feature is available only for Linux machines!!"
message = ""
for disk in smart_metrics(env):
if disk.Attributes:
for metric in env.metrics:
attribute = disk.Attributes.model_dump().get(metric.attribute)
if metric.max_threshold and attribute >= metric.max_threshold:
msg = f"{metric.attribute!r} for {disk.id!r} is >= {metric.max_threshold} at {attribute}"
LOGGER.critical(msg)
message += msg + "\n"
if metric.min_threshold and attribute <= metric.min_threshold:
msg = f"{metric.attribute!r} for {disk.id!r} is <= {metric.min_threshold} at {attribute}"
LOGGER.critical(msg)
message += msg + "\n"
if metric.equal_match and attribute != metric.equal_match:
msg = f"{metric.attribute!r} for {disk.id!r} IS NOT {metric.equal_match} at {attribute}"
LOGGER.critical(msg)
message += msg + "\n"
else:
LOGGER.warning("No attributes were loaded for %s", disk.model)
yield disk
if message:
notification_service(
title="Disk Monitor Alert!!", message=message, env_config=env
)


def monitor(**kwargs) -> None:
"""Entrypoint for the disk monitoring service.
Args:
**kwargs: Arbitrary keyword arguments.
"""
assert OPERATING_SYSTEM == OperationSystem.linux, "Monitoring feature is available only for Linux machines!!"
env = EnvConfig(**kwargs)
disk_report = [disk.model_dump() for disk in monitor_disk(env)]
if disk_report:
Expand Down

0 comments on commit 195e580

Please sign in to comment.