Skip to content

Commit

Permalink
Add FirstK aggregation (#553)
Browse files Browse the repository at this point in the history
Add FirstK aggregation in codebase along the lines of LastK aggregation.
  • Loading branch information
satrana42 authored Sep 21, 2024
1 parent 3864bc8 commit c7a227b
Show file tree
Hide file tree
Showing 36 changed files with 585 additions and 31 deletions.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Eval
eval
endswith
FennelDataAccessRole
FirstK
Flink
Flink's
GCP
Expand Down
3 changes: 2 additions & 1 deletion docs/api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ sidebar:
- "api-reference/aggregations/count"
- "api-reference/aggregations/distinct"
- "api-reference/aggregations/lastk"
- "api-reference/aggregations/firstk"
- "api-reference/aggregations/max"
- "api-reference/aggregations/min"
- "api-reference/aggregations/stddev"
Expand Down Expand Up @@ -171,4 +172,4 @@ sidebar:
- slug: "api-reference/expectations"
title: "Expectations"
pages:
- "api-reference/expectations"
- "api-reference/expectations"
150 changes: 150 additions & 0 deletions docs/examples/api-reference/aggregations/firstk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from datetime import datetime
from typing import List

import pandas as pd
import pytest

from fennel.testing import mock

__owner__ = "[email protected]"


@mock
def test_basic(client):
# docsnip basic
from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
from fennel.dtypes import Continuous
from fennel.lib import inputs
from fennel.connectors import source, Webhook

webhook = Webhook(name="webhook")

@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
@dataset
class Transaction:
uid: int
amount: int
timestamp: datetime

@dataset(index=True)
class Aggregated:
uid: int = field(key=True)
amounts: List[int]
timestamp: datetime

@pipeline
@inputs(Transaction)
def firstk_pipeline(cls, ds: Dataset):
return ds.groupby("uid").aggregate(
# docsnip-highlight start
amounts=FirstK(
of="amount",
limit=10,
dedup=False,
window=Continuous("1d"),
),
# docsnip-highlight end
)

# /docsnip
client.commit(message="msg", datasets=[Transaction, Aggregated])
# log some rows to the transaction dataset
client.log(
"webhook",
"Transaction",
pd.DataFrame(
[
{
"uid": 1,
"vendor": "A",
"amount": 10,
"timestamp": "2021-01-01T00:00:00",
},
{
"uid": 1,
"vendor": "B",
"amount": 20,
"timestamp": "2021-01-02T00:00:00",
},
{
"uid": 2,
"vendor": "A",
"amount": 30,
"timestamp": "2021-01-03T00:00:00",
},
{
"uid": 2,
"vendor": "B",
"amount": 40,
"timestamp": "2021-01-04T00:00:00",
},
{
"uid": 3,
"vendor": "A",
"amount": 50,
"timestamp": "2021-01-05T00:00:00",
},
{
"uid": 3,
"vendor": "B",
"amount": 60,
"timestamp": "2021-01-06T00:00:00",
},
]
),
)

# do lookup on the Aggregated dataset
ts = pd.Series(
[
datetime(2021, 1, 6, 0, 0, 0),
datetime(2021, 1, 6, 0, 0, 0),
datetime(2021, 1, 6, 0, 0, 0),
]
)
df, found = Aggregated.lookup(ts, uid=pd.Series([1, 2, 3]))
assert found.tolist() == [True, True, True]
assert df["uid"].tolist() == [1, 2, 3]
assert df["amounts"].tolist() == [[], [], [50, 60]]


@mock
def test_invalid_type(client):
with pytest.raises(Exception):
# docsnip incorrect_type
from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
from fennel.dtypes import Continuous
from fennel.lib import inputs
from fennel.connectors import source, Webhook

webhook = Webhook(name="webhook")

@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
@dataset
class Transaction:
uid: int
amount: int
timestamp: datetime

@dataset
class Aggregated:
uid: int = field(key=True)
# docsnip-highlight next-line
amounts: int # should be List[int]
timestamp: datetime

@pipeline
@inputs(Transaction)
def bad_pipeline(cls, ds: Dataset):
return ds.groupby("uid").aggregate(
# docsnip-highlight start
amounts=FirstK(
of="amount",
limit=10,
dedup=False,
window=Continuous("1d"),
),
# docsnip-highlight end
)

# /docsnip
63 changes: 63 additions & 0 deletions docs/pages/api-reference/aggregations/firstk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
---
title: FirstK
order: 0
status: published
---
### FirstK
Aggregation to computes a rolling list of the earliest values for each group
within a window.

#### Parameters
<Expandable title="of" type="str">
Name of the field in the input dataset over which the aggregation should be
computed.
</Expandable>

<Expandable title="window" type="Window">
The continuous window within which aggregation needs to be computed. Possible
values are `"forever"` or any [time duration](/api-reference/data-types/duration).
</Expandable>

<Expandable title="into_field" type="str">
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type `List[T]` where `T` is the type
of the field denoted by `of`.
</Expandable>

<Expandable title="limit" type="int">
Since storing all the values for a group can get costly, FirstK expects a
`limit` to be specified which denotes the maximum size of the list that should
be maintained at any point.
</Expandable>

<Expandable title="dedup" type="bool">
If set to True, only distinct values are stored else values stored in the first
can have duplicates too.
</Expandable>

<pre snippet="api-reference/aggregations/firstk#basic" status="success"
message="FirstK in window of 1 day">
</pre>

#### Returns
<Expandable type="List[T]">
Stores the result of the aggregation in the appropriate field of the output
dataset.
</Expandable>


#### Errors
<Expandable title="Incorrect output type">
The column denoted by `into_field` in the output dataset must be of type `List[T]`
where T is the type of the column denoted by `of` in the input dataset. Commit error
is raised if this is not the case.
</Expandable>

:::warning
Storing the full set of values and maintaining order between them can get costly,
so use this aggregation only when needed.
:::

<pre snippet="api-reference/aggregations/firstk#incorrect_type" status="error"
message="amounts should be of type List[int], not int">
</pre>
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.28] - 2024-09-21
- Add FirstK aggregation

## [1.5.27] - 2024-09-20
- Fix schema validation for int64 columns during mock client. Earlier, float64
was considered valid for optional[int64] fields to account for typing limitation
Expand Down
1 change: 1 addition & 0 deletions fennel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Distinct,
Sum,
LastK,
FirstK,
Min,
Max,
Average,
Expand Down
38 changes: 37 additions & 1 deletion fennel/client_tests/test_social_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

import fennel._vendor.requests as requests
from fennel import LastK
from fennel import LastK, FirstK
from fennel.connectors import source, Webhook
from fennel.datasets import dataset, field, Dataset, pipeline, Count
from fennel.dtypes import regex, oneof, Continuous
Expand Down Expand Up @@ -166,6 +166,27 @@ def last_viewed_post(cls, view_data: Dataset):
)


@meta(owner="[email protected]")
@dataset(index=True)
class FirstViewedPostByAgg:
user_id: str = field(key=True)
post_id: List[int]
time_stamp: datetime

@pipeline
@inputs(ViewData)
def first_viewed_post(cls, view_data: Dataset):
return view_data.groupby("user_id").aggregate(
FirstK(
into_field="post_id",
of="post_id",
window=Continuous("forever"),
limit=1,
dedup=False,
)
)


# --- Featuresets ---


Expand All @@ -188,6 +209,9 @@ class UserFeatures:
last_viewed_post2: List[int] = F(
LastViewedPostByAgg.post_id, default=[-1] # type: ignore
)
first_viewed_post: List[int] = F(
FirstViewedPostByAgg.post_id, default=[-1] # type: ignore
)

@extractor(deps=[UserViewsDataset]) # type: ignore
@inputs(Request.user_id)
Expand Down Expand Up @@ -234,6 +258,7 @@ def test_social_network(client):
UserCategoryDataset,
LastViewedPost,
LastViewedPostByAgg,
FirstViewedPostByAgg,
],
featuresets=[Request, UserFeatures],
)
Expand Down Expand Up @@ -307,6 +332,11 @@ def test_social_network(client):
assert last_post_viewed == [936609766, 735291550]
assert last_post_viewed2 == last_post_viewed

first_post_viewed = [
x[0] for x in feature_df["UserFeatures.first_viewed_post"].to_list()
]
assert first_post_viewed == [508698801, 43094523]

if client.is_integration_client():
return
df = client.get_dataset_df("UserCategoryDataset")
Expand Down Expand Up @@ -427,6 +457,7 @@ def test_social_network_with_mock_log(client):
UserCategoryDataset,
LastViewedPost,
LastViewedPostByAgg,
FirstViewedPostByAgg,
],
featuresets=[Request, UserFeatures],
)
Expand Down Expand Up @@ -494,5 +525,10 @@ def test_social_network_with_mock_log(client):
assert last_post_viewed == [936609766, 735291550]
assert last_post_viewed2 == last_post_viewed

first_post_viewed = [
x[0] for x in feature_df["UserFeatures.first_viewed_post"].to_list()
]
assert first_post_viewed == [508698801, 43094523]

df = client.get_dataset_df("UserCategoryDataset")
assert df.shape == (1998, 4)
Loading

0 comments on commit c7a227b

Please sign in to comment.