Skip to content

Commit

Permalink
Fixing the streaming join example (#54)
Browse files Browse the repository at this point in the history
* Fixing the streaming join example

* format

* add drop_columns

* update python internal package name

---------

Co-authored-by: Matt Green <[email protected]>
  • Loading branch information
ameyc and emgeee committed Nov 7, 2024
1 parent 245f9b2 commit ee5dbc1
Show file tree
Hide file tree
Showing 18 changed files with 59 additions and 40 deletions.
9 changes: 9 additions & 0 deletions crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ impl DataStream {
})
}

pub fn drop_columns(self, columns: &[&str]) -> Result<Self> {
Ok(Self {
df: Arc::new(self.df.as_ref().clone().drop_columns(columns)?),
context: self.context.clone(),
shutdown_tx: self.shutdown_tx.clone(),
shutdown_rx: self.shutdown_rx.clone(),
})
}

// Join two streams using the specified expression
pub fn join_on(
self,
Expand Down
41 changes: 26 additions & 15 deletions examples/examples/stream_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ async fn main() -> Result<()> {

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;
let ctx = Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1"))
.await;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic_builder = topic_builder
Expand All @@ -29,7 +32,7 @@ async fn main() -> Result<()> {
.clone()
.with_topic(String::from("temperature"))
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;
Expand All @@ -40,30 +43,38 @@ async fn main() -> Result<()> {
.clone()
.with_topic(String::from("humidity"))
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?,
)
.await?;
.await?
.with_column("humidity_sensor", col("sensor_name"))?
.drop_columns(&["sensor_name"])?
.window(
vec![col("humidity_sensor")],
vec![avg(col("reading")).alias("avg_humidity")],
Duration::from_millis(1_000),
None,
)?
.with_column("humidity_window_start_time", col("window_start_time"))?
.with_column("humidity_window_end_time", col("window_end_time"))?
.drop_columns(&["window_start_time", "window_end_time"])?;

let joined_ds = ctx
.from_topic(temperature_topic)
.await?
.window(
vec![col("sensor_name")],
vec![avg(col("reading")).alias("avg_temperature")],
Duration::from_millis(1_000),
None,
)?
.join(
humidity_ds,
JoinType::Inner,
&["sensor_name"],
&["sensor_name"],
None,
)?
.window(
vec![],
vec![
avg(col("temperature.reading")).alias("avg_temperature"),
avg(col("humidity.reading")).alias("avg_humidity"),
],
Duration::from_millis(1_000),
&["sensor_name", "window_start_time"],
&["humidity_sensor", "humidity_window_start_time"],
None,
)?;

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ feast = ["feast"]
[tool.maturin]
python-source = "python"
features = ["pyo3/extension-module"]
module-name = "denormalized._internal"
module-name = "denormalized._d_internal"

[tool.rye]
dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"]
Expand Down
3 changes: 1 addition & 2 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from denormalized._internal import PyContext
from denormalized._d_internal import PyContext
from .data_stream import DataStream

class Context:
Expand All @@ -20,4 +20,3 @@ def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> Da
ds = DataStream(py_ds)

return ds

2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/data_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pyarrow as pa
from denormalized._internal import PyDataStream
from denormalized._d_internal import PyDataStream
from denormalized.datafusion import Expr
from denormalized.utils import to_internal_expr, to_internal_exprs

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .catalog import Catalog, Database, Table

# The following imports are okay to remain as opaque to the user.
from denormalized._internal import Config, LogicalPlan, ExecutionPlan, runtime
from denormalized._d_internal import Config, LogicalPlan, ExecutionPlan, runtime

from .record_batch import RecordBatchStream, RecordBatch

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

import denormalized._internal as df_internal
import denormalized._d_internal as df_internal

from typing import TYPE_CHECKING

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
"""Common data types used throughout the DataFusion project."""

from denormalized._internal import common as common_internal
from denormalized._d_internal import common as common_internal
from enum import Enum

# TODO these should all have proper wrapper classes
Expand Down
12 changes: 6 additions & 6 deletions py-denormalized/python/denormalized/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

from __future__ import annotations

from denormalized._internal import SessionConfig as SessionConfigInternal
from denormalized._internal import RuntimeConfig as RuntimeConfigInternal
from denormalized._internal import SQLOptions as SQLOptionsInternal
from denormalized._internal import SessionContext as SessionContextInternal
from denormalized._internal import LogicalPlan, ExecutionPlan
from denormalized._d_internal import SessionConfig as SessionConfigInternal
from denormalized._d_internal import RuntimeConfig as RuntimeConfigInternal
from denormalized._d_internal import SQLOptions as SQLOptionsInternal
from denormalized._d_internal import SessionContext as SessionContextInternal
from denormalized._d_internal import LogicalPlan, ExecutionPlan

from denormalized._internal import AggregateUDF
from denormalized._d_internal import AggregateUDF
from denormalized.datafusion.catalog import Catalog, Table
from denormalized.datafusion.dataframe import DataFrame
from denormalized.datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list
Expand Down
4 changes: 2 additions & 2 deletions py-denormalized/python/denormalized/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import pathlib
from typing import Callable

from denormalized._internal import DataFrame as DataFrameInternal
from denormalized._d_internal import DataFrame as DataFrameInternal
from denormalized.datafusion.expr import Expr, SortExpr, sort_or_default
from denormalized._internal import (
from denormalized._d_internal import (
LogicalPlan,
ExecutionPlan,
)
Expand Down
6 changes: 3 additions & 3 deletions py-denormalized/python/denormalized/datafusion/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
from denormalized.datafusion.common import DataTypeMap, NullTreatment, RexType
from typing_extensions import deprecated

from denormalized._internal import LogicalPlan
from denormalized._internal import expr as expr_internal
from denormalized._internal import functions as functions_internal
from denormalized._d_internal import LogicalPlan
from denormalized._d_internal import expr as expr_internal
from denormalized._d_internal import functions as functions_internal

# The following are imported from the internal representation. We may choose to
# give these all proper wrappers, or to simply leave as is. These were added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from __future__ import annotations

from denormalized._internal import functions as f
from denormalized._d_internal import functions as f
from denormalized.datafusion.expr import (
CaseBuilder,
Expr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
"""Object store functionality."""

from denormalized._internal import object_store
from denormalized._d_internal import object_store

AmazonS3 = object_store.AmazonS3
GoogleCloud = object_store.GoogleCloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

if TYPE_CHECKING:
import pyarrow
import denormalized._internal as df_internal
import denormalized._d_internal as df_internal
import typing_extensions


Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

import denormalized._internal as df_internal
import denormalized._d_internal as df_internal
from datafusion.expr import Expr
from typing import Callable, TYPE_CHECKING, TypeVar
from abc import ABCMeta, abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/feast_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, TypeVar, Union, cast, get_type_hints

import pyarrow as pa
from denormalized._internal import PyDataStream
from denormalized._d_internal import PyDataStream
from denormalized.datafusion import Expr
from feast import FeatureStore, Field
from feast.data_source import PushMode
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from denormalized._internal import expr as internal_exprs
from denormalized._d_internal import expr as internal_exprs
from denormalized.datafusion import Expr


Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod utils;
pub(crate) struct TokioRuntime(tokio::runtime::Runtime);

/// A Python module implemented in Rust.
#[pymodule(name="_internal")]
#[pymodule(name = "_d_internal")]
fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<datastream::PyDataStream>()?;
m.add_class::<context::PyContext>()?;
Expand Down

0 comments on commit ee5dbc1

Please sign in to comment.