From a4b34088a6b7fb4e8e30bc475477c2ce0ce968f5 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Sun, 24 Nov 2024 23:43:10 -0800 Subject: [PATCH 1/5] Adding Kafka Timestamp support when no ts column is provided --- .../core/src/datasource/kafka/kafka_config.rs | 16 ++--- .../src/datasource/kafka/kafka_stream_read.rs | 41 +++++++++--- .../continuous/streaming_window.rs | 62 ++++++++++--------- 3 files changed, 70 insertions(+), 49 deletions(-) diff --git a/crates/core/src/datasource/kafka/kafka_config.rs b/crates/core/src/datasource/kafka/kafka_config.rs index 6559cb9..84b8dbf 100644 --- a/crates/core/src/datasource/kafka/kafka_config.rs +++ b/crates/core/src/datasource/kafka/kafka_config.rs @@ -37,8 +37,8 @@ pub struct KafkaReadConfig { pub encoding: StreamEncoding, pub order: Vec>, pub partition_count: i32, - pub timestamp_column: String, - pub timestamp_unit: TimestampUnit, + pub timestamp_column: Option, + pub timestamp_unit: Option, pub kafka_connection_opts: ConnectionOpts, } @@ -232,17 +232,9 @@ impl KafkaTopicBuilder { .as_ref() .ok_or_else(|| DenormalizedError::KafkaConfig("encoding required".to_string()))?; - let timestamp_column = self - .timestamp_column - .as_ref() - .ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_column required".to_string()))? - .clone(); + let timestamp_column = self.timestamp_column.clone(); - let timestamp_unit = self - .timestamp_unit - .as_ref() - .ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_unit required".to_string()))? - .clone(); + let timestamp_unit = self.timestamp_unit.clone(); let mut kafka_connection_opts = ConnectionOpts::new(); for (key, value) in opts.into_iter() { diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index caed655..7dcdf94 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use std::time::Duration; use arrow::datatypes::TimestampMillisecondType; -use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, StructArray}; +use arrow_array::{ + Array, ArrayRef, Int64Array, PrimitiveArray, RecordBatch, StringArray, StructArray, +}; use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; use crossbeam::channel; use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver}; @@ -146,8 +148,17 @@ impl PartitionStream for KafkaStreamRead { let mut builder = RecordBatchReceiverStreamBuilder::new(self.config.schema.clone(), 1); let tx = builder.tx(); let canonical_schema = self.config.schema.clone(); - let timestamp_column: String = self.config.timestamp_column.clone(); - let timestamp_unit = self.config.timestamp_unit.clone(); + let use_kafka_timestamps = self.config.timestamp_column.is_none(); + let timestamp_column = self + .config + .timestamp_column + .clone() + .unwrap_or(String::from("")); + let timestamp_unit = self + .config + .timestamp_unit + .clone() + .unwrap_or(crate::prelude::TimestampUnit::Int64Millis); let batch_timeout: Duration = Duration::from_millis(100); let mut decoder = self.config.build_decoder(); @@ -180,10 +191,13 @@ impl PartitionStream for KafkaStreamRead { let mut offsets_read: HashMap = HashMap::new(); + let mut timestamps: Vec = Vec::new(); loop { match tokio::time::timeout(batch_timeout, consumer.recv()).await { Ok(Ok(m)) => { let payload = m.payload().expect("Message payload is empty"); + let ts = m.timestamp().to_millis().unwrap_or(-1); + timestamps.push(ts); decoder.push_to_buffer(payload.to_owned()); offsets_read .entry(m.partition()) @@ -207,12 +221,21 @@ impl PartitionStream for KafkaStreamRead { if !offsets_read.is_empty() { let record_batch = decoder.to_record_batch().unwrap(); - let ts_column = record_batch - .column_by_name(timestamp_column.as_str()) - .map(|ts_col| { - Arc::new(array_to_timestamp_array(ts_col, timestamp_unit.clone())) - }) - .unwrap(); + + let kafka_ts_array = Int64Array::from_iter_values(timestamps); + let ts_column_array: Arc = Arc::new(kafka_ts_array); + let mut ts_column = Arc::new(array_to_timestamp_array( + ts_column_array.as_ref(), + timestamp_unit.clone(), + )); + + // Timestamp column was provided. TODO: This code is a hack for now we need a little cleanup here. + if !use_kafka_timestamps { + let arr = record_batch + .column_by_name(timestamp_column.as_str()) + .unwrap(); + ts_column = Arc::new(array_to_timestamp_array(arr, timestamp_unit.clone())); + }; let binary_vec = Vec::from_iter( std::iter::repeat(String::from("no_barrier")).take(ts_column.len()), diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index c6ea378..49d8e27 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -736,7 +736,6 @@ impl WindowAggStream { let mut watermark_lock: std::sync::MutexGuard> = self.latest_watermark.lock().unwrap(); - debug!("latest watermark currently is {:?}", *watermark_lock); if let Some(current_watermark) = *watermark_lock { if current_watermark <= watermark.min_timestamp { *watermark_lock = Some(watermark.min_timestamp) @@ -987,34 +986,41 @@ impl FullWindowAggStream { if self.seen_windows.contains(&start_time) && !self.cached_frames.contains_key(&start_time) { - panic!("we are reopening a window already seen.") + debug!( + "received late data for window with start time {:?}. dropping it.", + start_time + ); + Ok(RecordBatch::new_empty(Arc::new( + add_window_columns_to_schema(self.schema.clone()), + ))) + } else { + let frame = self.cached_frames.entry(start_time).or_insert({ + FullWindowAggFrame::new( + start_time, + batch_end_time.unwrap(), + &self.exec_aggregate_expressions, + self.aggregate_expressions.clone(), + self.filter_expressions.clone(), + self.schema.clone(), + self.baseline_metrics.clone(), + ) + }); + + self.seen_windows.insert(start_time); + + //last two columns are timestamp columns, so remove them before pushing them onto a frame. + let col_size = rb.num_columns(); + rb.remove_column(col_size - 1); + rb.remove_column(col_size - 2); + + frame.aggregate_batch(rb); + + self.watermark = self + .watermark + .map_or(Some(start_time), |w| Some(w.max(start_time))); + + self.finalize_windows() } - let frame = self.cached_frames.entry(start_time).or_insert({ - FullWindowAggFrame::new( - start_time, - batch_end_time.unwrap(), - &self.exec_aggregate_expressions, - self.aggregate_expressions.clone(), - self.filter_expressions.clone(), - self.schema.clone(), - self.baseline_metrics.clone(), - ) - }); - - self.seen_windows.insert(start_time); - - //last two columns are timestamp columns, so remove them before pushing them onto a frame. - let col_size = rb.num_columns(); - rb.remove_column(col_size - 1); - rb.remove_column(col_size - 2); - - frame.aggregate_batch(rb); - - self.watermark = self - .watermark - .map_or(Some(start_time), |w| Some(w.max(start_time))); - - self.finalize_windows() } else { Ok(RecordBatch::new_empty(Arc::new( add_window_columns_to_schema(self.schema.clone()), From f1eebdf4ba40841e3485b0e67b000f7b510ee987 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 25 Nov 2024 11:24:03 -0800 Subject: [PATCH 2/5] feat: add default kafka timestamp behavior to python API --- py-denormalized/pyproject.toml | 6 +++- .../python/denormalized/context.py | 36 +++++++++++++++---- py-denormalized/src/context.rs | 14 ++++++-- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index c7adff2..ac3bc8f 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -8,7 +8,11 @@ requires-python = ">=3.12" classifiers = [] dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml description = "Embeddable stream processing engine" -dependencies = ["pyarrow>=17.0.0", "datafusion>=40.1.0"] +dependencies = [ + "pyarrow>=17.0.0", + "datafusion>=40.1.0", + "pip>=24.3.1", +] [project.optional-dependencies] tests = ["pytest"] diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index e3f7c84..29f243e 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -4,16 +4,30 @@ class Context: - """Context.""" + """A context manager for handling data stream operations. + + This class provides functionality to create and manage data streams + from various sources like Kafka topics. + """ def __init__(self) -> None: - """__init__.""" + """Initializes a new Context instance with PyContext.""" self.ctx = PyContext() def __repr__(self): + """Returns the string representation of the PyContext object. + + Returns: + str: String representation of the underlying PyContext. + """ return self.ctx.__repr__() def __str__(self): + """Returns the string representation of the PyContext object. + + Returns: + str: String representation of the underlying PyContext. + """ return self.ctx.__str__() def from_topic( @@ -21,17 +35,27 @@ def from_topic( topic: str, sample_json: str, bootstrap_servers: str, - timestamp_column: str, + timestamp_column: str | None = None, group_id: str = "default_group", ) -> DataStream: - """Create a new context from a topic.""" + """Creates a new DataStream from a Kafka topic. + + Args: + topic: The name of the Kafka topic to consume from. + sample_json: A sample JSON string representing the expected message format. + bootstrap_servers: Comma-separated list of Kafka broker addresses. + timestamp_column: Optional column name containing message timestamps. If this is not specified it will default to using the kafka timestamp the message was received at. + group_id: Kafka consumer group ID, defaults to "default_group". + + Returns: + DataStream: A new DataStream instance connected to the specified topic. + """ py_ds = self.ctx.from_topic( topic, sample_json, bootstrap_servers, - timestamp_column, group_id, + timestamp_column, ) ds = DataStream(py_ds) - return ds diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index cee7a88..78eccee 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -71,14 +71,21 @@ impl PyContext { Ok("PyContext".to_string()) } + #[pyo3(signature = ( + topic, + sample_json, + bootstrap_servers, + group_id, + timestamp_column = None + ))] pub fn from_topic( &self, py: Python, topic: String, sample_json: String, bootstrap_servers: String, - timestamp_column: String, group_id: String, + timestamp_column: Option, ) -> PyResult { let context = self.context.clone(); let rt = &get_tokio_runtime(py).0; @@ -86,8 +93,11 @@ impl PyContext { rt.spawn(async move { let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); + if let Some(ts_col) = timestamp_column { + topic_builder.with_timestamp(ts_col, TimestampUnit::Int64Millis); + } + let source_topic = topic_builder - .with_timestamp(timestamp_column, TimestampUnit::Int64Millis) .with_encoding("json")? .with_topic(topic) .infer_schema_from_json(sample_json.as_str())? From 5c8f5396d631866b4105bd79ad10135ccbffa49b Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 25 Nov 2024 11:28:19 -0800 Subject: [PATCH 3/5] update lock file --- py-denormalized/uv.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/py-denormalized/uv.lock b/py-denormalized/uv.lock index 7503e9d..1291b8e 100644 --- a/py-denormalized/uv.lock +++ b/py-denormalized/uv.lock @@ -374,6 +374,7 @@ version = "0.0.9" source = { editable = "." } dependencies = [ { name = "datafusion" }, + { name = "pip" }, { name = "pyarrow" }, ] @@ -401,6 +402,7 @@ docs = [ requires-dist = [ { name = "datafusion", specifier = ">=40.1.0" }, { name = "feast", marker = "extra == 'feast'" }, + { name = "pip", specifier = ">=24.3.1" }, { name = "pyarrow", specifier = ">=17.0.0" }, ] From 55c78979c8d5675919a347f7749c517c39340481 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 25 Nov 2024 11:36:44 -0800 Subject: [PATCH 4/5] format python --- py-denormalized/python/denormalized/context.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index 6c6ca4e..66ec3d1 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -1,9 +1,10 @@ from denormalized._d_internal import PyContext from .data_stream import DataStream + class Context: """A context manager for handling data stream operations. - + This class provides functionality to create and manage data streams from various sources like Kafka topics. """ @@ -14,7 +15,7 @@ def __init__(self) -> None: def __repr__(self): """Returns the string representation of the PyContext object. - + Returns: str: String representation of the underlying PyContext. """ @@ -22,7 +23,7 @@ def __repr__(self): def __str__(self): """Returns the string representation of the PyContext object. - + Returns: str: String representation of the underlying PyContext. """ From 3f0567a67b5933302aa6f9016baa1e8d89f6aabc Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 25 Nov 2024 11:41:31 -0800 Subject: [PATCH 5/5] update macosx runner --- .github/workflows/python.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index f59a614..3023d6b 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -119,7 +119,7 @@ jobs: strategy: matrix: platform: - - runner: macos-12 + - runner: macos-14 target: x86_64 - runner: macos-14 target: aarch64