From 16e8cb68a7ea8851067361d267b0be6cf2187bf3 Mon Sep 17 00:00:00 2001 From: Nitin Bansal Date: Thu, 12 Dec 2024 00:04:16 +0530 Subject: [PATCH] aggregation: add warnings for aggregations on top of keyed datasets --- fennel/CHANGELOG.md | 3 +++ fennel/datasets/datasets.py | 38 +++++++++++++++++++++++++++---------- pyproject.toml | 2 +- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 632ea51c..99911785 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.61] - 2024-12-12 +- Add appropriate warnings for aggregations on top of keyed datasets. + ## [1.5.60] - 2024-12-10 - Add support for indirections in preproc ref type for Avro format diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 0a40b3f6..9b74f478 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2672,8 +2672,9 @@ def visitAggregate(self, obj) -> DSSchema: ) values: Dict[str, Type] = {} - found_discrete = False - found_non_discrete = False + all_discrete = False + all_continuous = False + all_discrete_forever = True lookback = None for agg in obj.aggregates: # If default window is present in groupby then each aggregate spec cannot have window different from @@ -2687,21 +2688,27 @@ def visitAggregate(self, obj) -> DSSchema: if not agg.window: agg.window = obj.window_field - # Check if all specs are either discrete or non-discrete + # Check if all specs are either discrete or continuous if isinstance(agg.window, (Hopping, Tumbling, Session)): - if found_non_discrete: + if all_continuous: raise ValueError( f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or" f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`." ) - found_discrete = True + if ( + isinstance(agg.window, Session) + or agg.window.duration != "forever" + ): + all_discrete_forever = False + all_discrete = True else: - if found_discrete: + if all_discrete: raise ValueError( f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or" f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`." ) - found_non_discrete = True + all_discrete_forever = False + all_continuous = True # Check lookback in all windows are same if isinstance(agg.window, (Hopping, Tumbling)): @@ -2961,9 +2968,20 @@ def visitAggregate(self, obj) -> DSSchema: "'along' param can not be used with emit=\"final\" strategy" ) - is_terminal = ( - found_non_discrete and obj.emit_strategy == EmitStrategy.Eager - ) + is_terminal = all_continuous and obj.emit_strategy == EmitStrategy.Eager + + # If input_schema is keyed then we allow aggregation only if all the windows are discrete forever or along is + # set + if len(input_schema.keys) > 0: + if ( + obj.along is None + or not all_discrete_forever + or obj.along == input_schema.timestamp + ): + warnings.warn( + "aggregation on keyed dataset will be deprecated in coming release. It will be allowed if either " + "all the windows are discrete forever or along is set." + ) return DSSchema( keys=keys, diff --git a/pyproject.toml b/pyproject.toml index fc910e0c..2efe765f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.60" +version = "1.5.61" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]