Skip to content

Commit

Permalink
Integrate feast functionality into python library (#47)
Browse files Browse the repository at this point in the history
* Integrate feast functionality into python library

* add write_feast_feature method

* configure slatedb backend in python bindings

* format
  • Loading branch information
emgeee authored Oct 22, 2024
1 parent 687771f commit cbbdcee
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/orchestrator/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Orchestrator {
senders: HashMap<String, channel::Sender<OrchestrationMessage>>,
}

pub const SHOULD_CHECKPOINT: bool = true; // THIS WILL BE MOVED INTO CONFIG
pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG

/**
* 1. Keep track of checkpoint per source.
Expand Down
12 changes: 2 additions & 10 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,8 @@ async fn main() -> Result<()> {

tasks.spawn(async move {
let sensors = [
"sensor_0",
"sensor_1",
"sensor_2",
"sensor_3",
"sensor_4",
"sensor_10",
"sensor_11",
"sensor_12",
"sensor_13",
"sensor_14",
"sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4", "sensor_5", "sensor_6",
"sensor_7", "sensor_8", "sensor_9",
];

loop {
Expand Down
9 changes: 4 additions & 5 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ async fn main() -> Result<()> {
.init();

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1"))
.await;
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers);

// Connect to source topic
Expand All @@ -36,7 +32,10 @@ async fn main() -> Result<()> {
]))
.await?;

ctx.from_topic(source_topic)
let _ctx = Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1"))
.await
.from_topic(source_topic)
.await?
.window(
vec![col("sensor_name")],
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "denormalized-python"
version = "0.0.1"
version = "0.0.5"
edition = "2021"
homepage = "https://github.com/probably-nothing-labs/denormalized.git"
repository = "https://github.com/probably-nothing-labs/denormalized.git"
Expand Down
1 change: 1 addition & 0 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [

[project.optional-dependencies]
tests = ["pytest"]
feast = ["feast"]

[tool.maturin]
python-source = "python"
Expand Down
38 changes: 37 additions & 1 deletion py-denormalized/python/denormalized/datastream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
import pyarrow as pa
from denormalized._internal import PyDataStream
from denormalized.datafusion import Expr
from denormalized.feature_flags import USE_FEAST, feast_feature
from denormalized.utils import to_internal_expr, to_internal_exprs

from typing import Callable
if USE_FEAST:
from feast import Field, FeatureStore
from feast.type_map import pa_to_feast_value_type
from feast.types import from_value_type
from feast.data_source import PushMode

from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from feast.value_type import ValueType


class DataStream:
"""Represents a stream of data that can be manipulated using various operations."""
Expand Down Expand Up @@ -181,3 +192,28 @@ def sink_kafka(self, bootstrap_servers: str, topic: str) -> None:
def sink(self, func: Callable[[pa.RecordBatch], None]) -> None:
"""Sink the DataStream to a Python function."""
self.ds.sink_python(func)

@feast_feature
def get_feast_schema(self) -> list["Field"]:
return [
Field(
name=s.name, dtype=from_value_type(pa_to_feast_value_type(str(s.type)))
)
for s in self.schema()
]

@feast_feature
def write_feast_feature(self, feature_store: "FeatureStore", source_name: str) -> None:
def _sink_to_feast(rb: pa.RecordBatch):
df = rb.to_pandas()

# This is required for feast to write ingestion
df["created"] = df["window_start_time"]

try:
feature_store.push(source_name, df, to=PushMode.ONLINE)
except Exception as e:
print(e)

self.ds.sink_python(_sink_to_feast)

17 changes: 17 additions & 0 deletions py-denormalized/python/denormalized/feature_flags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import importlib
from functools import wraps

def is_feast_available():
return importlib.util.find_spec("feast") is not None

USE_FEAST = is_feast_available()

def feast_feature(func):
"""Decorator to mark functions that require feast."""
@wraps(func)
def wrapper(*args, **kwargs):
if USE_FEAST:
return func(*args, **kwargs)
else:
raise NotImplementedError("This feature requires feast to be installed.")
return wrapper
22 changes: 13 additions & 9 deletions py-denormalized/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,27 @@ impl From<PyContext> for Arc<Context> {
impl PyContext {
/// creates a new PyDataFrame
#[new]
pub fn new() -> PyResult<Self> {
pub fn new(py: Python) -> PyResult<Self> {
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<Context>> = rt.spawn(async move {
Ok(Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1"))
.await)
});

let context = wait_for_future(py, fut).map_err(py_denormalized_err)??;

Ok(Self {
context: Arc::new(Context::new()?),
context: Arc::new(context),
})
}

fn foo(&self, _py: Python) -> PyResult<String> {
println!("Fooooo");
Ok("foo wtf".to_string())
}

fn __repr__(&self, _py: Python) -> PyResult<String> {
Ok("__repr__ PyContext".to_string())
Ok("PyContext".to_string())
}

fn __str__(&self, _py: Python) -> PyResult<String> {
Ok("__str__ PyContext".to_string())
Ok("PyContext".to_string())
}

pub fn from_topic(
Expand Down

0 comments on commit cbbdcee

Please sign in to comment.