Skip to content

Commit

Permalink
Drop extra records when merging different TimeseriesData instances. (
Browse files Browse the repository at this point in the history
…#46)

This PR handles the situation when attempting to merge timeseries
records of different chunks where a later chunk has extra records.
  • Loading branch information
cipherself authored Aug 6, 2024
1 parent e62466a commit d1d393c
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 62 deletions.
23 changes: 11 additions & 12 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ requires = [
"setuptools",
]

[tool.setuptools.dynamic]
version = {attr = "enlyze._version.VERSION"}

[project]
name = "enlyze"
description = "Python SDK for interacting with the ENLYZE platform https://www.enlyze.com"
readme = "README.rst"
license = {text = "MIT"}
authors = [{name = "ENLYZE GmbH", email = "[email protected]"},]
license = { text = "MIT" }
authors = [ { name = "ENLYZE GmbH", email = "[email protected]" } ]
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3 :: Only",
Expand All @@ -21,21 +18,20 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]
dynamic = [
'version',
"version",
]
dependencies = [
"httpx",
"pandas>=2",
"pydantic>=2",
]
[project.optional-dependencies]
docs = [
optional-dependencies.docs = [
"sphinx",
"sphinx-rtd-theme",
"sphinx-tabs",
"sphinxcontrib-spelling",
]
lint = [
optional-dependencies.lint = [
"bandit",
"black",
"flake8",
Expand All @@ -45,7 +41,7 @@ lint = [
"safety",
"tox-ini-fmt",
]
test = [
optional-dependencies.test = [
"coverage",
"hypothesis",
"pandas-stubs",
Expand All @@ -57,8 +53,11 @@ test = [
"respx",
]

[tool.mypy]
exclude="tests"
[tool.setuptools.dynamic]
version = { attr = "enlyze._version.VERSION" }

[tool.isort]
profile = "black"

[tool.mypy]
exclude = "tests"
13 changes: 8 additions & 5 deletions src/enlyze/api_clients/timeseries/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,19 @@ def extend(self, other: "TimeseriesData") -> None:

def merge(self, other: "TimeseriesData") -> "TimeseriesData":
"""Merge records from ``other`` into the existing records."""
if not (slen := len(self.records)) == (olen := len(other.records)):
slen, olen = len(self.records), len(other.records)
if olen < slen:
raise ValueError(
"Cannot merge. Number of records in both instances has to be the same,"
f" trying to merge an instance with {olen} records into an instance"
f" with {slen} records."
"Cannot merge. Attempted to merge"
f" an instance with {olen} records into an instance with {slen}"
" records. The instance to merge must have a number"
" of records greater than or equal to the number of records of"
" the instance you're trying to merge into."
)

self.columns.extend(other.columns[1:])

for s, o in zip(self.records, other.records):
for s, o in zip(self.records, other.records[:slen]):
if s[0] != o[0]:
raise ValueError(
"Cannot merge. Attempted to merge records "
Expand Down
120 changes: 75 additions & 45 deletions tests/enlyze/api_clients/timeseries/test_models.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,113 @@
import itertools
import random
from datetime import datetime, timedelta, timezone

import pytest

from enlyze.api_clients.timeseries.models import TimeseriesData

# We use this to skip columns that contain the timestamp assuming
# We use this to skip columns that contain the timestamp assuming
# it starts at the beginning of the sequence. We also use it
# when computing lengths to account for a timestamp column.
TIMESTAMP_OFFSET = 1
NOW = datetime.now(tz=timezone.utc)


@pytest.fixture
def timestamp():
return datetime.now(tz=timezone.utc)
def _generate_timeseries_data(*, columns, number_of_records):
timeseries_columns = ["time"]
timeseries_columns.extend(columns)

counter = itertools.count(start=10)

@pytest.fixture
def timeseries_data_1(timestamp):
return TimeseriesData(
columns=["time", "var1", "var2"],
columns=timeseries_columns,
records=[
[timestamp.isoformat(), 1, 2],
[(timestamp - timedelta(minutes=10)).isoformat(), 3, 4],
[
(NOW - timedelta(minutes=next(counter))).isoformat(),
*[random.randint(1, 100) for _ in range(len(columns))],
]
for _ in range(number_of_records)
],
)


@pytest.fixture
def timeseries_data_2(timestamp):
return TimeseriesData(
columns=["time", "var3"],
records=[
[timestamp.isoformat(), 5],
[(timestamp - timedelta(minutes=10)).isoformat(), 6],
class TestTimeseriesData:
@pytest.mark.parametrize(
"data_parameters,data_to_merge_parameters",
[
(
{"columns": ["var1", "var2"], "number_of_records": 1},
{"columns": ["var3"], "number_of_records": 1},
),
(
{"columns": ["var1", "var2"], "number_of_records": 1},
{"columns": ["var3"], "number_of_records": 3},
),
],
)


class TestTimeseriesData:
def test_merge(self, timeseries_data_1, timeseries_data_2):
timeseries_data_1_records_len = len(timeseries_data_1.records)
timeseries_data_1_columns_len = len(timeseries_data_1.columns)
timeseries_data_2_records_len = len(timeseries_data_2.records)
timeseries_data_2_columns_len = len(timeseries_data_2.columns)
expected_merged_record_len = len(timeseries_data_1.records[0]) + len(
timeseries_data_2.records[0][TIMESTAMP_OFFSET:]
def test_merge(self, data_parameters, data_to_merge_parameters):
data = _generate_timeseries_data(**data_parameters)
data_to_merge = _generate_timeseries_data(**data_to_merge_parameters)
data_records_len = len(data.records)
data_columns_len = len(data.columns)
data_to_merge_columns_len = len(data_to_merge.columns)
expected_merged_record_len = len(data.records[0]) + len(
data_to_merge.records[0][TIMESTAMP_OFFSET:]
)

merged = timeseries_data_1.merge(timeseries_data_2)
merged = data.merge(data_to_merge)

assert merged is timeseries_data_1
assert (
len(merged.records)
== timeseries_data_1_records_len
== timeseries_data_2_records_len
)
assert merged is data
assert len(merged.records) == data_records_len
assert (
len(merged.columns)
== timeseries_data_1_columns_len
+ timeseries_data_2_columns_len
- TIMESTAMP_OFFSET
== data_columns_len + data_to_merge_columns_len - TIMESTAMP_OFFSET
)

for r in merged.records:
assert len(r) == expected_merged_record_len == len(merged.columns)

def test_merge_raises_number_of_records_mismatch(
self, timeseries_data_1, timeseries_data_2
@pytest.mark.parametrize(
"data_parameters,data_to_merge_parameters",
[
(
{"columns": ["var1", "var2"], "number_of_records": 2},
{"columns": ["var3"], "number_of_records": 1},
),
],
)
def test_merge_raises_number_of_records_to_merge_less_than_existing(
self, data_parameters, data_to_merge_parameters
):
timeseries_data_2.records = timeseries_data_2.records[1:]
data = _generate_timeseries_data(**data_parameters)
data_to_merge = _generate_timeseries_data(**data_to_merge_parameters)

with pytest.raises(
ValueError, match="Number of records in both instances has to be the same"
ValueError,
match=(
"The instance to merge must have a number of"
" records greater than or equal to the number"
" of records of the instance you're trying to merge into."
),
):
timeseries_data_1.merge(timeseries_data_2)

data.merge(data_to_merge)

@pytest.mark.parametrize(
"data_parameters,data_to_merge_parameters",
[
(
{"columns": ["var1", "var2"], "number_of_records": 1},
{"columns": ["var3"], "number_of_records": 1},
),
],
)
def test_merge_raises_mismatched_timestamps(
self, timeseries_data_1, timeseries_data_2, timestamp
self, data_parameters, data_to_merge_parameters
):
timeseries_data_2.records[0][0] = (timestamp - timedelta(days=1)).isoformat()
data = _generate_timeseries_data(**data_parameters)
data_to_merge = _generate_timeseries_data(**data_to_merge_parameters)

data_to_merge.records[0][0] = (NOW - timedelta(days=1)).isoformat()

with pytest.raises(ValueError, match="mismatched timestamps"):
timeseries_data_1.merge(timeseries_data_2)
data.merge(data_to_merge)

0 comments on commit d1d393c

Please sign in to comment.