Skip to content

Commit

Permalink
feat: Serialize DataFrame/Series using IPC in serde (#20266)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Dec 17, 2024
1 parent 2f6c23b commit e03555c
Show file tree
Hide file tree
Showing 41 changed files with 618 additions and 503 deletions.
10 changes: 8 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ atoi_simd = "0.16"
atomic-waker = "1"
avro-schema = { version = "0.3" }
base64 = "0.22.0"
bincode = "1.3.3"
bitflags = "2"
bytemuck = { version = "1.11", features = ["derive", "extern_crate_alloc"] }
bytes = { version = "1.7" }
chrono = { version = "0.4.31", default-features = false, features = ["std"] }
chrono-tz = "0.10"
ciborium = "0.2"
compact_str = { version = "0.8.0", features = ["serde"] }
crossbeam-channel = "0.5.8"
crossbeam-deque = "0.8.5"
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-arrow/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ pub enum ArrowDataType {
LargeList(Box<Field>),
/// A nested [`ArrowDataType`] with a given number of [`Field`]s.
Struct(Vec<Field>),
/// A nested datatype that can represent slots of differing types.
/// Third argument represents mode
#[cfg_attr(feature = "serde", serde(skip))]
Union(Vec<Field>, Option<Vec<i32>>, UnionMode),
/// A nested type that is represented as
///
/// List<entries: Struct<key: K, value: V>>
Expand Down Expand Up @@ -176,6 +172,10 @@ pub enum ArrowDataType {
Utf8View,
/// A type unknown to Arrow.
Unknown,
/// A nested datatype that can represent slots of differing types.
/// Third argument represents mode
#[cfg_attr(feature = "serde", serde(skip))]
Union(Vec<Field>, Option<Vec<i32>>, UnionMode),
}

/// Mode of [`ArrowDataType::Union`]
Expand Down
13 changes: 11 additions & 2 deletions crates/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ thiserror = { workspace = true }
xxhash-rust = { workspace = true }

[dev-dependencies]
bincode = { version = "1" }
bincode = { workspace = true }
serde_json = { workspace = true }

[build-dependencies]
Expand Down Expand Up @@ -123,7 +123,15 @@ dtype-struct = []
bigidx = ["arrow/bigidx", "polars-utils/bigidx"]
python = []

serde = ["dep:serde", "bitflags/serde", "polars-schema/serde", "polars-utils/serde"]
serde = [
"dep:serde",
"bitflags/serde",
"polars-schema/serde",
"polars-utils/serde",
"arrow/io_ipc",
"arrow/io_ipc_compression",
"serde_json",
]
serde-lazy = ["serde", "arrow/serde", "indexmap/serde", "chrono/serde"]

docs-selection = [
Expand All @@ -143,6 +151,7 @@ docs-selection = [
"row_hash",
"rolling_window",
"rolling_window_by",
"serde",
"dtype-categorical",
"dtype-decimal",
"diagonal_concat",
Expand Down
45 changes: 37 additions & 8 deletions crates/polars-core/src/datatypes/_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ enum SerializableDataType {
// some logical types we cannot know statically, e.g. Datetime
Unknown(UnknownKind),
#[cfg(feature = "dtype-categorical")]
Categorical(Option<Wrap<Utf8ViewArray>>, CategoricalOrdering),
Categorical(Option<Series>, CategoricalOrdering),
#[cfg(feature = "dtype-decimal")]
Decimal(Option<usize>, Option<usize>),
#[cfg(feature = "dtype-categorical")]
Enum(Option<Wrap<Utf8ViewArray>>, CategoricalOrdering),
Enum(Option<Series>, CategoricalOrdering),
#[cfg(feature = "object")]
Object(String),
}
Expand Down Expand Up @@ -146,11 +146,23 @@ impl From<&DataType> for SerializableDataType {
#[cfg(feature = "dtype-struct")]
Struct(flds) => Self::Struct(flds.clone()),
#[cfg(feature = "dtype-categorical")]
Categorical(_, ordering) => Self::Categorical(None, *ordering),
Categorical(Some(rev_map), ordering) => Self::Categorical(
Some(
StringChunked::with_chunk(PlSmallStr::EMPTY, rev_map.get_categories().clone())
.into_series(),
),
*ordering,
),
#[cfg(feature = "dtype-categorical")]
Enum(Some(rev_map), ordering) => {
Self::Enum(Some(Wrap(rev_map.get_categories().clone())), *ordering)
},
Categorical(None, ordering) => Self::Categorical(None, *ordering),
#[cfg(feature = "dtype-categorical")]
Enum(Some(rev_map), ordering) => Self::Enum(
Some(
StringChunked::with_chunk(PlSmallStr::EMPTY, rev_map.get_categories().clone())
.into_series(),
),
*ordering,
),
#[cfg(feature = "dtype-categorical")]
Enum(None, ordering) => Self::Enum(None, *ordering),
#[cfg(feature = "dtype-decimal")]
Expand Down Expand Up @@ -190,9 +202,26 @@ impl From<SerializableDataType> for DataType {
#[cfg(feature = "dtype-struct")]
Struct(flds) => Self::Struct(flds),
#[cfg(feature = "dtype-categorical")]
Categorical(_, ordering) => Self::Categorical(None, ordering),
Categorical(Some(categories), ordering) => Self::Categorical(
Some(Arc::new(RevMapping::build_local(
categories.0.rechunk().chunks()[0]
.as_any()
.downcast_ref::<Utf8ViewArray>()
.unwrap()
.clone(),
))),
ordering,
),
#[cfg(feature = "dtype-categorical")]
Categorical(None, ordering) => Self::Categorical(None, ordering),
#[cfg(feature = "dtype-categorical")]
Enum(Some(categories), _) => create_enum_dtype(categories.0),
Enum(Some(categories), _) => create_enum_dtype(
categories.rechunk().chunks()[0]
.as_any()
.downcast_ref::<Utf8ViewArray>()
.unwrap()
.clone(),
),
#[cfg(feature = "dtype-categorical")]
Enum(None, ordering) => Self::Enum(None, ordering),
#[cfg(feature = "dtype-decimal")]
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-core/src/frame/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ mod series;
/// 2. A [`ScalarColumn`] that repeats a single [`Scalar`]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(from = "Series"))]
#[cfg_attr(feature = "serde", serde(into = "_SerdeSeries"))]
pub enum Column {
Series(SeriesColumn),
Partitioned(PartitionedColumn),
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-core/src/frame/column/partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use crate::frame::Scalar;
use crate::series::IsSorted;

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PartitionedColumn {
name: PlSmallStr,

values: Series,
ends: Arc<[IdxSize]>,

#[cfg_attr(feature = "serde", serde(skip))]
materialized: OnceLock<Series>,
}

Expand Down
68 changes: 68 additions & 0 deletions crates/polars-core/src/frame/column/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,71 @@ impl From<ScalarColumn> for Column {
Self::Scalar(value)
}
}

#[cfg(feature = "serde")]
mod serde_impl {
use std::sync::OnceLock;

use polars_error::PolarsError;
use polars_utils::pl_str::PlSmallStr;

use super::ScalarColumn;
use crate::frame::{Scalar, Series};

#[derive(serde::Serialize, serde::Deserialize)]
struct SerializeWrap {
name: PlSmallStr,
/// Unit-length series for dispatching to IPC serialize
unit_series: Series,
length: usize,
}

impl From<&ScalarColumn> for SerializeWrap {
fn from(value: &ScalarColumn) -> Self {
Self {
name: value.name.clone(),
unit_series: value.scalar.clone().into_series(PlSmallStr::EMPTY),
length: value.length,
}
}
}

impl TryFrom<SerializeWrap> for ScalarColumn {
type Error = PolarsError;

fn try_from(value: SerializeWrap) -> Result<Self, Self::Error> {
let slf = Self {
name: value.name,
scalar: Scalar::new(
value.unit_series.dtype().clone(),
value.unit_series.get(0)?.into_static(),
),
length: value.length,
materialized: OnceLock::new(),
};

Ok(slf)
}
}

impl serde::ser::Serialize for ScalarColumn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
SerializeWrap::from(self).serialize(serializer)
}
}

impl<'de> serde::de::Deserialize<'de> for ScalarColumn {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;

SerializeWrap::deserialize(deserializer)
.and_then(|x| ScalarColumn::try_from(x).map_err(D::Error::custom))
}
}
}
2 changes: 2 additions & 0 deletions crates/polars-core/src/frame/column/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use super::Series;
/// At the moment this just conditionally tracks where it was created so that materialization
/// problems can be tracked down.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SeriesColumn {
inner: Series,

#[cfg(debug_assertions)]
#[cfg_attr(feature = "serde", serde(skip))]
materialized_at: Option<std::sync::Arc<std::backtrace::Backtrace>>,
}

Expand Down
37 changes: 37 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3562,4 +3562,41 @@ mod test {
assert_eq!(df.get_column_names(), &["a", "b", "c"]);
Ok(())
}

#[cfg(feature = "serde")]
#[test]
fn test_deserialize_height_validation_8751() {
// Construct an invalid directly from the inner fields as the `new_unchecked_*` functions
// have debug assertions

use polars_utils::pl_serialize;

let df = DataFrame {
height: 2,
columns: vec![
Int64Chunked::full("a".into(), 1, 2).into_column(),
Int64Chunked::full("b".into(), 1, 1).into_column(),
],
cached_schema: OnceLock::new(),
};

// We rely on the fact that the serialization doesn't check the heights of all columns
let serialized = serde_json::to_string(&df).unwrap();
let err = serde_json::from_str::<DataFrame>(&serialized).unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));

let serialized = pl_serialize::SerializeOptions::default()
.serialize_to_bytes(&df)
.unwrap();
let err = pl_serialize::SerializeOptions::default()
.deserialize_from_reader::<DataFrame, _>(serialized.as_slice())
.unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));
}
}
8 changes: 4 additions & 4 deletions crates/polars-core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ mod test {
fn test_serde() -> PolarsResult<()> {
let ca = UInt32Chunked::new("foo".into(), &[Some(1), None, Some(2)]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_str::<Series>(&json).unwrap();
assert!(ca.into_series().equals_missing(&out));

let ca = StringChunked::new("foo".into(), &[Some("foo"), None, Some("bar")]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_str::<Series>(&json).unwrap(); // uses `Deserialize<'de>`
assert!(ca.into_series().equals_missing(&out));
Expand All @@ -32,7 +32,7 @@ mod test {
fn test_serde_owned() {
let ca = UInt32Chunked::new("foo".into(), &[Some(1), None, Some(2)]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_reader::<_, Series>(json.as_bytes()).unwrap(); // uses `DeserializeOwned`
assert!(ca.into_series().equals_missing(&out));
Expand All @@ -54,7 +54,7 @@ mod test {
for mut column in df.columns {
column.set_sorted_flag(IsSorted::Descending);
let json = serde_json::to_string(&column).unwrap();
let out = serde_json::from_reader::<_, Series>(json.as_bytes()).unwrap();
let out = serde_json::from_reader::<_, Column>(json.as_bytes()).unwrap();
let f = out.get_flags();
assert_ne!(f, MetadataFlags::empty());
assert_eq!(column.get_flags(), out.get_flags());
Expand Down
Loading

0 comments on commit e03555c

Please sign in to comment.