Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Dec 16, 2024
1 parent 91ad299 commit f629cf1
Show file tree
Hide file tree
Showing 26 changed files with 480 additions and 384 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ atoi_simd = "0.16"
atomic-waker = "1"
avro-schema = { version = "0.3" }
base64 = "0.22.0"
bincode = "1.3.3"
bitflags = "2"
bytemuck = { version = "1.11", features = ["derive", "extern_crate_alloc"] }
bytes = { version = "1.7" }
Expand Down
9 changes: 8 additions & 1 deletion crates/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,14 @@ dtype-struct = []
bigidx = ["arrow/bigidx", "polars-utils/bigidx"]
python = []

serde = ["dep:serde", "bitflags/serde", "polars-schema/serde", "polars-utils/serde"]
serde = [
"dep:serde",
"bitflags/serde",
"polars-schema/serde",
"polars-utils/serde",
"arrow/io_ipc",
"serde_json",
]
serde-lazy = ["serde", "arrow/serde", "indexmap/serde", "chrono/serde"]

docs-selection = [
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-core/src/frame/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ mod series;
/// 2. A [`ScalarColumn`] that repeats a single [`Scalar`]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(from = "Series"))]
#[cfg_attr(feature = "serde", serde(into = "_SerdeSeries"))]
pub enum Column {
Series(SeriesColumn),
Partitioned(PartitionedColumn),
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-core/src/frame/column/partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use crate::frame::Scalar;
use crate::series::IsSorted;

#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PartitionedColumn {
name: PlSmallStr,

values: Series,
ends: Arc<[IdxSize]>,

#[cfg_attr(feature = "serde", serde(skip))]
materialized: OnceLock<Series>,
}

Expand Down
68 changes: 68 additions & 0 deletions crates/polars-core/src/frame/column/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,71 @@ impl From<ScalarColumn> for Column {
Self::Scalar(value)
}
}

#[cfg(feature = "serde")]
mod serde_impl {
use std::sync::OnceLock;

use polars_error::PolarsError;
use polars_utils::pl_str::PlSmallStr;

use super::ScalarColumn;
use crate::frame::{Scalar, Series};

#[derive(serde::Serialize, serde::Deserialize)]
struct SerializeWrap {
name: PlSmallStr,
/// Unit-length series for dispatching to IPC serialize
unit_series: Series,
length: usize,
}

impl From<&ScalarColumn> for SerializeWrap {
fn from(value: &ScalarColumn) -> Self {
Self {
name: value.name.clone(),
unit_series: value.scalar.clone().into_series(PlSmallStr::EMPTY),
length: value.length,
}
}
}

impl TryFrom<SerializeWrap> for ScalarColumn {
type Error = PolarsError;

fn try_from(value: SerializeWrap) -> Result<Self, Self::Error> {
let slf = Self {
name: value.name,
scalar: Scalar::new(
value.unit_series.dtype().clone(),
value.unit_series.get(0)?.into_static(),
),
length: value.length,
materialized: OnceLock::new(),
};

Ok(slf)
}
}

impl serde::ser::Serialize for ScalarColumn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
SerializeWrap::from(self).serialize(serializer)
}
}

impl<'de> serde::de::Deserialize<'de> for ScalarColumn {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;

SerializeWrap::deserialize(deserializer)
.and_then(|x| ScalarColumn::try_from(x).map_err(D::Error::custom))
}
}
}
2 changes: 2 additions & 0 deletions crates/polars-core/src/frame/column/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use super::Series;
/// At the moment this just conditionally tracks where it was created so that materialization
/// problems can be tracked down.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SeriesColumn {
inner: Series,

#[cfg(debug_assertions)]
#[cfg_attr(feature = "serde", serde(skip))]
materialized_at: Option<std::sync::Arc<std::backtrace::Backtrace>>,
}

Expand Down
37 changes: 37 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3562,4 +3562,41 @@ mod test {
assert_eq!(df.get_column_names(), &["a", "b", "c"]);
Ok(())
}

#[cfg(feature = "serde")]
#[test]
fn test_deserialize_height_validation_8751() {
// Construct an invalid directly from the inner fields as the `new_unchecked_*` functions
// have debug assertions

use polars_utils::pl_serialize;

let df = DataFrame {
height: 2,
columns: vec![
Int64Chunked::full("a".into(), 1, 2).into_column(),
Int64Chunked::full("b".into(), 1, 1).into_column(),
],
cached_schema: OnceLock::new(),
};

// We rely on the fact that the serialization doesn't check the heights of all columns
let serialized = serde_json::to_string(&df).unwrap();
let err = serde_json::from_str::<DataFrame>(&serialized).unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));

let serialized = pl_serialize::SerializeOptions::new(true)
.serialize_to_bytes(&df)
.unwrap();
let err = pl_serialize::SerializeOptions::new(true)
.deserialize_from_reader::<DataFrame, _>(serialized.as_slice())
.unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));
}
}
8 changes: 4 additions & 4 deletions crates/polars-core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ mod test {
fn test_serde() -> PolarsResult<()> {
let ca = UInt32Chunked::new("foo".into(), &[Some(1), None, Some(2)]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_str::<Series>(&json).unwrap();
assert!(ca.into_series().equals_missing(&out));

let ca = StringChunked::new("foo".into(), &[Some("foo"), None, Some("bar")]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_str::<Series>(&json).unwrap(); // uses `Deserialize<'de>`
assert!(ca.into_series().equals_missing(&out));
Expand All @@ -32,7 +32,7 @@ mod test {
fn test_serde_owned() {
let ca = UInt32Chunked::new("foo".into(), &[Some(1), None, Some(2)]);

let json = serde_json::to_string(&ca).unwrap();
let json = serde_json::to_string(&ca.clone().into_series()).unwrap();

let out = serde_json::from_reader::<_, Series>(json.as_bytes()).unwrap(); // uses `DeserializeOwned`
assert!(ca.into_series().equals_missing(&out));
Expand All @@ -54,7 +54,7 @@ mod test {
for mut column in df.columns {
column.set_sorted_flag(IsSorted::Descending);
let json = serde_json::to_string(&column).unwrap();
let out = serde_json::from_reader::<_, Series>(json.as_bytes()).unwrap();
let out = serde_json::from_reader::<_, Column>(json.as_bytes()).unwrap();
let f = out.get_flags();
assert_ne!(f, MetadataFlags::empty());
assert_eq!(column.get_flags(), out.get_flags());
Expand Down
Loading

0 comments on commit f629cf1

Please sign in to comment.