Skip to content

Commit

Permalink
Fix an issue where the SDK fails with a TimeseriesServiceError if q…
Browse files Browse the repository at this point in the history
…uerying time series data for more than 100 variables (#40)

closes #7

This PR fixes an issue where the user was allowed to request timeseries
data for any number of variables but it'd fail because the `timeseries`
API accepts a maximum of `100` variables per request. We now chunk the
variables requested into chunks of size `<=100` and issue a request for
each chunk and then merge the data returned in all of the responses.

This PR also refactors the logic to fetch timeseries data so that it has
less repetition. Previously, there was a lot of overlap in the code in
the `get_timeseries` and `get_timeseries_with_resampling` methods.
  • Loading branch information
cipherself authored Mar 4, 2024
1 parent fcce1e7 commit dd84243
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 72 deletions.
22 changes: 22 additions & 0 deletions src/enlyze/api_clients/timeseries/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ def extend(self, other: "TimeseriesData") -> None:
"""Add records from ``other`` after the existing records."""
self.records.extend(other.records)

def merge(self, other: "TimeseriesData") -> "TimeseriesData":
"""Merge records from ``other`` into the existing records."""
if not (slen := len(self.records)) == (olen := len(other.records)):
raise ValueError(
"Cannot merge. Number of records in both instances has to be the same,"
f" trying to merge an instance with {olen} records into an instance"
f" with {slen} records."
)

self.columns.extend(other.columns[1:])

for s, o in zip(self.records, other.records):
if s[0] != o[0]:
raise ValueError(
"Cannot merge. Attempted to merge records "
f"with mismatched timestamps {s[0]}, {o[0]}"
)

s.extend(o[1:])

return self

def to_user_model(
self,
start: datetime,
Expand Down
198 changes: 131 additions & 67 deletions src/enlyze/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import abc
from datetime import datetime
from functools import cache
from typing import Iterator, Mapping, Optional, Sequence
from functools import cache, reduce
from typing import Any, Iterator, Mapping, Optional, Sequence, Tuple, Union
from uuid import UUID

import enlyze.api_clients.timeseries.models as timeseries_api_models
Expand All @@ -10,9 +11,11 @@
from enlyze.api_clients.timeseries.client import TimeseriesApiClient
from enlyze.constants import (
ENLYZE_BASE_URL,
MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST,
VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR,
)
from enlyze.errors import EnlyzeError
from enlyze.errors import EnlyzeError, ResamplingValidationError
from enlyze.iterable_tools import chunk
from enlyze.validators import (
validate_datetime,
validate_resampling_interval,
Expand All @@ -21,6 +24,8 @@
validate_timeseries_arguments,
)

FETCHING_TIMESERIES_DATA_ERROR_MSG = "Error occurred when fetching timeseries data."


def _get_timeseries_data_from_pages(
pages: Iterator[timeseries_api_models.TimeseriesData],
Expand All @@ -42,6 +47,38 @@ def _get_timeseries_data_from_pages(
return timeseries_data


def _get_variables_sequence_and_query_parameter_list(
variables: Union[
Sequence[user_models.Variable],
Mapping[user_models.Variable, user_models.ResamplingMethod],
],
resampling_interval: Optional[int],
) -> Tuple[Sequence[user_models.Variable], Sequence[str]]:
if isinstance(variables, abc.Sequence) and resampling_interval is not None:
raise ResamplingValidationError(
"`variables` must be a mapping {variable: ResamplingMethod}"
)

if resampling_interval:
validate_resampling_interval(resampling_interval)
variables_sequence = []
variables_query_parameter_list = []
for variable, resampling_method in variables.items(): # type: ignore
variables_sequence.append(variable)
variables_query_parameter_list.append(
f"{variable.uuid}"
f"{VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR}"
f"{resampling_method.value}"
)

validate_resampling_method_for_data_type(
resampling_method, variable.data_type
)
return variables_sequence, variables_query_parameter_list

return variables, [str(v.uuid) for v in variables] # type: ignore


class EnlyzeClient:
"""Main entrypoint for interacting with the ENLYZE platform.
Expand Down Expand Up @@ -150,6 +187,95 @@ def get_variables(
for variable in self._get_variables(machine.uuid)
]

def _get_paginated_timeseries(
self,
*,
machine_uuid: str,
start: datetime,
end: datetime,
variables: Sequence[str],
resampling_interval: Optional[int],
) -> Iterator[timeseries_api_models.TimeseriesData]:
params: dict[str, Any] = {
"appliance": machine_uuid,
"start_datetime": start.isoformat(),
"end_datetime": end.isoformat(),
"variables": ",".join(variables),
}

if resampling_interval:
params["resampling_interval"] = resampling_interval

return self._timeseries_api_client.get_paginated(
"timeseries", timeseries_api_models.TimeseriesData, params=params
)

def _get_timeseries(
self,
start: datetime,
end: datetime,
variables: Union[
Sequence[user_models.Variable],
Mapping[user_models.Variable, user_models.ResamplingMethod],
],
resampling_interval: Optional[int] = None,
) -> Optional[user_models.TimeseriesData]:
variables_sequence, variables_query_parameter_list = (
_get_variables_sequence_and_query_parameter_list(
variables, resampling_interval
)
)

start, end, machine_uuid = validate_timeseries_arguments(
start, end, variables_sequence
)

try:
chunks = chunk(
variables_query_parameter_list,
MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST,
)
except ValueError as e:
raise EnlyzeError(FETCHING_TIMESERIES_DATA_ERROR_MSG) from e

chunks_pages = (
self._get_paginated_timeseries(
machine_uuid=machine_uuid,
start=start,
end=end,
variables=chunk,
resampling_interval=resampling_interval,
)
for chunk in chunks
)

timeseries_data_chunked = [
_get_timeseries_data_from_pages(pages) for pages in chunks_pages
]

if not timeseries_data_chunked or all(
data is None for data in timeseries_data_chunked
):
return None

if any(data is None for data in timeseries_data_chunked) and any(
data is not None for data in timeseries_data_chunked
):
raise EnlyzeError(
"The timeseries API didn't return data for some of the variables."
)

try:
timeseries_data = reduce(lambda x, y: x.merge(y), timeseries_data_chunked) # type: ignore # noqa
except ValueError as e:
raise EnlyzeError(FETCHING_TIMESERIES_DATA_ERROR_MSG) from e

return timeseries_data.to_user_model( # type: ignore
start=start,
end=end,
variables=variables_sequence,
)

def get_timeseries(
self,
start: datetime,
Expand Down Expand Up @@ -180,28 +306,7 @@ def get_timeseries(
"""

start, end, machine_uuid = validate_timeseries_arguments(start, end, variables)

pages = self._timeseries_api_client.get_paginated(
"timeseries",
timeseries_api_models.TimeseriesData,
params={
"appliance": machine_uuid,
"start_datetime": start.isoformat(),
"end_datetime": end.isoformat(),
"variables": ",".join(str(v.uuid) for v in variables),
},
)

timeseries_data = _get_timeseries_data_from_pages(pages)
if timeseries_data is None:
return None

return timeseries_data.to_user_model(
start=start,
end=end,
variables=variables,
)
return self._get_timeseries(start, end, variables)

def get_timeseries_with_resampling(
self,
Expand Down Expand Up @@ -241,48 +346,7 @@ def get_timeseries_with_resampling(
request
""" # noqa: E501
variables_sequence = []
variables_query_parameter_list = []
for variable, resampling_method in variables.items():
variables_sequence.append(variable)
variables_query_parameter_list.append(
f"{variable.uuid}"
f"{VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR}"
f"{resampling_method.value}"
)

validate_resampling_method_for_data_type(
resampling_method, variable.data_type
)

start, end, machine_uuid = validate_timeseries_arguments(
start,
end,
variables_sequence,
)
validate_resampling_interval(resampling_interval)

pages = self._timeseries_api_client.get_paginated(
"timeseries",
timeseries_api_models.TimeseriesData,
params={
"appliance": machine_uuid,
"start_datetime": start.isoformat(),
"end_datetime": end.isoformat(),
"variables": ",".join(variables_query_parameter_list),
"resampling_interval": resampling_interval,
},
)

timeseries_data = _get_timeseries_data_from_pages(pages)
if timeseries_data is None:
return None

return timeseries_data.to_user_model(
start=start,
end=end,
variables=variables_sequence,
)
return self._get_timeseries(start, end, variables, resampling_interval)

def _get_production_runs(
self,
Expand Down
6 changes: 5 additions & 1 deletion src/enlyze/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

#: HTTP timeout for requests to the Timeseries API.
#:
#: Reference: https://www.python-httpx.org/advanced/#timeout-configuration
#: Reference: https://www.python-httpx.org/advanced/timeouts/
HTTPX_TIMEOUT = 30.0

#: The separator to use when to separate the variable UUID and the resampling method
Expand All @@ -18,3 +18,7 @@

#: The minimum allowed resampling interval when resampling timeseries data.
MINIMUM_RESAMPLING_INTERVAL = 10

#: The maximum number of variables that can be used in a single request when querying
#: timeseries data.
MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST = 100
12 changes: 12 additions & 0 deletions src/enlyze/iterable_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Iterable, Sequence, TypeVar

MINIMUM_CHUNK_SIZE = 1

T = TypeVar("T")


def chunk(seq: Sequence[T], chunk_size: int) -> Iterable[Sequence[T]]:
if chunk_size < MINIMUM_CHUNK_SIZE:
raise ValueError(f"{chunk_size=} is less than {MINIMUM_CHUNK_SIZE=}")

return (seq[i : i + chunk_size] for i in range(0, len(seq), chunk_size))
83 changes: 83 additions & 0 deletions tests/enlyze/api_clients/timeseries/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from datetime import datetime, timedelta, timezone

import pytest

from enlyze.api_clients.timeseries.models import TimeseriesData

# We use this to skip columns that contain the timestamp assuming
# it starts at the beginning of the sequence. We also use it
# when computing lengths to account for a timestamp column.
TIMESTAMP_OFFSET = 1


@pytest.fixture
def timestamp():
return datetime.now(tz=timezone.utc)


@pytest.fixture
def timeseries_data_1(timestamp):
return TimeseriesData(
columns=["time", "var1", "var2"],
records=[
[timestamp.isoformat(), 1, 2],
[(timestamp - timedelta(minutes=10)).isoformat(), 3, 4],
],
)


@pytest.fixture
def timeseries_data_2(timestamp):
return TimeseriesData(
columns=["time", "var3"],
records=[
[timestamp.isoformat(), 5],
[(timestamp - timedelta(minutes=10)).isoformat(), 6],
],
)


class TestTimeseriesData:
def test_merge(self, timeseries_data_1, timeseries_data_2):
timeseries_data_1_records_len = len(timeseries_data_1.records)
timeseries_data_1_columns_len = len(timeseries_data_1.columns)
timeseries_data_2_records_len = len(timeseries_data_2.records)
timeseries_data_2_columns_len = len(timeseries_data_2.columns)
expected_merged_record_len = len(timeseries_data_1.records[0]) + len(
timeseries_data_2.records[0][TIMESTAMP_OFFSET:]
)

merged = timeseries_data_1.merge(timeseries_data_2)

assert merged is timeseries_data_1
assert (
len(merged.records)
== timeseries_data_1_records_len
== timeseries_data_2_records_len
)
assert (
len(merged.columns)
== timeseries_data_1_columns_len
+ timeseries_data_2_columns_len
- TIMESTAMP_OFFSET
)

for r in merged.records:
assert len(r) == expected_merged_record_len == len(merged.columns)

def test_merge_raises_number_of_records_mismatch(
self, timeseries_data_1, timeseries_data_2
):
timeseries_data_2.records = timeseries_data_2.records[1:]
with pytest.raises(
ValueError, match="Number of records in both instances has to be the same"
):
timeseries_data_1.merge(timeseries_data_2)

def test_merge_raises_mismatched_timestamps(
self, timeseries_data_1, timeseries_data_2, timestamp
):
timeseries_data_2.records[0][0] = (timestamp - timedelta(days=1)).isoformat()

with pytest.raises(ValueError, match="mismatched timestamps"):
timeseries_data_1.merge(timeseries_data_2)
Loading

0 comments on commit dd84243

Please sign in to comment.