Skip to content

Commit

Permalink
refactor: Don't deconstruct CsvParseOptions (#20302)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 15, 2024
1 parent 2d65cc9 commit 91ad299
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 238 deletions.
32 changes: 17 additions & 15 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::buffer::Buffer;
use super::options::{CommentPrefix, NullValuesCompiled};
use super::splitfields::SplitFields;
use super::utils::get_file_chunks;
use super::CsvParseOptions;
use crate::path_utils::is_cloud_url;
use crate::utils::compression::maybe_decompress_bytes;

Expand Down Expand Up @@ -740,14 +741,9 @@ pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &
#[allow(clippy::too_many_arguments)]
pub(super) fn parse_lines(
mut bytes: &[u8],
parse_options: &CsvParseOptions,
offset: usize,
separator: u8,
comment_prefix: Option<&CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
missing_is_null: bool,
ignore_errors: bool,
mut truncate_ragged_lines: bool,
null_values: Option<&NullValuesCompiled>,
projection: &[usize],
buffers: &mut [Buffer],
Expand All @@ -760,6 +756,7 @@ pub(super) fn parse_lines(
!projection.is_empty(),
"at least one column should be projected"
);
let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
// During projection pushdown we are not checking other csv fields.
// This would be very expensive and we don't care as we only want
// the projected columns.
Expand All @@ -781,9 +778,9 @@ pub(super) fn parse_lines(

if bytes.is_empty() {
return Ok(original_bytes_len);
} else if is_comment_line(bytes, comment_prefix) {
} else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
// deal with comments
let bytes_rem = skip_this_line(bytes, quote_char, eol_char);
let bytes_rem = skip_this_line(bytes, parse_options.quote_char, parse_options.eol_char);
bytes = bytes_rem;
continue;
}
Expand All @@ -795,7 +792,12 @@ pub(super) fn parse_lines(
let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
let mut processed_fields = 0;

let mut iter = SplitFields::new(bytes, separator, quote_char, eol_char);
let mut iter = SplitFields::new(
bytes,
parse_options.separator,
parse_options.quote_char,
parse_options.eol_char,
);
let mut idx = 0u32;
let mut read_sol = 0;
loop {
Expand Down Expand Up @@ -840,9 +842,9 @@ pub(super) fn parse_lines(
add_null = unsafe { null_values.is_null(field, idx as usize) }
}
if add_null {
buf.add_null(!missing_is_null && field.is_empty())
buf.add_null(!parse_options.missing_is_null && field.is_empty())
} else {
buf.add(field, ignore_errors, needs_escaping, missing_is_null)
buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
.map_err(|e| {
let bytes_offset = offset + field.as_ptr() as usize - start;
let unparsable = String::from_utf8_lossy(field);
Expand Down Expand Up @@ -874,7 +876,7 @@ pub(super) fn parse_lines(
match projection_iter.next() {
Some(p) => next_projected = p,
None => {
if bytes.get(read_sol - 1) == Some(&eol_char) {
if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
bytes = &bytes[read_sol..];
} else {
if !truncate_ragged_lines && read_sol < bytes.len() {
Expand All @@ -884,8 +886,8 @@ Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
}
let bytes_rem = skip_this_line(
unsafe { bytes.get_unchecked(read_sol - 1..) },
quote_char,
eol_char,
parse_options.quote_char,
parse_options.eol_char,
);
bytes = bytes_rem;
}
Expand All @@ -907,7 +909,7 @@ Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
// SAFETY: processed fields index can never exceed the projection indices.
buffers.get_unchecked_mut(processed_fields)
};
buf.add_null(!missing_is_null);
buf.add_null(!parse_options.missing_is_null);
processed_fields += 1;
}
line_count += 1;
Expand Down
109 changes: 38 additions & 71 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use polars_time::prelude::*;
use rayon::prelude::*;

use super::buffer::init_buffers;
use super::options::{CommentPrefix, CsvEncoding, NullValues, NullValuesCompiled};
use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
use super::parser::{
is_comment_line, parse_lines, skip_bom, skip_line_ending, skip_lines_naive, skip_this_line,
CountLines, SplitLines,
Expand All @@ -21,6 +21,7 @@ use super::reader::prepare_csv_schema;
use super::schema_inference::{check_decimal_comma, infer_file_schema};
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use super::utils::decompress;
use super::CsvParseOptions;
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
Expand Down Expand Up @@ -100,6 +101,7 @@ pub(crate) struct CoreReader<'a> {
reader_bytes: Option<ReaderBytes<'a>>,
/// Explicit schema for the CSV file
schema: SchemaRef,
parse_options: CsvParseOptions,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// Current line number, used in error reporting
Expand All @@ -110,21 +112,13 @@ pub(crate) struct CoreReader<'a> {
// after the header, we need to take embedded lines into account
skip_rows_after_header: usize,
n_rows: Option<usize>,
encoding: CsvEncoding,
n_threads: Option<usize>,
has_header: bool,
separator: u8,
chunk_size: usize,
decimal_comma: bool,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
missing_is_null: bool,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
to_cast: Vec<Field>,
row_index: Option<RowIndex>,
truncate_ragged_lines: bool,
#[cfg_attr(not(feature = "dtype-categorical"), allow(unused))]
has_categorical: bool,
}
Expand All @@ -143,38 +137,29 @@ impl<'a> CoreReader<'a> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
reader_bytes: ReaderBytes<'a>,
parse_options: Arc<CsvParseOptions>,
n_rows: Option<usize>,
skip_rows: usize,
skip_lines: usize,
mut projection: Option<Vec<usize>>,
max_records: Option<usize>,
separator: Option<u8>,
has_header: bool,
ignore_errors: bool,
schema: Option<SchemaRef>,
columns: Option<Arc<[PlSmallStr]>>,
encoding: CsvEncoding,
mut n_threads: Option<usize>,
schema_overwrite: Option<SchemaRef>,
dtype_overwrite: Option<Arc<Vec<DataType>>>,
chunk_size: usize,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValues>,
missing_is_null: bool,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
mut to_cast: Vec<Field>,
skip_rows_after_header: usize,
row_index: Option<RowIndex>,
try_parse_dates: bool,
raise_if_empty: bool,
truncate_ragged_lines: bool,
decimal_comma: bool,
) -> PolarsResult<CoreReader<'a>> {
let separator = separator.unwrap_or(b',');
let separator = parse_options.separator;

check_decimal_comma(decimal_comma, separator)?;
check_decimal_comma(parse_options.decimal_comma, separator)?;
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;

Expand All @@ -192,9 +177,13 @@ impl<'a> CoreReader<'a> {
{
let total_n_rows =
n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
if let Some(b) =
decompress(&reader_bytes, total_n_rows, separator, quote_char, eol_char)
{
if let Some(b) = decompress(
&reader_bytes,
total_n_rows,
separator,
parse_options.quote_char,
parse_options.eol_char,
) {
reader_bytes = ReaderBytes::Owned(b.into());
}
}
Expand All @@ -204,21 +193,15 @@ impl<'a> CoreReader<'a> {
None => {
let (inferred_schema, _, _) = infer_file_schema(
&reader_bytes,
separator,
&parse_options,
max_records,
has_header,
schema_overwrite.as_deref(),
skip_rows,
skip_lines,
skip_rows_after_header,
comment_prefix.as_ref(),
quote_char,
eol_char,
null_values.as_ref(),
try_parse_dates,
raise_if_empty,
&mut n_threads,
decimal_comma,
)?;
Arc::new(inferred_schema)
},
Expand All @@ -233,7 +216,11 @@ impl<'a> CoreReader<'a> {
let has_categorical = prepare_csv_schema(&mut schema, &mut to_cast)?;

// Create a null value for every column
let null_values = null_values.map(|nv| nv.compile(&schema)).transpose()?;
let null_values = parse_options
.null_values
.as_ref()
.map(|nv| nv.clone().compile(&schema))
.transpose()?;

if let Some(cols) = columns {
let mut prj = Vec::with_capacity(cols.len());
Expand All @@ -246,6 +233,7 @@ impl<'a> CoreReader<'a> {

Ok(CoreReader {
reader_bytes: Some(reader_bytes),
parse_options: (*parse_options).clone(),
schema,
projection,
current_line: usize::from(has_header),
Expand All @@ -254,21 +242,13 @@ impl<'a> CoreReader<'a> {
skip_rows_before_header: skip_rows,
skip_rows_after_header,
n_rows,
encoding,
n_threads,
has_header,
separator,
chunk_size,
comment_prefix,
quote_char,
eol_char,
null_values,
missing_is_null,
predicate,
to_cast,
row_index,
truncate_ragged_lines,
decimal_comma,
has_categorical,
})
}
Expand All @@ -287,7 +267,7 @@ impl<'a> CoreReader<'a> {
self.skip_lines,
self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_prefix.as_ref(),
self.parse_options.comment_prefix.as_ref(),
self.has_header,
)?;

Expand Down Expand Up @@ -320,31 +300,28 @@ impl<'a> CoreReader<'a> {
) -> PolarsResult<DataFrame> {
let mut df = read_chunk(
bytes,
self.separator,
&self.parse_options,
self.schema.as_ref(),
self.ignore_errors,
projection,
bytes_offset,
self.quote_char,
self.eol_char,
self.comment_prefix.as_ref(),
capacity,
self.encoding,
self.null_values.as_ref(),
self.missing_is_null,
self.truncate_ragged_lines,
usize::MAX,
stop_at_nbytes,
starting_point_offset,
self.decimal_comma,
)?;

cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
Ok(df)
}

fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
let (bytes, _) = self.find_starting_point(bytes, self.quote_char, self.eol_char)?;
let (bytes, _) = self.find_starting_point(
bytes,
self.parse_options.quote_char,
self.parse_options.eol_char,
)?;

let projection = self.get_projection()?;

Expand Down Expand Up @@ -407,9 +384,9 @@ impl<'a> CoreReader<'a> {
#[cfg(target_family = "wasm")]
let pool = &POOL;

let counter = CountLines::new(self.quote_char, self.eol_char);
let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char);
let mut total_offset = 0;
let check_utf8 = matches!(self.encoding, CsvEncoding::Utf8)
let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
&& self.schema.iter_fields().any(|f| f.dtype().is_string());

pool.scope(|s| {
Expand All @@ -418,9 +395,11 @@ impl<'a> CoreReader<'a> {
if b.is_empty() {
break;
}
debug_assert!(total_offset == 0 || bytes[total_offset - 1] == self.eol_char);
debug_assert!(
total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
);
let (count, position) = counter.find_next(b, &mut chunk_size);
debug_assert!(count == 0 || b[position] == self.eol_char);
debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);

let (b, count) = if count == 0
&& unsafe { b.as_ptr().add(b.len()) == bytes.as_ptr().add(bytes.len()) }
Expand Down Expand Up @@ -537,23 +516,16 @@ impl<'a> CoreReader<'a> {
#[allow(clippy::too_many_arguments)]
pub fn read_chunk(
bytes: &[u8],
separator: u8,
parse_options: &CsvParseOptions,
schema: &Schema,
ignore_errors: bool,
projection: &[usize],
bytes_offset_thread: usize,
quote_char: Option<u8>,
eol_char: u8,
comment_prefix: Option<&CommentPrefix>,
capacity: usize,
encoding: CsvEncoding,
null_values: Option<&NullValuesCompiled>,
missing_is_null: bool,
truncate_ragged_lines: bool,
chunk_size: usize,
stop_at_nbytes: usize,
starting_point_offset: Option<usize>,
decimal_comma: bool,
) -> PolarsResult<DataFrame> {
let mut read = bytes_offset_thread;
// There's an off-by-one error somewhere in the reading code, where it reads
Expand All @@ -565,9 +537,9 @@ pub fn read_chunk(
projection,
capacity + 1,
schema,
quote_char,
encoding,
decimal_comma,
parse_options.quote_char,
parse_options.encoding,
parse_options.decimal_comma,
)?;

debug_assert!(projection.is_sorted());
Expand All @@ -583,14 +555,9 @@ pub fn read_chunk(
let offset = read + starting_point_offset.unwrap();
read += parse_lines(
local_bytes,
parse_options,
offset,
separator,
comment_prefix,
quote_char,
eol_char,
missing_is_null,
ignore_errors,
truncate_ragged_lines,
null_values,
projection,
&mut buffers,
Expand Down
Loading

0 comments on commit 91ad299

Please sign in to comment.