Skip to content

Commit

Permalink
extractor: Add list based lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-nambiar committed Dec 25, 2024
1 parent cf00519 commit 9f87486
Show file tree
Hide file tree
Showing 14 changed files with 503 additions and 54 deletions.
2 changes: 1 addition & 1 deletion docs/examples/api-reference/expressions/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def test_now():
{"birthdate": [datetime(1997, 12, 24), datetime(2001, 1, 21), None]}
)
assert expr.eval(df, schema={"birthdate": Optional[datetime]}).tolist() == [
26,
27,
23,
pd.NA,
]
Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.63] - 2024-12-25
- Support list lookup based autogenerated extractors.

## [1.5.62] - 2024-12-16
- Fix casting for empty dataframes.

Expand Down
5 changes: 4 additions & 1 deletion fennel/client_tests/test_complex_autogen_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ def test_complex_auto_gen_extractors(client):
assert extracted_df["RiderFeatures.dl_state"].to_list() == ["US", "Unknown"]
assert extracted_df["RiderFeatures.is_us_dl"].to_list() == [True, False]

age_years = datetime.now(timezone.utc).year - 2000
age_years = (
datetime.now() - datetime(2000, 1, 1, 0, 0, 0)
).total_seconds() / (60 * 60 * 24 * 365)
age_years = int(age_years + 0.001)
assert extracted_df["RiderFeatures.age_years"].to_list() == [30, age_years]
assert extracted_df["RiderFeatures.dl_state_population"].to_list() == [
328200000,
Expand Down
6 changes: 3 additions & 3 deletions fennel/client_tests/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class UserInfoFeatures:
"Rachel",
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [54, 44, 34, 26, 23, pd.NA]
assert df["UserInfoFeatures.age"].tolist() == [55, 44, 34, 27, 23, pd.NA]
assert df["UserInfoFeatures.country"].tolist() == [
"India",
"USA",
Expand Down Expand Up @@ -132,10 +132,10 @@ class UserInfoFeatures:
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [
53,
54,
43,
33,
25,
26,
22,
pd.NA,
]
Expand Down
270 changes: 251 additions & 19 deletions fennel/client_tests/test_featureset.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class UserInfoDataset:
user_id: int = field(key=True)
name: str
age: Optional[int]
hobbies: List[Optional[str]]
timestamp: datetime = field(timestamp=True)
country: str

Expand Down Expand Up @@ -206,10 +207,10 @@ def test_simple_extractor(self, client):
)
now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now],
[18234, "Monica", 24, "Chile", now],
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]],
]
columns = ["user_id", "name", "age", "country", "timestamp"]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
input_df = pd.DataFrame(data, columns=columns)
response = client.log("fennel_webhook", "UserInfoDataset", input_df)
assert response.status_code == requests.codes.OK, response.json()
Expand Down Expand Up @@ -254,10 +255,10 @@ def test_e2e_query(self, client):
)
now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now],
[18234, "Monica", 24, "Chile", now],
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]],
]
columns = ["user_id", "name", "age", "country", "timestamp"]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
input_df = pd.DataFrame(data, columns=columns)
response = client.log("fennel_webhook", "UserInfoDataset", input_df)
assert response.status_code == requests.codes.OK, response.json()
Expand Down Expand Up @@ -393,11 +394,19 @@ def test_derived_extractor(self, client):
)
now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now],
[18234, "Monica", 24, "Chile", now],
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]],
]
df = pd.DataFrame(
data, columns=["user_id", "name", "age", "country", "timestamp"]
data,
columns=[
"user_id",
"name",
"age",
"country",
"timestamp",
"hobbies",
],
)
response = client.log("fennel_webhook", "UserInfoDataset", df)
assert response.status_code == requests.codes.OK, response.json()
Expand Down Expand Up @@ -485,10 +494,10 @@ def test_dag_resolution2(self, client):
)
now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now],
[18234, "Monica", 24, "Chile", now],
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]],
]
columns = ["user_id", "name", "age", "country", "timestamp"]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
df = pd.DataFrame(data, columns=columns)
response = client.log("fennel_webhook", "UserInfoDataset", df)
assert response.status_code == requests.codes.OK, response.json()
Expand Down Expand Up @@ -586,10 +595,10 @@ def test_dag_resolution_complex(self, client):
client.sleep()
now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now],
[18234, "Monica", 24, "Chile", now],
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]],
]
columns = ["user_id", "name", "age", "country", "timestamp"]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
df = pd.DataFrame(data, columns=columns)
response = client.log("fennel_webhook", "UserInfoDataset", df)
assert response.status_code == requests.codes.OK, response.json()
Expand Down Expand Up @@ -1068,11 +1077,11 @@ def test_chained_lookups(client):

now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now],
[18234, "Monica", 24, "Chile", now],
[18235, "Rahul", 28, "India", now],
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]],
[18235, "Rahul", 28, "India", now, ["Reading", "Football"]],
]
columns = ["user_id", "name", "age", "country", "timestamp"]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
input_df = pd.DataFrame(data, columns=columns)
client.log("fennel_webhook", "UserInfoDataset", input_df)

Expand Down Expand Up @@ -1263,3 +1272,226 @@ def time_feature_extractor(cls, ts: pd.Series) -> pd.DataFrame:
featuresets=[QueryTimeFeatures],
message="first_commit",
)


@pytest.mark.integration
@mock
def test_looks_like_list_lookup(client):
@featureset
class UserInfoListLookup:
user_id: int
hobbies: List[Optional[str]] = F(
UserInfoDataset.hobbies, default=["Unknown"]
)

client.commit(
datasets=[UserInfoDataset],
featuresets=[UserInfoListLookup],
message="Initial commit",
)

now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Reading", "Writing"]],
[18235, "Rahul", 28, "India", now, ["Reading", "Writing"]],
]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
input_df = pd.DataFrame(data, columns=columns)
client.log("fennel_webhook", "UserInfoDataset", input_df)

client.sleep()

feature_df = client.query(
outputs=[UserInfoListLookup],
inputs=[UserInfoListLookup.user_id],
input_dataframe=pd.DataFrame(
{"UserInfoListLookup.user_id": [18234, 18235]}
),
)

assert feature_df.shape == (2, 2)
assert feature_df["UserInfoListLookup.hobbies"].tolist() == [
["Reading", "Writing"],
["Reading", "Writing"],
]


@pytest.mark.integration
@mock
def test_list_lookup(client):
@featureset
class UserInfoListLookup:
user_id: List[int]
country: List[Optional[str]] = F(UserInfoDataset.country)
name: List[str] = F(UserInfoDataset.name, default="Unknown")

client.commit(
datasets=[UserInfoDataset],
featuresets=[UserInfoListLookup],
message="Initial commit",
)

now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Reading", "Writing"]],
[18235, "Rahul", 28, "India", now, ["Reading", "Writing"]],
]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
input_df = pd.DataFrame(data, columns=columns)
client.log("fennel_webhook", "UserInfoDataset", input_df)

client.sleep()

feature_df = client.query(
outputs=[UserInfoListLookup],
inputs=[UserInfoListLookup.user_id],
input_dataframe=pd.DataFrame(
{
"UserInfoListLookup.user_id": [
[18232, 18234, 18235, 123],
[18232, 18234],
[18234, 18235, 1],
]
}
),
)

assert feature_df.shape == (3, 3)
assert feature_df["UserInfoListLookup.country"].tolist() == [
["USA", "Chile", "India", pd.NA],
["USA", "Chile"],
["Chile", "India", pd.NA],
]
assert feature_df["UserInfoListLookup.name"].tolist() == [
["John", "Monica", "Rahul", "Unknown"],
["John", "Monica"],
["Monica", "Rahul", "Unknown"],
]


@pytest.mark.integration
@mock
def test_chainedlist_lookup_with_default(client):
"""_summary_
In this test, we look up a list of categories from a user id, and then look up a list of values from the categories.
"""

@source(webhook.endpoint("HobbyDataset"), disorder="14d", cdc="upsert")
@dataset(index=True)
class HobbyDataset:
hobby: str = field(key=True)
category: str
ts: datetime

@featureset
class UserHobbies:
user_id: int
hobby: List[Optional[str]] = F(UserInfoDataset.hobbies, default=[])
categories: List[Optional[str]] = F(
HobbyDataset.category,
)

client.commit(
datasets=[HobbyDataset, UserInfoDataset],
featuresets=[UserHobbies],
message="Initial commit",
)

now = datetime.now(timezone.utc)
data = [
[18232, "John", 32, "USA", now, ["Reading", "Writing"]],
[18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]],
[18235, "Rahul", 28, "India", now, ["Reading", "Football"]],
]
columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"]
input_df = pd.DataFrame(data, columns=columns)
client.log("fennel_webhook", "UserInfoDataset", input_df)

client.sleep()

data = [
["Reading", "Productivity", now],
["Football", "Sports", now],
["Cooking", "Food", now],
]
columns = ["hobby", "category", "ts"]
input_df = pd.DataFrame(data, columns=columns)
client.log("fennel_webhook", "HobbyDataset", input_df)

client.sleep()

feature_df = client.query(
outputs=[UserHobbies],
inputs=[UserHobbies.user_id],
input_dataframe=pd.DataFrame(
{
"UserHobbies.user_id": [18232, 18234, 18235],
}
),
)

assert feature_df.shape == (3, 3)
assert feature_df["UserHobbies.hobby"].tolist() == [
["Reading", "Writing"],
["Cooking", "Music"],
["Reading", "Football"],
]
assert feature_df["UserHobbies.categories"].tolist() == [
["Productivity", pd.NA],
["Food", pd.NA],
["Productivity", "Sports"],
]


@pytest.mark.integration
@mock
def test_multiple_keyed_list_lookup(client):
@source(webhook.endpoint("KeyedDataset"), disorder="14d", cdc="upsert")
@dataset(index=True)
class KeyedDataset:
key1: int = field(key=True)
key2: int = field(key=True)
value: str
ts: datetime = field(timestamp=True)

@featureset
class KeyedFeatureset:
key1: List[int]
key2: List[int]
value: List[Optional[str]] = F(KeyedDataset.value)

client.commit(
datasets=[KeyedDataset], featuresets=[KeyedFeatureset], message="msg"
)

# Log data
now = datetime.now(timezone.utc)
data = [
[1, 2, "a", now],
[3, 4, "b", now],
[5, 6, "c", now],
[7, 8, "d", now],
]
columns = ["key1", "key2", "value", "ts"]
input_df = pd.DataFrame(data, columns=columns)
client.log("fennel_webhook", "KeyedDataset", input_df)

client.sleep()

feature_df = client.query(
inputs=[KeyedFeatureset.key1, KeyedFeatureset.key2],
outputs=[KeyedFeatureset.value],
input_dataframe=pd.DataFrame(
{
"KeyedFeatureset.key1": [[1, 2, 3], [7, 5, 6]],
"KeyedFeatureset.key2": [[2, 8, 4], [8, 11, 12]],
}
),
)
assert feature_df.shape == (2, 1)
assert feature_df["KeyedFeatureset.value"].tolist() == [
["a", pd.NA, "b"],
["d", pd.NA, pd.NA],
]
Loading

0 comments on commit 9f87486

Please sign in to comment.