Skip to content

Commit

Permalink
schema: Use pd types rather than python types
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-nambiar committed Nov 12, 2023
1 parent 7b5061e commit ea15438
Show file tree
Hide file tree
Showing 22 changed files with 689 additions and 179 deletions.
3 changes: 2 additions & 1 deletion docs/examples/datasets/lookups.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class UserFeature:
def func(cls, ts: pd.Series, uid: pd.Series):
df, _found = User.lookup(ts, uid=uid)
return pd.Series(
name="in_home_city", data=df["home_city"] == df["cur_city"]
name="in_home_city",
data=df["home_city"] == df["cur_city"],
)


Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [0.18.14] - 2023-11-11
- Use pd types rather than python types

## [0.18.12] - 2023-11-08
- Add support for strings in extract_features and extract_historical_features

Expand Down
4 changes: 4 additions & 0 deletions fennel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ def extract_historical_features(
raise Exception(
f"Timestamp column {timestamp_column} not found in input dataframe."
)
# Convert timestamp column to string to make it JSON serializable
input_dataframe[timestamp_column] = input_dataframe[
timestamp_column
].astype(str)
extract_historical_input["Pandas"] = input_dataframe.to_dict(
orient="list"
)
Expand Down
11 changes: 6 additions & 5 deletions fennel/client_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ def test_simple_drop_null(self, client):
if client.is_integration_client():
client.sleep()
ts = pd.Series([now, now, now, now, now, now])
user_id_keys = pd.Series([18232, 18234, 18235, 18236, 18237, 18238])
user_id_keys = pd.Series(
[18232, 18234, 18235, 18236, 18237, 18238], dtype="Int64"
)

df, found = UserInfoDataset.lookup(ts, user_id=user_id_keys)
assert df.shape == (6, 5)
Expand Down Expand Up @@ -294,12 +296,11 @@ def test_log_to_dataset(self, client):
assert (
response.json()["error"]
== "Schema validation failed during data insertion to "
"`UserInfoDataset` [ValueError('Field `age` is of type int, but the column "
"in the dataframe is of type `object`. Error found during checking schema for `UserInfoDataset`.')]"
"""`UserInfoDataset`: Failed to cast data logged to column `age` of type `optional(int)`: Unable to parse string "32yrs" at position 0"""
)
client.sleep(10)
# Do some lookups
user_ids = pd.Series([18232, 18234, 1920])
user_ids = pd.Series([18232, 18234, 1920], dtype="Int64")
lookup_now = datetime.now() + pd.Timedelta(minutes=1)
ts = pd.Series([lookup_now, lookup_now, lookup_now])
df, found = UserInfoDataset.lookup(
Expand All @@ -323,7 +324,7 @@ def test_log_to_dataset(self, client):
pd.Timestamp(yday_rounded),
]
# Do some lookups with a timestamp
user_ids = pd.Series([18232, 18234])
user_ids = pd.Series([18232, 18234], dtype="Int64")
six_hours_ago = now - pd.Timedelta(hours=6)
ts = pd.Series([six_hours_ago, six_hours_ago])
df, found = UserInfoDataset.lookup(
Expand Down
29 changes: 14 additions & 15 deletions fennel/client_tests/test_invalid.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MemberActivityDataset:
domain: str = field(key=True)
hasShortcut: bool
country: str
DOMAIN_USED_COUNT: int
domain_used_count: int


@meta(owner="[email protected]")
Expand All @@ -53,7 +53,7 @@ class MemberDataset:
@dataset
class MemberActivityDatasetCopy:
domain: str = field(key=True)
DOMAIN_USED_COUNT: int
domain_used_count: int
time: datetime = field(timestamp=True)
url: str
uid: str
Expand All @@ -71,11 +71,11 @@ def copy(cls, ds: Dataset):
@featureset
class DomainFeatures:
domain: str = feature(id=1)
DOMAIN_USED_COUNT: int = feature(id=2)
domain_used_count: int = feature(id=2)

@extractor(depends_on=[MemberActivityDatasetCopy])
@inputs(Query.domain)
@outputs(domain, DOMAIN_USED_COUNT)
@outputs(domain, domain_used_count)
def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
df, found = MemberActivityDatasetCopy.lookup( # type: ignore
ts, domain=domain
Expand Down Expand Up @@ -106,16 +106,16 @@ def test_invalid_sync(self, client):
@featureset
class DomainFeatures2:
domain: str = feature(id=1)
DOMAIN_USED_COUNT: int = feature(id=2)
domain_used_count: int = feature(id=2)

@extractor()
@inputs(Query.domain)
@outputs(domain, DOMAIN_USED_COUNT)
@outputs(domain, domain_used_count)
def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
df, found = MemberActivityDatasetCopy.lookup( # type: ignore
ts, domain=domain
)
return df[[str(cls.domain), str(cls.DOMAIN_USED_COUNT)]]
return df[[str(cls.domain), str(cls.domain_used_count)]]


class TestInvalidExtractorDependsOn(unittest.TestCase):
Expand All @@ -133,7 +133,7 @@ class MemberActivityDataset:
domain: str = field(key=True)
hasShortcut: bool
country: str
DOMAIN_USED_COUNT: int
domain_used_count: int

@meta(owner="[email protected]")
@source(webhook.endpoint("MemberDataset"))
Expand All @@ -150,7 +150,7 @@ class MemberDataset:
@dataset
class MemberActivityDatasetCopy:
domain: str = field(key=True)
DOMAIN_USED_COUNT: int
domain_used_count: int
time: datetime = field(timestamp=True)
url: str
uid: str
Expand All @@ -167,11 +167,11 @@ def copy(cls, ds: Dataset):
@featureset
class DomainFeatures:
domain: str = feature(id=1)
DOMAIN_USED_COUNT: int = feature(id=2)
domain_used_count: int = feature(id=2)

@extractor(depends_on=[MemberActivityDatasetCopy])
@inputs(Query.domain)
@outputs(domain, DOMAIN_USED_COUNT)
@outputs(domain, domain_used_count)
def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
df, found = MemberActivityDatasetCopy.lookup( # type: ignore
ts, domain=domain
Expand All @@ -188,7 +188,7 @@ def get_domain_feature(cls, ts: pd.Series, domain: pd.Series):
)
client.extract_features(
output_feature_list=[DomainFeatures2],
input_feature_list=[Query],
input_feature_list=[Query.member_id],
input_dataframe=pd.DataFrame(
{
"Query.domain": [
Expand Down Expand Up @@ -262,9 +262,8 @@ def test_no_access(self, client):
)
else:
assert (
"Extractor `get_domain_feature` in `DomainFeatures2` "
"failed to run with error: name "
"'MemberActivityDatasetCopy' is not defined. " == str(e.value)
str(e.value)
== """Dataset `MemberActivityDataset` is an input to the pipelines: `['copy']` but is not synced. Please add it to the sync call."""
)

@mock
Expand Down
63 changes: 51 additions & 12 deletions fennel/client_tests/test_movie_tickets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
from fennel import featureset, extractor, feature
from fennel.datasets import dataset, field
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs, outputs
from fennel.lib.schema import inputs, outputs, between
from fennel.sources import source
from fennel.datasets import pipeline, Dataset
from fennel.lib.aggregate import Sum
from fennel.lib.aggregate import Sum, LastK, Distinct
from fennel.lib.window import Window
from fennel.sources import Webhook
from fennel.test_lib import mock, MockClient

from typing import List

from typing import List, Optional

client = MockClient()

Expand All @@ -29,7 +28,7 @@
@dataset
class MovieInfo:
title: str = field(key=True)
actors: List[str] # can be an empty list
actors: List[Optional[str]] # can be an empty list
release: datetime


Expand All @@ -39,15 +38,15 @@ class MovieInfo:
class TicketSale:
ticket_id: str
title: str
price: int
price: between(int, 0, 1000) # type: ignore
at: datetime


@meta(owner="[email protected]")
@dataset
class ActorStats:
name: str = field(key=True)
revenue: int
revenue: between(int, 0, 1000) # type: ignore
at: datetime

@pipeline(version=1, tier="prod")
Expand Down Expand Up @@ -101,6 +100,46 @@ def foo(df):
)


@meta(owner="[email protected]")
@dataset
class ActorStatsList:
name: str = field(key=True)
revenue: List[between(int, 0, 1000)] # type: ignore
revenue_distinct: List[between(int, 0, 1000)] # type: ignore
at: datetime

@pipeline(version=1, tier="prod")
@inputs(MovieInfo, TicketSale)
def pipeline_join(cls, info: Dataset, sale: Dataset):
uniq = sale.groupby("ticket_id").first()
c = (
uniq.join(info, how="inner", on=["title"])
.explode(columns=["actors"])
.rename(columns={"actors": "name"})
)
# name -> Option[str]
schema = c.schema()
schema["name"] = str
c = c.transform(lambda x: x, schema)
return c.groupby("name").aggregate(
[
LastK(
window=Window("forever"),
of="price",
into_field="revenue",
limit=10,
dedup=False,
),
Distinct(
window=Window("forever"),
of="price",
into_field="revenue_distinct",
unordered=True,
),
]
)


@meta(owner="[email protected]")
@featureset
class RequestFeatures:
Expand Down Expand Up @@ -132,7 +171,7 @@ def extract_revenue2(cls, ts: pd.Series, name: pd.Series):
class TestMovieTicketSale(unittest.TestCase):
@mock
def test_movie_ticket_sale(self, client):
datasets = [MovieInfo, TicketSale, ActorStats] # type: ignore
datasets = [MovieInfo, TicketSale, ActorStats, ActorStatsList] # type: ignore
featuresets = [ActorFeatures, RequestFeatures]
client.sync(datasets=datasets, featuresets=featuresets, tier="prod") # type: ignore
client.sleep()
Expand Down Expand Up @@ -166,10 +205,10 @@ def test_movie_ticket_sale(self, client):
two_hours_ago = now - timedelta(hours=2)
columns = ["ticket_id", "title", "price", "at"]
data = [
["1", "Titanic", 50, one_hour_ago],
["2", "Titanic", 100, one_day_ago],
["3", "Jumanji", 25, one_hour_ago],
["4", "The Matrix", 50, two_hours_ago], # no match
["1", "Titanic", "50", one_hour_ago],
["2", "Titanic", "100", one_day_ago],
["3", "Jumanji", "25", one_hour_ago],
["4", "The Matrix", "50", two_hours_ago], # no match
["5", "Great Gatbsy", 49, one_hour_ago],
]
df = pd.DataFrame(data, columns=columns)
Expand Down
8 changes: 5 additions & 3 deletions fennel/client_tests/test_outbrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
@source(
s3.bucket("fennel-demo-data", prefix="outbrain/page_views_filter.csv"),
every="1d",
tier="prod",
)
@source(webhook.endpoint("PageViews"), tier="dev")
@meta(owner="[email protected]")
@dataset
class PageViews:
Expand Down Expand Up @@ -101,16 +103,16 @@ def extract(cls, ts: pd.Series, uuids: pd.Series):
@pytest.mark.integration
@mock
def test_outbrain(client):
fake_PageViews = PageViews.with_source(webhook.endpoint("PageViews"))
client.sync(
datasets=[
fake_PageViews,
PageViews,
PageViewsByUser,
],
featuresets=[
Request,
UserPageViewFeatures,
],
tier="dev",
)
df = pd.read_csv("fennel/client_tests/data/page_views_sample.csv")
# Current time in ms
Expand Down Expand Up @@ -143,7 +145,7 @@ def test_outbrain(client):
Request,
UserPageViewFeatures,
],
input_feature_list=[Request],
input_feature_list=[Request.uuid, Request.document_id],
input_dataframe=input_df,
)
assert feature_df.shape[0] == 347
Expand Down
8 changes: 4 additions & 4 deletions fennel/client_tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import pandas as pd
import pytest
from typing import Dict, List
from typing import Dict, List, Optional

import fennel._vendor.requests as requests
from fennel import sources
Expand All @@ -14,7 +14,7 @@
from fennel.lib.aggregate import Count, Sum
from fennel.lib.includes import includes
from fennel.lib.metadata import meta
from fennel.lib.schema import Embedding
from fennel.lib.schema import Embedding, oneof
from fennel.lib.schema import inputs, outputs
from fennel.lib.window import Window
from fennel.sources import source
Expand Down Expand Up @@ -217,7 +217,7 @@ def top_words_count(cls, ds: Dataset):
class UserActivity:
user_id: int
doc_id: int
action_type: str
action_type: oneof(str, ["view", "edit"]) # type: ignore
view_time: float
timestamp: datetime

Expand Down Expand Up @@ -597,7 +597,7 @@ def test_search_e2e(self, client):
DocumentFeatures,
DocumentContentFeatures,
],
input_feature_list=[Query],
input_feature_list=[Query.doc_id, Query.user_id],
input_dataframe=input_df,
)
assert df.shape == (2, 15)
Expand Down
6 changes: 3 additions & 3 deletions fennel/client_tests/test_social_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class UserInfo:
@meta(owner="[email protected]")
class PostInfo:
title: str
category: str
category: str # type: ignore
post_id: int = field(key=True)
timestamp: datetime

Expand Down Expand Up @@ -168,7 +168,7 @@ def test_social_network(client):
user_data_df = pd.read_csv("fennel/client_tests/data/user_data.csv")
post_data_df = pd.read_csv("fennel/client_tests/data/post_data.csv")
view_data_df = pd.read_csv("fennel/client_tests/data/view_data_sampled.csv")
ts = datetime(2018, 1, 1, 0, 0, 0)
ts = "2018-01-01 00:00:00"
user_data_df["timestamp"] = ts
post_data_df["timestamp"] = ts
view_data_df["time_stamp"] = view_data_df["time_stamp"].apply(
Expand All @@ -194,7 +194,7 @@ def test_social_network(client):

feature_df = client.extract_features(
output_feature_list=[UserFeatures],
input_feature_list=[Request],
input_feature_list=[Request.user_id, Request.category],
input_dataframe=pd.DataFrame(
{
"Request.user_id": [
Expand Down
Loading

0 comments on commit ea15438

Please sign in to comment.