diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index b9455168..b3b0f47a 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2671,8 +2671,9 @@ def visitAggregate(self, obj) -> DSSchema: ) values: Dict[str, Type] = {} - found_discrete = False - found_non_discrete = False + all_discrete = False + all_continuous = False + all_discrete_forever = True lookback = None for agg in obj.aggregates: # If default window is present in groupby then each aggregate spec cannot have window different from @@ -2686,21 +2687,27 @@ def visitAggregate(self, obj) -> DSSchema: if not agg.window: agg.window = obj.window_field - # Check if all specs are either discrete or non-discrete + # Check if all specs are either discrete or continuous if isinstance(agg.window, (Hopping, Tumbling, Session)): - if found_non_discrete: + if all_continuous: raise ValueError( f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or" f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`." ) - found_discrete = True + if ( + isinstance(agg.window, Session) + or agg.window.duration != "forever" + ): + all_discrete_forever = False + all_discrete = True else: - if found_discrete: + if all_discrete: raise ValueError( f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or" f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`." ) - found_non_discrete = True + all_discrete_forever = False + all_continuous = True # Check lookback in all windows are same if isinstance(agg.window, (Hopping, Tumbling)): @@ -2910,9 +2917,20 @@ def visitAggregate(self, obj) -> DSSchema: "'along' param can not be used with emit=\"final\" strategy" ) - is_terminal = ( - found_non_discrete and obj.emit_strategy == EmitStrategy.Eager - ) + is_terminal = all_continuous and obj.emit_strategy == EmitStrategy.Eager + + # If input_schema is keyed then we allow aggregation only if all the windows are discrete forever or along is + # set + if len(input_schema.keys) > 0: + if ( + obj.along is None + or not all_discrete_forever + or obj.along != input_schema.timestamp + ): + warnings.warn( + "aggregation on keyed dataset will be deprecated in coming release. It will be allowed if either " + "all the windows are discrete forever or along is set." + ) return DSSchema( keys=keys,