Skip to content

Commit

Permalink
schema: Use pd types rather than python types (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-nambiar authored Nov 12, 2023
1 parent 7b5061e commit 10e5ea9
Show file tree
Hide file tree
Showing 22 changed files with 716 additions and 193 deletions.
3 changes: 2 additions & 1 deletion docs/examples/datasets/lookups.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class UserFeature:
def func(cls, ts: pd.Series, uid: pd.Series):
df, _found = User.lookup(ts, uid=uid)
return pd.Series(
name="in_home_city", data=df["home_city"] == df["cur_city"]
name="in_home_city",
data=df["home_city"] == df["cur_city"],
)


Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [0.18.14] - 2023-11-11
- Use pd types rather than python types

## [0.18.12] - 2023-11-08
- Add support for strings in extract_features and extract_historical_features

Expand Down
4 changes: 4 additions & 0 deletions fennel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ def extract_historical_features(
raise Exception(
f"Timestamp column {timestamp_column} not found in input dataframe."
)
# Convert timestamp column to string to make it JSON serializable
input_dataframe[timestamp_column] = input_dataframe[
timestamp_column
].astype(str)
extract_historical_input["Pandas"] = input_dataframe.to_dict(
orient="list"
)
Expand Down
52 changes: 33 additions & 19 deletions fennel/client_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ def test_simple_drop_null(self, client):
if client.is_integration_client():
client.sleep()
ts = pd.Series([now, now, now, now, now, now])
user_id_keys = pd.Series([18232, 18234, 18235, 18236, 18237, 18238])
user_id_keys = pd.Series(
[18232, 18234, 18235, 18236, 18237, 18238], dtype="Int64"
)

df, found = UserInfoDataset.lookup(ts, user_id=user_id_keys)
assert df.shape == (6, 5)
Expand Down Expand Up @@ -294,12 +296,11 @@ def test_log_to_dataset(self, client):
assert (
response.json()["error"]
== "Schema validation failed during data insertion to "
"`UserInfoDataset` [ValueError('Field `age` is of type int, but the column "
"in the dataframe is of type `object`. Error found during checking schema for `UserInfoDataset`.')]"
"""`UserInfoDataset`: Failed to cast data logged to column `age` of type `optional(int)`: Unable to parse string "32yrs" at position 0"""
)
client.sleep(10)
# Do some lookups
user_ids = pd.Series([18232, 18234, 1920])
user_ids = pd.Series([18232, 18234, 1920], dtype="Int64")
lookup_now = datetime.now() + pd.Timedelta(minutes=1)
ts = pd.Series([lookup_now, lookup_now, lookup_now])
df, found = UserInfoDataset.lookup(
Expand All @@ -323,7 +324,7 @@ def test_log_to_dataset(self, client):
pd.Timestamp(yday_rounded),
]
# Do some lookups with a timestamp
user_ids = pd.Series([18232, 18234])
user_ids = pd.Series([18232, 18234], dtype="Int64")
six_hours_ago = now - pd.Timedelta(hours=6)
ts = pd.Series([six_hours_ago, six_hours_ago])
df, found = UserInfoDataset.lookup(
Expand Down Expand Up @@ -2651,16 +2652,29 @@ def get_payload(cls, common_event: Dataset):
"latitude": float,
"longitude": float,
}
data = event.transform(
extract_payload,
schema={"json_payload": Dict[str, float], "timestamp": datetime},
).transform(
lambda x: extract_keys(
x,
json_col="json_payload",
keys=["latitude", "longitude"],
),
schema=new_schema,
data = (
event.transform(
extract_payload,
schema={
"json_payload": Dict[str, float],
"timestamp": datetime,
},
)
.assign(
"latitude",
float,
lambda df: df["json_payload"]
.apply(lambda x: x["latitude"])
.astype(float),
)
.assign(
"longitude",
float,
lambda df: df["json_payload"]
.apply(lambda x: x["longitude"])
.astype(float),
)
.drop("json_payload")
)

new_schema.update({"latlng2": str})
Expand Down Expand Up @@ -2697,7 +2711,7 @@ def test_complex_lambda(client):
view.add(LocationLatLong)
sync_request = view._get_sync_request_proto()
assert len(sync_request.datasets) == 2
assert len(sync_request.operators) == 6
assert len(sync_request.operators) == 8

expected_code = """
def extract_location_index(
Expand Down Expand Up @@ -2777,17 +2791,17 @@ def LocationLatLong_wrapper_adace968e2(*args, **kwargs):
{
"name": "LocationLatLong",
"timestamp": datetime.now(),
"payload": '{"user_id": "247", "latitude": 12.3, "longitude": 12.3, "token": "abc"}',
"payload": '{"user_id": 247, "latitude": 12.3, "longitude": 12.3}',
},
{
"name": "LocationLatLong",
"timestamp": datetime.now(),
"payload": '{"user_id": "248", "latitude": 12.3, "longitude": 12.4, "token": "abc"}',
"payload": '{"user_id": 248, "latitude": 12.3, "longitude": 12.4}',
},
{
"name": "LocationLatLong",
"timestamp": datetime.now(),
"payload": '{"user_id": "246", "latitude": 12.3, "longitude": 12.3, "token": "abc"}',
"payload": '{"user_id": 246, "latitude": 12.3, "longitude": 12.3}',
},
]
res = client.log("fennel_webhook", "CommonEvent", pd.DataFrame(data))
Expand Down
29 changes: 14 additions & 15 deletions fennel/client_tests/test_invalid.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MemberActivityDataset:
domain: str = field(key=True)
hasShortcut: bool
country: str
DOMAIN_USED_COUNT: int
domain_used_count: int


@meta(owner="[email protected]")
Expand All @@ -53,7 +53,7 @@ class MemberDataset:
@dataset
class MemberActivityDatasetCopy:
domain: str = field(key=True)
DOMAIN_USED_COUNT: int
domain_used_count: int
time: datetime = field(timestamp=True)
url: str
uid: str
Expand All @@ -71,11 +71,11 @@ def copy(cls, ds: Dataset):
@featureset
class DomainFeatures:
domain: str = feature(id=1)
DOMAIN_USED_COUNT: int = feature(id=2)
domain_used_count: int = feature(id=2)

@extractor(depends_on=[MemberActivityDatasetCopy])
@inputs(Query.domain)
@outputs(domain, DOMAIN_USED_COUNT)
@outputs(domain, domain_used_count)
def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
df, found = MemberActivityDatasetCopy.lookup( # type: ignore
ts, domain=domain
Expand Down Expand Up @@ -106,16 +106,16 @@ def test_invalid_sync(self, client):
@featureset
class DomainFeatures2:
domain: str = feature(id=1)
DOMAIN_USED_COUNT: int = feature(id=2)
domain_used_count: int = feature(id=2)

@extractor()
@inputs(Query.domain)
@outputs(domain, DOMAIN_USED_COUNT)
@outputs(domain, domain_used_count)
def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
df, found = MemberActivityDatasetCopy.lookup( # type: ignore
ts, domain=domain
)
return df[[str(cls.domain), str(cls.DOMAIN_USED_COUNT)]]
return df[[str(cls.domain), str(cls.domain_used_count)]]


class TestInvalidExtractorDependsOn(unittest.TestCase):
Expand All @@ -133,7 +133,7 @@ class MemberActivityDataset:
domain: str = field(key=True)
hasShortcut: bool
country: str
DOMAIN_USED_COUNT: int
domain_used_count: int

@meta(owner="[email protected]")
@source(webhook.endpoint("MemberDataset"))
Expand All @@ -150,7 +150,7 @@ class MemberDataset:
@dataset
class MemberActivityDatasetCopy:
domain: str = field(key=True)
DOMAIN_USED_COUNT: int
domain_used_count: int
time: datetime = field(timestamp=True)
url: str
uid: str
Expand All @@ -167,11 +167,11 @@ def copy(cls, ds: Dataset):
@featureset
class DomainFeatures:
domain: str = feature(id=1)
DOMAIN_USED_COUNT: int = feature(id=2)
domain_used_count: int = feature(id=2)

@extractor(depends_on=[MemberActivityDatasetCopy])
@inputs(Query.domain)
@outputs(domain, DOMAIN_USED_COUNT)
@outputs(domain, domain_used_count)
def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
df, found = MemberActivityDatasetCopy.lookup( # type: ignore
ts, domain=domain
Expand All @@ -188,7 +188,7 @@ def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
)
client.extract_features(
output_feature_list=[DomainFeatures2],
input_feature_list=[Query],
input_feature_list=[Query.member_id],
input_dataframe=pd.DataFrame(
{
"Query.domain": [
Expand Down Expand Up @@ -262,9 +262,8 @@ def test_no_access(self, client):
)
else:
assert (
"Extractor `get_domain_feature` in `DomainFeatures2` "
"failed to run with error: name "
"'MemberActivityDatasetCopy' is not defined. " == str(e.value)
str(e.value)
== """Dataset `MemberActivityDataset` is an input to the pipelines: `['copy']` but is not synced. Please add it to the sync call."""
)

@mock
Expand Down
63 changes: 51 additions & 12 deletions fennel/client_tests/test_movie_tickets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
from fennel import featureset, extractor, feature
from fennel.datasets import dataset, field
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs, outputs
from fennel.lib.schema import inputs, outputs, between
from fennel.sources import source
from fennel.datasets import pipeline, Dataset
from fennel.lib.aggregate import Sum
from fennel.lib.aggregate import Sum, LastK, Distinct
from fennel.lib.window import Window
from fennel.sources import Webhook
from fennel.test_lib import mock, MockClient

from typing import List

from typing import List, Optional

client = MockClient()

Expand All @@ -29,7 +28,7 @@
@dataset
class MovieInfo:
title: str = field(key=True)
actors: List[str] # can be an empty list
actors: List[Optional[str]] # can be an empty list
release: datetime


Expand All @@ -39,15 +38,15 @@ class MovieInfo:
class TicketSale:
ticket_id: str
title: str
price: int
price: int # type: ignore
at: datetime


@meta(owner="[email protected]")
@dataset
class ActorStats:
name: str = field(key=True)
revenue: int
revenue: int # type: ignore
at: datetime

@pipeline(version=1, tier="prod")
Expand Down Expand Up @@ -101,6 +100,46 @@ def foo(df):
)


@meta(owner="[email protected]")
@dataset
class ActorStatsList:
name: str = field(key=True)
revenue: List[int] # type: ignore
revenue_distinct: List[int] # type: ignore
at: datetime

@pipeline(version=1, tier="prod")
@inputs(MovieInfo, TicketSale)
def pipeline_join(cls, info: Dataset, sale: Dataset):
uniq = sale.groupby("ticket_id").first()
c = (
uniq.join(info, how="inner", on=["title"])
.explode(columns=["actors"])
.rename(columns={"actors": "name"})
)
# name -> Option[str]
schema = c.schema()
schema["name"] = str
c = c.transform(lambda x: x, schema)
return c.groupby("name").aggregate(
[
LastK(
window=Window("forever"),
of="price",
into_field="revenue",
limit=10,
dedup=False,
),
Distinct(
window=Window("forever"),
of="price",
into_field="revenue_distinct",
unordered=True,
),
]
)


@meta(owner="[email protected]")
@featureset
class RequestFeatures:
Expand Down Expand Up @@ -132,7 +171,7 @@ def extract_revenue2(cls, ts: pd.Series, name: pd.Series):
class TestMovieTicketSale(unittest.TestCase):
@mock
def test_movie_ticket_sale(self, client):
datasets = [MovieInfo, TicketSale, ActorStats] # type: ignore
datasets = [MovieInfo, TicketSale, ActorStats, ActorStatsList] # type: ignore
featuresets = [ActorFeatures, RequestFeatures]
client.sync(datasets=datasets, featuresets=featuresets, tier="prod") # type: ignore
client.sleep()
Expand Down Expand Up @@ -166,10 +205,10 @@ def test_movie_ticket_sale(self, client):
two_hours_ago = now - timedelta(hours=2)
columns = ["ticket_id", "title", "price", "at"]
data = [
["1", "Titanic", 50, one_hour_ago],
["2", "Titanic", 100, one_day_ago],
["3", "Jumanji", 25, one_hour_ago],
["4", "The Matrix", 50, two_hours_ago], # no match
["1", "Titanic", "50", one_hour_ago],
["2", "Titanic", "100", one_day_ago],
["3", "Jumanji", "25", one_hour_ago],
["4", "The Matrix", "50", two_hours_ago], # no match
["5", "Great Gatbsy", 49, one_hour_ago],
]
df = pd.DataFrame(data, columns=columns)
Expand Down
8 changes: 5 additions & 3 deletions fennel/client_tests/test_outbrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
@source(
s3.bucket("fennel-demo-data", prefix="outbrain/page_views_filter.csv"),
every="1d",
tier="prod",
)
@source(webhook.endpoint("PageViews"), tier="dev")
@meta(owner="[email protected]")
@dataset
class PageViews:
Expand Down Expand Up @@ -101,16 +103,16 @@ def extract(cls, ts: pd.Series, uuids: pd.Series):
@pytest.mark.integration
@mock
def test_outbrain(client):
fake_PageViews = PageViews.with_source(webhook.endpoint("PageViews"))
client.sync(
datasets=[
fake_PageViews,
PageViews,
PageViewsByUser,
],
featuresets=[
Request,
UserPageViewFeatures,
],
tier="dev",
)
df = pd.read_csv("fennel/client_tests/data/page_views_sample.csv")
# Current time in ms
Expand Down Expand Up @@ -143,7 +145,7 @@ def test_outbrain(client):
Request,
UserPageViewFeatures,
],
input_feature_list=[Request],
input_feature_list=[Request.uuid, Request.document_id],
input_dataframe=input_df,
)
assert feature_df.shape[0] == 347
Expand Down
Loading

0 comments on commit 10e5ea9

Please sign in to comment.