From 394b588ee4b9717ce4d02caa471629e2f0c34120 Mon Sep 17 00:00:00 2001 From: Nitin Bansal Date: Fri, 22 Dec 2023 12:16:06 +0530 Subject: [PATCH] [DEV-2320] added capability to specify window in str for aggregate (#325) * added capability to specify window in str for aggregate * ran black * lint check * lint check * removed Window function from documentation --------- Co-authored-by: Nitin Bansal --- docs/examples/api-reference/operators_ref.py | 8 +-- docs/examples/datasets/operators.py | 4 +- docs/examples/datasets/pipelines.py | 27 ++++----- docs/examples/examples/ecommerce.py | 7 +-- docs/examples/getting-started/quickstart.py | 5 +- docs/examples/overview/concepts.py | 6 +- docs/examples/testing-and-ci-cd/unit_tests.py | 7 +-- fennel/client_tests/test_dataset.py | 8 +-- fennel/datasets/test_dataset.py | 55 +++++++++++++++++++ fennel/lib/aggregate/aggregate.py | 8 +++ 10 files changed, 92 insertions(+), 43 deletions(-) diff --git a/docs/examples/api-reference/operators_ref.py b/docs/examples/api-reference/operators_ref.py index d5e2b4ab3..0122a51c9 100644 --- a/docs/examples/api-reference/operators_ref.py +++ b/docs/examples/api-reference/operators_ref.py @@ -7,10 +7,8 @@ from fennel.datasets import dataset, field from fennel.datasets import pipeline, Dataset from fennel.lib.aggregate import Sum, Count -from fennel.lib.includes import includes from fennel.lib.metadata import meta from fennel.lib.schema import inputs -from fennel.lib.window import Window from fennel.sources import source, Webhook webhook = Webhook(name="fennel_webhook") @@ -164,10 +162,8 @@ def create_fraud_dataset( # docsnip aggregate aggregated_ds = joined_ds.groupby("merchant_category").aggregate( - [ - Sum(of="txn_amount", window=Window("1h"), into_field="txn_sum"), - Count(window=Window("1h"), into_field="txn_count"), - ] + Sum(of="txn_amount", window="1h", into_field="txn_sum"), + Count(window="1h", into_field="txn_count"), ) # /docsnip return aggregated_ds diff --git a/docs/examples/datasets/operators.py b/docs/examples/datasets/operators.py index e4d70fe71..bd3fd572c 100644 --- a/docs/examples/datasets/operators.py +++ b/docs/examples/datasets/operators.py @@ -190,8 +190,8 @@ class UserAdStats: def aggregate_ad_clicks(cls, ad_clicks: Dataset): return ad_clicks.groupby("uid").aggregate( [ - Count(window=Window("forever"), into_field="num_clicks"), - Count(window=Window("1w"), into_field="num_clicks_1w"), + Count(window="forever", into_field="num_clicks"), + Count(window="1w", into_field="num_clicks_1w"), ] ) diff --git a/docs/examples/datasets/pipelines.py b/docs/examples/datasets/pipelines.py index 0304de869..049a90c3f 100644 --- a/docs/examples/datasets/pipelines.py +++ b/docs/examples/datasets/pipelines.py @@ -11,7 +11,6 @@ from fennel.lib.includes import includes from fennel.lib.metadata import meta from fennel.lib.schema import inputs -from fennel.lib.window import Window from fennel.sources import source, Webhook from fennel.test_lib import mock @@ -63,18 +62,16 @@ def first_pipeline(cls, user: Dataset, transaction: Dataset): lambda df: df["country"] != df["payment_country"] ) return abroad.groupby("uid").aggregate( - [ - Count(window=Window("forever"), into_field="count"), - Sum(of="amount", window=Window("1d"), into_field="amount_1d"), - Sum(of="amount", window=Window("1w"), into_field="amount_1w"), - LastK( - of="merchant_id", - window=Window("1d"), - into_field="recent_merchant_ids", - limit=5, - dedup=True, - ), - ] + Count(window="forever", into_field="count"), + Sum(of="amount", window="1d", into_field="amount_1d"), + Sum(of="amount", window="1w", into_field="amount_1w"), + LastK( + of="merchant_id", + window="1d", + into_field="recent_merchant_ids", + limit=5, + dedup=True, + ), ) @@ -310,9 +307,7 @@ def android_logins(cls, android_logins: Dataset, ios_logins: Dataset): ) union = with_ios_platform + with_android_platform return union.groupby(["uid", "platform"]).aggregate( - [ - Count(window=Window("1d"), into_field="num_logins_1d"), - ] + Count(window="1d", into_field="num_logins_1d"), ) diff --git a/docs/examples/examples/ecommerce.py b/docs/examples/examples/ecommerce.py index 3f4170fcd..8aa63898b 100644 --- a/docs/examples/examples/ecommerce.py +++ b/docs/examples/examples/ecommerce.py @@ -10,7 +10,6 @@ from fennel.lib.aggregate import Count from fennel.lib.metadata import meta from fennel.lib.schema import inputs, outputs -from fennel.lib.window import Window from fennel.sources import Postgres, source, Webhook from fennel.test_lib import mock @@ -59,10 +58,8 @@ class UserSellerOrders: @inputs(Order) def my_pipeline(cls, orders: Dataset): return orders.groupby("uid", "seller_id").aggregate( - [ - Count(window=Window("1d"), into_field="num_orders_1d"), - Count(window=Window("1w"), into_field="num_orders_1w"), - ] + Count(window="1d", into_field="num_orders_1d"), + Count(window="1w", into_field="num_orders_1w"), ) diff --git a/docs/examples/getting-started/quickstart.py b/docs/examples/getting-started/quickstart.py index 89dc198d5..345aa1b9d 100644 --- a/docs/examples/getting-started/quickstart.py +++ b/docs/examples/getting-started/quickstart.py @@ -14,7 +14,6 @@ ) from fennel.lib.metadata import meta from fennel.lib.schema import inputs, outputs -from fennel.lib.window import Window from fennel.sources import source, Postgres, Snowflake, Kafka, Webhook # /docsnip @@ -83,8 +82,8 @@ def my_pipeline(cls, orders: Dataset, products: Dataset): orders = orders.drop("product_id", "desc", "price") orders = orders.dropnull() return orders.groupby("uid", "seller_id").aggregate( - Count(window=Window("1d"), into_field="num_orders_1d"), - Count(window=Window("1w"), into_field="num_orders_1w"), + Count(window="1d", into_field="num_orders_1d"), + Count(window="1w", into_field="num_orders_1w"), ) diff --git a/docs/examples/overview/concepts.py b/docs/examples/overview/concepts.py index 651e0b69e..079abbee6 100644 --- a/docs/examples/overview/concepts.py +++ b/docs/examples/overview/concepts.py @@ -79,9 +79,9 @@ def first_pipeline(cls, user: Dataset, transaction: Dataset): lambda df: df["country"] != df["payment_country"] ) return abroad.groupby("uid").aggregate( - Count(window=Window("forever"), into_field="count"), - Sum(of="amount", window=Window("1d"), into_field="amount_1d"), - Sum(of="amount", window=Window("1d"), into_field="amount_1w"), + Count(window="forever", into_field="count"), + Sum(of="amount", window="1d", into_field="amount_1d"), + Sum(of="amount", window="1d", into_field="amount_1w"), ) diff --git a/docs/examples/testing-and-ci-cd/unit_tests.py b/docs/examples/testing-and-ci-cd/unit_tests.py index 333a92f03..57e1119df 100644 --- a/docs/examples/testing-and-ci-cd/unit_tests.py +++ b/docs/examples/testing-and-ci-cd/unit_tests.py @@ -10,7 +10,6 @@ from fennel.lib.includes import includes from fennel.lib.metadata import meta from fennel.lib.schema import inputs, outputs -from fennel.lib.window import Window from fennel.sources import source, Webhook webhook = Webhook(name="fennel_webhook") @@ -39,9 +38,9 @@ class MovieRating: @inputs(RatingActivity) def pipeline_aggregate(cls, activity: Dataset): return activity.groupby("movie").aggregate( - Count(window=Window("7d"), into_field="num_ratings"), - Sum(window=Window("28d"), of="rating", into_field="sum_ratings"), - Average(window=Window("12h"), of="rating", into_field="rating"), + Count(window="7d", into_field="num_ratings"), + Sum(window="28d", of="rating", into_field="sum_ratings"), + Average(window="12h", of="rating", into_field="rating"), ) diff --git a/fennel/client_tests/test_dataset.py b/fennel/client_tests/test_dataset.py index 48120498d..d157b6470 100644 --- a/fennel/client_tests/test_dataset.py +++ b/fennel/client_tests/test_dataset.py @@ -1114,14 +1114,14 @@ class MovieRatingWindowed: @inputs(RatingActivity) def pipeline_aggregate(cls, activity: Dataset): return activity.groupby("movie").aggregate( - Count(window=Window("3d"), into_field=str(cls.num_ratings_3d)), + Count(window="3d", into_field=str(cls.num_ratings_3d)), Sum( - window=Window("7d"), + window="7d", of="rating", into_field=str(cls.sum_ratings_7d), ), Average( - window=Window("6h"), + window="6h", of="rating", into_field=str(cls.avg_rating_6h), ), @@ -1132,7 +1132,7 @@ def pipeline_aggregate(cls, activity: Dataset): into_field=str(cls.std_rating_3d), ), Stddev( - window=Window("7d"), + window="7d", of="rating", into_field=str(cls.std_rating_7d), ), diff --git a/fennel/datasets/test_dataset.py b/fennel/datasets/test_dataset.py index 5f837cf9f..cb9071f04 100644 --- a/fennel/datasets/test_dataset.py +++ b/fennel/datasets/test_dataset.py @@ -2580,3 +2580,58 @@ def pipeline2(cls, a: Dataset, b: Dataset): sync_request = view._get_sync_request_proto(tier="prod") pipelines = sync_request.pipelines assert len(pipelines) == 1 + + +def test_dataset_with_str_window_aggregate(): + @meta(owner="test@test.com") + @dataset + class UserAggregatesDataset: + gender: str = field(key=True) + timestamp: datetime = field(timestamp=True) + count: int + avg_age: float + + @pipeline(version=1) + @inputs(UserInfoDataset) + def create_aggregated_dataset(cls, user_info: Dataset): + return user_info.groupby("gender").aggregate( + Count(window="forever", into_field="count"), + Average(of="age", window="forever", into_field="avg_age"), + ) + + view = InternalTestClient() + view.add(UserAggregatesDataset) + sync_request = view._get_sync_request_proto() + assert len(sync_request.datasets) == 1 + d = { + "name": "UserAggregatesDataset", + "metadata": {"owner": "test@test.com"}, + "dsschema": { + "keys": { + "fields": [{"name": "gender", "dtype": {"stringType": {}}}] + }, + "values": { + "fields": [ + {"name": "count", "dtype": {"intType": {}}}, + {"name": "avg_age", "dtype": {"doubleType": {}}}, + ] + }, + "timestamp": "timestamp", + }, + "history": "63072000s", + "retention": "63072000s", + "fieldMetadata": { + "avg_age": {}, + "count": {}, + "gender": {}, + "timestamp": {}, + }, + "pycode": {}, + } + # Ignoring schema validation since they are bytes and not human readable + dataset_req = sync_request.datasets[0] + dataset_req.pycode.Clear() + expected_dataset_request = ParseDict(d, ds_proto.CoreDataset()) + assert dataset_req == expected_dataset_request, error_message( + dataset_req, expected_dataset_request + ) diff --git a/fennel/lib/aggregate/aggregate.py b/fennel/lib/aggregate/aggregate.py index d459db979..0c405e4f6 100644 --- a/fennel/lib/aggregate/aggregate.py +++ b/fennel/lib/aggregate/aggregate.py @@ -12,6 +12,14 @@ class AggregateType(BaseModel): # Name of the field the aggregate will be assigned to into_field: str + @validator("window", pre=True) + # Converting the window into Window object if str is passed + def validate_window(cls, value): + if isinstance(value, str): + return Window(value) + else: + return value + def to_proto(self) -> spec_proto.PreSpec: raise NotImplementedError