Skip to content

Commit

Permalink
add write_feast_feature method
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Oct 17, 2024
1 parent a98d034 commit 6a1b45d
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions py-denormalized/python/denormalized/datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from denormalized.utils import to_internal_expr, to_internal_exprs

if USE_FEAST:
from feast import Field
from feast import Field, FeatureStore
from feast.type_map import pa_to_feast_value_type
from feast.types import from_value_type
from feast.data_source import PushMode

from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from feast.value_type import ValueType


class DataStream:
"""Represents a stream of data that can be manipulated using various operations."""

Expand Down Expand Up @@ -192,7 +194,26 @@ def sink(self, func: Callable[[pa.RecordBatch], None]) -> None:
self.ds.sink_python(func)

@feast_feature
def get_feast_schema(self) -> list['Field']:
def get_feast_schema(self) -> list["Field"]:
return [
Field(name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))) for s in self.schema()
Field(
name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))
)
for s in self.schema()
]

@feast_feature
def write_feast_feature(self, feature_store: "FeatureStore", source_name: str) -> None:
def _sink_to_feast(rb: pa.RecordBatch):
df = rb.to_pandas()

# This is required for feast to write ingestion
df["created"] = df["window_start_time"]

try:
feature_store.push(source_name, df, to=PushMode.ONLINE)
except Exception as e:
print(e)

self.ds.sink_python(_sink_to_feast)

0 comments on commit 6a1b45d

Please sign in to comment.