Skip to content

Commit

Permalink
Add support for now in expressions.
Browse files Browse the repository at this point in the history
  • Loading branch information
nonibansal committed Sep 30, 2024
1 parent e7d9900 commit ffdf10e
Show file tree
Hide file tree
Showing 17 changed files with 498 additions and 178 deletions.
1 change: 1 addition & 0 deletions docs/api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ sidebar:
- "api-reference/expressions/fillnull"
- "api-reference/expressions/lit"
- "api-reference/expressions/not"
- "api-reference/expressions/now"
- "api-reference/expressions/typeof"
- "api-reference/expressions/when"

Expand Down
25 changes: 25 additions & 0 deletions docs/examples/api-reference/expressions/basic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

import pytest
from typing import Optional
import pandas as pd
Expand Down Expand Up @@ -156,3 +158,26 @@ def test_lit():
df = pd.DataFrame({"x": pd.Series([1, 2, None], dtype=pd.Int64Dtype())})
assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [2, 3, pd.NA]
# /docsnip


def test_now():
# docsnip expr_now
from fennel.expr import now, col

# docsnip-highlight next-line
expr = now().dt.since(col("birthdate"), "year")

assert (
expr.typeof(schema={"birthdate": Optional[datetime]}) == Optional[int]
)

# can be evaluated with a dataframe
df = pd.DataFrame(
{"birthdate": [datetime(1997, 12, 24), datetime(2001, 1, 21), None]}
)
assert expr.eval(df, schema={"birthdate": Optional[datetime]}).tolist() == [
26,
23,
pd.NA,
]
# /docsnip
18 changes: 18 additions & 0 deletions docs/pages/api-reference/expressions/now.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
title: Now
order: 0
status: published
---
### Now

Function to get current timestamp, similar to what `datetime.now` does in Python.

<pre snippet="api-reference/expressions/basic#expr_now"
message="Using now to get age of a person" status="success">
</pre>

#### Returns
<Expandable type="Any">
Returns an expression object denoting a reference to the column. The type of
the resulting expression is datetime.
</Expandable>
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.33] - 2024-09-30
- Add support for now in expressions.

## [1.5.32] - 2024-09-27
- Add dropnull to FirstK

Expand Down
149 changes: 149 additions & 0 deletions fennel/client_tests/test_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from datetime import datetime, timezone, timedelta
from typing import Optional

import pandas as pd
import pytest

from fennel._vendor import requests
from fennel.connectors import source, Webhook
from fennel.datasets import dataset, field
from fennel.expr import col
from fennel.featuresets import featureset, feature as F
from fennel.testing import mock

webhook = Webhook(name="fennel_webhook")
__owner__ = "[email protected]"


@source(webhook.endpoint("UserInfoDataset"), disorder="14d", cdc="upsert")
@dataset(index=True)
class UserInfoDataset:
user_id: int = field(key=True)
name: str
birthdate: datetime
country: str
ts: datetime = field(timestamp=True)


@pytest.mark.integration
@mock
def test_now(client):
from fennel.expr import now

@featureset
class UserInfoFeatures:
user_id: int
name: Optional[str] = F(UserInfoDataset.name)
birthdate: Optional[datetime] = F(UserInfoDataset.birthdate)
age: Optional[int] = F(now().dt.since(col("birthdate"), unit="year"))
country: Optional[str] = F(UserInfoDataset.country)

# Sync the dataset
response = client.commit(
message="msg",
datasets=[UserInfoDataset],
featuresets=[UserInfoFeatures],
)
assert response.status_code == requests.codes.OK, response.json()

now = datetime.now(timezone.utc)
now_1y = now - timedelta(days=365)
df = pd.DataFrame(
{
"user_id": [1, 2, 3, 4, 5],
"name": ["Ross", "Monica", "Chandler", "Joey", "Rachel"],
"birthdate": [
datetime(1970, 1, 1, tzinfo=timezone.utc),
datetime(1980, 3, 12, tzinfo=timezone.utc),
datetime(1990, 5, 15, tzinfo=timezone.utc),
datetime(1997, 12, 24, tzinfo=timezone.utc),
datetime(2001, 1, 21, tzinfo=timezone.utc),
],
"country": ["India", "USA", "Africa", "UK", "Chile"],
"ts": [now_1y, now_1y, now_1y, now_1y, now_1y],
}
)
response = client.log("fennel_webhook", "UserInfoDataset", df)
assert response.status_code == requests.codes.OK, response.json()

client.sleep()

# Querying UserInfoFeatures
df = client.query(
inputs=[UserInfoFeatures.user_id],
outputs=[
UserInfoFeatures.name,
UserInfoFeatures.age,
UserInfoFeatures.country,
],
input_dataframe=pd.DataFrame(
{"UserInfoFeatures.user_id": [1, 2, 3, 4, 5, 6]}
),
)
assert df.shape == (6, 3)
assert df["UserInfoFeatures.name"].tolist() == [
"Ross",
"Monica",
"Chandler",
"Joey",
"Rachel",
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [54, 44, 34, 26, 23, pd.NA]
assert df["UserInfoFeatures.country"].tolist() == [
"India",
"USA",
"Africa",
"UK",
"Chile",
pd.NA,
]

if not client.is_integration_client():
df = client.query_offline(
inputs=[UserInfoFeatures.user_id],
outputs=[
UserInfoFeatures.name,
UserInfoFeatures.age,
UserInfoFeatures.country,
],
input_dataframe=pd.DataFrame(
{
"UserInfoFeatures.user_id": [1, 2, 3, 4, 5, 6],
"timestamp": [
now_1y,
now_1y,
now_1y,
now_1y,
now_1y,
now_1y,
],
}
),
timestamp_column="timestamp",
)
assert df.shape == (6, 4)
assert df["UserInfoFeatures.name"].tolist() == [
"Ross",
"Monica",
"Chandler",
"Joey",
"Rachel",
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [
53,
43,
33,
25,
22,
pd.NA,
]
assert df["UserInfoFeatures.country"].tolist() == [
"India",
"USA",
"Africa",
"UK",
"Chile",
pd.NA,
]
2 changes: 1 addition & 1 deletion fennel/datasets/test_invalid_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def transform(cls, rating: Dataset):

assert (
str(e2.value)
== """invalid assign - '[Pipeline:transform]->assign node' error in expression for column `movie_suffixed`: Failed to compile expression: invalid expression: both sides of '+' must be numeric types but found String & String, left: col("movie"), right: lit(String("_suffix"))"""
== """invalid assign - '[Pipeline:transform]->assign node' error in expression for column `movie_suffixed`: failed to compile expression: invalid expression: both sides of '+' must be numeric types but found String & String, left: col("movie"), right: lit(String("_suffix"))"""
)


Expand Down
1 change: 1 addition & 0 deletions fennel/expr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
var,
datetime,
from_epoch,
now,
Expr,
InvalidExprException,
)
31 changes: 30 additions & 1 deletion fennel/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Callable, Dict, Type, Optional
import pandas as pd

from fennel._vendor.pydantic import BaseModel # type: ignore
from fennel.dtypes.dtypes import FENNEL_STRUCT, FENNEL_STRUCT_SRC_CODE
from fennel.internal_lib.schema.schema import (
convert_dtype_to_arrow_type_with_nullable,
Expand All @@ -17,6 +18,7 @@
from fennel_data_lib import assign, type_of, matches

import fennel.gen.schema_pb2 as schema_proto
import fennel.gen.expr_pb2 as expr_proto
from fennel.internal_lib.schema import (
get_datatype,
cast_col_to_arrow_dtype,
Expand All @@ -27,6 +29,10 @@
)


class EvalContext(BaseModel):
now_col_name: Optional[str] = None


class InvalidExprException(Exception):
pass

Expand Down Expand Up @@ -360,6 +366,7 @@ def eval(
schema: Dict,
output_dtype: Optional[Type] = None,
parse=True,
context: Optional[EvalContext] = None,
) -> pd.Series:
from fennel.expr.serializer import ExprSerializer

Expand Down Expand Up @@ -410,8 +417,18 @@ def pa_to_pd(pa_data, ret_type, parse=True):
ret_type = output_dtype

serialized_ret_type = get_datatype(ret_type).SerializeToString()
if context is None:
serialized_context = expr_proto.EvalContext().SerializeToString()
else:
serialized_context = expr_proto.EvalContext(
**context.dict()
).SerializeToString()
arrow_col = assign(
proto_bytes, df_pa, proto_schema, serialized_ret_type
proto_bytes,
df_pa,
proto_schema,
serialized_ret_type,
serialized_context,
)
return pa_to_pd(arrow_col, ret_type, parse)

Expand Down Expand Up @@ -1089,6 +1106,14 @@ def __str__(self) -> str:
return f"fillnull({self.expr}, {self.fill})"


class Now(Expr):
def __init__(self):
super(Now, self).__init__()

def __str__(self) -> str:
return "now()"


class MakeStruct(Expr):
def __init__(self, fields: Dict[str, Expr], type: Type):
self.fields = fields
Expand Down Expand Up @@ -1183,3 +1208,7 @@ def datetime(
),
DateTimeNoop(),
)


def now() -> Now:
return Now()
5 changes: 5 additions & 0 deletions fennel/expr/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ def visitDateTimeLiteral(self, obj):
expr.datetime_literal.CopyFrom(datetime_literal)
return expr

def visitNow(self, obj):
expr = proto.Expr()
expr.now.CopyFrom(proto.Now())
return expr


def val_as_json(val: Any) -> str:
if isinstance(val, str):
Expand Down
37 changes: 34 additions & 3 deletions fennel/expr/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import random
import pandas as pd
from dataclasses import dataclass, fields
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Optional, List
from fennel.datasets import dataset

from fennel.dtypes.dtypes import struct
from fennel.expr import col, when, lit
from fennel.expr.expr import TimeUnit, from_epoch, make_struct
from fennel.expr import col, when, lit, now
from fennel.expr.expr import (
TimeUnit,
from_epoch,
make_struct,
)
from fennel.expr.visitor import ExprPrinter, FetchReferences
from fennel.expr.serializer import ExprSerializer
from google.protobuf.json_format import ParseDict # type: ignore
Expand Down Expand Up @@ -1345,3 +1349,30 @@ def test_isnull():

for case in cases:
check_test_case(case)


def test_now():
cases = [
ExprTestCase(
expr=now().dt.since(col("birthdate"), unit="day"),
df=pd.DataFrame(
{
"birthdate": [
datetime.now(timezone.utc),
datetime.now(timezone.utc),
None,
datetime.now(timezone.utc),
],
}
),
schema={"birthdate": Optional[datetime]},
display='SINCE(NOW(), col("birthdate"), unit=TimeUnit.DAY)',
refs={"birthdate"},
eval_result=[0, 0, pd.NA, 0],
expected_dtype=Optional[int],
proto_json=None,
),
]

for case in cases:
check_test_case(case)
Loading

0 comments on commit ffdf10e

Please sign in to comment.