Skip to content

Commit

Permalink
aggregation: add warnings for aggregations on top of keyed datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
nonibansal committed Dec 12, 2024
1 parent 130d240 commit e00ca45
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.60] - 2024-12-12
- Add appropriate warnings for aggregations on top of keyed datasets.

## [1.5.59] - 2024-12-10
- Allow None as default value for min/max/avg/stddev aggregations.

Expand Down
38 changes: 28 additions & 10 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2672,8 +2672,9 @@ def visitAggregate(self, obj) -> DSSchema:
)

values: Dict[str, Type] = {}
found_discrete = False
found_non_discrete = False
all_discrete = False
all_continuous = False
all_discrete_forever = True
lookback = None
for agg in obj.aggregates:
# If default window is present in groupby then each aggregate spec cannot have window different from
Expand All @@ -2687,21 +2688,27 @@ def visitAggregate(self, obj) -> DSSchema:
if not agg.window:
agg.window = obj.window_field

# Check if all specs are either discrete or non-discrete
# Check if all specs are either discrete or continuous
if isinstance(agg.window, (Hopping, Tumbling, Session)):
if found_non_discrete:
if all_continuous:
raise ValueError(
f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or"
f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`."
)
found_discrete = True
if (
isinstance(agg.window, Session)
or agg.window.duration != "forever"
):
all_discrete_forever = False
all_discrete = True
else:
if found_discrete:
if all_discrete:
raise ValueError(
f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or"
f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`."
)
found_non_discrete = True
all_discrete_forever = False
all_continuous = True

# Check lookback in all windows are same
if isinstance(agg.window, (Hopping, Tumbling)):
Expand Down Expand Up @@ -2961,9 +2968,20 @@ def visitAggregate(self, obj) -> DSSchema:
"'along' param can not be used with emit=\"final\" strategy"
)

is_terminal = (
found_non_discrete and obj.emit_strategy == EmitStrategy.Eager
)
is_terminal = all_continuous and obj.emit_strategy == EmitStrategy.Eager

# If input_schema is keyed then we allow aggregation only if all the windows are discrete forever or along is
# set
if len(input_schema.keys) > 0:
if (
obj.along is None
or not all_discrete_forever
or obj.along == input_schema.timestamp
):
warnings.warn(
"aggregation on keyed dataset will be deprecated in coming release. It will be allowed if either "
"all the windows are discrete forever or along is set."
)

return DSSchema(
keys=keys,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.59"
version = "1.5.60"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <[email protected]>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit e00ca45

Please sign in to comment.