From cbbdcee7e85dc7872cbb5054260711ca67bab2da Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 22 Oct 2024 15:02:22 -0700 Subject: [PATCH] Integrate feast functionality into python library (#47) * Integrate feast functionality into python library * add write_feast_feature method * configure slatedb backend in python bindings * format --- Cargo.lock | 2 +- crates/orchestrator/src/orchestrator.rs | 2 +- examples/examples/emit_measurements.rs | 12 +----- examples/examples/simple_aggregation.rs | 9 ++--- py-denormalized/Cargo.toml | 2 +- py-denormalized/pyproject.toml | 1 + .../python/denormalized/datastream.py | 38 ++++++++++++++++++- .../python/denormalized/feature_flags.py | 17 +++++++++ py-denormalized/src/context.rs | 22 ++++++----- 9 files changed, 77 insertions(+), 28 deletions(-) create mode 100644 py-denormalized/python/denormalized/feature_flags.py diff --git a/Cargo.lock b/Cargo.lock index 184f3b2..f5f0ef9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1664,7 +1664,7 @@ dependencies = [ [[package]] name = "denormalized-python" -version = "0.0.1" +version = "0.0.5" dependencies = [ "chrono", "datafusion", diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs index 8529848..6acfeed 100644 --- a/crates/orchestrator/src/orchestrator.rs +++ b/crates/orchestrator/src/orchestrator.rs @@ -20,7 +20,7 @@ pub struct Orchestrator { senders: HashMap>, } -pub const SHOULD_CHECKPOINT: bool = true; // THIS WILL BE MOVED INTO CONFIG +pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG /** * 1. Keep track of checkpoint per source. diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index af43e43..6285243 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -28,16 +28,8 @@ async fn main() -> Result<()> { tasks.spawn(async move { let sensors = [ - "sensor_0", - "sensor_1", - "sensor_2", - "sensor_3", - "sensor_4", - "sensor_10", - "sensor_11", - "sensor_12", - "sensor_13", - "sensor_14", + "sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4", "sensor_5", "sensor_6", + "sensor_7", "sensor_8", "sensor_9", ]; loop { diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 60c3221..2fe1306 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -18,10 +18,6 @@ async fn main() -> Result<()> { .init(); let bootstrap_servers = String::from("localhost:9092"); - - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) - .await; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic @@ -36,7 +32,10 @@ async fn main() -> Result<()> { ])) .await?; - ctx.from_topic(source_topic) + let _ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await + .from_topic(source_topic) .await? .window( vec![col("sensor_name")], diff --git a/py-denormalized/Cargo.toml b/py-denormalized/Cargo.toml index ac8ce00..b81f362 100644 --- a/py-denormalized/Cargo.toml +++ b/py-denormalized/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "denormalized-python" -version = "0.0.1" +version = "0.0.5" edition = "2021" homepage = "https://github.com/probably-nothing-labs/denormalized.git" repository = "https://github.com/probably-nothing-labs/denormalized.git" diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index bde07b2..171eae2 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ [project.optional-dependencies] tests = ["pytest"] +feast = ["feast"] [tool.maturin] python-source = "python" diff --git a/py-denormalized/python/denormalized/datastream.py b/py-denormalized/python/denormalized/datastream.py index 418c520..0539a22 100644 --- a/py-denormalized/python/denormalized/datastream.py +++ b/py-denormalized/python/denormalized/datastream.py @@ -1,9 +1,20 @@ import pyarrow as pa from denormalized._internal import PyDataStream from denormalized.datafusion import Expr +from denormalized.feature_flags import USE_FEAST, feast_feature from denormalized.utils import to_internal_expr, to_internal_exprs -from typing import Callable +if USE_FEAST: + from feast import Field, FeatureStore + from feast.type_map import pa_to_feast_value_type + from feast.types import from_value_type + from feast.data_source import PushMode + +from typing import TYPE_CHECKING, Callable + +if TYPE_CHECKING: + from feast.value_type import ValueType + class DataStream: """Represents a stream of data that can be manipulated using various operations.""" @@ -181,3 +192,28 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None: def sink(self, func: Callable[[pa.RecordBatch], None]) -> None: """Sink the DataStream to a Python function.""" self.ds.sink_python(func) + + @feast_feature + def get_feast_schema(self) -> list["Field"]: + return [ + Field( + name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type))) + ) + for s in self.schema() + ] + + @feast_feature + def write_feast_feature(self, feature_store: "FeatureStore", source_name: str) -> None: + def _sink_to_feast(rb: pa.RecordBatch): + df = rb.to_pandas() + + # This is required for feast to write ingestion + df["created"] = df["window_start_time"] + + try: + feature_store.push(source_name, df, to=PushMode.ONLINE) + except Exception as e: + print(e) + + self.ds.sink_python(_sink_to_feast) + diff --git a/py-denormalized/python/denormalized/feature_flags.py b/py-denormalized/python/denormalized/feature_flags.py new file mode 100644 index 0000000..0ad07d3 --- /dev/null +++ b/py-denormalized/python/denormalized/feature_flags.py @@ -0,0 +1,17 @@ +import importlib +from functools import wraps + +def is_feast_available(): + return importlib.util.find_spec("feast") is not None + +USE_FEAST = is_feast_available() + +def feast_feature(func): + """Decorator to mark functions that require feast.""" + @wraps(func) + def wrapper(*args, **kwargs): + if USE_FEAST: + return func(*args, **kwargs) + else: + raise NotImplementedError("This feature requires feast to be installed.") + return wrapper diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index 6e1740f..7e2372f 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -51,23 +51,27 @@ impl From for Arc { impl PyContext { /// creates a new PyDataFrame #[new] - pub fn new() -> PyResult { + pub fn new(py: Python) -> PyResult { + let rt = &get_tokio_runtime(py).0; + let fut: JoinHandle> = rt.spawn(async move { + Ok(Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + .await) + }); + + let context = wait_for_future(py, fut).map_err(py_denormalized_err)??; + Ok(Self { - context: Arc::new(Context::new()?), + context: Arc::new(context), }) } - fn foo(&self, _py: Python) -> PyResult { - println!("Fooooo"); - Ok("foo wtf".to_string()) - } - fn __repr__(&self, _py: Python) -> PyResult { - Ok("__repr__ PyContext".to_string()) + Ok("PyContext".to_string()) } fn __str__(&self, _py: Python) -> PyResult { - Ok("__str__ PyContext".to_string()) + Ok("PyContext".to_string()) } pub fn from_topic(