Skip to content

Commit

Permalink
aggregation: add warnings for aggregations on top of keyed datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
nonibansal committed Dec 11, 2024
1 parent af3c39e commit 7d5e761
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2671,8 +2671,9 @@ def visitAggregate(self, obj) -> DSSchema:
)

values: Dict[str, Type] = {}
found_discrete = False
found_non_discrete = False
all_discrete = False
all_continuous = False
all_discrete_forever = True
lookback = None
for agg in obj.aggregates:
# If default window is present in groupby then each aggregate spec cannot have window different from
Expand All @@ -2686,21 +2687,27 @@ def visitAggregate(self, obj) -> DSSchema:
if not agg.window:
agg.window = obj.window_field

# Check if all specs are either discrete or non-discrete
# Check if all specs are either discrete or continuous
if isinstance(agg.window, (Hopping, Tumbling, Session)):
if found_non_discrete:
if all_continuous:
raise ValueError(
f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or"
f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`."
)
found_discrete = True
if (
isinstance(agg.window, Session)
or agg.window.duration != "forever"
):
all_discrete_forever = False
all_discrete = True
else:
if found_discrete:
if all_discrete:
raise ValueError(
f"Windows in all specs have to be either discrete (Hopping/Tumbling/Session) or"
f" non-discrete (Continuous/Forever) not both in pipeline `{self.pipeline_name}`."
)
found_non_discrete = True
all_discrete_forever = False
all_continuous = True

# Check lookback in all windows are same
if isinstance(agg.window, (Hopping, Tumbling)):
Expand Down Expand Up @@ -2910,9 +2917,20 @@ def visitAggregate(self, obj) -> DSSchema:
"'along' param can not be used with emit=\"final\" strategy"
)

is_terminal = (
found_non_discrete and obj.emit_strategy == EmitStrategy.Eager
)
is_terminal = all_continuous and obj.emit_strategy == EmitStrategy.Eager

# If input_schema is keyed then we allow aggregation only if all the windows are discrete forever or along is
# set
if len(input_schema.keys) > 0:
if (
obj.along is None
or not all_discrete_forever
or obj.along != input_schema.timestamp
):
warnings.warn(
"aggregation on keyed dataset will be deprecated in coming release. It will be allowed if either "
"all the windows are discrete forever or along is set."
)

return DSSchema(
keys=keys,
Expand Down

0 comments on commit 7d5e761

Please sign in to comment.