Skip to content

Commit

Permalink
[DEV-2320] added capability to specify window in str for aggregate (#325
Browse files Browse the repository at this point in the history
)

* added capability to specify window in str for aggregate

* ran black

* lint check

* lint check

* removed Window function from documentation

---------

Co-authored-by: Nitin Bansal <[email protected]>
  • Loading branch information
nonibansal and nonibansal authored Dec 22, 2023
1 parent 6c9633e commit 394b588
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 43 deletions.
8 changes: 2 additions & 6 deletions docs/examples/api-reference/operators_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
from fennel.datasets import dataset, field
from fennel.datasets import pipeline, Dataset
from fennel.lib.aggregate import Sum, Count
from fennel.lib.includes import includes
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs
from fennel.lib.window import Window
from fennel.sources import source, Webhook

webhook = Webhook(name="fennel_webhook")
Expand Down Expand Up @@ -164,10 +162,8 @@ def create_fraud_dataset(

# docsnip aggregate
aggregated_ds = joined_ds.groupby("merchant_category").aggregate(
[
Sum(of="txn_amount", window=Window("1h"), into_field="txn_sum"),
Count(window=Window("1h"), into_field="txn_count"),
]
Sum(of="txn_amount", window="1h", into_field="txn_sum"),
Count(window="1h", into_field="txn_count"),
)
# /docsnip
return aggregated_ds
4 changes: 2 additions & 2 deletions docs/examples/datasets/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ class UserAdStats:
def aggregate_ad_clicks(cls, ad_clicks: Dataset):
return ad_clicks.groupby("uid").aggregate(
[
Count(window=Window("forever"), into_field="num_clicks"),
Count(window=Window("1w"), into_field="num_clicks_1w"),
Count(window="forever", into_field="num_clicks"),
Count(window="1w", into_field="num_clicks_1w"),
]
)

Expand Down
27 changes: 11 additions & 16 deletions docs/examples/datasets/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from fennel.lib.includes import includes
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs
from fennel.lib.window import Window
from fennel.sources import source, Webhook
from fennel.test_lib import mock

Expand Down Expand Up @@ -63,18 +62,16 @@ def first_pipeline(cls, user: Dataset, transaction: Dataset):
lambda df: df["country"] != df["payment_country"]
)
return abroad.groupby("uid").aggregate(
[
Count(window=Window("forever"), into_field="count"),
Sum(of="amount", window=Window("1d"), into_field="amount_1d"),
Sum(of="amount", window=Window("1w"), into_field="amount_1w"),
LastK(
of="merchant_id",
window=Window("1d"),
into_field="recent_merchant_ids",
limit=5,
dedup=True,
),
]
Count(window="forever", into_field="count"),
Sum(of="amount", window="1d", into_field="amount_1d"),
Sum(of="amount", window="1w", into_field="amount_1w"),
LastK(
of="merchant_id",
window="1d",
into_field="recent_merchant_ids",
limit=5,
dedup=True,
),
)


Expand Down Expand Up @@ -310,9 +307,7 @@ def android_logins(cls, android_logins: Dataset, ios_logins: Dataset):
)
union = with_ios_platform + with_android_platform
return union.groupby(["uid", "platform"]).aggregate(
[
Count(window=Window("1d"), into_field="num_logins_1d"),
]
Count(window="1d", into_field="num_logins_1d"),
)


Expand Down
7 changes: 2 additions & 5 deletions docs/examples/examples/ecommerce.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from fennel.lib.aggregate import Count
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs, outputs
from fennel.lib.window import Window
from fennel.sources import Postgres, source, Webhook
from fennel.test_lib import mock

Expand Down Expand Up @@ -59,10 +58,8 @@ class UserSellerOrders:
@inputs(Order)
def my_pipeline(cls, orders: Dataset):
return orders.groupby("uid", "seller_id").aggregate(
[
Count(window=Window("1d"), into_field="num_orders_1d"),
Count(window=Window("1w"), into_field="num_orders_1w"),
]
Count(window="1d", into_field="num_orders_1d"),
Count(window="1w", into_field="num_orders_1w"),
)


Expand Down
5 changes: 2 additions & 3 deletions docs/examples/getting-started/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
)
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs, outputs
from fennel.lib.window import Window
from fennel.sources import source, Postgres, Snowflake, Kafka, Webhook

# /docsnip
Expand Down Expand Up @@ -83,8 +82,8 @@ def my_pipeline(cls, orders: Dataset, products: Dataset):
orders = orders.drop("product_id", "desc", "price")
orders = orders.dropnull()
return orders.groupby("uid", "seller_id").aggregate(
Count(window=Window("1d"), into_field="num_orders_1d"),
Count(window=Window("1w"), into_field="num_orders_1w"),
Count(window="1d", into_field="num_orders_1d"),
Count(window="1w", into_field="num_orders_1w"),
)


Expand Down
6 changes: 3 additions & 3 deletions docs/examples/overview/concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ def first_pipeline(cls, user: Dataset, transaction: Dataset):
lambda df: df["country"] != df["payment_country"]
)
return abroad.groupby("uid").aggregate(
Count(window=Window("forever"), into_field="count"),
Sum(of="amount", window=Window("1d"), into_field="amount_1d"),
Sum(of="amount", window=Window("1d"), into_field="amount_1w"),
Count(window="forever", into_field="count"),
Sum(of="amount", window="1d", into_field="amount_1d"),
Sum(of="amount", window="1d", into_field="amount_1w"),
)


Expand Down
7 changes: 3 additions & 4 deletions docs/examples/testing-and-ci-cd/unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from fennel.lib.includes import includes
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs, outputs
from fennel.lib.window import Window
from fennel.sources import source, Webhook

webhook = Webhook(name="fennel_webhook")
Expand Down Expand Up @@ -39,9 +38,9 @@ class MovieRating:
@inputs(RatingActivity)
def pipeline_aggregate(cls, activity: Dataset):
return activity.groupby("movie").aggregate(
Count(window=Window("7d"), into_field="num_ratings"),
Sum(window=Window("28d"), of="rating", into_field="sum_ratings"),
Average(window=Window("12h"), of="rating", into_field="rating"),
Count(window="7d", into_field="num_ratings"),
Sum(window="28d", of="rating", into_field="sum_ratings"),
Average(window="12h", of="rating", into_field="rating"),
)


Expand Down
8 changes: 4 additions & 4 deletions fennel/client_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,14 +1114,14 @@ class MovieRatingWindowed:
@inputs(RatingActivity)
def pipeline_aggregate(cls, activity: Dataset):
return activity.groupby("movie").aggregate(
Count(window=Window("3d"), into_field=str(cls.num_ratings_3d)),
Count(window="3d", into_field=str(cls.num_ratings_3d)),
Sum(
window=Window("7d"),
window="7d",
of="rating",
into_field=str(cls.sum_ratings_7d),
),
Average(
window=Window("6h"),
window="6h",
of="rating",
into_field=str(cls.avg_rating_6h),
),
Expand All @@ -1132,7 +1132,7 @@ def pipeline_aggregate(cls, activity: Dataset):
into_field=str(cls.std_rating_3d),
),
Stddev(
window=Window("7d"),
window="7d",
of="rating",
into_field=str(cls.std_rating_7d),
),
Expand Down
55 changes: 55 additions & 0 deletions fennel/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2580,3 +2580,58 @@ def pipeline2(cls, a: Dataset, b: Dataset):
sync_request = view._get_sync_request_proto(tier="prod")
pipelines = sync_request.pipelines
assert len(pipelines) == 1


def test_dataset_with_str_window_aggregate():
@meta(owner="[email protected]")
@dataset
class UserAggregatesDataset:
gender: str = field(key=True)
timestamp: datetime = field(timestamp=True)
count: int
avg_age: float

@pipeline(version=1)
@inputs(UserInfoDataset)
def create_aggregated_dataset(cls, user_info: Dataset):
return user_info.groupby("gender").aggregate(
Count(window="forever", into_field="count"),
Average(of="age", window="forever", into_field="avg_age"),
)

view = InternalTestClient()
view.add(UserAggregatesDataset)
sync_request = view._get_sync_request_proto()
assert len(sync_request.datasets) == 1
d = {
"name": "UserAggregatesDataset",
"metadata": {"owner": "[email protected]"},
"dsschema": {
"keys": {
"fields": [{"name": "gender", "dtype": {"stringType": {}}}]
},
"values": {
"fields": [
{"name": "count", "dtype": {"intType": {}}},
{"name": "avg_age", "dtype": {"doubleType": {}}},
]
},
"timestamp": "timestamp",
},
"history": "63072000s",
"retention": "63072000s",
"fieldMetadata": {
"avg_age": {},
"count": {},
"gender": {},
"timestamp": {},
},
"pycode": {},
}
# Ignoring schema validation since they are bytes and not human readable
dataset_req = sync_request.datasets[0]
dataset_req.pycode.Clear()
expected_dataset_request = ParseDict(d, ds_proto.CoreDataset())
assert dataset_req == expected_dataset_request, error_message(
dataset_req, expected_dataset_request
)
8 changes: 8 additions & 0 deletions fennel/lib/aggregate/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ class AggregateType(BaseModel):
# Name of the field the aggregate will be assigned to
into_field: str

@validator("window", pre=True)
# Converting the window into Window object if str is passed
def validate_window(cls, value):
if isinstance(value, str):
return Window(value)
else:
return value

def to_proto(self) -> spec_proto.PreSpec:
raise NotImplementedError

Expand Down

0 comments on commit 394b588

Please sign in to comment.