Skip to content

Commit

Permalink
fix: Fix feature view retrieval and add timestamp field to schema (#94)
Browse files Browse the repository at this point in the history
* fix: Fix feature view retrieval and add timestamp field to schema

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Mar 13, 2024
1 parent 1763d8d commit be6b236
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
20 changes: 12 additions & 8 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,12 +1498,14 @@ def write_to_online_store(
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
# In EG, we use feature view to support streaming and http registry is not implemented for stream feature views
# HTTP Registry raises MethodNotImplementedError for get_stream_feature_view
try:
feature_view = self.get_stream_feature_view(
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
provider = self._get_provider()
Expand Down Expand Up @@ -1663,9 +1665,9 @@ def _get_online_features(

return self._go_server.get_online_features(
features_refs=features if isinstance(features, list) else [],
feature_service=features
if isinstance(features, FeatureService)
else None,
feature_service=(
features if isinstance(features, FeatureService) else None
),
entities=entity_native_values,
request_data={}, # TODO: add request data parameter to public API
full_feature_names=full_feature_names,
Expand Down Expand Up @@ -2093,9 +2095,11 @@ def _populate_response_from_feature_data(
"""
# Add the feature names to the response.
requested_feature_refs = [
f"{table.projection.name_to_use()}__{feature_name}"
if full_feature_names
else feature_name
(
f"{table.projection.name_to_use()}__{feature_name}"
if full_feature_names
else feature_name
)
for feature_name in requested_features
]
online_features_response.metadata.feature_names.val.extend(
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
if isinstance(self.sfv, FeatureView):
drop_list: List[str] = []
fv_schema: Set[str] = set(map(lambda field: field.name, self.sfv.schema))
# Add timestamp field to the schema so we don't delete from dataframe
if isinstance(self.sfv, StreamFeatureView):
fv_schema.add(self.sfv.timestamp_field)
else:
fv_schema.add(self.sfv.stream_source.timestamp_field)
for column in df.columns:
if column not in fv_schema:
drop_list.append(column)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union, cast
Expand Down Expand Up @@ -32,6 +33,8 @@
_run_pyarrow_field_mapping,
)

logger = logging.getLogger(__name__)


class SparkMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for spark engine"""
Expand Down Expand Up @@ -235,7 +238,7 @@ def _process_by_partition(rows, spark_serialized_artifacts: _SparkSerializedArti

df = pd.DataFrame.from_records(dicts)
if df.shape[0] == 0:
print("Skipping")
logger.info("Dataframe has 0 records to process")
return

table = pyarrow.Table.from_pandas(df)
Expand Down

0 comments on commit be6b236

Please sign in to comment.