Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Serialize DataFrame/Series using IPC in serde #20266

Merged
merged 11 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ atoi_simd = "0.16"
atomic-waker = "1"
avro-schema = { version = "0.3" }
base64 = "0.22.0"
bincode = "1.3.3"
bitflags = "2"
bytemuck = { version = "1.11", features = ["derive", "extern_crate_alloc"] }
bytes = { version = "1.7" }
chrono = { version = "0.4.31", default-features = false, features = ["std"] }
chrono-tz = "0.10"
ciborium = "0.2"
compact_str = { version = "0.8.0", features = ["serde"] }
crossbeam-channel = "0.5.8"
crossbeam-deque = "0.8.5"
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-arrow/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ pub enum ArrowDataType {
LargeList(Box<Field>),
/// A nested [`ArrowDataType`] with a given number of [`Field`]s.
Struct(Vec<Field>),
/// A nested datatype that can represent slots of differing types.
/// Third argument represents mode
#[cfg_attr(feature = "serde", serde(skip))]
Union(Vec<Field>, Option<Vec<i32>>, UnionMode),
/// A nested type that is represented as
///
/// List<entries: Struct<key: K, value: V>>
Expand Down Expand Up @@ -176,6 +172,10 @@ pub enum ArrowDataType {
Utf8View,
/// A type unknown to Arrow.
Unknown,
/// A nested datatype that can represent slots of differing types.
/// Third argument represents mode
#[cfg_attr(feature = "serde", serde(skip))]
Union(Vec<Field>, Option<Vec<i32>>, UnionMode),
}

/// Mode of [`ArrowDataType::Union`]
Expand Down
13 changes: 11 additions & 2 deletions crates/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ thiserror = { workspace = true }
xxhash-rust = { workspace = true }

[dev-dependencies]
bincode = { version = "1" }
bincode = { workspace = true }
serde_json = { workspace = true }

[build-dependencies]
Expand Down Expand Up @@ -123,7 +123,15 @@ dtype-struct = []
bigidx = ["arrow/bigidx", "polars-utils/bigidx"]
python = []

serde = ["dep:serde", "bitflags/serde", "polars-schema/serde", "polars-utils/serde"]
serde = [
"dep:serde",
"bitflags/serde",
"polars-schema/serde",
"polars-utils/serde",
"arrow/io_ipc",
"arrow/io_ipc_compression",
"serde_json",
]
serde-lazy = ["serde", "arrow/serde", "indexmap/serde", "chrono/serde"]

docs-selection = [
Expand All @@ -143,6 +151,7 @@ docs-selection = [
"row_hash",
"rolling_window",
"rolling_window_by",
"serde",
"dtype-categorical",
"dtype-decimal",
"diagonal_concat",
Expand Down
45 changes: 37 additions & 8 deletions crates/polars-core/src/datatypes/_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ enum SerializableDataType {
// some logical types we cannot know statically, e.g. Datetime
Unknown(UnknownKind),
#[cfg(feature = "dtype-categorical")]
Categorical(Option<Wrap<Utf8ViewArray>>, CategoricalOrdering),
Categorical(Option<Series>, CategoricalOrdering),
#[cfg(feature = "dtype-decimal")]
Decimal(Option<usize>, Option<usize>),
#[cfg(feature = "dtype-categorical")]
Enum(Option<Wrap<Utf8ViewArray>>, CategoricalOrdering),
Enum(Option<Series>, CategoricalOrdering),
#[cfg(feature = "object")]
Object(String),
}
Expand Down Expand Up @@ -146,11 +146,23 @@ impl From<&DataType> for SerializableDataType {
#[cfg(feature = "dtype-struct")]
Struct(flds) => Self::Struct(flds.clone()),
#[cfg(feature = "dtype-categorical")]
Categorical(_, ordering) => Self::Categorical(None, *ordering),
Categorical(Some(rev_map), ordering) => Self::Categorical(
Some(
StringChunked::with_chunk(PlSmallStr::EMPTY, rev_map.get_categories().clone())
.into_series(),
),
*ordering,
),
#[cfg(feature = "dtype-categorical")]
Enum(Some(rev_map), ordering) => {
Self::Enum(Some(Wrap(rev_map.get_categories().clone())), *ordering)
},
Categorical(None, ordering) => Self::Categorical(None, *ordering),
#[cfg(feature = "dtype-categorical")]
Enum(Some(rev_map), ordering) => Self::Enum(
Some(
StringChunked::with_chunk(PlSmallStr::EMPTY, rev_map.get_categories().clone())
.into_series(),
),
*ordering,
),
#[cfg(feature = "dtype-categorical")]
Enum(None, ordering) => Self::Enum(None, *ordering),
#[cfg(feature = "dtype-decimal")]
Expand Down Expand Up @@ -190,9 +202,26 @@ impl From<SerializableDataType> for DataType {
#[cfg(feature = "dtype-struct")]
Struct(flds) => Self::Struct(flds),
#[cfg(feature = "dtype-categorical")]
Categorical(_, ordering) => Self::Categorical(None, ordering),
Categorical(Some(categories), ordering) => Self::Categorical(
Some(Arc::new(RevMapping::build_local(
categories.0.rechunk().chunks()[0]
.as_any()
.downcast_ref::<Utf8ViewArray>()
.unwrap()
.clone(),
))),
ordering,
),
#[cfg(feature = "dtype-categorical")]
Categorical(None, ordering) => Self::Categorical(None, ordering),
#[cfg(feature = "dtype-categorical")]
Enum(Some(categories), _) => create_enum_dtype(categories.0),
Enum(Some(categories), _) => create_enum_dtype(
categories.rechunk().chunks()[0]
.as_any()
.downcast_ref::<Utf8ViewArray>()
.unwrap()
.clone(),
),
#[cfg(feature = "dtype-categorical")]
Enum(None, ordering) => Self::Enum(None, ordering),
#[cfg(feature = "dtype-decimal")]
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-core/src/frame/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ mod series;
/// 2. A [`ScalarColumn`] that repeats a single [`Scalar`]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(from = "Series"))]
#[cfg_attr(feature = "serde", serde(into = "_SerdeSeries"))]
pub enum Column {
Series(SeriesColumn),
Partitioned(PartitionedColumn),
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-core/src/frame/column/partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use crate::frame::Scalar;
use crate::series::IsSorted;

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PartitionedColumn {
name: PlSmallStr,

values: Series,
ends: Arc<[IdxSize]>,

#[cfg_attr(feature = "serde", serde(skip))]
materialized: OnceLock<Series>,
}

Expand Down
68 changes: 68 additions & 0 deletions crates/polars-core/src/frame/column/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,71 @@ impl From<ScalarColumn> for Column {
Self::Scalar(value)
}
}

#[cfg(feature = "serde")]
mod serde_impl {
use std::sync::OnceLock;

use polars_error::PolarsError;
use polars_utils::pl_str::PlSmallStr;

use super::ScalarColumn;
use crate::frame::{Scalar, Series};

#[derive(serde::Serialize, serde::Deserialize)]
struct SerializeWrap {
name: PlSmallStr,
/// Unit-length series for dispatching to IPC serialize
unit_series: Series,
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ScalarColumn I had the option of either using the serde::Serialize impl from AnyValue, or converting it to a unit-length Series and dispatch to IPC. I chose the IPC option as the AnyValue serde impl was missing quite a lot of dtypes, and using IPC would also give more assurance that the serialize behavior is the same with the SeriesColumn.

length: usize,
}

impl From<&ScalarColumn> for SerializeWrap {
fn from(value: &ScalarColumn) -> Self {
Self {
name: value.name.clone(),
unit_series: value.scalar.clone().into_series(PlSmallStr::EMPTY),
length: value.length,
}
}
}

impl TryFrom<SerializeWrap> for ScalarColumn {
type Error = PolarsError;

fn try_from(value: SerializeWrap) -> Result<Self, Self::Error> {
let slf = Self {
name: value.name,
scalar: Scalar::new(
value.unit_series.dtype().clone(),
value.unit_series.get(0)?.into_static(),
),
length: value.length,
materialized: OnceLock::new(),
};

Ok(slf)
}
}

impl serde::ser::Serialize for ScalarColumn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
SerializeWrap::from(self).serialize(serializer)
}
}

impl<'de> serde::de::Deserialize<'de> for ScalarColumn {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;

SerializeWrap::deserialize(deserializer)
.and_then(|x| ScalarColumn::try_from(x).map_err(D::Error::custom))
}
}
}
2 changes: 2 additions & 0 deletions crates/polars-core/src/frame/column/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use super::Series;
/// At the moment this just conditionally tracks where it was created so that materialization
/// problems can be tracked down.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SeriesColumn {
inner: Series,

#[cfg(debug_assertions)]
#[cfg_attr(feature = "serde", serde(skip))]
materialized_at: Option<std::sync::Arc<std::backtrace::Backtrace>>,
}

Expand Down
37 changes: 37 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3562,4 +3562,41 @@ mod test {
assert_eq!(df.get_column_names(), &["a", "b", "c"]);
Ok(())
}

#[cfg(feature = "serde")]
#[test]
fn test_deserialize_height_validation_8751() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test was moved from Python - the existing test used an exact string representation of the previous JSON format, but that is changed after this PR.

// Construct an invalid directly from the inner fields as the `new_unchecked_*` functions
// have debug assertions

use polars_utils::pl_serialize;

let df = DataFrame {
height: 2,
columns: vec![
Int64Chunked::full("a".into(), 1, 2).into_column(),
Int64Chunked::full("b".into(), 1, 1).into_column(),
],
cached_schema: OnceLock::new(),
};

// We rely on the fact that the serialization doesn't check the heights of all columns
let serialized = serde_json::to_string(&df).unwrap();
let err = serde_json::from_str::<DataFrame>(&serialized).unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));

let serialized = pl_serialize::SerializeOptions::default()
.serialize_to_bytes(&df)
.unwrap();
let err = pl_serialize::SerializeOptions::default()
.deserialize_from_reader::<DataFrame, _>(serialized.as_slice())
.unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));
}
}
8 changes: 4 additions & 4 deletions crates/polars-core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ mod test {
fn test_serde() -> PolarsResult<()> {
let ca = UInt32Chunked::new("foo".into(), &[Some(1), None, Some(2)]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_str::<Series>(&json).unwrap();
assert!(ca.into_series().equals_missing(&out));

let ca = StringChunked::new("foo".into(), &[Some("foo"), None, Some("bar")]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_str::<Series>(&json).unwrap(); // uses `Deserialize<'de>`
assert!(ca.into_series().equals_missing(&out));
Expand All @@ -32,7 +32,7 @@ mod test {
fn test_serde_owned() {
let ca = UInt32Chunked::new("foo".into(), &[Some(1), None, Some(2)]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_reader::<_, Series>(json.as_bytes()).unwrap(); // uses `DeserializeOwned`
assert!(ca.into_series().equals_missing(&out));
Expand All @@ -54,7 +54,7 @@ mod test {
for mut column in df.columns {
column.set_sorted_flag(IsSorted::Descending);
let json = serde_json::to_string(&column).unwrap();
let out = serde_json::from_reader::<_, Series>(json.as_bytes()).unwrap();
let out = serde_json::from_reader::<_, Column>(json.as_bytes()).unwrap();
let f = out.get_flags();
assert_ne!(f, MetadataFlags::empty());
assert_eq!(column.get_flags(), out.get_flags());
Expand Down
Loading
Loading