diff --git a/docs/examples/api-reference/expressions/basic.py b/docs/examples/api-reference/expressions/basic.py index 3a27c2e8..d6c37185 100644 --- a/docs/examples/api-reference/expressions/basic.py +++ b/docs/examples/api-reference/expressions/basic.py @@ -176,7 +176,7 @@ def test_now(): {"birthdate": [datetime(1997, 12, 24), datetime(2001, 1, 21), None]} ) assert expr.eval(df, schema={"birthdate": Optional[datetime]}).tolist() == [ - 26, + 27, 23, pd.NA, ] diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 66cb5034..0a631721 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.63] - 2024-12-25 +- Support list lookup based autogenerated extractors. + ## [1.5.62] - 2024-12-16 - Fix casting for empty dataframes. diff --git a/fennel/client_tests/test_complex_autogen_extractor.py b/fennel/client_tests/test_complex_autogen_extractor.py index b869b238..36f63599 100644 --- a/fennel/client_tests/test_complex_autogen_extractor.py +++ b/fennel/client_tests/test_complex_autogen_extractor.py @@ -346,7 +346,10 @@ def test_complex_auto_gen_extractors(client): assert extracted_df["RiderFeatures.dl_state"].to_list() == ["US", "Unknown"] assert extracted_df["RiderFeatures.is_us_dl"].to_list() == [True, False] - age_years = datetime.now(timezone.utc).year - 2000 + age_years = ( + datetime.now() - datetime(2000, 1, 1, 0, 0, 0) + ).total_seconds() / (60 * 60 * 24 * 365) + age_years = int(age_years + 0.001) assert extracted_df["RiderFeatures.age_years"].to_list() == [30, age_years] assert extracted_df["RiderFeatures.dl_state_population"].to_list() == [ 328200000, diff --git a/fennel/client_tests/test_expr.py b/fennel/client_tests/test_expr.py index fd01f545..32f84f03 100644 --- a/fennel/client_tests/test_expr.py +++ b/fennel/client_tests/test_expr.py @@ -89,7 +89,7 @@ class UserInfoFeatures: "Rachel", pd.NA, ] - assert df["UserInfoFeatures.age"].tolist() == [54, 44, 34, 26, 23, pd.NA] + assert df["UserInfoFeatures.age"].tolist() == [55, 44, 34, 27, 23, pd.NA] assert df["UserInfoFeatures.country"].tolist() == [ "India", "USA", @@ -132,10 +132,10 @@ class UserInfoFeatures: pd.NA, ] assert df["UserInfoFeatures.age"].tolist() == [ - 53, + 54, 43, 33, - 25, + 26, 22, pd.NA, ] diff --git a/fennel/client_tests/test_featureset.py b/fennel/client_tests/test_featureset.py index 8d52dffd..1e043629 100644 --- a/fennel/client_tests/test_featureset.py +++ b/fennel/client_tests/test_featureset.py @@ -37,6 +37,7 @@ class UserInfoDataset: user_id: int = field(key=True) name: str age: Optional[int] + hobbies: List[Optional[str]] timestamp: datetime = field(timestamp=True) country: str @@ -206,10 +207,10 @@ def test_simple_extractor(self, client): ) now = datetime.now(timezone.utc) data = [ - [18232, "John", 32, "USA", now], - [18234, "Monica", 24, "Chile", now], + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]], ] - columns = ["user_id", "name", "age", "country", "timestamp"] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] input_df = pd.DataFrame(data, columns=columns) response = client.log("fennel_webhook", "UserInfoDataset", input_df) assert response.status_code == requests.codes.OK, response.json() @@ -254,10 +255,10 @@ def test_e2e_query(self, client): ) now = datetime.now(timezone.utc) data = [ - [18232, "John", 32, "USA", now], - [18234, "Monica", 24, "Chile", now], + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]], ] - columns = ["user_id", "name", "age", "country", "timestamp"] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] input_df = pd.DataFrame(data, columns=columns) response = client.log("fennel_webhook", "UserInfoDataset", input_df) assert response.status_code == requests.codes.OK, response.json() @@ -393,11 +394,19 @@ def test_derived_extractor(self, client): ) now = datetime.now(timezone.utc) data = [ - [18232, "John", 32, "USA", now], - [18234, "Monica", 24, "Chile", now], + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]], ] df = pd.DataFrame( - data, columns=["user_id", "name", "age", "country", "timestamp"] + data, + columns=[ + "user_id", + "name", + "age", + "country", + "timestamp", + "hobbies", + ], ) response = client.log("fennel_webhook", "UserInfoDataset", df) assert response.status_code == requests.codes.OK, response.json() @@ -485,10 +494,10 @@ def test_dag_resolution2(self, client): ) now = datetime.now(timezone.utc) data = [ - [18232, "John", 32, "USA", now], - [18234, "Monica", 24, "Chile", now], + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]], ] - columns = ["user_id", "name", "age", "country", "timestamp"] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] df = pd.DataFrame(data, columns=columns) response = client.log("fennel_webhook", "UserInfoDataset", df) assert response.status_code == requests.codes.OK, response.json() @@ -586,10 +595,10 @@ def test_dag_resolution_complex(self, client): client.sleep() now = datetime.now(timezone.utc) data = [ - [18232, "John", 32, "USA", now], - [18234, "Monica", 24, "Chile", now], + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]], ] - columns = ["user_id", "name", "age", "country", "timestamp"] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] df = pd.DataFrame(data, columns=columns) response = client.log("fennel_webhook", "UserInfoDataset", df) assert response.status_code == requests.codes.OK, response.json() @@ -1068,11 +1077,11 @@ def test_chained_lookups(client): now = datetime.now(timezone.utc) data = [ - [18232, "John", 32, "USA", now], - [18234, "Monica", 24, "Chile", now], - [18235, "Rahul", 28, "India", now], + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]], + [18235, "Rahul", 28, "India", now, ["Reading", "Football"]], ] - columns = ["user_id", "name", "age", "country", "timestamp"] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] input_df = pd.DataFrame(data, columns=columns) client.log("fennel_webhook", "UserInfoDataset", input_df) @@ -1263,3 +1272,226 @@ def time_feature_extractor(cls, ts: pd.Series) -> pd.DataFrame: featuresets=[QueryTimeFeatures], message="first_commit", ) + + +@pytest.mark.integration +@mock +def test_looks_like_list_lookup(client): + @featureset + class UserInfoListLookup: + user_id: int + hobbies: List[Optional[str]] = F( + UserInfoDataset.hobbies, default=["Unknown"] + ) + + client.commit( + datasets=[UserInfoDataset], + featuresets=[UserInfoListLookup], + message="Initial commit", + ) + + now = datetime.now(timezone.utc) + data = [ + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Reading", "Writing"]], + [18235, "Rahul", 28, "India", now, ["Reading", "Writing"]], + ] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] + input_df = pd.DataFrame(data, columns=columns) + client.log("fennel_webhook", "UserInfoDataset", input_df) + + client.sleep() + + feature_df = client.query( + outputs=[UserInfoListLookup], + inputs=[UserInfoListLookup.user_id], + input_dataframe=pd.DataFrame( + {"UserInfoListLookup.user_id": [18234, 18235]} + ), + ) + + assert feature_df.shape == (2, 2) + assert feature_df["UserInfoListLookup.hobbies"].tolist() == [ + ["Reading", "Writing"], + ["Reading", "Writing"], + ] + + +@pytest.mark.integration +@mock +def test_list_lookup(client): + @featureset + class UserInfoListLookup: + user_id: List[int] + country: List[Optional[str]] = F(UserInfoDataset.country) + name: List[str] = F(UserInfoDataset.name, default="Unknown") + + client.commit( + datasets=[UserInfoDataset], + featuresets=[UserInfoListLookup], + message="Initial commit", + ) + + now = datetime.now(timezone.utc) + data = [ + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Reading", "Writing"]], + [18235, "Rahul", 28, "India", now, ["Reading", "Writing"]], + ] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] + input_df = pd.DataFrame(data, columns=columns) + client.log("fennel_webhook", "UserInfoDataset", input_df) + + client.sleep() + + feature_df = client.query( + outputs=[UserInfoListLookup], + inputs=[UserInfoListLookup.user_id], + input_dataframe=pd.DataFrame( + { + "UserInfoListLookup.user_id": [ + [18232, 18234, 18235, 123], + [18232, 18234], + [18234, 18235, 1], + ] + } + ), + ) + + assert feature_df.shape == (3, 3) + assert feature_df["UserInfoListLookup.country"].tolist() == [ + ["USA", "Chile", "India", pd.NA], + ["USA", "Chile"], + ["Chile", "India", pd.NA], + ] + assert feature_df["UserInfoListLookup.name"].tolist() == [ + ["John", "Monica", "Rahul", "Unknown"], + ["John", "Monica"], + ["Monica", "Rahul", "Unknown"], + ] + + +@pytest.mark.integration +@mock +def test_chainedlist_lookup_with_default(client): + """_summary_ + In this test, we look up a list of categories from a user id, and then look up a list of values from the categories. + """ + + @source(webhook.endpoint("HobbyDataset"), disorder="14d", cdc="upsert") + @dataset(index=True) + class HobbyDataset: + hobby: str = field(key=True) + category: str + ts: datetime + + @featureset + class UserHobbies: + user_id: int + hobby: List[Optional[str]] = F(UserInfoDataset.hobbies, default=[]) + categories: List[Optional[str]] = F( + HobbyDataset.category, + ) + + client.commit( + datasets=[HobbyDataset, UserInfoDataset], + featuresets=[UserHobbies], + message="Initial commit", + ) + + now = datetime.now(timezone.utc) + data = [ + [18232, "John", 32, "USA", now, ["Reading", "Writing"]], + [18234, "Monica", 24, "Chile", now, ["Cooking", "Music"]], + [18235, "Rahul", 28, "India", now, ["Reading", "Football"]], + ] + columns = ["user_id", "name", "age", "country", "timestamp", "hobbies"] + input_df = pd.DataFrame(data, columns=columns) + client.log("fennel_webhook", "UserInfoDataset", input_df) + + client.sleep() + + data = [ + ["Reading", "Productivity", now], + ["Football", "Sports", now], + ["Cooking", "Food", now], + ] + columns = ["hobby", "category", "ts"] + input_df = pd.DataFrame(data, columns=columns) + client.log("fennel_webhook", "HobbyDataset", input_df) + + client.sleep() + + feature_df = client.query( + outputs=[UserHobbies], + inputs=[UserHobbies.user_id], + input_dataframe=pd.DataFrame( + { + "UserHobbies.user_id": [18232, 18234, 18235], + } + ), + ) + + assert feature_df.shape == (3, 3) + assert feature_df["UserHobbies.hobby"].tolist() == [ + ["Reading", "Writing"], + ["Cooking", "Music"], + ["Reading", "Football"], + ] + assert feature_df["UserHobbies.categories"].tolist() == [ + ["Productivity", pd.NA], + ["Food", pd.NA], + ["Productivity", "Sports"], + ] + + +@pytest.mark.integration +@mock +def test_multiple_keyed_list_lookup(client): + @source(webhook.endpoint("KeyedDataset"), disorder="14d", cdc="upsert") + @dataset(index=True) + class KeyedDataset: + key1: int = field(key=True) + key2: int = field(key=True) + value: str + ts: datetime = field(timestamp=True) + + @featureset + class KeyedFeatureset: + key1: List[int] + key2: List[int] + value: List[Optional[str]] = F(KeyedDataset.value) + + client.commit( + datasets=[KeyedDataset], featuresets=[KeyedFeatureset], message="msg" + ) + + # Log data + now = datetime.now(timezone.utc) + data = [ + [1, 2, "a", now], + [3, 4, "b", now], + [5, 6, "c", now], + [7, 8, "d", now], + ] + columns = ["key1", "key2", "value", "ts"] + input_df = pd.DataFrame(data, columns=columns) + client.log("fennel_webhook", "KeyedDataset", input_df) + + client.sleep() + + feature_df = client.query( + inputs=[KeyedFeatureset.key1, KeyedFeatureset.key2], + outputs=[KeyedFeatureset.value], + input_dataframe=pd.DataFrame( + { + "KeyedFeatureset.key1": [[1, 2, 3], [7, 5, 6]], + "KeyedFeatureset.key2": [[2, 8, 4], [8, 11, 12]], + } + ), + ) + assert feature_df.shape == (2, 1) + assert feature_df["KeyedFeatureset.value"].tolist() == [ + ["a", pd.NA, "b"], + ["d", pd.NA, pd.NA], + ] diff --git a/fennel/featuresets/featureset.py b/fennel/featuresets/featureset.py index 7a5742d5..e2ce94a7 100644 --- a/fennel/featuresets/featureset.py +++ b/fennel/featuresets/featureset.py @@ -13,6 +13,7 @@ Optional, List, Union, + get_origin, ) import pandas as pd @@ -697,6 +698,14 @@ def _get_generated_extractors( feature.ref_env, ) extractor.set_inputs_from_featureset(self, feature) + # If feature is a list, then we need to use the list lookup extractor + if get_origin(feature.dtype) == list: + # Check that reference feature is also a list + inputs = extractor.inputs + # Check that all inputs are of type list + if all(get_origin(i.dtype) == list for i in inputs): + extractor.extractor_type = ExtractorType.LIST_LOOKUP + extractor.featureset = self._name extractor.outputs = [feature] feature_meta = get_meta(feature) @@ -803,6 +812,7 @@ def _validate(self): # Check that the types match field = extractor.derived_extractor_info.field default = extractor.derived_extractor_info.default + if default is not None: if field.is_optional(): field_dtype = fennel_get_optional_inner(field.dtype) @@ -839,6 +849,45 @@ def _validate(self): raise ValueError( f"Default value `{default}` for feature `{feature.fqn()}` has incorrect default value: {e}" ) + if extractor.extractor_type == ExtractorType.LIST_LOOKUP: + feature = extractor.outputs[0] + # Check that the types match + field = extractor.derived_extractor_info.field + default = extractor.derived_extractor_info.default + if default is not None: + if field.is_optional(): + field_dtype = fennel_get_optional_inner(field.dtype) + else: + field_dtype = field.dtype + if feature.dtype != List[field_dtype]: + raise TypeError( + f"Feature `{feature.fqn()}` has type `{feature.dtype}` " + f"but expected type `List[{field_dtype}]`." + ) + else: + if field.is_optional(): + # keeping with optional because default is not defined + expected_field_type = field.dtype + else: + # Adding optional because default is not defined + expected_field_type = Optional[field.dtype] + if feature.dtype != List[expected_field_type]: + raise TypeError( + f"Feature `{feature.fqn()}` has type `{feature.dtype}` " + f"but expectected type `List[{expected_field_type}]`" + ) + + # Check that the default value has the right type + if default is not None: + try: + validate_val_with_dtype( + field.dtype, + default, + ) + except ValueError as e: + raise ValueError( + f"Default value `{default}` for feature `{feature.fqn()}` has incorrect default value: {e}" + ) # Check that all features have unique names. feature_name_set = set() @@ -969,7 +1018,10 @@ def set_inputs_from_featureset( ): if self.inputs and len(self.inputs) > 0: return - if self.extractor_type != ExtractorType.LOOKUP: + if not ( + self.extractor_type == ExtractorType.LOOKUP + or self.extractor_type == ExtractorType.LIST_LOOKUP + ): return if not self.derived_extractor_info or not hasattr( self.derived_extractor_info, "field" diff --git a/fennel/featuresets/test_invalid_featureset.py b/fennel/featuresets/test_invalid_featureset.py index a0bf9163..1da5c29c 100644 --- a/fennel/featuresets/test_invalid_featureset.py +++ b/fennel/featuresets/test_invalid_featureset.py @@ -29,6 +29,7 @@ class UserInfoDataset: dob: str age: int ids: List[int] + hobbies: List[Optional[str]] account_creation_date: datetime country: Optional[str] timestamp: datetime = field(timestamp=True) @@ -456,3 +457,42 @@ class UserInfo6: str(e.value) == "extractor for 'age_squared' refers to feature col('UserInfoDataset.age') not present in 'UserInfo6'; 'col' can only reference features from the same featureset" ) + + +@mock +def test_invalid_list_lookup(client): + @source(webhook.endpoint("HobbyDataset"), disorder="14d", cdc="upsert") + @dataset(index=True) + class HobbyDataset: + hobby: str = field(key=True) + category: str + ts: datetime + + with pytest.raises(TypeError) as e: + + @featureset + class UserHobbies: + user_id: int + hobby: List[Optional[str]] = F(UserInfoDataset.hobbies, default=[]) + # This is incorrect as the type is List[str] not List[Optional[str]] + categories: List[Optional[str]] = F( + HobbyDataset.category, default="Unk" + ) + + assert ( + str(e.value) + == "Feature `UserHobbies.categories` has type `typing.List[typing.Optional[str]]` but expected type `List[]`." + ) + + with pytest.raises(TypeError) as e: + + @featureset + class UserHobbies2: + hobby: str + # This is incorrect as hobby should be List[str] + categories: List[str] = F(HobbyDataset.category, default="Unk") + + assert ( + str(e.value) + == "Feature `UserHobbies2.categories` has type `typing.List[str]` but expected type ``." + ) diff --git a/fennel/gen/featureset_pb2.py b/fennel/gen/featureset_pb2.py index 6f4bdd9d..77bbe20c 100644 --- a/fennel/gen/featureset_pb2.py +++ b/fennel/gen/featureset_pb2.py @@ -18,7 +18,7 @@ import fennel.gen.expression_pb2 as expression__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x66\x65\x61tureset.proto\x12\x17\x66\x65nnel.proto.featureset\x1a\x0emetadata.proto\x1a\x0cschema.proto\x1a\x0cpycode.proto\x1a\x10\x65xpression.proto\"\x8c\x01\n\x0e\x43oreFeatureset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x0c\n\x04tags\x18\x04 \x03(\t\"\xa0\x01\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x31\n\x08metadata\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x04 \x01(\t\x12\x0c\n\x04tags\x18\x05 \x03(\t\"S\n\x0f\x46ieldLookupInfo\x12)\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x1a.fennel.proto.schema.Field\x12\x15\n\rdefault_value\x18\x03 \x01(\t\"\xcd\x03\n\tExtractor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tasets\x18\x02 \x03(\t\x12.\n\x06inputs\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.featureset.Input\x12\x10\n\x08\x66\x65\x61tures\x18\x04 \x03(\t\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x0f\n\x07version\x18\x06 \x01(\x05\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x08 \x01(\t\x12>\n\x0e\x65xtractor_type\x18\t \x01(\x0e\x32&.fennel.proto.featureset.ExtractorType\x12>\n\nfield_info\x18\n \x01(\x0b\x32(.fennel.proto.featureset.FieldLookupInfoH\x00\x12+\n\x04\x65xpr\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.expression.Expr\x12\x0c\n\x04tags\x18\x0b \x03(\tB\x18\n\x16\x64\x65rived_extractor_info\"\xa1\x01\n\x05Input\x12\x37\n\x07\x66\x65\x61ture\x18\x01 \x01(\x0b\x32&.fennel.proto.featureset.Input.Feature\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x1a\x31\n\x07\x46\x65\x61ture\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xad\x01\n\x05Model\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x30\n\x06inputs\x18\x02 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x07outputs\x18\x03 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata*=\n\rExtractorType\x12\x0b\n\x07PY_FUNC\x10\x00\x12\n\n\x06LOOKUP\x10\x01\x12\t\n\x05\x41LIAS\x10\x02\x12\x08\n\x04\x45XPR\x10\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x66\x65\x61tureset.proto\x12\x17\x66\x65nnel.proto.featureset\x1a\x0emetadata.proto\x1a\x0cschema.proto\x1a\x0cpycode.proto\x1a\x10\x65xpression.proto\"\x8c\x01\n\x0e\x43oreFeatureset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x0c\n\x04tags\x18\x04 \x03(\t\"\xa0\x01\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x31\n\x08metadata\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x04 \x01(\t\x12\x0c\n\x04tags\x18\x05 \x03(\t\"S\n\x0f\x46ieldLookupInfo\x12)\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x1a.fennel.proto.schema.Field\x12\x15\n\rdefault_value\x18\x03 \x01(\t\"\xcd\x03\n\tExtractor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tasets\x18\x02 \x03(\t\x12.\n\x06inputs\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.featureset.Input\x12\x10\n\x08\x66\x65\x61tures\x18\x04 \x03(\t\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x0f\n\x07version\x18\x06 \x01(\x05\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x08 \x01(\t\x12>\n\x0e\x65xtractor_type\x18\t \x01(\x0e\x32&.fennel.proto.featureset.ExtractorType\x12>\n\nfield_info\x18\n \x01(\x0b\x32(.fennel.proto.featureset.FieldLookupInfoH\x00\x12+\n\x04\x65xpr\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.expression.Expr\x12\x0c\n\x04tags\x18\x0b \x03(\tB\x18\n\x16\x64\x65rived_extractor_info\"\xa1\x01\n\x05Input\x12\x37\n\x07\x66\x65\x61ture\x18\x01 \x01(\x0b\x32&.fennel.proto.featureset.Input.Feature\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x1a\x31\n\x07\x46\x65\x61ture\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xad\x01\n\x05Model\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x30\n\x06inputs\x18\x02 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x07outputs\x18\x03 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata*N\n\rExtractorType\x12\x0b\n\x07PY_FUNC\x10\x00\x12\n\n\x06LOOKUP\x10\x01\x12\t\n\x05\x41LIAS\x10\x02\x12\x08\n\x04\x45XPR\x10\x03\x12\x0f\n\x0bLIST_LOOKUP\x10\x04\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -26,7 +26,7 @@ if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None _globals['_EXTRACTORTYPE']._serialized_start=1302 - _globals['_EXTRACTORTYPE']._serialized_end=1363 + _globals['_EXTRACTORTYPE']._serialized_end=1380 _globals['_COREFEATURESET']._serialized_start=108 _globals['_COREFEATURESET']._serialized_end=248 _globals['_FEATURE']._serialized_start=251 diff --git a/fennel/gen/featureset_pb2.pyi b/fennel/gen/featureset_pb2.pyi index dcf36b39..68e82f9a 100644 --- a/fennel/gen/featureset_pb2.pyi +++ b/fennel/gen/featureset_pb2.pyi @@ -33,6 +33,7 @@ class _ExtractorTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._ LOOKUP: _ExtractorType.ValueType # 1 ALIAS: _ExtractorType.ValueType # 2 EXPR: _ExtractorType.ValueType # 3 + LIST_LOOKUP: _ExtractorType.ValueType # 4 class ExtractorType(_ExtractorType, metaclass=_ExtractorTypeEnumTypeWrapper): ... @@ -41,6 +42,7 @@ PY_FUNC: ExtractorType.ValueType # 0 LOOKUP: ExtractorType.ValueType # 1 ALIAS: ExtractorType.ValueType # 2 EXPR: ExtractorType.ValueType # 3 +LIST_LOOKUP: ExtractorType.ValueType # 4 global___ExtractorType = ExtractorType @typing_extensions.final @@ -158,7 +160,7 @@ class Extractor(google.protobuf.message.Message): @property def field_info(self) -> global___FieldLookupInfo: """pycode excluded from the oneof for better bwd compatibility in Rust - required iff extractor_type == LOOKUP + required iff extractor_type == LOOKUP or LIST_LOOKUP """ @property def expr(self) -> expression_pb2.Expr: diff --git a/fennel/internal_lib/to_proto/to_proto.py b/fennel/internal_lib/to_proto/to_proto.py index c9b68e6c..d1b34539 100644 --- a/fennel/internal_lib/to_proto/to_proto.py +++ b/fennel/internal_lib/to_proto/to_proto.py @@ -723,7 +723,10 @@ def _extractor_to_proto( ) extractor_field_info = None - if extractor.extractor_type == ExtractorType.LOOKUP: + if ( + extractor.extractor_type == ExtractorType.LOOKUP + or extractor.extractor_type == ExtractorType.LIST_LOOKUP + ): if not extractor.derived_extractor_info: raise TypeError( f"Lookup extractor `{extractor.name}` must have DatasetLookupInfo" diff --git a/fennel/testing/mock_client.py b/fennel/testing/mock_client.py index 3940bf23..e7bc94dd 100644 --- a/fennel/testing/mock_client.py +++ b/fennel/testing/mock_client.py @@ -522,9 +522,15 @@ def _transform_input_dataframe_from_inputs( if isinstance(feature, str): continue col_type = get_datatype(feature.dtype) # type: ignore - input_dataframe[input_col] = cast_col_to_arrow_dtype( - input_dataframe[input_col], col_type - ) + try: + input_dataframe[input_col] = cast_col_to_arrow_dtype( + input_dataframe[input_col], col_type + ) + except Exception as e: + raise Exception( + f"Error casting input dataframe column `{input_col}` for feature `{feature.fqn_}`, " + f"dtype: `{feature.dtype}`: {e}" + ) return input_dataframe def _get_secret(self, secret_name: str) -> Optional[str]: diff --git a/fennel/testing/query_engine.py b/fennel/testing/query_engine.py index 2cf6ce74..43dba78b 100644 --- a/fennel/testing/query_engine.py +++ b/fennel/testing/query_engine.py @@ -171,6 +171,17 @@ def run_extractors( self._check_schema_exceptions(output, dsschema, extractor.name) continue + if extractor.extractor_type == ProtoExtractorType.LIST_LOOKUP: + output = self._compute_list_lookup_extractor( + data_engine, + extractor, + timestamps.copy(), + intermediate_data, + use_as_of, + ) + self._check_schema_exceptions(output, dsschema, extractor.name) + continue + if extractor.extractor_type == ProtoExtractorType.EXPR: output = self._compute_expr_extractor( extractor, timestamps.copy(), intermediate_data @@ -386,6 +397,114 @@ def _compute_expr_extractor( intermediate_data[extractor.fqn_output_features()[0]] = res return res + def _get_default_value(self, extractor: Extractor): + default_value = extractor.derived_extractor_info.default # type: ignore + proto_dtype = get_datatype(extractor.derived_extractor_info.field.dtype) # type: ignore + # Custom operations on default value for datetime and decimal type + if proto_dtype.HasField("decimal_type"): + if pd.notna(default_value) and not isinstance( + default_value, Decimal + ): + default_value = Decimal( + "%0.{}f".format(proto_dtype.decimal_type.scale) + % float(default_value) # type: ignore + ) + if isinstance(default_value, datetime): + default_value = parse_datetime(default_value) + return default_value + + def _compute_list_lookup_extractor( + self, + data_engine: DataEngine, + extractor: Extractor, + timestamps: pd.Series, + intermediate_data: Dict[str, pd.Series], + use_as_of: bool, + ) -> pd.Series: + if len(extractor.outputs) != 1: + raise ValueError( + f"Lookup extractor {extractor.name} must have exactly one output feature, found {len(extractor.outputs)}" + ) + if len(extractor.depends_on) != 1: + raise ValueError( + f"Lookup extractor {extractor.name} must have exactly one dependent dataset, found {len(extractor.depends_on)}" + ) + + input_features = { + k.name: intermediate_data[k.fqn()] for k in extractor.inputs # type: ignore + } + allowed_datasets = self._get_allowed_datasets(extractor) + arg_list_lengths = [] + # Assert all input features are of type list + for idx, input_feature in enumerate(extractor.inputs): + for row_idx, row in enumerate(input_features[input_feature.name]): + if not isinstance(row, list): + raise ValueError( + f"Input feature `{input_feature.name}` is not of type list," + f"found `{row}` of type `{type(row)}`" + ) + if idx == 0: + arg_list_lengths.append(len(row)) + else: + if len(row) != arg_list_lengths[row_idx]: + raise ValueError( + f"All input features must have the same length, found {len(row)} for feature {input_feature.name} and {len(arg_list_lengths[0])} for feature {extractor.inputs[0].name}" # type: ignore + ) + + repeated_timestamps = [] + # Repeat each timestamp for the number of times in arg_list_lengths + for idx, timestamp in enumerate(timestamps): + repeated_timestamps.extend([timestamp] * arg_list_lengths[idx]) + + repeated_timestamps = pd.Series(repeated_timestamps) + # Flatten the input features + flattened_input_features = {} + for input_feature in extractor.inputs: + flattened_input_features[input_feature.name] = input_features[ + input_feature.name + ].explode() + + fennel.datasets.datasets.dataset_lookup = ( + data_engine.get_dataset_lookup_impl( + extractor.name, allowed_datasets, use_as_of + ) + ) + results, _ = extractor.depends_on[0].lookup( + repeated_timestamps, **flattened_input_features + ) + if ( + not extractor.derived_extractor_info + or not extractor.derived_extractor_info.field + or not extractor.derived_extractor_info.field.name + ): + raise TypeError( + f"Field for lookup extractor {extractor.name} must have a named field" + ) + results = results[extractor.derived_extractor_info.field.name] + default_value = self._get_default_value(extractor) + if default_value is not None: + if results.dtype != object: + results = results.fillna(default_value) + else: + # fillna doesn't work for list type or dict type :cols + for row in results.loc[results.isnull()].index: + results[row] = default_value + + fennel.datasets.datasets.dataset_lookup = ( + data_engine.get_dataset_lookup_impl(None, None, False) + ) + # Pack the results into a list of lists, using the arg_list_lengths + packed_results = [] + start_idx = 0 + results_list = results.tolist() + for length in arg_list_lengths: + packed_results.append(results_list[start_idx : start_idx + length]) + start_idx += length + packed_results: pd.Series = pd.Series(packed_results) # type: ignore + packed_results.name = extractor.fqn_output_features()[0] # type: ignore + intermediate_data[extractor.fqn_output_features()[0]] = packed_results + return packed_results + def _compute_lookup_extractor( self, data_engine: DataEngine, @@ -424,19 +543,7 @@ def _compute_lookup_extractor( f"Field for lookup extractor {extractor.name} must have a named field" ) results = results[extractor.derived_extractor_info.field.name] - default_value = extractor.derived_extractor_info.default - proto_dtype = get_datatype(extractor.derived_extractor_info.field.dtype) - # Custom operations on default value for datetime and decimal type - if proto_dtype.HasField("decimal_type"): - if pd.notna(default_value) and not isinstance( - default_value, Decimal - ): - default_value = Decimal( - "%0.{}f".format(proto_dtype.decimal_type.scale) - % float(default_value) # type: ignore - ) - if isinstance(default_value, datetime): - default_value = parse_datetime(default_value) + default_value = self._get_default_value(extractor) if default_value is not None: if results.dtype != object: results = results.fillna(default_value) diff --git a/poetry.lock b/poetry.lock index ed535678..d961693d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "anyio" @@ -143,19 +143,19 @@ typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.11\""} [[package]] name = "attrs" -version = "24.2.0" +version = "24.3.0" description = "Classes Without Boilerplate" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "attrs-24.2.0-py3-none-any.whl", hash = "sha256:81921eb96de3191c8258c199618104dd27ac608d9366f5e35d011eae1867ede2"}, - {file = "attrs-24.2.0.tar.gz", hash = "sha256:5cfb1b9148b5b086569baec03f20d7b6bf3bcacc9a42bebf87ffaaca362f6346"}, + {file = "attrs-24.3.0-py3-none-any.whl", hash = "sha256:ac96cd038792094f438ad1f6ff80837353805ac950cd2aa0e0625ef19850c308"}, + {file = "attrs-24.3.0.tar.gz", hash = "sha256:8f5c07333d543103541ba7be0e2ce16eeee8130cb0b3f9238ab904ce1e85baff"}, ] [package.extras] benchmark = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins", "pytest-xdist[psutil]"] cov = ["cloudpickle", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] -dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier (<24.7)"] tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] @@ -3771,4 +3771,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "8b8f85fde3aaeb25f9dc5cda7264e90e32645fd7221f770bb90a9c8e1bbb6768" +content-hash = "631099edfc20fe90e77c817031172acd8c41fa96683c276490047365662d66f4" diff --git a/pyproject.toml b/pyproject.toml index ad0511f3..3508427e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.62" +version = "1.5.63" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }] @@ -20,8 +20,9 @@ pytest = "7.1.3" pytest-rerunfailures = "^13.0" sortedcontainers = "^2.4.0" typing-extensions = "^4.12.0" -fennel-data-lib = "0.1.24" +fennel-data-lib = "^0.1.24" pyarrow = "^14.0.2" +attrs = "^24.3.0" [tool.poetry.dev-dependencies] flake8 = "^4.0.1"