Skip to content

Commit

Permalink
extractors: Add dtype validation for autogenerated extractors (#310)
Browse files Browse the repository at this point in the history
* extractors: Add dtype validation for autogenerated extractors

* Handle optional fields in ds-fs lookups
  • Loading branch information
aditya-nambiar authored Nov 26, 2023
1 parent e076671 commit 7822015
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 18 deletions.
8 changes: 4 additions & 4 deletions fennel/client_tests/test_complex_autogen_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ def test_complex_auto_gen_extractors(client):
rider_df = pd.DataFrame(
{
"rider_id": [1],
"created": [datetime.now()],
"birthdate": [datetime.now() - timedelta(days=365 * 30)],
"created": [datetime.utcnow()],
"birthdate": [datetime.utcnow() - timedelta(days=365 * 30)],
"country_code": ["US"],
}
)
Expand All @@ -298,7 +298,7 @@ def test_complex_auto_gen_extractors(client):
"rider_id": [1],
"vehicle_id": [1],
"is_completed_trip": [1],
"created": [datetime.now()],
"created": [datetime.utcnow()],
}
)
log_response = client.log(
Expand All @@ -311,7 +311,7 @@ def test_complex_auto_gen_extractors(client):
country_license_df = pd.DataFrame(
{
"rider_id": [1],
"created": [datetime.now()],
"created": [datetime.utcnow()],
"country_code": ["US"],
}
)
Expand Down
66 changes: 63 additions & 3 deletions fennel/featuresets/featureset.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@
set_meta_attr,
)
from fennel.lib.includes import TierSelector
from fennel.lib.schema import FENNEL_INPUTS, FENNEL_OUTPUTS
from fennel.lib.schema import (
FENNEL_INPUTS,
FENNEL_OUTPUTS,
validate_value_matches_type,
fennel_get_optional_inner,
)

from fennel.lib import FENNEL_GEN_CODE_MARKER
from fennel.utils import (
parse_annotation_comments,
propogate_fennel_attributes,
Expand Down Expand Up @@ -372,7 +379,6 @@ def extract(
tier=tier,
)
return self

provider_features = []
# If provider is none, then the provider is this featureset. The input features
# are captured once this featureset is initialized
Expand Down Expand Up @@ -480,8 +486,8 @@ def __init__(
self._features = features
self._feature_map = {feature.name: feature for feature in features}
self._id_to_feature = {feature.id: feature for feature in features}
self._extractors = self._get_extractors()
self._validate()
self._extractors = self._get_extractors()
self._add_feature_names_as_attributes()
self._set_extractors_as_attributes()
self._expectation = self._get_expectations()
Expand Down Expand Up @@ -576,6 +582,60 @@ def _get_extractors(self) -> List[Extractor]:
return extractors

def _validate(self):
cls_module = inspect.getmodule(self.__fennel_original_cls__)
if cls_module is not None and hasattr(
cls_module, FENNEL_GEN_CODE_MARKER
):
if getattr(cls_module, FENNEL_GEN_CODE_MARKER):
return

# Validate all auto generated extractors.
for feature in self._features:
if feature.extractor:
extractor = feature.extractor
if extractor.extractor_type == ExtractorType.ALIAS:
# Check that the types match
if feature.dtype != extractor.inputs[0].dtype:
raise TypeError(
f"Feature `{feature.fqn()}` has type `{feature.dtype}` "
f"but the extractor aliasing `{extractor.inputs[0].fqn()}` has input type "
f"`{extractor.inputs[0].dtype}`."
)
if extractor.extractor_type == ExtractorType.LOOKUP:
# Check that the types match
field_dtype = extractor.derived_extractor_info.field.dtype
if extractor.derived_extractor_info.default is not None:
if extractor.derived_extractor_info.field.is_optional():
field_dtype = fennel_get_optional_inner(field_dtype)

if feature.dtype != field_dtype:
raise TypeError(
f"Feature `{feature.fqn()}` has type `{feature.dtype}` "
f"but expected type `{extractor.derived_extractor_info.field.dtype}`."
)
else:
if extractor.derived_extractor_info.field.is_optional():
expected_field_type = field_dtype
else:
expected_field_type = Optional[field_dtype]

if feature.dtype != expected_field_type:
raise TypeError(
f"Feature `{feature.fqn()}` has type `{feature.dtype}` "
f"but expectected type `{expected_field_type}`"
)
# Check that the default value has the right type
if extractor.derived_extractor_info.default is not None:
try:
validate_value_matches_type(
extractor.derived_extractor_info.default,
extractor.derived_extractor_info.field.dtype,
)
except ValueError as e:
raise ValueError(
f"Default value `{extractor.derived_extractor_info.default}` for feature `{feature.fqn()}` has incorrect default value: {e}"
)

# Check that all features have unique ids.
feature_id_set = set()
for feature in self._features:
Expand Down
8 changes: 4 additions & 4 deletions fennel/featuresets/test_derived_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class UserInfo:
default="unspecified",
)
# lookup with meta
age_years: float = (
age_years: int = (
feature(id=3)
.extract(
field=UserInfoDataset.age,
Expand Down Expand Up @@ -101,7 +101,7 @@ class AgeInfo:
# alias a feature that has an explicit extractor
age_group: AgeGroup = feature(id=1).extract(feature=UserInfo.age_group)
# alias a feature that has a derived extractor
age: float = feature(id=2).extract(feature=UserInfo.age_years)
age: int = feature(id=2).extract(feature=UserInfo.age_years)

view = InternalTestClient()
view.add(UserInfoDataset)
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_feature(actual_feature, expected_dict):
{
"id": 3,
"name": "age_years",
"dtype": {"double_type": {}},
"dtype": {"int_type": {}},
"metadata": {
"owner": "[email protected]",
"description": "lookup with meta",
Expand Down Expand Up @@ -211,7 +211,7 @@ def test_feature(actual_feature, expected_dict):
{
"id": 2,
"name": "age",
"dtype": {"double_type": {}},
"dtype": {"int_type": {}},
"metadata": {
"description": "alias a feature that has a derived extractor"
},
Expand Down
2 changes: 1 addition & 1 deletion fennel/featuresets/test_featureset.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class UserInfo:
income: int = feature(id=5).extract( # type: ignore
field=UserInfoDataset.avg_income,
provider=Request,
default="pluto",
default=1,
tier=["~prod"],
)

Expand Down
118 changes: 118 additions & 0 deletions fennel/featuresets/test_invalid_featureset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from typing import Optional, List
import sys

import pandas as pd
import pytest
Expand All @@ -8,11 +9,16 @@
from fennel.datasets import dataset, field
from fennel.featuresets import featureset, extractor, feature
from fennel.lib.schema import inputs, outputs
from fennel.sources import source, Webhook

# noinspection PyUnresolvedReferences
from fennel.test_lib import *

__owner__ = "[email protected]"
webhook = Webhook(name="fennel_webhook")


@source(webhook.endpoint("UserInfoDataset"))
@dataset
class UserInfoDataset:
user_id: int = field(key=True)
Expand All @@ -21,6 +27,7 @@ class UserInfoDataset:
# Users date of birth
dob: str
age: int
ids: List[int]
account_creation_date: datetime
country: Optional[str]
timestamp: datetime = field(timestamp=True)
Expand Down Expand Up @@ -238,3 +245,114 @@ class UserInfo:
str(e.value)
== "Feature `extractors` in `UserInfo` has a reserved name `extractors`."
)


@featureset
class UserInfo2:
user_id: int = feature(id=1)
home_geoid: int = feature(id=2)
age: int = feature(id=3).extract(field=UserInfoDataset.age, default=0) # type: ignore
credit_score: int = feature(id=4)


@mock
def test_tp(client):
client.sync(datasets=[UserInfoDataset], featuresets=[UserInfo2])


@mock
def test_invalid_autogenerated_extractors(client):
with pytest.raises(TypeError) as e:

@featureset
class UserInfo:
user_id: int = feature(id=1)
home_geoid: int = feature(id=2)
age: int = feature(id=3).extract(field=UserInfoDataset.age)
credit_score: int = feature(id=4)

# age should be optional[int]
if sys.version_info < (3, 9):
assert (
str(e.value)
== "Feature `UserInfo.age` has type `<class 'int'>` but expectected type `typing.Union[int, NoneType]`"
)
else:
assert (
str(e.value)
== "Feature `UserInfo.age` has type `<class 'int'>` but expectected type `typing.Optional[int]`"
)

with pytest.raises(TypeError) as e:

@featureset
class UserInfo1:
user_id: int = feature(id=1)
home_geoid: int = feature(id=2)
age: float = feature(id=3).extract(
field=UserInfoDataset.age, default=0
)
credit_score: int = feature(id=4)

# age should be int
assert (
str(e.value)
== "Feature `UserInfo1.age` has type `<class 'float'>` but expected type `<class 'int'>`."
)

with pytest.raises(TypeError) as e:

@featureset
class UserInfo2:
home_geoid: int = feature(id=2)
age: int = feature(id=3)
credit_score: int = feature(id=4)

@featureset
class UserInfo2:
user_id: int = feature(id=1)
home_geoid: float = feature(id=2).extract(
feature=UserInfo2.home_geoid
)
age: int = feature(id=3)
credit_score: int = feature(id=4)

# home_geoid should be int
assert (
str(e.value)
== "Feature `UserInfo2.home_geoid` has type `<class 'float'>` but the extractor aliasing `UserInfo2.home_geoid` has input type `<class 'int'>`."
)

with pytest.raises(ValueError) as e:

@featureset
class UserInfo3:
user_id: int = feature(id=1)
home_geoid: int = feature(id=2)
age: int = feature(id=4).extract(
field=UserInfoDataset.age, default=0.0
)
credit_score: int = feature(id=5)

# default value for age should be 0
assert (
str(e.value)
== "Default value `0.0` for feature `UserInfo3.age` has incorrect default value: Value `0.0` does not match type `int`"
)

with pytest.raises(ValueError) as e:

@featureset
class UserInfo4:
user_id: int = feature(id=1)
home_geoid: int = feature(id=2)
country: str = feature(id=4).extract(
field=UserInfoDataset.country, default=0.0
)
credit_score: int = feature(id=5)

# default value for age should be 0
assert (
str(e.value)
== "Default value `0.0` for feature `UserInfo4.country` has incorrect default value: Value `0.0` does not match type `Optional[str]`"
)
1 change: 1 addition & 0 deletions fennel/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FENNEL_GEN_CODE_MARKER = "__fennel_gen_code__"
1 change: 1 addition & 0 deletions fennel/lib/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@
FENNEL_STRUCT,
FENNEL_STRUCT_DEPENDENCIES_SRC_CODE,
FENNEL_STRUCT_SRC_CODE,
validate_value_matches_type,
)
13 changes: 13 additions & 0 deletions fennel/lib/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,19 @@ def is_hashable(dtype: Any) -> bool:
return False


def validate_value_matches_type(value: Any, dtype: Any) -> None:
# Create a dataframe with a single row and check the schema
# If the schema check fails, then the value does not match the type
df = pd.DataFrame({"0": [value]})
field = schema_proto.Field(name="0", dtype=get_datatype(dtype))
try:
_validate_field_in_df(field, df, "value")
except ValueError:
raise ValueError(
f"Value `{value}` does not match type `{dtype_to_string(dtype)}`"
)


def data_schema_check(
schema: schema_proto.DSSchema, df: pd.DataFrame, dataset_name=""
) -> List[ValueError]:
Expand Down
1 change: 1 addition & 0 deletions fennel/lib/to_proto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
to_duration_proto,
to_sync_request_proto,
)
from fennel.lib.to_proto.source_code import FENNEL_GEN_CODE_MARKER
4 changes: 3 additions & 1 deletion fennel/lib/to_proto/source_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
FENNEL_STRUCT_SRC_CODE,
)
from fennel.utils import fennel_get_source
from fennel.lib import FENNEL_GEN_CODE_MARKER


def _remove_empty_lines(source_code: str) -> str:
Expand Down Expand Up @@ -214,7 +215,8 @@ def get_all_imports() -> str:
"from fennel.datasets.datasets import dataset_lookup",
]

return "\n".join(imports + fennel_imports) + "\n"
gen_code_marker = f"{FENNEL_GEN_CODE_MARKER}=True\n"
return gen_code_marker + "\n".join(imports + fennel_imports) + "\n"


def to_includes_proto(func: Callable) -> pycode_proto.PyCode:
Expand Down
12 changes: 9 additions & 3 deletions fennel/test_lib/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,15 @@ def visitAssign(self, obj):
return None
df = input_ret.df
df[obj.column] = obj.func(df)
df[obj.column] = cast_col_to_dtype(
df[obj.column], get_datatype(obj.output_type)
)
try:
df[obj.column] = cast_col_to_dtype(
df[obj.column], get_datatype(obj.output_type)
)
except Exception as e:
raise Exception(
f"Error in assign node for column `{obj.column}` for pipeline "
f"`{self.cur_pipeline_name}`, {e}"
)
return NodeRet(df, input_ret.timestamp_field, input_ret.key_fields)

def visitDedup(self, obj):
Expand Down
4 changes: 3 additions & 1 deletion fennel/test_lib/mock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ def get_extractor_func(extractor_proto: ProtoExtractor) -> Callable:
sys.modules[fqn] = mod
exec(code, mod.__dict__)
except Exception as e:
raise Exception(f"Error while executing code: {code} : {str(e)}")
raise Exception(
f"Error while executing code for {fqn}:\n {code} \n: {str(e)}"
)
return mod.__dict__[extractor_proto.pycode.entry_point]


Expand Down
Loading

0 comments on commit 7822015

Please sign in to comment.