diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 26dc488..9d484ee 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -253,7 +253,7 @@ impl PartitionStream for KafkaStreamRead { max_timestamp, offsets_read, }; - let _ = state_backend + state_backend .as_ref() .put(channel_tag.as_bytes().to_vec(), off.to_bytes().unwrap()); debug!("checkpointed offsets {:?}", off); diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 4114584..da43430 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -168,7 +168,7 @@ impl GroupedWindowAggStream { context, epoch: 0, partition, - channel_tag: channel_tag, + channel_tag, receiver, state_backend, }; @@ -184,7 +184,7 @@ impl GroupedWindowAggStream { .collect(); let _ = stream.ensure_window_frames_for_ranges(&ranges); state.frames.iter().for_each(|f| { - let _ = stream.update_accumulators_for_frame(f.window_start_time, &f); + let _ = stream.update_accumulators_for_frame(f.window_start_time, f); }); let state_watermark = state.watermark.unwrap(); stream.process_watermark(RecordBatchWatermark { @@ -387,7 +387,7 @@ impl GroupedWindowAggStream { let watermark = { let watermark_lock = self.latest_watermark.lock().unwrap(); - watermark_lock.clone() + *watermark_lock }; let checkpointed_state = CheckpointedGroupedWindowAggStream { diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 2fe1306..d785d65 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { ])) .await?; - let _ctx = Context::new()? + Context::new()? .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) .await .from_topic(source_topic) diff --git a/py-denormalized/.gitignore b/py-denormalized/.gitignore index c8f0442..9d2a03d 100644 --- a/py-denormalized/.gitignore +++ b/py-denormalized/.gitignore @@ -70,3 +70,6 @@ docs/_build/ # Pyenv .python-version + +.ipynb_checkpoints/ +Untitled.ipynb diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index c6c5baf..88735ca 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ [project.optional-dependencies] tests = ["pytest"] feast = ["feast"] +dev = [] [tool.maturin] python-source = "python" diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index a2a5805..ca28dfb 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -73,6 +73,11 @@ def with_column(self, name: str, predicate: Expr) -> "DataStream": DataStream: A new DataStream with the additional column. """ return DataStream(self.ds.with_column(name, to_internal_expr(predicate))) + + def drop_columns(self, columns: list[str]) -> "DataStream": + """Drops columns from the DataStream. + """ + return DataStream(self.ds.drop_columns(columns)) def join_on( self, right: "DataStream", join_type: str, on_exprs: list[Expr] diff --git a/py-denormalized/python/examples/stream_join.py b/py-denormalized/python/examples/stream_join.py new file mode 100644 index 0000000..1155a62 --- /dev/null +++ b/py-denormalized/python/examples/stream_join.py @@ -0,0 +1,72 @@ +"""stream_aggregate example.""" + +import json +import signal +import sys +import pprint as pp + +from denormalized import Context +from denormalized.datafusion import col, expr +from denormalized.datafusion import functions as f +from denormalized.datafusion import lit + + +def signal_handler(sig, frame): + print("You pressed Ctrl+C!") + sys.exit(0) + + +signal.signal(signal.SIGINT, signal_handler) + +bootstrap_server = "localhost:9092" + +sample_event = { + "occurred_at_ms": 100, + "sensor_name": "foo", + "reading": 0.0, +} + + +def print_batch(rb): + pp.pprint(rb.to_pydict()) + + +ctx = Context() +temperature_ds = ctx.from_topic( + "temperature", json.dumps(sample_event), bootstrap_server +) + +humidity_ds = ( + ctx.from_topic("humidity", json.dumps(sample_event), bootstrap_server) + .with_column("humidity_sensor", col("sensor_name")) + .drop_columns(["sensor_name"]) + .window( + [col("humidity_sensor")], + [ + f.count(col("reading")).alias("avg_humidity"), + ], + 4000, + None, + ) + .with_column("humidity_window_start_time", col("window_start_time")) + .with_column("humidity_window_end_time", col("window_end_time")) + .drop_columns(["window_start_time", "window_end_time"]) +) + +joined_ds = ( + temperature_ds.window( + [col("sensor_name")], + [ + f.avg(col("reading")).alias("avg_temperature"), + ], + 4000, + None, + ) + .join( + humidity_ds, + "inner", + ["sensor_name", "window_start_time"], + ["humidity_sensor", "humidity_window_start_time"], + ) + .sink(print_batch) +) diff --git a/py-denormalized/src/datastream.rs b/py-denormalized/src/datastream.rs index da9de9d..c3b712f 100644 --- a/py-denormalized/src/datastream.rs +++ b/py-denormalized/src/datastream.rs @@ -9,13 +9,14 @@ use tokio::task::JoinHandle; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::PyArrowType; use datafusion::arrow::pyarrow::ToPyArrow; +use datafusion::common::JoinType; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion_python::expr::{join::PyJoinType, PyExpr}; use denormalized::datastream::DataStream; -use crate::errors::{py_denormalized_err, Result}; +use crate::errors::{py_denormalized_err, DenormalizedError, Result}; use crate::utils::{get_tokio_runtime, python_print, wait_for_future}; #[pyclass(name = "PyDataStream", module = "denormalized", subclass)] @@ -86,6 +87,13 @@ impl PyDataStream { Ok(Self::new(ds)) } + pub fn drop_columns(&self, columns: Vec) -> Result { + let columns_ref: Vec<&str> = columns.iter().map(|s| s.as_str()).collect(); + + let ds = self.ds.as_ref().clone().drop_columns(&columns_ref)?; + Ok(Self::new(ds)) + } + pub fn join_on( &self, _right: PyDataStream, @@ -95,29 +103,42 @@ impl PyDataStream { todo!() } - #[pyo3(signature = (right, join_type, left_cols, right_cols, filter=None))] + #[pyo3(signature = (right, how, left_cols, right_cols, filter=None))] pub fn join( &self, right: PyDataStream, - join_type: PyJoinType, + how: &str, left_cols: Vec, right_cols: Vec, filter: Option, ) -> PyResult { let right_ds = right.ds.as_ref().clone(); + let join_type = match how { + "inner" => JoinType::Inner, + "left" => JoinType::Left, + "right" => JoinType::Right, + "full" => JoinType::Full, + "semi" => JoinType::LeftSemi, + "anti" => JoinType::LeftAnti, + how => { + return Err(DenormalizedError::Common(format!( + "The join type {how} does not exist or is not implemented" + )) + .into()); + } + }; + let filter = filter.map(|f| f.into()); let left_cols = left_cols.iter().map(|s| s.as_ref()).collect::>(); let right_cols = right_cols.iter().map(|s| s.as_ref()).collect::>(); - let ds = self.ds.as_ref().clone().join( - right_ds, - join_type.into(), - &left_cols, - &right_cols, - filter, - )?; + let ds = + self.ds + .as_ref() + .clone() + .join(right_ds, join_type, &left_cols, &right_cols, filter)?; Ok(Self::new(ds)) }