From 1d64ffaea0ef1b18c032f4ddabcdc540fd9fd6af Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 26 Nov 2024 15:55:56 -0800 Subject: [PATCH] refactor internal column name to constant --- crates/common/src/lib.rs | 4 ++ .../core/src/datasource/kafka/kafka_config.rs | 3 +- crates/core/src/datastream.rs | 3 +- .../continuous/grouped_window_agg_stream.rs | 9 ++- .../continuous/streaming_window.rs | 70 +++++++++---------- py-denormalized/src/datastream.rs | 3 +- 6 files changed, 48 insertions(+), 44 deletions(-) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 5df1826..b17602c 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,3 +1,7 @@ pub mod error; pub use error::*; + +pub const INTERNAL_METADATA_COLUMN: &str = "_streaming_internal_metadata"; + +// use denormalized::common::INTERNAL_METADATA_COLUMN; diff --git a/crates/core/src/datasource/kafka/kafka_config.rs b/crates/core/src/datasource/kafka/kafka_config.rs index 84b8dbf..5d22ff3 100644 --- a/crates/core/src/datasource/kafka/kafka_config.rs +++ b/crates/core/src/datasource/kafka/kafka_config.rs @@ -6,6 +6,7 @@ use apache_avro::Schema as AvroSchema; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use datafusion::logical_expr::SortExpr; +use denormalized_common::INTERNAL_METADATA_COLUMN; use crate::formats::decoders::avro::AvroDecoder; use crate::formats::decoders::json::JsonDecoder; @@ -203,7 +204,7 @@ impl KafkaTopicBuilder { fields.insert( fields.len(), Arc::new(Field::new( - String::from("_streaming_internal_metadata"), + String::from(INTERNAL_METADATA_COLUMN), DataType::Struct(Fields::from(struct_fields)), true, )), diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index f251724..6b22f72 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -26,6 +26,7 @@ use crate::state_backend::slatedb::get_global_slatedb; use denormalized_orchestrator::orchestrator::Orchestrator; use denormalized_common::error::Result; +use denormalized_common::INTERNAL_METADATA_COLUMN; /// The primary interface for building a streaming job /// @@ -203,7 +204,7 @@ impl DataStream { let qualified_fields = schema .iter() .map(|(qualifier, field)| (qualifier.cloned(), field.clone())) - .filter(|(_qualifier, field)| *field.name() != "_streaming_internal_metadata") + .filter(|(_qualifier, field)| *field.name() != INTERNAL_METADATA_COLUMN) .collect::>(); DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()).unwrap() diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 0388a23..46e6205 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -36,6 +36,7 @@ use datafusion::{ }, }; +use denormalized_common::INTERNAL_METADATA_COLUMN; use denormalized_orchestrator::{ channel_manager::take_receiver, orchestrator::OrchestrationMessage, }; @@ -331,7 +332,7 @@ impl GroupedWindowAggStream { Some(Ok(batch)) => { if batch.num_rows() > 0 { let watermark: RecordBatchWatermark = - RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?; + RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?; let ranges = get_windows_for_watermark(&watermark, self.window_type); let _ = self.ensure_window_frames_for_ranges(&ranges); for range in ranges { @@ -547,9 +548,7 @@ impl GroupedAggWindowFrame { } pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> { - let metadata = batch - .column_by_name("_streaming_internal_metadata") - .unwrap(); + let metadata = batch.column_by_name(INTERNAL_METADATA_COLUMN).unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); let ts_column = metadata_struct @@ -582,7 +581,7 @@ impl GroupedAggWindowFrame { .as_millis() as i64; let metadata = filtered_batch - .column_by_name("_streaming_internal_metadata") + .column_by_name(INTERNAL_METADATA_COLUMN) .unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index 49d8e27..7d889f5 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -1,3 +1,10 @@ +use arrow::{ + compute::{concat_batches, filter_record_batch, max}, + datatypes::TimestampMillisecondType, +}; +use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray}; +use arrow_ord::cmp; +use arrow_schema::{Field, Schema, SchemaRef}; use std::{ any::Any, borrow::Cow, @@ -8,14 +15,6 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use arrow::{ - compute::{concat_batches, filter_record_batch, max}, - datatypes::TimestampMillisecondType, -}; -use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray}; -use arrow_ord::cmp; -use arrow_schema::{Field, Schema, SchemaRef}; - use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::aggregate::AggregateFunctionExpr; use datafusion::physical_expr::{ @@ -42,6 +41,7 @@ use denormalized_orchestrator::{ channel_manager::{create_channel, get_sender}, orchestrator::OrchestrationMessage, }; +use denormalized_common::INTERNAL_METADATA_COLUMN; use futures::{Stream, StreamExt}; use tracing::debug; @@ -117,7 +117,7 @@ impl PartialWindowAggFrame { pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> { let metadata = batch - .column_by_name("_streaming_internal_metadata") + .column_by_name(INTERNAL_METADATA_COLUMN) .unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); @@ -151,7 +151,7 @@ impl PartialWindowAggFrame { .as_millis() as i64; let metadata = filtered_batch - .column_by_name("_streaming_internal_metadata") + .column_by_name(INTERNAL_METADATA_COLUMN) .unwrap(); let metadata_struct = metadata.as_any().downcast_ref::().unwrap(); @@ -778,35 +778,33 @@ impl WindowAggStream { #[inline] fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { - let result: std::prelude::v1::Result = match self - .input - .poll_next_unpin(cx) - { - Poll::Ready(rdy) => match rdy { - Some(Ok(batch)) => { - if batch.num_rows() > 0 { - let watermark: RecordBatchWatermark = - RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?; - let ranges = get_windows_for_watermark(&watermark, self.window_type); - let _ = self.ensure_window_frames_for_ranges(&ranges); - for range in ranges { - let frame = self.window_frames.get_mut(&range.0).unwrap(); - let _ = frame.push(&batch); - } - self.process_watermark(watermark); + let result: std::prelude::v1::Result = + match self.input.poll_next_unpin(cx) { + Poll::Ready(rdy) => match rdy { + Some(Ok(batch)) => { + if batch.num_rows() > 0 { + let watermark: RecordBatchWatermark = + RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?; + let ranges = get_windows_for_watermark(&watermark, self.window_type); + let _ = self.ensure_window_frames_for_ranges(&ranges); + for range in ranges { + let frame = self.window_frames.get_mut(&range.0).unwrap(); + let _ = frame.push(&batch); + } + self.process_watermark(watermark); - self.trigger_windows() - } else { - Ok(RecordBatch::new_empty(self.output_schema_with_window())) + self.trigger_windows() + } else { + Ok(RecordBatch::new_empty(self.output_schema_with_window())) + } } + Some(Err(e)) => Err(e), + None => Ok(RecordBatch::new_empty(self.output_schema_with_window())), + }, + Poll::Pending => { + return Poll::Pending; } - Some(Err(e)) => Err(e), - None => Ok(RecordBatch::new_empty(self.output_schema_with_window())), - }, - Poll::Pending => { - return Poll::Pending; - } - }; + }; Poll::Ready(Some(result)) } } diff --git a/py-denormalized/src/datastream.rs b/py-denormalized/src/datastream.rs index 5551529..4a452a6 100644 --- a/py-denormalized/src/datastream.rs +++ b/py-denormalized/src/datastream.rs @@ -14,6 +14,7 @@ use datafusion_python::expr::{join::PyJoinType, PyExpr}; use tokio::task::JoinHandle; use denormalized::datastream::DataStream; +use denormalized::common::INTERNAL_METADATA_COLUMN; use crate::errors::{py_denormalized_err, DenormalizedError, Result}; use crate::utils::{get_tokio_runtime, python_print, wait_for_future}; @@ -242,7 +243,7 @@ impl PyDataStream { Ok(Some(batch)) => { Python::with_gil(|py| -> PyResult<()> { let mut batch = batch.clone(); - if let Ok(col_idx) = batch.schema_ref().index_of("_streaming_internal_metadata") { + if let Ok(col_idx) = batch.schema_ref().index_of(INTERNAL_METADATA_COLUMN) { batch.remove_column(col_idx); }