Skip to content

Commit

Permalink
dedup: fix bug with mock impl of session based dedup
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilgarg28 committed Nov 9, 2024
1 parent 2beb5ba commit 5933996
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 6 deletions.
4 changes: 3 additions & 1 deletion fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog
## [1.5.51] - 2024-11-09
- Fix bug in mock client for session based dedup operator

## [1.5.49] - 2024-11-09
## [1.5.50] - 2024-11-09
- Add json_extract and split string expressions

## [1.5.48] - 2024-11-06
Expand Down
86 changes: 86 additions & 0 deletions fennel/client_tests/test_dedup_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import time
from datetime import datetime, timedelta, timezone
import pandas as pd

from fennel.datasets import (
dataset,
field,
pipeline,
Sum,
Count,
)
from fennel.dtypes import Continuous, Session
from fennel.lib import inputs
from fennel.testing import mock, log

__owner__ = "[email protected]"


@mock
def test_dedup_session(client):
@dataset
class DatasetA:
a: int
user_id: str
time: datetime

@dataset(index=True)
class AggDataset:
user_id: str = field(key=True)
count_a: int
sum_a: int
time: datetime

@pipeline
@inputs(DatasetA)
def pipeline(cls, data_a):
data_b = data_a.dedup(by=["user_id", "a"], window=Session(gap="1s"))
return data_b.groupby("user_id").aggregate(
count_a=Count(window=Continuous("forever")),
sum_a=Sum(of="a", window=Continuous("forever")),
)

client.commit(
message="initial commit",
datasets=[DatasetA, AggDataset],
)

day = timedelta(days=1)
min = timedelta(minutes=1)
now = datetime.now(timezone.utc)

time.sleep(2)
now2 = datetime.now(timezone.utc)

columns = ["a", "user_id", "time"]
data = [
[1, "u1", now],
[1, "u1", now],
[1, "u2", now],
]
df = pd.DataFrame(data, columns=columns)
log(DatasetA, df)

data = [
[1, "u1", now2 + day],
[2, "u1", now2 + day],
[0, "u2", now2 + day],
[5, "u1", now2 + 2 * day + min],
]
df = pd.DataFrame(data, columns=columns)
log(DatasetA, df)

data, found = client.lookup(
"AggDataset",
timestamps=pd.Series(
[now + min, now2 + day + min, now2 + 2 * day + 2 * min]
),
keys=pd.DataFrame({"user_id": ["u1", "u1", "u1"]}),
)
assert data.shape == (3, 4)
assert data.iloc[0]["count_a"] == 1
assert data.iloc[0]["sum_a"] == 1
assert data.iloc[1]["count_a"] == 3
assert data.iloc[1]["sum_a"] == 4
assert data.iloc[2]["count_a"] == 4
assert data.iloc[2]["sum_a"] == 9
16 changes: 12 additions & 4 deletions fennel/testing/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,19 +755,27 @@ def visitDedup(self, obj):
)
df = df.drop(columns=["__@@__tumbling_id", "__@@__index"])
elif isinstance(obj.window, Session):
df["__@@__index"] = df.index
df["__@@__index"] = range(df.shape[0])
# Using a stable sort algorithm to preserve the order of rows that have the same timestamp within the same session
df = df.sort_values(input_ret.timestamp_field, kind="mergesort")
df["__@@__session_id"] = generate_session_id(
df[input_ret.timestamp_field], obj.window
)
index_to_session_id = {}
for _, group in df.groupby(obj.by):
session_ids = generate_session_id(
group[input_ret.timestamp_field], obj.window
)
for i, session_id in enumerate(session_ids):
index_to_session_id[group.iloc[i]["__@@__index"]] = (
session_id
)
df["__@@__session_id"] = df["__@@__index"].map(index_to_session_id)
df = (
df.groupby(obj.by + ["__@@__session_id"])
.last()
.reset_index()
.sort_values("__@@__index")
)
df = df.drop(columns=["__@@__session_id", "__@@__index"])
df = df.reset_index(drop=True)
else:
raise Exception(f"Unsupported window type {type(obj.window)}")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.50"
version = "1.5.51"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <[email protected]>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit 5933996

Please sign in to comment.