Skip to content

Commit

Permalink
fix: Reverted not null PR specific changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhargav Dodla committed Mar 13, 2024
1 parent f0a21d9 commit ca32d6e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 21 deletions.
20 changes: 8 additions & 12 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,14 +1498,12 @@ def write_to_online_store(
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
# In EG, we use feature view to support streaming and http registry is not implemented for stream feature views
# HTTP Registry raises MethodNotImplementedError for get_stream_feature_view
try:
feature_view = self.get_feature_view(
feature_view = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = self.get_stream_feature_view(
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
provider = self._get_provider()
Expand Down Expand Up @@ -1665,9 +1663,9 @@ def _get_online_features(

return self._go_server.get_online_features(
features_refs=features if isinstance(features, list) else [],
feature_service=(
features if isinstance(features, FeatureService) else None
),
feature_service=features
if isinstance(features, FeatureService)
else None,
entities=entity_native_values,
request_data={}, # TODO: add request data parameter to public API
full_feature_names=full_feature_names,
Expand Down Expand Up @@ -2095,11 +2093,9 @@ def _populate_response_from_feature_data(
"""
# Add the feature names to the response.
requested_feature_refs = [
(
f"{table.projection.name_to_use()}__{feature_name}"
if full_feature_names
else feature_name
)
f"{table.projection.name_to_use()}__{feature_name}"
if full_feature_names
else feature_name
for feature_name in requested_features
]
online_features_response.metadata.feature_names.val.extend(
Expand Down
5 changes: 0 additions & 5 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,6 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
if isinstance(self.sfv, FeatureView):
drop_list: List[str] = []
fv_schema: Set[str] = set(map(lambda field: field.name, self.sfv.schema))
# Add timestamp field to the schema
if isinstance(self.sfv, StreamFeatureView):
fv_schema.add(self.sfv.timestamp_field)
else:
fv_schema.add(self.sfv.stream_source.timestamp_field)
for column in df.columns:
if column not in fv_schema:
drop_list.append(column)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union, cast
Expand Down Expand Up @@ -33,8 +32,6 @@
_run_pyarrow_field_mapping,
)

logger = logging.getLogger(__name__)


class SparkMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for spark engine"""
Expand Down Expand Up @@ -238,7 +235,7 @@ def _process_by_partition(rows, spark_serialized_artifacts: _SparkSerializedArti

df = pd.DataFrame.from_records(dicts)
if df.shape[0] == 0:
logger.info("Dataframe has 0 records to process")
print("Skipping")
return

table = pyarrow.Table.from_pandas(df)
Expand Down

0 comments on commit ca32d6e

Please sign in to comment.