Skip to content

Commit

Permalink
sync: Tier selector (#293)
Browse files Browse the repository at this point in the history
* sync: Tier selector

* Docs and mock client changes
  • Loading branch information
aditya-nambiar authored Nov 9, 2023
1 parent 6032c95 commit 2a0eddd
Show file tree
Hide file tree
Showing 23 changed files with 768 additions and 157 deletions.
15 changes: 10 additions & 5 deletions docs/examples/examples/ecommerce.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@


# docsnip datasets
@source(postgres.table("orders", cursor="timestamp"), every="1m", lateness="1d")
@source(
postgres.table("orders", cursor="timestamp"),
every="1m",
lateness="1d",
tier="prod",
)
@source(Webhook(name="fennel_webhook").endpoint("Order"), tier="dev")
@meta(owner="[email protected]")
@dataset
class Order:
Expand Down Expand Up @@ -89,15 +95,14 @@ def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
# We can write a unit test to verify that the feature is working as expected
# docsnip test

fake_webhook = Webhook(name="fennel_webhook")


class TestUserLivestreamFeatures(unittest.TestCase):
@mock
def test_feature(self, client):
fake_Order = Order.with_source(fake_webhook.endpoint("Order"))
client.sync(
datasets=[fake_Order, UserSellerOrders], featuresets=[UserSeller]
datasets=[Order, UserSellerOrders],
featuresets=[UserSeller],
tier="dev",
)
columns = ["uid", "product_id", "seller_id", "timestamp"]
now = datetime.utcnow()
Expand Down
61 changes: 34 additions & 27 deletions docs/examples/featuresets/overview.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs, outputs
from fennel.sources import source, Webhook
from fennel.test_lib import mock
from fennel.test_lib import mock, InternalTestClient

webhook = Webhook(name="fennel_webhook")

Expand Down Expand Up @@ -70,34 +70,41 @@ def e2(cls, ts: pd.Series, durations: pd.Series) -> pd.Series:
# /docsnip


def test_multiple_extractors_of_same_feature():
with pytest.raises(Exception):
# docsnip featureset_extractors_of_same_feature
@featureset
class Movies:
duration: int = feature(id=1)
over_2hrs: bool = feature(id=2)
# invalid: both e1 & e2 output `over_3hrs`
over_3hrs: bool = feature(id=3)

@extractor
@inputs(duration)
@outputs(over_2hrs, over_3hrs)
def e1(cls, ts: pd.Series, durations: pd.Series) -> pd.DataFrame:
two_hrs = durations > 2 * 3600
three_hrs = durations > 3 * 3600
return pd.DataFrame(
{"over_2hrs": two_hrs, "over_3hrs": three_hrs}
)

@extractor
@inputs(duration)
@outputs(over_3hrs)
def e2(cls, ts: pd.Series, durations: pd.Series) -> pd.Series:
return pd.Series(name="over_3hrs", data=durations > 3 * 3600)
@mock
def test_multiple_extractors_of_same_feature(client):
# docsnip featureset_extractors_of_same_feature
@meta(owner="[email protected]")
@featureset
class Movies:
duration: int = feature(id=1)
over_2hrs: bool = feature(id=2)
# invalid: both e1 & e2 output `over_3hrs`
over_3hrs: bool = feature(id=3)

@extractor(tier=["default"])
@inputs(duration)
@outputs(over_2hrs, over_3hrs)
def e1(cls, ts: pd.Series, durations: pd.Series) -> pd.DataFrame:
two_hrs = durations > 2 * 3600
three_hrs = durations > 3 * 3600
return pd.DataFrame({"over_2hrs": two_hrs, "over_3hrs": three_hrs})

@extractor(tier=["non-default"])
@inputs(duration)
@outputs(over_3hrs)
def e2(cls, ts: pd.Series, durations: pd.Series) -> pd.Series:
return pd.Series(name="over_3hrs", data=durations > 3 * 3600)

# /docsnip

# /docsnip
view = InternalTestClient()
view.add(Movies)
with pytest.raises(Exception) as e:
view._get_sync_request_proto()
assert (
str(e.value)
== "Feature `over_3hrs` is extracted by multiple extractors including `e2`."
)


# docsnip remote_feature_as_input
Expand Down
17 changes: 11 additions & 6 deletions docs/examples/getting-started/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@
postgres = Postgres.get(name="my_rdbms")
warehouse = Snowflake.get(name="my_warehouse")
kafka = Kafka.get(name="my_kafka")
webhook = Webhook(name="fennel_webhook")


# /docsnip


# docsnip datasets
@dataset
@source(postgres.table("product_info", cursor="last_modified"), every="1m")
@source(
postgres.table("product_info", cursor="last_modified"),
every="1m",
tier="prod",
)
@source(webhook.endpoint("Product"), tier="dev")
@meta(owner="[email protected]", tags=["PII"])
class Product:
product_id: int = field(key=True)
Expand All @@ -51,7 +57,8 @@ def get_expectations(cls):

# ingesting realtime data from Kafka works exactly the same way
@meta(owner="[email protected]")
@source(kafka.topic("orders"), lateness="1h")
@source(kafka.topic("orders"), lateness="1h", tier="prod")
@source(webhook.endpoint("Order"), tier="dev")
@dataset
class Order:
uid: int
Expand Down Expand Up @@ -122,15 +129,13 @@ def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
# docsnip sync
from fennel.test_lib import MockClient

webhook = Webhook(name="fennel_webhook")

# client = Client('<FENNEL SERVER URL>') # uncomment this line to use a real Fennel server
client = MockClient() # comment this line to use a real Fennel server
fake_Product = Product.with_source(webhook.endpoint("Product"))
fake_Order = Order.with_source(webhook.endpoint("Order"))
client.sync(
datasets=[fake_Order, fake_Product, UserSellerOrders],
datasets=[Order, Product, UserSellerOrders],
featuresets=[UserSellerFeatures],
tier="dev",
)

now = datetime.utcnow()
Expand Down
17 changes: 10 additions & 7 deletions docs/examples/overview/concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ class UserDataset:

postgres = Postgres.get(name="postgres")
kafka = Kafka.get(name="kafka")
webhook = Webhook(name="fennel_webhook")


# docsnip external_data_sources
@meta(owner="[email protected]")
@source(postgres.table("user", cursor="update_timestamp"), every="1m")
@source(
postgres.table("user", cursor="update_timestamp"), every="1m", tier="prod"
)
@source(webhook.endpoint("User"), tier="dev")
@dataset
class User:
uid: int = field(key=True)
Expand All @@ -40,7 +44,8 @@ class User:


@meta(owner="[email protected]")
@source(kafka.topic("transactions"))
@source(kafka.topic("transactions"), tier="prod")
@source(webhook.endpoint("Transaction"), tier="dev")
@dataset
class Transaction:
uid: int
Expand Down Expand Up @@ -118,15 +123,13 @@ def get_country(cls, ts: pd.Series, uids: pd.Series):

# /docsnip

webhook = Webhook(name="fennel_webhook")


# Tests to ensure that there are no run time errors in the snippets
@mock
def test_overview(client):
fake_User = User.with_source(webhook.endpoint("User"))
fake_Transaction = Transaction.with_source(webhook.endpoint("Transaction"))
client.sync(datasets=[fake_User, fake_Transaction, UserTransactionsAbroad])
client.sync(
datasets=[User, Transaction, UserTransactionsAbroad], tier="dev"
)
now = datetime.now()
dob = now - timedelta(days=365 * 30)
data = [
Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [0.18.11] - 2023-11-08
- Add support for tier selectors.

## [0.18.10] - 2023-10-30
- Add support for `since` in S3 source.

Expand Down
9 changes: 6 additions & 3 deletions fennel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def sync(
self,
datasets: Optional[List[Dataset]] = None,
featuresets: Optional[List[Featureset]] = None,
tier: Optional[str] = None,
):
"""
Sync the client with the server. This will register any datasets or
Expand Down Expand Up @@ -92,7 +93,7 @@ def sync(
f" of type `{type(featureset)}` instead."
)
self.add(featureset)
sync_request = self._get_sync_request_proto()
sync_request = self._get_sync_request_proto(tier)
response = self._post_bytes(
"{}/sync".format(V1_API),
sync_request.SerializeToString(),
Expand Down Expand Up @@ -634,8 +635,10 @@ def _get_session():
)
return http

def _get_sync_request_proto(self):
return to_sync_request_proto(self.to_register_objects)
def _get_sync_request_proto(self, tier: Optional[str] = None):
if tier is not None and not isinstance(tier, str):
raise ValueError(f"Expected tier to be a string, got {tier}")
return to_sync_request_proto(self.to_register_objects, tier)

def _get(self, path: str):
headers = None
Expand Down
20 changes: 12 additions & 8 deletions fennel/client_tests/test_movie_tickets.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ActorStats:
revenue: int
at: datetime

@pipeline(version=1)
@pipeline(version=1, tier="prod")
@inputs(MovieInfo, TicketSale)
def pipeline_join(cls, info: Dataset, sale: Dataset):
uniq = sale.groupby("ticket_id").first()
Expand All @@ -73,7 +73,7 @@ def pipeline_join(cls, info: Dataset, sale: Dataset):
]
)

@pipeline(version=2, active=True)
@pipeline(version=2, active=True, tier="prod")
@inputs(MovieInfo, TicketSale)
def pipeline_join_v2(cls, info: Dataset, sale: Dataset):
def foo(df):
Expand Down Expand Up @@ -112,25 +112,29 @@ class RequestFeatures:
class ActorFeatures:
revenue: int = feature(id=1)

@extractor(depends_on=[ActorStats])
@extractor(depends_on=[ActorStats], tier="prod")
@inputs(RequestFeatures.name)
@outputs(revenue)
def extract_revenue(cls, ts: pd.Series, name: pd.Series):
import sys

print(name, file=sys.stderr)
print("##", name.name, file=sys.stderr)
df, _ = ActorStats.lookup(ts, name=name) # type: ignore
df = df.fillna(0)
return df["revenue"]

@extractor(depends_on=[ActorStats], tier="staging")
@inputs(RequestFeatures.name)
@outputs(revenue)
def extract_revenue2(cls, ts: pd.Series, name: pd.Series):
df, _ = ActorStats.lookup(ts, name=name) # type: ignore
df = df.fillna(0)
return df["revenue"] * 2


class TestMovieTicketSale(unittest.TestCase):
@mock
def test_movie_ticket_sale(self, client):
datasets = [MovieInfo, TicketSale, ActorStats] # type: ignore
featuresets = [ActorFeatures, RequestFeatures]
client.sync(datasets=datasets, featuresets=featuresets) # type: ignore
client.sync(datasets=datasets, featuresets=featuresets, tier="prod") # type: ignore
client.sleep()
data = [
[
Expand Down
Loading

0 comments on commit 2a0eddd

Please sign in to comment.