diff --git a/py-denormalized/python/denormalized/datastream.py b/py-denormalized/python/denormalized/datastream.py index ed3a785..0539a22 100644 --- a/py-denormalized/python/denormalized/datastream.py +++ b/py-denormalized/python/denormalized/datastream.py @@ -5,15 +5,17 @@ from denormalized.utils import to_internal_expr, to_internal_exprs if USE_FEAST: - from feast import Field + from feast import Field, FeatureStore from feast.type_map import pa_to_feast_value_type from feast.types import from_value_type + from feast.data_source import PushMode from typing import TYPE_CHECKING, Callable if TYPE_CHECKING: from feast.value_type import ValueType + class DataStream: """Represents a stream of data that can be manipulated using various operations.""" @@ -192,7 +194,26 @@ def sink(self, func: Callable[[pa.RecordBatch], None]) -> None: self.ds.sink_python(func) @feast_feature - def get_feast_schema(self) -> list['Field']: + def get_feast_schema(self) -> list["Field"]: return [ - Field(name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))) for s in self.schema() + Field( + name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type))) + ) + for s in self.schema() ] + + @feast_feature + def write_feast_feature(self, feature_store: "FeatureStore", source_name: str) -> None: + def _sink_to_feast(rb: pa.RecordBatch): + df = rb.to_pandas() + + # This is required for feast to write ingestion + df["created"] = df["window_start_time"] + + try: + feature_store.push(source_name, df, to=PushMode.ONLINE) + except Exception as e: + print(e) + + self.ds.sink_python(_sink_to_feast) +