Skip to content

Commit

Permalink
use a new FeastDataStream class for feast functionality (#49)
Browse files Browse the repository at this point in the history
* use a new FeastDataStream class for feast functionality

* add pyi file for feast_data_stream
  • Loading branch information
emgeee authored Oct 24, 2024
1 parent cbbdcee commit 3aa6e00
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion py-denormalized/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "denormalized-python"
version = "0.0.5"
version = "0.0.7"
edition = "2021"
homepage = "https://github.com/probably-nothing-labs/denormalized.git"
repository = "https://github.com/probably-nothing-labs/denormalized.git"
Expand Down
9 changes: 8 additions & 1 deletion py-denormalized/python/denormalized/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from .context import Context
from .datastream import DataStream
from .data_stream import DataStream

__all__ = [
"Context",
"DataStream",
]

try:
from .feast_data_stream import FeastDataStream

__all__.append("FeastDataStream")
except ImportError:
pass
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from denormalized._internal import PyContext
from .datastream import DataStream
from .data_stream import DataStream

class Context:
"""Context."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
import pyarrow as pa
from denormalized._internal import PyDataStream
from denormalized.datafusion import Expr
from denormalized.feature_flags import USE_FEAST, feast_feature
from denormalized.utils import to_internal_expr, to_internal_exprs

if USE_FEAST:
from feast import Field, FeatureStore
from feast.type_map import pa_to_feast_value_type
from feast.types import from_value_type
from feast.data_source import PushMode

from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from feast.value_type import ValueType

from typing import Callable

class DataStream:
"""Represents a stream of data that can be manipulated using various operations."""
Expand Down Expand Up @@ -192,28 +181,3 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
def sink(self, func: Callable[[pa.RecordBatch], None]) -> None:
"""Sink the DataStream to a Python function."""
self.ds.sink_python(func)

@feast_feature
def get_feast_schema(self) -> list["Field"]:
return [
Field(
name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))
)
for s in self.schema()
]

@feast_feature
def write_feast_feature(self, feature_store: "FeatureStore", source_name: str) -> None:
def _sink_to_feast(rb: pa.RecordBatch):
df = rb.to_pandas()

# This is required for feast to write ingestion
df["created"] = df["window_start_time"]

try:
feature_store.push(source_name, df, to=PushMode.ONLINE)
except Exception as e:
print(e)

self.ds.sink_python(_sink_to_feast)

89 changes: 89 additions & 0 deletions py-denormalized/python/denormalized/feast_data_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import inspect
from typing import Any, TypeVar, Union, cast, get_type_hints

import pyarrow as pa
from denormalized._internal import PyDataStream
from denormalized.datafusion import Expr
from feast import FeatureStore, Field
from feast.data_source import PushMode
from feast.type_map import pa_to_feast_value_type
from feast.types import from_value_type

from .data_stream import DataStream

T = TypeVar("T")


class FeastDataStreamMeta(type):
"""Metaclass that modifies DataStream return types to FeastDataStream."""

def __new__(cls, name: str, bases: tuple, attrs: dict) -> Any:
# Get all methods from DataStream that return DataStream
datastream_methods = inspect.getmembers(
DataStream,
predicate=lambda x: (
inspect.isfunction(x) and get_type_hints(x).get("return") == DataStream
),
)

# For each method that returns DataStream, create a wrapper that returns FeastDataStream
for method_name, method in datastream_methods:
if method_name not in attrs: # Only wrap if not already defined

def create_wrapper(method_name):
def wrapper(self, *args, **kwargs):
result = getattr(
super(cast(type, self.__class__), self), method_name
)(*args, **kwargs)
return self.__class__(result)

# Copy original method's signature but change return type
hints = get_type_hints(getattr(DataStream, method_name))
hints["return"] = (
"FeastDataStream" # Use string to handle forward reference
)
wrapper.__annotations__ = hints
return wrapper

attrs[method_name] = create_wrapper(method_name)

return super().__new__(cls, name, bases, attrs)


class FeastDataStream(DataStream, metaclass=FeastDataStreamMeta):
"""A DataStream subclass with additional Feast-specific functionality."""

def __init__(self, stream: Union[PyDataStream, DataStream]) -> None:
"""Initialize a FeastDataStream from either a PyDataStream or DataStream.
Args:
stream: Either a PyDataStream object or a DataStream object
"""
if isinstance(stream, DataStream):
super().__init__(stream.ds)
else:
super().__init__(stream)

def get_feast_schema(self) -> list[Field]:
"""Get the Feast schema for this DataStream."""
return [
Field(
name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))
)
for s in self.schema()
]

def write_feast_feature(
self, feature_store: FeatureStore, source_name: str
) -> None:
"""Write the DataStream to a Feast feature store."""

def _sink_to_feast(rb: pa.RecordBatch):
df = rb.to_pandas()

try:
feature_store.push(source_name, df, to=PushMode.ONLINE)
except Exception as e:
print(e)

self.ds.sink_python(_sink_to_feast)
42 changes: 42 additions & 0 deletions py-denormalized/python/denormalized/feast_data_stream.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# feast_data_stream.pyi
from typing import List, Optional, Type, Union

from denormalized._internal import PyDataStream
from denormalized.datafusion import Expr
from feast import FeatureStore, Field

from .data_stream import DataStream

class FeastDataStreamMeta(type):
def __new__(cls, name: str, bases: tuple, attrs: dict) -> Type[FeastDataStream]: ...

class FeastDataStream(DataStream, metaclass=FeastDataStreamMeta):
def __init__(self, stream: Union[PyDataStream, DataStream]) -> None: ...
def select(self, expr_list: List[Expr]) -> FeastDataStream: ...
def filter(self, predicate: Expr) -> FeastDataStream: ...
def with_column(self, name: str, predicate: Expr) -> FeastDataStream: ...
def join_on(
self, right: DataStream, join_type: str, on_exprs: List[Expr]
) -> FeastDataStream: ...
def join(
self,
right: DataStream,
join_type: str,
left_cols: List[str],
right_cols: List[str],
filter: Optional[Expr] = None,
) -> FeastDataStream: ...
def window(
self,
group_exprs: List[Expr],
aggr_exprs: List[Expr],
window_length_millis: int,
slide_millis: Optional[int] = None,
) -> FeastDataStream: ...
def print_schema(self) -> FeastDataStream: ...
def print_plan(self) -> FeastDataStream: ...
def print_physical_plan(self) -> FeastDataStream: ...
def get_feast_schema(self) -> List[Field]: ...
def write_feast_feature(
self, feature_store: FeatureStore, source_name: str
) -> None: ...
17 changes: 0 additions & 17 deletions py-denormalized/python/denormalized/feature_flags.py

This file was deleted.

0 comments on commit 3aa6e00

Please sign in to comment.