Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duration/Memory Convenience Click Types #448

Merged
merged 7 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions flepimop/gempyor_pkg/src/gempyor/_click.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
__all__ = []


from datetime import timedelta
from math import ceil
import re
from typing import Any, Literal

import click


class DurationParamType(click.ParamType):
"""
A custom Click parameter type for parsing duration strings into `timedelta` objects.

Attributes:
name: The name of the parameter type.

Examples:
>>> from gempyor._click import DurationParamType
>>> DurationParamType(False, "minutes").convert("23min", None, None)
datetime.timedelta(seconds=1380)
>>> DurationParamType(False, None).convert("2.5hr", None, None)
datetime.timedelta(seconds=9000)
>>> DurationParamType(False, "minutes").convert("-2", None, None)
datetime.timedelta(days=-1, seconds=86280)
"""

name = "duration"
_abbreviations = {
"s": "seconds",
"sec": "seconds",
"secs": "seconds",
"second": "seconds",
"seconds": "seconds",
"m": "minutes",
"min": "minutes",
"mins": "minutes",
"minute": "minutes",
"minutes": "minutes",
"h": "hours",
"hr": "hours",
"hrs": "hours",
"hour": "hours",
"hours": "hours",
"d": "days",
"day": "days",
"days": "days",
"w": "weeks",
"week": "weeks",
"weeks": "weeks",
}

def __init__(
self,
nonnegative: bool,
default_unit: Literal["seconds", "minutes", "hours", "days", "weeks"] | None,
) -> None:
"""
Initialize the instance based on parameter settings.

Args:
nonnegative: If `True` negative durations are not allowed.
default_unit: The default unit to use if no unit is specified in the input
string. If `None` a unitless duration is not allowed.

Notes:
It's on the user of this param type to document in their CLI help text what
the default unit is if they set it to a non-`None` value.
"""
super().__init__()
self._nonnegative = nonnegative
self._duration_regex = re.compile(
rf"^((-)?([0-9]+)?(\.[0-9]+)?)({'|'.join(self._abbreviations.keys())})?$",
flags=re.IGNORECASE,
)
self._default_unit = default_unit

def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> timedelta:
"""
Converts a string representation of a duration into a `timedelta` object.

Args:
value: The value to convert, expected to be a string like representation of
a duration. Allowed durations are limited to seconds, minutes, hours,
days, and weeks.
param: The Click parameter object for context in errors.
ctx: The Click context object for context in errors.

Returns:
The converted duration as a `timedelta` object.

Raises:
click.BadParameter: If the value is not a valid duration based on the
format.
click.BadParameter: If the duration is negative and the class was
initialized with `nonnegative` set to `True`.
click.BadParameter: If the duration is unitless and the class was
initialized with `default_unit` set to `None`.
"""
value = str(value).strip()
if (m := self._duration_regex.match(value)) is None:
self.fail(f"{value!r} is not a valid duration", param, ctx)
number, posneg, _, _, unit = m.groups()
if self._nonnegative and posneg == "-":
self.fail(f"{value!r} is a negative duration", param, ctx)
if unit is None:
if self._default_unit is None:
self.fail(f"{value!r} is a unitless duration", param, ctx)
unit = self._default_unit
kwargs = {}
kwargs[self._abbreviations.get(unit.lower())] = float(number)
return timedelta(**kwargs)


class MemoryParamType(click.ParamType):
"""
A custom Click parameter type for parsing memory strings.

Attributes:
name: The name of the parameter type.

Examples:
>>> from gempyor._click import MemoryParamType
>>> MemoryParamType(False, "mb", False).convert("12.34MB", None, None)
12.34
>>> MemoryParamType(True, "mb", True).convert("78.9", None, None)
79
>>> MemoryParamType(False, "gb", False).convert("123kb", None, None)
0.00011730194091796875
"""

name = "memory"
_units = {
"kb": 1024.0**1.0,
"k": 1024.0**1.0,
"mb": 1024.0**2.0,
"m": 1024.0**2.0,
"gb": 1024.0**3.0,
"g": 1024.0**3.0,
"t": 1024.0**4.0,
"tb": 1024.0**4.0,
}

def __init__(self, as_int: bool, unit: str, allow_unitless: bool) -> None:
"""
Initialize the instance based on parameter settings.

Args:
as_int: if `True` the `convert` method returns an integer instead of a
float.
unit: The output unit to use in the `convert` method.

Raises:
ValueError: If `unit` is not a valid memory unit size.
"""
super().__init__()
if (unit := unit.lower()) not in self._units.keys():
raise ValueError(
f"The `unit` given is not valid, given '{unit}' and "
f"must be one of: {', '.join(self._units.keys())}."
)
self._unit = unit
self._regex = re.compile(
rf"^(([0-9]+)?(\.[0-9]+)?)({'|'.join(self._units.keys())})?$",
flags=re.IGNORECASE,
)
self._as_int = as_int
self._allow_unitless = allow_unitless

def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> float | int:
"""
Converts a string representation of a memory size into a numeric.

Args:
value: The value to convert, expected to be a string like representation of
memory size.
param: The Click parameter object for context in errors.
ctx: The Click context object for context in errors.

Returns:
The converted memory size as a numeric. Specifically an integer if the
`as_int` attribute is `True` and float otherwise.

Raises:
click.BadParameter: If the value is not a valid memory size based on the
format.
click.BadParameter: If the memory size is unitless and the class was
initialized with `allow_unitless` set to `False`.
"""
value = str(value).strip()
if (m := self._regex.match(value)) is None:
self.fail(f"{value!r} is not a valid memory size.", param, ctx)
number, _, _, unit = m.groups()
if unit is None:
if not self._allow_unitless:
self.fail(f"{value!r} is a unitless memory size.", param, ctx)
unit = self._unit
else:
unit = unit.lower()
if unit == self._unit:
result = float(number)
else:
result = (self._units.get(unit, self._unit) * float(number)) / (
self._units.get(self._unit)
)
return ceil(result) if self._as_int else result
11 changes: 6 additions & 5 deletions flepimop/gempyor_pkg/src/gempyor/shared_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
supported options for config file overrides, and custom click decorators.
"""

__all__ = []


import multiprocessing
import pathlib

from typing import Any, Callable, Literal
import warnings
from typing import Callable, Any
import re

import click
import confuse

from .utils import config, as_list

__all__ = []
from .utils import as_list, config


@click.group()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import timedelta
from typing import Literal

from click.exceptions import BadParameter
import pytest

from gempyor._click import DurationParamType


@pytest.mark.parametrize("nonnegative", (True, False))
@pytest.mark.parametrize("value", ("abc", "$12.34", "12..3", "12years", "12.a2"))
def test_invalid_duration_bad_parameter(nonnegative: bool, value: str) -> None:
duration = DurationParamType(nonnegative=nonnegative, default_unit="seconds")
with pytest.raises(BadParameter, match="^'.*' is not a valid duration$"):
duration.convert(value, None, None)


@pytest.mark.parametrize("value", ("-1", "-123", "-99.45", "-.9"))
def test_negative_duration_bad_parameter(value: str) -> None:
duration = DurationParamType(nonnegative=True, default_unit="seconds")
with pytest.raises(BadParameter, match="^'.*' is a negative duration$"):
duration.convert(value, None, None)


@pytest.mark.parametrize("value", ("1", "-123", "99.45", "-.9"))
def test_unitless_duration_bad_paramter(value: str) -> None:
duration = DurationParamType(nonnegative=False, default_unit=None)
with pytest.raises(BadParameter, match="^'.*' is a unitless duration$"):
duration.convert(value, None, None)


@pytest.mark.parametrize(
("value", "default_unit", "expected"),
(
("1", "minutes", timedelta(minutes=1)),
("1", "days", timedelta(days=1)),
("2s", None, timedelta(seconds=2)),
("3hrs", None, timedelta(hours=3)),
("-4min", None, timedelta(minutes=-4)),
("-5d", None, timedelta(days=-5)),
("12.3", "seconds", timedelta(seconds=12.3)),
("12.3", "hours", timedelta(hours=12.3)),
("12.3", "weeks", timedelta(weeks=12.3)),
("-45.6h", None, timedelta(hours=-45.6)),
("-.1w", None, timedelta(weeks=-0.1)),
("0.0Weeks", "days", timedelta(weeks=0)),
),
)
def test_exact_results_for_select_inputs(
value: str,
default_unit: Literal["seconds", "minutes", "hours", "days", "weeks"] | None,
expected: timedelta,
) -> None:
duration = DurationParamType(nonnegative=False, default_unit=default_unit)
assert duration.convert(value, None, None) == expected
assert duration.convert(value.upper(), None, None) == expected
assert duration.convert(value.lower(), None, None) == expected
76 changes: 76 additions & 0 deletions flepimop/gempyor_pkg/tests/_click/test_memory_param_type_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import random

from click.exceptions import BadParameter
import pytest

from gempyor._click import MemoryParamType


@pytest.mark.parametrize("unit", ("Nope", "NO CHANCE", "wrong", "bb"))
def test_invalid_unit_value_error(unit: str) -> None:
with pytest.raises(
ValueError,
match=(
"^The `unit` given is not valid, given "
f"'{unit.lower()}' and must be one of:.*.$"
),
):
MemoryParamType(False, unit, True)


@pytest.mark.parametrize("value", ("1..2MB", "3.4cb", "56.abc", "-1GB"))
def test_invalid_value_bad_parameter(value: str) -> None:
memory = MemoryParamType(False, "mb", True)
with pytest.raises(BadParameter, match="^.* is not a valid memory size.$"):
memory.convert(value, None, None)


@pytest.mark.parametrize("value", ("1", "123", "99.45", ".9"))
def test_unitless_value_bad_parameter(value: str) -> None:
memory = MemoryParamType(False, "mb", False)
with pytest.raises(BadParameter, match="^'.*' is a unitless memory size.$"):
memory.convert(value, None, None)


@pytest.mark.parametrize("unit", MemoryParamType._units.keys())
@pytest.mark.parametrize("as_int", (True, False))
@pytest.mark.parametrize(
"number",
[random.randint(1, 1000) for _ in range(3)] # int
+ [random.random() for _ in range(3)] # float without numbers left of decimal
+ [
random.randint(1, 25) + random.random() for _ in range(3)
], # float with numbers left of the decimal
)
def test_convert_acts_as_identity(unit: str, as_int: bool, number: int | float) -> None:
memory = MemoryParamType(as_int, unit, True)
for u in (unit, unit.upper()):
result = memory.convert(f"{number}{u}".lstrip("0"), None, None)
assert isinstance(result, int if as_int else float)
assert abs(result - number) <= 1 if as_int else result == number


@pytest.mark.parametrize(
("as_int", "unit", "allow_unitless", "value", "expected"),
(
(False, "gb", False, "1.2gb", 1.2),
(True, "gb", False, "1.2gb", 2),
(False, "kb", False, "1mb", 1024.0),
(True, "kb", False, "1mb", 1024),
(False, "gb", False, "30mb", 30.0 / 1024.0),
(True, "gb", False, "30mb", 1),
(False, "kb", False, "2tb", 2.0 * (1024.0**3.0)),
(True, "kb", False, "2tb", 2147483648),
(False, "mb", False, "0.1gb", 0.1 * 1024.0),
(True, "mb", False, "0.1gb", 103),
(False, "gb", True, "4", 4.0),
(True, "gb", True, "4", 4),
(False, "mb", True, "1234.56", 1234.56),
(True, "mb", True, "1234.56", 1235),
),
)
def test_exact_results_for_select_inputs(
unit: str, as_int: bool, allow_unitless: bool, value: str, expected: float | int
) -> None:
memory = MemoryParamType(as_int, unit, allow_unitless)
assert memory.convert(value, None, None) == expected
Loading