From a391a845311ac22f20ba7fc472a5899a33874d24 Mon Sep 17 00:00:00 2001 From: Aditya Nambiar Date: Sun, 12 Nov 2023 12:28:36 -0800 Subject: [PATCH] auto extractor: Bug fixes for code gen and default values --- .../test_complex_autogen_extractor.py | 293 ++++++++++++++++++ fennel/featuresets/featureset.py | 41 ++- fennel/featuresets/test_featureset.py | 9 +- .../test_invalid_derived_extractors.py | 3 +- fennel/lib/schema/schema.py | 3 + fennel/lib/to_proto/source_code.py | 48 ++- fennel/lib/to_proto/to_proto.py | 25 +- fennel/test_lib/mock_client.py | 45 ++- pyproject.toml | 2 +- 9 files changed, 427 insertions(+), 42 deletions(-) create mode 100644 fennel/client_tests/test_complex_autogen_extractor.py diff --git a/fennel/client_tests/test_complex_autogen_extractor.py b/fennel/client_tests/test_complex_autogen_extractor.py new file mode 100644 index 000000000..13e5857f0 --- /dev/null +++ b/fennel/client_tests/test_complex_autogen_extractor.py @@ -0,0 +1,293 @@ +import sys + +from datetime import datetime, timedelta +from typing import Optional + +import pandas as pd +import pytest + +import fennel + +from fennel import meta, Count, Window, featureset, feature, extractor +from fennel.client import Client +from fennel.lib.schema import inputs, oneof, outputs +from fennel.lib.aggregate import Max, Min +from fennel.sources import Webhook, S3, MySQL +from fennel.datasets import dataset, field, pipeline, Dataset +from fennel.sources import source +from fennel.test_lib import MockClient, mock + +webhook = Webhook(name="fennel_webhook") + +__owner__ = "uber-data@eng.com" + + +@dataset +@source(webhook.endpoint("RiderDataset"), tier="local") +class RiderDataset: + rider_id: int = field(key=True) + created: datetime = field(timestamp=True) + birthdate: datetime + + +@dataset +@source(webhook.endpoint("RiderCreditScoreDataset"), tier="local") +class RiderCreditScoreDataset: + rider_id: int = field(key=True) + created: datetime + score: float + + +@dataset +@source(webhook.endpoint("CountryLicenseDataset"), tier="local") +@meta(owner="data@eng.com") +class CountryLicenseDataset: + rider_id: int = field(key=True) + created: datetime + country_code: str + + +@dataset +@source(webhook.endpoint("ReservationsDataset"), tier="local") +class ReservationsDataset: + rider_id: int + vehicle_id: int + is_completed_trip: int + created: datetime + + +@dataset +@source(webhook.endpoint("NumCompletedTripsDataset"), tier="local") +class NumCompletedTripsDataset: + rider_id: int = field(key=True) + count_num_completed_trips: int + created: datetime + + @pipeline() + @inputs(ReservationsDataset) + def my_pipeline(cls, reservations: Dataset): + completed = reservations.filter(lambda df: df["is_completed_trip"] == 1) + return completed.groupby("rider_id").aggregate( + Count( + of="vehicle_id", + unique=True, + approx=True, + window=Window("forever"), + into_field="count_num_completed_trips", + ), + ) + + +@featureset +class RequestFeatures0: + ts: datetime = feature(id=1) + rider_id: int = feature(id=2) + + +@featureset +class RequestFeatures1: + ts: datetime = feature(id=1) + id1: int = feature(id=2).extract(feature=RequestFeatures0.rider_id) # type: ignore + + +@featureset +class RequestFeatures2: + ts: datetime = feature(id=1) + id2: int = feature(id=2).extract(feature=RequestFeatures1.id1) # type: ignore + const: int = feature(id=3) + num_trips: int = feature(id=4).extract( # type: ignore + field=NumCompletedTripsDataset.count_num_completed_trips, + provider=RequestFeatures0, + default=0, + ) + + @extractor() + @inputs(id2) + @outputs(const) + def extract_const(cls, ts: pd.Series, id2: pd.Series) -> pd.DataFrame: + return pd.DataFrame({"const": [1] * len(ts)}) + + +@featureset +class RequestFeatures3: + ts: datetime = feature(id=1) + rider_id: int = feature(id=2).extract(feature=RequestFeatures2.id2) # type: ignore + vehicle_id: int = feature(id=3) + reservation_id: Optional[int] = feature(id=4) + + +@featureset +class RiderFeatures: + id: int = feature(id=1).extract(feature=RequestFeatures2.id2) # type: ignore + created: datetime = feature(id=2).extract( # type: ignore + field=RiderDataset.created, + provider=RequestFeatures3, + default=datetime(2000, 1, 1, 0, 0, 0), + ) + birthdate: datetime = feature(id=3).extract( # type: ignore + field=RiderDataset.birthdate, + provider=RequestFeatures3, + default=datetime(2000, 1, 1, 0, 0, 0), + ) + age_years: int = feature(id=4) + ais_score: float = feature(id=5).extract( # type: ignore + field=RiderCreditScoreDataset.score, + provider=RequestFeatures3, + default=-1.0, + ) + dl_state: str = feature(id=6).extract( # type: ignore + field=CountryLicenseDataset.country_code, + provider=RequestFeatures3, + default="Unknown", + ) + is_us_dl: bool = feature(id=7) + num_past_completed_trips: int = feature(id=8).extract( # type: ignore + field=NumCompletedTripsDataset.count_num_completed_trips, + provider=RequestFeatures3, + default=0, + ) + + @extractor + @inputs(dl_state) + @outputs(is_us_dl) + def extract_is_us_dl( + cls, ts: pd.Series, dl_state: pd.Series + ) -> pd.DataFrame: + is_us_dl = dl_state == "US" + return pd.DataFrame({"is_us_dl": is_us_dl}) + + @extractor + @inputs(birthdate) + @outputs(age_years) + def extract_age_years( + cls, ts: pd.Series, birthdate: pd.Series + ) -> pd.DataFrame: + age_years = (datetime.now() - birthdate).dt.total_seconds() / ( + 60 * 60 * 24 * 365 + ) + age_years = age_years.astype(int) + return pd.DataFrame({"age_years": age_years}) + + +@mock +def test_complex_auto_gen_extractors(client): + with pytest.raises(ValueError) as e: + _ = client.sync( + datasets=[ + RiderDataset, + RiderCreditScoreDataset, + CountryLicenseDataset, + ReservationsDataset, + NumCompletedTripsDataset, + ], + featuresets=[ + RiderFeatures, + RequestFeatures1, + RequestFeatures2, + RequestFeatures3, + ], + ) + error_msg1 = "Featureset `RequestFeatures0` is required by `RequestFeatures1` but is not present in the sync call. Please ensure that all featuresets are present in the sync call." + error_msg2 = error_msg1.replace("RequestFeatures1", "RequestFeatures2") + assert str(e.value) == error_msg1 or str(e.value) == error_msg2 + + with pytest.raises(ValueError) as e: + _ = client.sync( + datasets=[ + RiderDataset, + RiderCreditScoreDataset, + CountryLicenseDataset, + ReservationsDataset, + NumCompletedTripsDataset, + ], + featuresets=[ + RiderFeatures, + RequestFeatures0, + RequestFeatures1, + RequestFeatures3, + ], + ) + error_msg1 = "Featureset `RequestFeatures2` is required by `RiderFeatures` but is not present in the sync call. Please ensure that all featuresets are present in the sync call." + error_msg2 = error_msg1.replace("RiderFeatures", "RequestFeatures3") + assert str(e.value) == error_msg1 or str(e.value) == error_msg2 + + with pytest.raises(ValueError) as e: + _ = client.sync( + datasets=[ + RiderDataset, + RiderCreditScoreDataset, + CountryLicenseDataset, + ReservationsDataset, + ], + featuresets=[ + RiderFeatures, + RequestFeatures0, + RequestFeatures1, + RequestFeatures2, + RequestFeatures3, + ], + ) + assert ( + str(e.value) + == "Dataset NumCompletedTripsDataset not found in sync call" + ) + + resp = client.sync( + datasets=[ + RiderDataset, + RiderCreditScoreDataset, + CountryLicenseDataset, + ReservationsDataset, + NumCompletedTripsDataset, + ], + featuresets=[ + RiderFeatures, + RequestFeatures0, + RequestFeatures1, + RequestFeatures2, + RequestFeatures3, + ], + ) + + assert resp.status_code == 200 + + rider_df = pd.DataFrame( + { + "rider_id": [1], + "created": [datetime.now()], + "birthdate": [datetime.now() - timedelta(days=365 * 30)], + "country_code": ["US"], + } + ) + + log_response = client.log( + webhook="fennel_webhook", endpoint="RiderDataset", df=rider_df + ) + assert log_response.status_code == 200 + + reservation_df = pd.DataFrame( + { + "rider_id": [1], + "vehicle_id": [1], + "is_completed_trip": [1], + "created": [datetime.now()], + } + ) + log_response = client.log( + webhook="fennel_webhook", + endpoint="ReservationsDataset", + df=reservation_df, + ) + assert log_response.status_code == 200 + + extracted_df = client.extract_features( + input_feature_list=[RequestFeatures0.rider_id], + output_feature_list=[RiderFeatures], + input_dataframe=pd.DataFrame({"RequestFeatures0.rider_id": [1]}), + ) + assert extracted_df.shape[0] == 1 + assert ( + extracted_df["RiderFeatures.created"].iloc[0] + == rider_df["created"].iloc[0] + ) + assert extracted_df["RiderFeatures.dl_state"].iloc[0] == "Unknown" diff --git a/fennel/featuresets/featureset.py b/fennel/featuresets/featureset.py index 5c5f152a1..ee10f0f68 100644 --- a/fennel/featuresets/featureset.py +++ b/fennel/featuresets/featureset.py @@ -322,6 +322,7 @@ def fqn(self) -> str: def extract( self, + *, field: Field = None, provider: Featureset = None, default=None, @@ -484,7 +485,7 @@ def __init__( setattr(self, OWNER, owner) propogate_fennel_attributes(featureset_cls, self) - def get_dataset_dependencies(self): + def get_dataset_dependencies(self) -> List[Dataset]: """ This function gets the list of datasets the Featureset depends upon. This dependency is introduced by features that directly lookup a dataset @@ -509,6 +510,24 @@ def get_dataset_dependencies(self): return depended_datasets + def get_featureset_dependencies(self) -> List[str]: + """ + This function gets the list of featuresets the Featureset depends upon. + This dependency is introduced by features that directly lookup a featureset + via the FS-FS route, while specifying a provider. + """ + depended_featuresets = set() + for f in self._features: + if f.extractor is None: + continue + if f.extractor.extractor_type == ExtractorType.ALIAS: + # Alias extractors have exactly one input feature + depended_featuresets.add(f.extractor.inputs[0].featureset_name) + elif f.extractor.extractor_type == ExtractorType.LOOKUP: + for inp_feature in f.extractor.inputs: + depended_featuresets.add(inp_feature.featureset_name) + return list(depended_featuresets) + # ------------------- Private Methods ---------------------------------- def _add_feature_names_as_attributes(self): @@ -527,7 +546,7 @@ def _get_extractors(self) -> List[Extractor]: if extractor.extractor_type == ExtractorType.LOOKUP and ( extractor.inputs is None or len(extractor.inputs) == 0 ): - feature.extractor.set_inputs_from_featureset(self) + feature.extractor.set_inputs_from_featureset(self, feature) extractors.append(extractor) # user defined extractors @@ -678,7 +697,9 @@ def get_included_modules(self) -> List[Callable]: return getattr(self.func, FENNEL_INCLUDED_MOD) return [] - def set_inputs_from_featureset(self, featureset: Featureset): + def set_inputs_from_featureset( + self, featureset: Featureset, feature: Feature + ): if self.inputs and len(self.inputs) > 0: return if self.extractor_type != ExtractorType.LOOKUP: @@ -696,22 +717,22 @@ def set_inputs_from_featureset(self, featureset: Featureset): ds = field.dataset if not ds: raise ValueError( - f"Dataset {field.dataset_name} not found for field {field}" + f"Dataset `{field.dataset_name}` not found for field `{field}`" ) self.depends_on = [ds] for k in ds.dsschema().keys: - feature = featureset.feature(k) - if not feature: + f = featureset.feature(k) + if not f: raise ValueError( - f"Dataset key {k} not found in provider {featureset._name} for extractor {self.name}" + f"Key field `{k}` for dataset `{ds._name}` not found in provider `{featureset._name}` for feature: `{feature.name}` auto generated extractor" ) - self.inputs.append(feature) + self.inputs.append(f) class DatasetLookupInfo: field: Field - default: Any + default: Optional[Any] = None - def __init__(self, field: Field, default_val: Any): + def __init__(self, field: Field, default_val: Optional[Any] = None): self.field = field self.default = default_val diff --git a/fennel/featuresets/test_featureset.py b/fennel/featuresets/test_featureset.py index b2ed14c89..edd3a3baf 100644 --- a/fennel/featuresets/test_featureset.py +++ b/fennel/featuresets/test_featureset.py @@ -390,6 +390,7 @@ def get_user_income(cls, ts: pd.Series, user_id: pd.Series): pass view = InternalTestClient() + view.add(Request) view.add(UserInfoDataset) view.add(UserInfo) view.add(User) @@ -401,17 +402,17 @@ def get_user_income(cls, ts: pd.Series, user_id: pd.Series): ) sync_request = view._get_sync_request_proto("prod") - assert len(sync_request.feature_sets) == 2 + assert len(sync_request.feature_sets) == 3 assert len(sync_request.extractors) == 2 - assert len(sync_request.features) == 7 + assert len(sync_request.features) == 8 extractor_req = sync_request.extractors[1] assert extractor_req.name == "get_user_info2" sync_request = view._get_sync_request_proto("dev") - assert len(sync_request.feature_sets) == 2 + assert len(sync_request.feature_sets) == 3 assert len(sync_request.extractors) == 2 - assert len(sync_request.features) == 7 + assert len(sync_request.features) == 8 extractor_req = sync_request.extractors[0] assert extractor_req.name == "_fennel_alias_user_id" diff --git a/fennel/featuresets/test_invalid_derived_extractors.py b/fennel/featuresets/test_invalid_derived_extractors.py index 139d4c908..59064df0b 100644 --- a/fennel/featuresets/test_invalid_derived_extractors.py +++ b/fennel/featuresets/test_invalid_derived_extractors.py @@ -91,6 +91,7 @@ def get_age(cls, ts: pd.Series, user_id: pd.Series): return df.fillna(0) view = InternalTestClient() + view.add(User) view.add(UserInfo3) view._get_sync_request_proto() assert ( @@ -142,7 +143,7 @@ class UserInfo6: assert ( str(e.value) - == "Dataset key user_id not found in provider UserInfo6 for extractor _fennel_lookup_age" + == "Key field `user_id` for dataset `UserInfoDataset` not found in provider `UserInfo6` for feature: `age` auto generated extractor" ) # missing dataset key in provider diff --git a/fennel/lib/schema/schema.py b/fennel/lib/schema/schema.py index 0d5c55cbc..730f0cb61 100644 --- a/fennel/lib/schema/schema.py +++ b/fennel/lib/schema/schema.py @@ -503,6 +503,9 @@ def _validate_field_in_df( ): name = field.name dtype = field.dtype + if df.shape[0] == 0: + return + if name not in df.columns: raise ValueError( f"Field `{name}` not found in dataframe during checking schema for " diff --git a/fennel/lib/to_proto/source_code.py b/fennel/lib/to_proto/source_code.py index f14d3bbf0..9828e4d0b 100644 --- a/fennel/lib/to_proto/source_code.py +++ b/fennel/lib/to_proto/source_code.py @@ -5,7 +5,7 @@ import sys from textwrap import dedent, indent -from typing import Callable +from typing import Callable, Dict import fennel.gen.pycode_pb2 as pycode_proto from fennel.datasets import Dataset @@ -26,7 +26,17 @@ def _remove_empty_lines(source_code: str) -> str: return source_code -def get_featureset_core_code(featureset: Featureset) -> str: +def get_featureset_core_code( + featureset: Featureset, +) -> str: + """ + Now that featuresets can hold features, which can be auto generated, we need to ensure + that all the code required for the features and their extractors is present in the + generated code. + + :param featureset: + :return: + """ # Keep all lines till class definition source_code = fennel_get_source(featureset.__fennel_original_cls__) for extractor in featureset.extractors: @@ -40,8 +50,36 @@ def get_featureset_core_code(featureset: Featureset) -> str: # If python version 3.8 or below add @feature decorator if sys.version_info < (3, 9): source_code = f"@featureset\n{dedent(source_code)}" - z = _remove_empty_lines(source_code) - return z + return _remove_empty_lines(source_code) + + +def get_featureset_gen_code( + featureset: Featureset, fs_obj_map: Dict[str, Featureset] +) -> str: + """ + Now that featuresets can hold features, which can be auto generated, we need to ensure + that all the code required for the features and their extractors is present in the + generated code. + + :param featureset: + :return: + """ + gen_code = "" + for ds in featureset.get_dataset_dependencies(): + gen_code = get_dataset_core_code(ds) + "\n" + gen_code + "\n" + + for fs in featureset.get_featureset_dependencies(): + if fs not in fs_obj_map: + raise ValueError( + f"Featureset `{fs}` is required by `{featureset._name}` but is not present in the sync call. Please " + f"ensure that all featuresets are present in the sync call." + ) + if fs == featureset._name: + continue + fs_code = get_featureset_gen_code(fs_obj_map[fs], fs_obj_map) + gen_code = dedent(fs_code) + "\n" + gen_code + "\n" + source_code = gen_code + "\n" + get_featureset_core_code(featureset) + return _remove_empty_lines(source_code) def remove_source_decorator(text): @@ -69,7 +107,7 @@ def get_dataset_core_code(dataset: Dataset) -> str: # Add any struct definitions if hasattr(dataset, FENNEL_STRUCT_SRC_CODE): source_code = ( - getattr(dataset, FENNEL_STRUCT_SRC_CODE) + "\n\n" + source_code + getattr(dataset, FENNEL_STRUCT_SRC_CODE) + "\n" + source_code ) return _remove_empty_lines(source_code) diff --git a/fennel/lib/to_proto/to_proto.py b/fennel/lib/to_proto/to_proto.py index 86f58cdb6..dab2cd7aa 100644 --- a/fennel/lib/to_proto/to_proto.py +++ b/fennel/lib/to_proto/to_proto.py @@ -38,6 +38,7 @@ get_featureset_core_code, get_dataset_core_code, get_all_imports, + get_featureset_gen_code, ) from fennel.lib.to_proto.source_code import to_includes_proto from fennel.utils import fennel_get_source @@ -479,10 +480,18 @@ def _check_owner_exists(obj): def _to_field_lookup_proto( info: Extractor.DatasetLookupInfo, ) -> fs_proto.FieldLookupInfo: + if info.default is None: + return fs_proto.FieldLookupInfo( + field=_field_to_proto(info.field), default_value=json.dumps(None) + ) + if getattr(info.default.__class__, FENNEL_STRUCT, False): default_val = json.dumps(info.default.as_json()) else: - default_val = json.dumps(info.default) + try: + default_val = json.dumps(info.default) + except TypeError: + default_val = json.dumps(str(info.default)) return fs_proto.FieldLookupInfo( field=_field_to_proto(info.field), @@ -1197,14 +1206,8 @@ def to_extractor_pycode( gen_code = "\n" + dedent(dep.generated_code) + "\n" + gen_code dependencies.append(dep) - datasets_added = set() # Extractor code construction for dataset in extractor.get_dataset_dependencies(): - datasets_added.add(dataset) - for dataset in fs_obj_map[extractor.featureset].get_dataset_dependencies(): - datasets_added.add(dataset) - - for dataset in datasets_added: gen_code += get_dataset_core_code(dataset) input_fs_added = set() @@ -1227,13 +1230,15 @@ def to_extractor_pycode( ) gen_code = ( gen_code - + get_featureset_core_code(fs_obj_map[input.featureset_name]) - + "\n" + + get_featureset_gen_code( + fs_obj_map[input.featureset_name], fs_obj_map + ) + + "\n\n" ) extractor_src_code = dedent(inspect.getsource(extractor.func)) indented_code = indent(extractor_src_code, " " * 4) - featureset_core_code = get_featureset_core_code(featureset) + featureset_core_code = get_featureset_gen_code(featureset, fs_obj_map) gen_code = gen_code + "\n" + featureset_core_code + "\n" + indented_code ref_includes = {featureset._name: pycode_proto.RefType.Featureset} datasets = extractor.get_dataset_dependencies() diff --git a/fennel/test_lib/mock_client.py b/fennel/test_lib/mock_client.py index 6bde64c52..632071b21 100644 --- a/fennel/test_lib/mock_client.py +++ b/fennel/test_lib/mock_client.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from datetime import datetime from functools import partial +import logging import numpy as np import pandas as pd @@ -52,6 +53,8 @@ FENNEL_ORDER = "__fennel_order__" FENNEL_TIMESTAMP = "__fennel_timestamp__" +logger = logging.getLogger(__name__) + class FakeResponse(Response): def __init__(self, status_code: int, content: str): @@ -107,6 +110,9 @@ def dataset_lookup_impl( f"but {len(keys.columns)} key fields were provided." ) if cls_name not in data and cls_name not in aggregated_datasets: + logger.warning( + f"Not data found for Dataset `{cls_name}` during lookup, returning an empty dataframe" + ) # Create a dataframe with all nulls val_cols = datasets[cls_name].fields if len(fields) > 0: @@ -747,7 +753,7 @@ def _run_extractors( extractor.inputs[0].fqn() ] intermediate_data[feature_name].name = feature_name - self._check_exceptions( + self._check_schema_exceptions( intermediate_data[feature_name], dsschema, extractor.name ) continue @@ -756,7 +762,7 @@ def _run_extractors( output = self._compute_lookup_extractor( extractor, timestamps.copy(), intermediate_data ) - self._check_exceptions(output, dsschema, extractor.name) + self._check_schema_exceptions(output, dsschema, extractor.name) continue allowed_datasets = [ @@ -793,7 +799,7 @@ def _run_extractors( f"Extractor `{extractor.name}` returned " f"invalid type `{type(output)}`, expected a pandas series or dataframe" ) - self._check_exceptions(output, dsschema, extractor.name) + self._check_schema_exceptions(output, dsschema, extractor.name) if isinstance(output, pd.Series): if output.name in intermediate_data: continue @@ -847,9 +853,11 @@ def _run_extractors( ) return output_df - def _check_exceptions( + def _check_schema_exceptions( self, output, dsschema: DSSchema, extractor_name: str ): + if output is None or output.shape[0] == 0: + return output_df = pd.DataFrame(output) output_df.reset_index(inplace=True) exceptions = data_schema_check(dsschema, output_df, extractor_name) @@ -864,7 +872,7 @@ def _compute_lookup_extractor( extractor: Extractor, timestamps: pd.Series, intermediate_data: Dict[str, pd.Series], - ): + ) -> pd.Series: if len(extractor.output_features) != 1: raise ValueError( f"Lookup extractor {extractor.name} must have exactly one output feature, found {len(extractor.output_features)}" @@ -900,15 +908,30 @@ def _compute_lookup_extractor( f"Field for lookup extractor {extractor.name} must have a named field" ) results = results[extractor.derived_extractor_info.field.name] - if results.dtype != object: - results = results.fillna(extractor.derived_extractor_info.default) + if extractor.derived_extractor_info.default is not None: + if results.dtype != object: + results = results.fillna( + extractor.derived_extractor_info.default + ) + else: + # fillna doesn't work for list type or dict type :cols + for row in results.loc[results.isnull()].index: + results[row] = extractor.derived_extractor_info.default + results = cast_col_to_dtype( + results, + get_datatype(extractor.derived_extractor_info.field.dtype), + ) else: - # fillna doesn't work for list type or ditc type :cols - for row in results.loc[results.isnull()].index: - results[row] = extractor.derived_extractor_info.default + results = cast_col_to_dtype( + results, + get_datatype( + Optional[extractor.derived_extractor_info.field.dtype] + ), + ) + results.replace({np.nan: None}, inplace=True) + results.name = extractor.fqn_output_features()[0] intermediate_data[extractor.fqn_output_features()[0]] = results - fennel.datasets.datasets.dataset_lookup = partial( dataset_lookup_impl, self.data, diff --git a/pyproject.toml b/pyproject.toml index ba1acda4c..fc0dff72c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "0.18.15" +version = "0.18.16" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]