diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 0fa2ba855d..cd669f1a33 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1498,12 +1498,14 @@ def write_to_online_store( allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. """ # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type + # In EG, we use feature view to support streaming and http registry is not implemented for stream feature views + # HTTP Registry raises MethodNotImplementedError for get_stream_feature_view try: - feature_view = self.get_stream_feature_view( + feature_view = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) except FeatureViewNotFoundException: - feature_view = self.get_feature_view( + feature_view = self.get_stream_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) provider = self._get_provider() @@ -1663,9 +1665,9 @@ def _get_online_features( return self._go_server.get_online_features( features_refs=features if isinstance(features, list) else [], - feature_service=features - if isinstance(features, FeatureService) - else None, + feature_service=( + features if isinstance(features, FeatureService) else None + ), entities=entity_native_values, request_data={}, # TODO: add request data parameter to public API full_feature_names=full_feature_names, @@ -2093,9 +2095,11 @@ def _populate_response_from_feature_data( """ # Add the feature names to the response. requested_feature_refs = [ - f"{table.projection.name_to_use()}__{feature_name}" - if full_feature_names - else feature_name + ( + f"{table.projection.name_to_use()}__{feature_name}" + if full_feature_names + else feature_name + ) for feature_name in requested_features ] online_features_response.metadata.feature_names.val.extend( diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index f8295cdcf5..aea0434d59 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -182,6 +182,11 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: if isinstance(self.sfv, FeatureView): drop_list: List[str] = [] fv_schema: Set[str] = set(map(lambda field: field.name, self.sfv.schema)) + # Add timestamp field to the schema so we don't delete from dataframe + if isinstance(self.sfv, StreamFeatureView): + fv_schema.add(self.sfv.timestamp_field) + else: + fv_schema.add(self.sfv.stream_source.timestamp_field) for column in df.columns: if column not in fv_schema: drop_list.append(column) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index ed4388aeb3..0f9a668b12 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -1,3 +1,4 @@ +import logging from dataclasses import dataclass from datetime import datetime from typing import Callable, List, Literal, Optional, Sequence, Union, cast @@ -32,6 +33,8 @@ _run_pyarrow_field_mapping, ) +logger = logging.getLogger(__name__) + class SparkMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for spark engine""" @@ -235,7 +238,7 @@ def _process_by_partition(rows, spark_serialized_artifacts: _SparkSerializedArti df = pd.DataFrame.from_records(dicts) if df.shape[0] == 0: - print("Skipping") + logger.info("Dataframe has 0 records to process") return table = pyarrow.Table.from_pandas(df)