diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 579d5db8..d0f39ef6 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog +## [1.5.51] - 2024-11-09 +- Fix bug in mock client for session based dedup operator -## [1.5.49] - 2024-11-09 +## [1.5.50] - 2024-11-09 - Add json_extract and split string expressions ## [1.5.48] - 2024-11-06 diff --git a/fennel/client_tests/test_dedup_session.py b/fennel/client_tests/test_dedup_session.py new file mode 100644 index 00000000..7d9bb844 --- /dev/null +++ b/fennel/client_tests/test_dedup_session.py @@ -0,0 +1,86 @@ +import time +from datetime import datetime, timedelta, timezone +import pandas as pd + +from fennel.datasets import ( + dataset, + field, + pipeline, + Sum, + Count, +) +from fennel.dtypes import Continuous, Session +from fennel.lib import inputs +from fennel.testing import mock, log + +__owner__ = "eng@fennel.ai" + + +@mock +def test_dedup_session(client): + @dataset + class DatasetA: + a: int + user_id: str + time: datetime + + @dataset(index=True) + class AggDataset: + user_id: str = field(key=True) + count_a: int + sum_a: int + time: datetime + + @pipeline + @inputs(DatasetA) + def pipeline(cls, data_a): + data_b = data_a.dedup(by=["user_id", "a"], window=Session(gap="1s")) + return data_b.groupby("user_id").aggregate( + count_a=Count(window=Continuous("forever")), + sum_a=Sum(of="a", window=Continuous("forever")), + ) + + client.commit( + message="initial commit", + datasets=[DatasetA, AggDataset], + ) + + day = timedelta(days=1) + min = timedelta(minutes=1) + now = datetime.now(timezone.utc) + + time.sleep(2) + now2 = datetime.now(timezone.utc) + + columns = ["a", "user_id", "time"] + data = [ + [1, "u1", now], + [1, "u1", now], + [1, "u2", now], + ] + df = pd.DataFrame(data, columns=columns) + log(DatasetA, df) + + data = [ + [1, "u1", now2 + day], + [2, "u1", now2 + day], + [0, "u2", now2 + day], + [5, "u1", now2 + 2 * day + min], + ] + df = pd.DataFrame(data, columns=columns) + log(DatasetA, df) + + data, found = client.lookup( + "AggDataset", + timestamps=pd.Series( + [now + min, now2 + day + min, now2 + 2 * day + 2 * min] + ), + keys=pd.DataFrame({"user_id": ["u1", "u1", "u1"]}), + ) + assert data.shape == (3, 4) + assert data.iloc[0]["count_a"] == 1 + assert data.iloc[0]["sum_a"] == 1 + assert data.iloc[1]["count_a"] == 3 + assert data.iloc[1]["sum_a"] == 4 + assert data.iloc[2]["count_a"] == 4 + assert data.iloc[2]["sum_a"] == 9 diff --git a/fennel/testing/executor.py b/fennel/testing/executor.py index 6b7c1f73..bfd6e69d 100644 --- a/fennel/testing/executor.py +++ b/fennel/testing/executor.py @@ -755,12 +755,19 @@ def visitDedup(self, obj): ) df = df.drop(columns=["__@@__tumbling_id", "__@@__index"]) elif isinstance(obj.window, Session): - df["__@@__index"] = df.index + df["__@@__index"] = range(df.shape[0]) # Using a stable sort algorithm to preserve the order of rows that have the same timestamp within the same session df = df.sort_values(input_ret.timestamp_field, kind="mergesort") - df["__@@__session_id"] = generate_session_id( - df[input_ret.timestamp_field], obj.window - ) + index_to_session_id = {} + for _, group in df.groupby(obj.by): + session_ids = generate_session_id( + group[input_ret.timestamp_field], obj.window + ) + for i, session_id in enumerate(session_ids): + index_to_session_id[group.iloc[i]["__@@__index"]] = ( + session_id + ) + df["__@@__session_id"] = df["__@@__index"].map(index_to_session_id) df = ( df.groupby(obj.by + ["__@@__session_id"]) .last() @@ -768,6 +775,7 @@ def visitDedup(self, obj): .sort_values("__@@__index") ) df = df.drop(columns=["__@@__session_id", "__@@__index"]) + df = df.reset_index(drop=True) else: raise Exception(f"Unsupported window type {type(obj.window)}") diff --git a/pyproject.toml b/pyproject.toml index 67a1ebb6..43e56275 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.50" +version = "1.5.51" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]