Skip to content

Commit

Permalink
strip out internal field at rust schema() level
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Nov 26, 2024
1 parent fee8499 commit 6fefc91
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
13 changes: 11 additions & 2 deletions crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,17 @@ impl DataStream {
}

/// Return the schema of DataFrame that backs the DataStream
pub fn schema(&self) -> &DFSchema {
self.df.schema()
pub fn schema(&self) -> DFSchema {
let schema = self.df.schema().clone();

// Strip out internal metadata fields from the schema
let qualified_fields = schema
.iter()
.map(|(qualifier, field)| (qualifier.cloned(), field.clone()))
.filter(|(_qualifier, field)| *field.name() != "_streaming_internal_metadata")
.collect::<Vec<_>>();

DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()).unwrap()
}

/// Prints the schema of the underlying dataframe
Expand Down
3 changes: 1 addition & 2 deletions py-denormalized/python/denormalized/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def schema(self) -> pa.Schema:
Returns:
pa.Schema: The PyArrow schema describing the structure of the data
"""
schema = self.ds.schema()
return schema.remove(schema.get_field_index("_streaming_internal_metadata"))
return self.ds.schema()

def select(self, expr_list: list[Expr]) -> "DataStream":
"""Select specific columns or expressions from the DataStream.
Expand Down
12 changes: 8 additions & 4 deletions py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;

use tokio::task::JoinHandle;

use core::ops::Index;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::pyarrow::ToPyArrow;
use datafusion::common::JoinType;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion_python::expr::{join::PyJoinType, PyExpr};
use tokio::task::JoinHandle;

use denormalized::datastream::DataStream;

Expand Down Expand Up @@ -242,8 +242,12 @@ impl PyDataStream {
match message.transpose() {
Ok(Some(batch)) => {
Python::with_gil(|py| -> PyResult<()> {
let batch = batch.clone().to_pyarrow(py)?;
func.call1(py, (batch,))?;
let mut batch = batch.clone();
if let Ok(col_idx) = batch.schema_ref().index_of("_streaming_internal_metadata") {
batch.remove_column(col_idx);
}

func.call1(py, (batch.to_pyarrow(py)?,))?;
Ok(())
})?;
},
Expand Down

0 comments on commit 6fefc91

Please sign in to comment.