Skip to content

Commit

Permalink
remove internal schema column (#66)
Browse files Browse the repository at this point in the history
* remove internal schema column

* strip out internal field at rust schema() level

* remove unused import

* refactor internal column name to constant
  • Loading branch information
emgeee authored Nov 27, 2024
1 parent 6023d6d commit 7abb9f8
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 75 deletions.
4 changes: 4 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
pub mod error;

pub use error::*;

pub const INTERNAL_METADATA_COLUMN: &str = "_streaming_internal_metadata";

// use denormalized::common::INTERNAL_METADATA_COLUMN;
3 changes: 2 additions & 1 deletion crates/core/src/datasource/kafka/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use apache_avro::Schema as AvroSchema;
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};

use datafusion::logical_expr::SortExpr;
use denormalized_common::INTERNAL_METADATA_COLUMN;

use crate::formats::decoders::avro::AvroDecoder;
use crate::formats::decoders::json::JsonDecoder;
Expand Down Expand Up @@ -203,7 +204,7 @@ impl KafkaTopicBuilder {
fields.insert(
fields.len(),
Arc::new(Field::new(
String::from("_streaming_internal_metadata"),
String::from(INTERNAL_METADATA_COLUMN),
DataType::Struct(Fields::from(struct_fields)),
true,
)),
Expand Down
14 changes: 12 additions & 2 deletions crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::state_backend::slatedb::get_global_slatedb;
use denormalized_orchestrator::orchestrator::Orchestrator;

use denormalized_common::error::Result;
use denormalized_common::INTERNAL_METADATA_COLUMN;

/// The primary interface for building a streaming job
///
Expand Down Expand Up @@ -196,8 +197,17 @@ impl DataStream {
}

/// Return the schema of DataFrame that backs the DataStream
pub fn schema(&self) -> &DFSchema {
self.df.schema()
pub fn schema(&self) -> DFSchema {
let schema = self.df.schema().clone();

// Strip out internal metadata fields from the schema
let qualified_fields = schema
.iter()
.map(|(qualifier, field)| (qualifier.cloned(), field.clone()))
.filter(|(_qualifier, field)| *field.name() != INTERNAL_METADATA_COLUMN)
.collect::<Vec<_>>();

DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()).unwrap()
}

/// Prints the schema of the underlying dataframe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::{
},
};

use denormalized_common::INTERNAL_METADATA_COLUMN;
use denormalized_orchestrator::{
channel_manager::take_receiver, orchestrator::OrchestrationMessage,
};
Expand Down Expand Up @@ -323,35 +324,33 @@ impl GroupedWindowAggStream {

#[inline]
fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> = match self
.input
.poll_next_unpin(cx)
{
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> =
match self.input.poll_next_unpin(cx) {
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
}
self.process_watermark(watermark);

self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
}
self.process_watermark(watermark);

self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
};
};

let mut checkpoint_batch = false;

Expand Down Expand Up @@ -547,9 +546,7 @@ impl GroupedAggWindowFrame {
}

pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> {
let metadata = batch
.column_by_name("_streaming_internal_metadata")
.unwrap();
let metadata = batch.column_by_name(INTERNAL_METADATA_COLUMN).unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

let ts_column = metadata_struct
Expand Down Expand Up @@ -582,7 +579,7 @@ impl GroupedAggWindowFrame {
.as_millis() as i64;

let metadata = filtered_batch
.column_by_name("_streaming_internal_metadata")
.column_by_name(INTERNAL_METADATA_COLUMN)
.unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

Expand Down
72 changes: 34 additions & 38 deletions crates/core/src/physical_plan/continuous/streaming_window.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
use arrow::{
compute::{concat_batches, filter_record_batch, max},
datatypes::TimestampMillisecondType,
};
use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray};
use arrow_ord::cmp;
use arrow_schema::{Field, Schema, SchemaRef};
use std::{
any::Any,
borrow::Cow,
Expand All @@ -8,14 +15,6 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use arrow::{
compute::{concat_batches, filter_record_batch, max},
datatypes::TimestampMillisecondType,
};
use arrow_array::{Array, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray};
use arrow_ord::cmp;
use arrow_schema::{Field, Schema, SchemaRef};

use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
use datafusion::physical_expr::{
Expand All @@ -38,6 +37,7 @@ use datafusion::{
common::{internal_err, stats::Precision, DataFusionError, Statistics},
physical_plan::Distribution,
};
use denormalized_common::INTERNAL_METADATA_COLUMN;
use denormalized_orchestrator::{
channel_manager::{create_channel, get_sender},
orchestrator::OrchestrationMessage,
Expand Down Expand Up @@ -116,9 +116,7 @@ impl PartialWindowAggFrame {
}

pub fn push(&mut self, batch: &RecordBatch) -> Result<(), DataFusionError> {
let metadata = batch
.column_by_name("_streaming_internal_metadata")
.unwrap();
let metadata = batch.column_by_name(INTERNAL_METADATA_COLUMN).unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

let ts_column = metadata_struct
Expand Down Expand Up @@ -151,7 +149,7 @@ impl PartialWindowAggFrame {
.as_millis() as i64;

let metadata = filtered_batch
.column_by_name("_streaming_internal_metadata")
.column_by_name(INTERNAL_METADATA_COLUMN)
.unwrap();
let metadata_struct = metadata.as_any().downcast_ref::<StructArray>().unwrap();

Expand Down Expand Up @@ -778,35 +776,33 @@ impl WindowAggStream {

#[inline]
fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> = match self
.input
.poll_next_unpin(cx)
{
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, "_streaming_internal_metadata")?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
}
self.process_watermark(watermark);
let result: std::prelude::v1::Result<RecordBatch, DataFusionError> =
match self.input.poll_next_unpin(cx) {
Poll::Ready(rdy) => match rdy {
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let watermark: RecordBatchWatermark =
RecordBatchWatermark::try_from(&batch, INTERNAL_METADATA_COLUMN)?;
let ranges = get_windows_for_watermark(&watermark, self.window_type);
let _ = self.ensure_window_frames_for_ranges(&ranges);
for range in ranges {
let frame = self.window_frames.get_mut(&range.0).unwrap();
let _ = frame.push(&batch);
}
self.process_watermark(watermark);

self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
self.trigger_windows()
} else {
Ok(RecordBatch::new_empty(self.output_schema_with_window()))
}
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
Some(Err(e)) => Err(e),
None => Ok(RecordBatch::new_empty(self.output_schema_with_window())),
},
Poll::Pending => {
return Poll::Pending;
}
};
};
Poll::Ready(Some(result))
}
}
Expand Down
12 changes: 8 additions & 4 deletions py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;

use tokio::task::JoinHandle;

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::pyarrow::ToPyArrow;
use datafusion::common::JoinType;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion_python::expr::{join::PyJoinType, PyExpr};
use tokio::task::JoinHandle;

use denormalized::common::INTERNAL_METADATA_COLUMN;
use denormalized::datastream::DataStream;

use crate::errors::{py_denormalized_err, DenormalizedError, Result};
Expand Down Expand Up @@ -242,8 +242,12 @@ impl PyDataStream {
match message.transpose() {
Ok(Some(batch)) => {
Python::with_gil(|py| -> PyResult<()> {
let batch = batch.clone().to_pyarrow(py)?;
func.call1(py, (batch,))?;
let mut batch = batch.clone();
if let Ok(col_idx) = batch.schema_ref().index_of(INTERNAL_METADATA_COLUMN) {
batch.remove_column(col_idx);
}

func.call1(py, (batch.to_pyarrow(py)?,))?;
Ok(())
})?;
},
Expand Down

0 comments on commit 7abb9f8

Please sign in to comment.