From 6fefc91bac726b2c4ffadde0bb53d42b1dff752b Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 26 Nov 2024 13:42:29 -0800 Subject: [PATCH] strip out internal field at rust schema() level --- crates/core/src/datastream.rs | 13 +++++++++++-- py-denormalized/python/denormalized/data_stream.py | 3 +-- py-denormalized/src/datastream.rs | 12 ++++++++---- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index ea86b72..f251724 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -196,8 +196,17 @@ impl DataStream { } /// Return the schema of DataFrame that backs the DataStream - pub fn schema(&self) -> &DFSchema { - self.df.schema() + pub fn schema(&self) -> DFSchema { + let schema = self.df.schema().clone(); + + // Strip out internal metadata fields from the schema + let qualified_fields = schema + .iter() + .map(|(qualifier, field)| (qualifier.cloned(), field.clone())) + .filter(|(_qualifier, field)| *field.name() != "_streaming_internal_metadata") + .collect::>(); + + DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()).unwrap() } /// Prints the schema of the underlying dataframe diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index c7329eb..f2c2d1c 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -47,8 +47,7 @@ def schema(self) -> pa.Schema: Returns: pa.Schema: The PyArrow schema describing the structure of the data """ - schema = self.ds.schema() - return schema.remove(schema.get_field_index("_streaming_internal_metadata")) + return self.ds.schema() def select(self, expr_list: list[Expr]) -> "DataStream": """Select specific columns or expressions from the DataStream. diff --git a/py-denormalized/src/datastream.rs b/py-denormalized/src/datastream.rs index c3b712f..dce9188 100644 --- a/py-denormalized/src/datastream.rs +++ b/py-denormalized/src/datastream.rs @@ -4,8 +4,7 @@ use futures::StreamExt; use std::sync::Arc; use std::time::Duration; -use tokio::task::JoinHandle; - +use core::ops::Index; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::PyArrowType; use datafusion::arrow::pyarrow::ToPyArrow; @@ -13,6 +12,7 @@ use datafusion::common::JoinType; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion_python::expr::{join::PyJoinType, PyExpr}; +use tokio::task::JoinHandle; use denormalized::datastream::DataStream; @@ -242,8 +242,12 @@ impl PyDataStream { match message.transpose() { Ok(Some(batch)) => { Python::with_gil(|py| -> PyResult<()> { - let batch = batch.clone().to_pyarrow(py)?; - func.call1(py, (batch,))?; + let mut batch = batch.clone(); + if let Ok(col_idx) = batch.schema_ref().index_of("_streaming_internal_metadata") { + batch.remove_column(col_idx); + } + + func.call1(py, (batch.to_pyarrow(py)?,))?; Ok(()) })?; },