diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index 61cc1c0c1b00..24291dac846c 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -12,6 +12,7 @@ use super::buffer::Buffer; use super::options::{CommentPrefix, NullValuesCompiled}; use super::splitfields::SplitFields; use super::utils::get_file_chunks; +use super::CsvParseOptions; use crate::path_utils::is_cloud_url; use crate::utils::compression::maybe_decompress_bytes; @@ -740,14 +741,9 @@ pub(super) fn skip_this_line(bytes: &[u8], quote: Option, eol_char: u8) -> & #[allow(clippy::too_many_arguments)] pub(super) fn parse_lines( mut bytes: &[u8], + parse_options: &CsvParseOptions, offset: usize, - separator: u8, - comment_prefix: Option<&CommentPrefix>, - quote_char: Option, - eol_char: u8, - missing_is_null: bool, ignore_errors: bool, - mut truncate_ragged_lines: bool, null_values: Option<&NullValuesCompiled>, projection: &[usize], buffers: &mut [Buffer], @@ -760,6 +756,7 @@ pub(super) fn parse_lines( !projection.is_empty(), "at least one column should be projected" ); + let mut truncate_ragged_lines = parse_options.truncate_ragged_lines; // During projection pushdown we are not checking other csv fields. // This would be very expensive and we don't care as we only want // the projected columns. @@ -781,9 +778,9 @@ pub(super) fn parse_lines( if bytes.is_empty() { return Ok(original_bytes_len); - } else if is_comment_line(bytes, comment_prefix) { + } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) { // deal with comments - let bytes_rem = skip_this_line(bytes, quote_char, eol_char); + let bytes_rem = skip_this_line(bytes, parse_options.quote_char, parse_options.eol_char); bytes = bytes_rem; continue; } @@ -795,7 +792,12 @@ pub(super) fn parse_lines( let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() }; let mut processed_fields = 0; - let mut iter = SplitFields::new(bytes, separator, quote_char, eol_char); + let mut iter = SplitFields::new( + bytes, + parse_options.separator, + parse_options.quote_char, + parse_options.eol_char, + ); let mut idx = 0u32; let mut read_sol = 0; loop { @@ -840,9 +842,9 @@ pub(super) fn parse_lines( add_null = unsafe { null_values.is_null(field, idx as usize) } } if add_null { - buf.add_null(!missing_is_null && field.is_empty()) + buf.add_null(!parse_options.missing_is_null && field.is_empty()) } else { - buf.add(field, ignore_errors, needs_escaping, missing_is_null) + buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null) .map_err(|e| { let bytes_offset = offset + field.as_ptr() as usize - start; let unparsable = String::from_utf8_lossy(field); @@ -874,7 +876,7 @@ pub(super) fn parse_lines( match projection_iter.next() { Some(p) => next_projected = p, None => { - if bytes.get(read_sol - 1) == Some(&eol_char) { + if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) { bytes = &bytes[read_sol..]; } else { if !truncate_ragged_lines && read_sol < bytes.len() { @@ -884,8 +886,8 @@ Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE) } let bytes_rem = skip_this_line( unsafe { bytes.get_unchecked(read_sol - 1..) }, - quote_char, - eol_char, + parse_options.quote_char, + parse_options.eol_char, ); bytes = bytes_rem; } @@ -907,7 +909,7 @@ Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE) // SAFETY: processed fields index can never exceed the projection indices. buffers.get_unchecked_mut(processed_fields) }; - buf.add_null(!missing_is_null); + buf.add_null(!parse_options.missing_is_null); processed_fields += 1; } line_count += 1; diff --git a/crates/polars-io/src/csv/read/read_impl.rs b/crates/polars-io/src/csv/read/read_impl.rs index 4b73063fbb5f..d1dc175ea861 100644 --- a/crates/polars-io/src/csv/read/read_impl.rs +++ b/crates/polars-io/src/csv/read/read_impl.rs @@ -12,7 +12,7 @@ use polars_time::prelude::*; use rayon::prelude::*; use super::buffer::init_buffers; -use super::options::{CommentPrefix, CsvEncoding, NullValues, NullValuesCompiled}; +use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled}; use super::parser::{ is_comment_line, parse_lines, skip_bom, skip_line_ending, skip_lines_naive, skip_this_line, CountLines, SplitLines, @@ -21,6 +21,7 @@ use super::reader::prepare_csv_schema; use super::schema_inference::{check_decimal_comma, infer_file_schema}; #[cfg(any(feature = "decompress", feature = "decompress-fast"))] use super::utils::decompress; +use super::CsvParseOptions; use crate::mmap::ReaderBytes; use crate::predicates::PhysicalIoExpr; #[cfg(not(any(feature = "decompress", feature = "decompress-fast")))] @@ -100,6 +101,7 @@ pub(crate) struct CoreReader<'a> { reader_bytes: Option>, /// Explicit schema for the CSV file schema: SchemaRef, + parse_options: CsvParseOptions, /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// Current line number, used in error reporting @@ -110,21 +112,13 @@ pub(crate) struct CoreReader<'a> { // after the header, we need to take embedded lines into account skip_rows_after_header: usize, n_rows: Option, - encoding: CsvEncoding, n_threads: Option, has_header: bool, - separator: u8, chunk_size: usize, - decimal_comma: bool, - comment_prefix: Option, - quote_char: Option, - eol_char: u8, null_values: Option, - missing_is_null: bool, predicate: Option>, to_cast: Vec, row_index: Option, - truncate_ragged_lines: bool, #[cfg_attr(not(feature = "dtype-categorical"), allow(unused))] has_categorical: bool, } @@ -143,38 +137,29 @@ impl<'a> CoreReader<'a> { #[allow(clippy::too_many_arguments)] pub(crate) fn new( reader_bytes: ReaderBytes<'a>, + parse_options: Arc, n_rows: Option, skip_rows: usize, skip_lines: usize, mut projection: Option>, max_records: Option, - separator: Option, has_header: bool, ignore_errors: bool, schema: Option, columns: Option>, - encoding: CsvEncoding, mut n_threads: Option, schema_overwrite: Option, dtype_overwrite: Option>>, chunk_size: usize, - comment_prefix: Option, - quote_char: Option, - eol_char: u8, - null_values: Option, - missing_is_null: bool, predicate: Option>, mut to_cast: Vec, skip_rows_after_header: usize, row_index: Option, - try_parse_dates: bool, raise_if_empty: bool, - truncate_ragged_lines: bool, - decimal_comma: bool, ) -> PolarsResult> { - let separator = separator.unwrap_or(b','); + let separator = parse_options.separator; - check_decimal_comma(decimal_comma, separator)?; + check_decimal_comma(parse_options.decimal_comma, separator)?; #[cfg(any(feature = "decompress", feature = "decompress-fast"))] let mut reader_bytes = reader_bytes; @@ -192,9 +177,13 @@ impl<'a> CoreReader<'a> { { let total_n_rows = n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n); - if let Some(b) = - decompress(&reader_bytes, total_n_rows, separator, quote_char, eol_char) - { + if let Some(b) = decompress( + &reader_bytes, + total_n_rows, + separator, + parse_options.quote_char, + parse_options.eol_char, + ) { reader_bytes = ReaderBytes::Owned(b.into()); } } @@ -204,21 +193,15 @@ impl<'a> CoreReader<'a> { None => { let (inferred_schema, _, _) = infer_file_schema( &reader_bytes, - separator, + &parse_options, max_records, has_header, schema_overwrite.as_deref(), skip_rows, skip_lines, skip_rows_after_header, - comment_prefix.as_ref(), - quote_char, - eol_char, - null_values.as_ref(), - try_parse_dates, raise_if_empty, &mut n_threads, - decimal_comma, )?; Arc::new(inferred_schema) }, @@ -233,7 +216,11 @@ impl<'a> CoreReader<'a> { let has_categorical = prepare_csv_schema(&mut schema, &mut to_cast)?; // Create a null value for every column - let null_values = null_values.map(|nv| nv.compile(&schema)).transpose()?; + let null_values = parse_options + .null_values + .as_ref() + .map(|nv| nv.clone().compile(&schema)) + .transpose()?; if let Some(cols) = columns { let mut prj = Vec::with_capacity(cols.len()); @@ -246,6 +233,7 @@ impl<'a> CoreReader<'a> { Ok(CoreReader { reader_bytes: Some(reader_bytes), + parse_options: (*parse_options).clone(), schema, projection, current_line: usize::from(has_header), @@ -254,21 +242,13 @@ impl<'a> CoreReader<'a> { skip_rows_before_header: skip_rows, skip_rows_after_header, n_rows, - encoding, n_threads, has_header, - separator, chunk_size, - comment_prefix, - quote_char, - eol_char, null_values, - missing_is_null, predicate, to_cast, row_index, - truncate_ragged_lines, - decimal_comma, has_categorical, }) } @@ -287,7 +267,7 @@ impl<'a> CoreReader<'a> { self.skip_lines, self.skip_rows_before_header, self.skip_rows_after_header, - self.comment_prefix.as_ref(), + self.parse_options.comment_prefix.as_ref(), self.has_header, )?; @@ -320,23 +300,16 @@ impl<'a> CoreReader<'a> { ) -> PolarsResult { let mut df = read_chunk( bytes, - self.separator, + &self.parse_options, self.schema.as_ref(), self.ignore_errors, projection, bytes_offset, - self.quote_char, - self.eol_char, - self.comment_prefix.as_ref(), capacity, - self.encoding, self.null_values.as_ref(), - self.missing_is_null, - self.truncate_ragged_lines, usize::MAX, stop_at_nbytes, starting_point_offset, - self.decimal_comma, )?; cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?; @@ -344,7 +317,11 @@ impl<'a> CoreReader<'a> { } fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult { - let (bytes, _) = self.find_starting_point(bytes, self.quote_char, self.eol_char)?; + let (bytes, _) = self.find_starting_point( + bytes, + self.parse_options.quote_char, + self.parse_options.eol_char, + )?; let projection = self.get_projection()?; @@ -407,9 +384,9 @@ impl<'a> CoreReader<'a> { #[cfg(target_family = "wasm")] let pool = &POOL; - let counter = CountLines::new(self.quote_char, self.eol_char); + let counter = CountLines::new(self.parse_options.quote_char, self.parse_options.eol_char); let mut total_offset = 0; - let check_utf8 = matches!(self.encoding, CsvEncoding::Utf8) + let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8) && self.schema.iter_fields().any(|f| f.dtype().is_string()); pool.scope(|s| { @@ -418,9 +395,11 @@ impl<'a> CoreReader<'a> { if b.is_empty() { break; } - debug_assert!(total_offset == 0 || bytes[total_offset - 1] == self.eol_char); + debug_assert!( + total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char + ); let (count, position) = counter.find_next(b, &mut chunk_size); - debug_assert!(count == 0 || b[position] == self.eol_char); + debug_assert!(count == 0 || b[position] == self.parse_options.eol_char); let (b, count) = if count == 0 && unsafe { b.as_ptr().add(b.len()) == bytes.as_ptr().add(bytes.len()) } @@ -537,23 +516,16 @@ impl<'a> CoreReader<'a> { #[allow(clippy::too_many_arguments)] pub fn read_chunk( bytes: &[u8], - separator: u8, + parse_options: &CsvParseOptions, schema: &Schema, ignore_errors: bool, projection: &[usize], bytes_offset_thread: usize, - quote_char: Option, - eol_char: u8, - comment_prefix: Option<&CommentPrefix>, capacity: usize, - encoding: CsvEncoding, null_values: Option<&NullValuesCompiled>, - missing_is_null: bool, - truncate_ragged_lines: bool, chunk_size: usize, stop_at_nbytes: usize, starting_point_offset: Option, - decimal_comma: bool, ) -> PolarsResult { let mut read = bytes_offset_thread; // There's an off-by-one error somewhere in the reading code, where it reads @@ -565,9 +537,9 @@ pub fn read_chunk( projection, capacity + 1, schema, - quote_char, - encoding, - decimal_comma, + parse_options.quote_char, + parse_options.encoding, + parse_options.decimal_comma, )?; debug_assert!(projection.is_sorted()); @@ -583,14 +555,9 @@ pub fn read_chunk( let offset = read + starting_point_offset.unwrap(); read += parse_lines( local_bytes, + parse_options, offset, - separator, - comment_prefix, - quote_char, - eol_char, - missing_is_null, ignore_errors, - truncate_ragged_lines, null_values, projection, &mut buffers, diff --git a/crates/polars-io/src/csv/read/read_impl/batched.rs b/crates/polars-io/src/csv/read/read_impl/batched.rs index 0a7e490dff99..80f1e0ed278b 100644 --- a/crates/polars-io/src/csv/read/read_impl/batched.rs +++ b/crates/polars-io/src/csv/read/read_impl/batched.rs @@ -10,10 +10,10 @@ use polars_utils::IdxSize; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::{cast_columns, read_chunk, CoreReader, CountLines}; -use crate::csv::read::options::{CommentPrefix, CsvEncoding, NullValuesCompiled}; +use crate::csv::read::options::NullValuesCompiled; use crate::csv::read::CsvReader; use crate::mmap::{MmapBytesReader, ReaderBytes}; -use crate::prelude::update_row_counts2; +use crate::prelude::{update_row_counts2, CsvParseOptions}; use crate::RowIndex; #[allow(clippy::too_many_arguments)] @@ -120,8 +120,11 @@ impl<'a> CoreReader<'a> { pub fn batched(mut self) -> PolarsResult> { let reader_bytes = self.reader_bytes.take().unwrap(); let bytes = reader_bytes.as_ref(); - let (bytes, starting_point_offset) = - self.find_starting_point(bytes, self.quote_char, self.eol_char)?; + let (bytes, starting_point_offset) = self.find_starting_point( + bytes, + self.parse_options.quote_char, + self.parse_options.eol_char, + )?; let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads()); @@ -151,8 +154,8 @@ impl<'a> CoreReader<'a> { n_chunks: offset_batch_size, chunk_size, rows_per_batch: self.chunk_size, - quote_char: self.quote_char, - eol_char: self.eol_char, + quote_char: self.parse_options.quote_char, + eol_char: self.parse_options.eol_char, }; let projection = self.get_projection()?; @@ -170,57 +173,43 @@ impl<'a> CoreReader<'a> { Ok(BatchedCsvReader { reader_bytes, + parse_options: self.parse_options, chunk_size: self.chunk_size, file_chunks_iter: file_chunks, file_chunks: vec![], projection, starting_point_offset, row_index: self.row_index, - comment_prefix: self.comment_prefix, - quote_char: self.quote_char, - eol_char: self.eol_char, null_values: self.null_values, - missing_is_null: self.missing_is_null, to_cast: self.to_cast, ignore_errors: self.ignore_errors, - truncate_ragged_lines: self.truncate_ragged_lines, remaining: self.n_rows.unwrap_or(usize::MAX), - encoding: self.encoding, - separator: self.separator, schema: self.schema, rows_read: 0, _cat_lock, - decimal_comma: self.decimal_comma, }) } } pub struct BatchedCsvReader<'a> { reader_bytes: ReaderBytes<'a>, + parse_options: CsvParseOptions, chunk_size: usize, file_chunks_iter: ChunkOffsetIter<'a>, file_chunks: Vec<(usize, usize)>, projection: Vec, starting_point_offset: Option, row_index: Option, - comment_prefix: Option, - quote_char: Option, - eol_char: u8, null_values: Option, - missing_is_null: bool, - truncate_ragged_lines: bool, to_cast: Vec, ignore_errors: bool, remaining: usize, - encoding: CsvEncoding, - separator: u8, schema: SchemaRef, rows_read: IdxSize, #[cfg(feature = "dtype-categorical")] _cat_lock: Option, #[cfg(not(feature = "dtype-categorical"))] _cat_lock: Option, - decimal_comma: bool, } impl BatchedCsvReader<'_> { @@ -250,23 +239,16 @@ impl BatchedCsvReader<'_> { .map(|(bytes_offset_thread, stop_at_nbytes)| { let mut df = read_chunk( bytes, - self.separator, + &self.parse_options, self.schema.as_ref(), self.ignore_errors, &self.projection, bytes_offset_thread, - self.quote_char, - self.eol_char, - self.comment_prefix.as_ref(), self.chunk_size, - self.encoding, self.null_values.as_ref(), - self.missing_is_null, - self.truncate_ragged_lines, usize::MAX, stop_at_nbytes, self.starting_point_offset, - self.decimal_comma, )?; cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?; diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index fee35e21c306..c21c3fde1b4c 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -112,34 +112,25 @@ impl CsvReader { CoreReader::new( reader_bytes, + parse_options, self.options.n_rows, self.options.skip_rows, self.options.skip_lines, self.options.projection.clone().map(|x| x.as_ref().clone()), self.options.infer_schema_length, - Some(parse_options.separator), self.options.has_header, self.options.ignore_errors, self.options.schema.clone(), self.options.columns.clone(), - parse_options.encoding, self.options.n_threads, self.options.schema_overwrite.clone(), self.options.dtype_overwrite.clone(), self.options.chunk_size, - parse_options.comment_prefix.clone(), - parse_options.quote_char, - parse_options.eol_char, - parse_options.null_values.clone(), - parse_options.missing_is_null, self.predicate.clone(), self.options.fields_to_cast.clone(), self.options.skip_rows_after_header, self.options.row_index.clone(), - parse_options.try_parse_dates, self.options.raise_if_empty, - parse_options.truncate_ragged_lines, - parse_options.decimal_comma, ) } diff --git a/crates/polars-io/src/csv/read/schema_inference.rs b/crates/polars-io/src/csv/read/schema_inference.rs index 74d230d547ba..b6d7da8559a0 100644 --- a/crates/polars-io/src/csv/read/schema_inference.rs +++ b/crates/polars-io/src/csv/read/schema_inference.rs @@ -8,10 +8,9 @@ use polars_time::chunkedarray::string::infer as date_infer; use polars_time::prelude::string::Pattern; use polars_utils::format_pl_smallstr; -use super::options::{CommentPrefix, CsvEncoding, NullValues}; use super::parser::{is_comment_line, skip_bom, skip_line_ending, SplitLines}; use super::splitfields::SplitFields; -use super::CsvReadOptions; +use super::{CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues}; use crate::csv::read::parser::skip_lines_naive; use crate::mmap::ReaderBytes; use crate::utils::{BOOLEAN_RE, FLOAT_RE, FLOAT_RE_DECIMAL, INTEGER_RE}; @@ -32,7 +31,6 @@ impl SchemaInferenceResult { ) -> PolarsResult { let parse_options = options.get_parse_options(); - let separator = parse_options.separator; let infer_schema_length = options.infer_schema_length; let has_header = options.has_header; let schema_overwrite_arc = options.schema_overwrite.clone(); @@ -40,34 +38,22 @@ impl SchemaInferenceResult { let skip_rows = options.skip_rows; let skip_lines = options.skip_lines; let skip_rows_after_header = options.skip_rows_after_header; - let comment_prefix = parse_options.comment_prefix.as_ref(); - let quote_char = parse_options.quote_char; - let eol_char = parse_options.eol_char; - let null_values = parse_options.null_values.clone(); - let try_parse_dates = parse_options.try_parse_dates; let raise_if_empty = options.raise_if_empty; let mut n_threads = options.n_threads; - let decimal_comma = parse_options.decimal_comma; let bytes_total = reader_bytes.len(); let (inferred_schema, rows_read, bytes_read) = infer_file_schema( reader_bytes, - separator, + &parse_options, infer_schema_length, has_header, schema_overwrite, skip_rows, skip_lines, skip_rows_after_header, - comment_prefix, - quote_char, - eol_char, - null_values.as_ref(), - try_parse_dates, raise_if_empty, &mut n_threads, - decimal_comma, )?; let this = Self { @@ -198,7 +184,7 @@ fn parse_bytes_with_encoding(bytes: &[u8], encoding: CsvEncoding) -> PolarsResul #[allow(clippy::too_many_arguments)] fn infer_file_schema_inner( reader_bytes: &ReaderBytes, - separator: u8, + parse_options: &CsvParseOptions, max_read_rows: Option, has_header: bool, schema_overwrite: Option<&Schema>, @@ -206,15 +192,9 @@ fn infer_file_schema_inner( // on the schema inference mut skip_rows: usize, skip_rows_after_header: usize, - comment_prefix: Option<&CommentPrefix>, - quote_char: Option, - eol_char: u8, - null_values: Option<&NullValues>, - try_parse_dates: bool, recursion_count: u8, raise_if_empty: bool, n_threads: &mut Option, - decimal_comma: bool, ) -> PolarsResult<(Schema, usize, usize)> { // keep track so that we can determine the amount of bytes read let start_ptr = reader_bytes.as_ptr() as usize; @@ -223,11 +203,12 @@ fn infer_file_schema_inner( // It may later. let encoding = CsvEncoding::LossyUtf8; - let bytes = skip_line_ending(skip_bom(reader_bytes), eol_char); + let bytes = skip_line_ending(skip_bom(reader_bytes), parse_options.eol_char); if raise_if_empty { polars_ensure!(!bytes.is_empty(), NoData: "empty CSV"); }; - let mut lines = SplitLines::new(bytes, quote_char, eol_char).skip(skip_rows); + let mut lines = + SplitLines::new(bytes, parse_options.quote_char, parse_options.eol_char).skip(skip_rows); // get or create header names // when has_header is false, creates default column names with column_ prefix @@ -236,7 +217,7 @@ fn infer_file_schema_inner( let mut first_line = None; for (i, line) in (&mut lines).enumerate() { - if !is_comment_line(line, comment_prefix) { + if !is_comment_line(line, parse_options.comment_prefix.as_ref()) { first_line = Some(line); skip_rows += i; break; @@ -258,7 +239,12 @@ fn infer_file_schema_inner( } } - let byterecord = SplitFields::new(header_line, separator, quote_char, eol_char); + let byterecord = SplitFields::new( + header_line, + parse_options.separator, + parse_options.quote_char, + parse_options.eol_char, + ); if has_header { let headers = byterecord .map(|(slice, needs_escaping)| { @@ -297,25 +283,19 @@ fn infer_file_schema_inner( // this is likely to be cheap as there are no rows. let mut buf = Vec::with_capacity(bytes.len() + 2); buf.extend_from_slice(bytes); - buf.push(eol_char); + buf.push(parse_options.eol_char); return infer_file_schema_inner( &ReaderBytes::Owned(buf.into()), - separator, + parse_options, max_read_rows, has_header, schema_overwrite, skip_rows, skip_rows_after_header, - comment_prefix, - quote_char, - eol_char, - null_values, - try_parse_dates, recursion_count + 1, raise_if_empty, n_threads, - decimal_comma, ); } else if !raise_if_empty { return Ok((Schema::default(), 0, 0)); @@ -324,7 +304,8 @@ fn infer_file_schema_inner( }; if !has_header { // re-init lines so that the header is included in type inference. - lines = SplitLines::new(bytes, quote_char, eol_char).skip(skip_rows); + lines = SplitLines::new(bytes, parse_options.quote_char, parse_options.eol_char) + .skip(skip_rows); } let header_length = headers.len(); @@ -366,7 +347,7 @@ fn infer_file_schema_inner( } // line is a comment -> skip - if is_comment_line(line, comment_prefix) { + if is_comment_line(line, parse_options.comment_prefix.as_ref()) { continue; } @@ -379,7 +360,12 @@ fn infer_file_schema_inner( } } - let mut record = SplitFields::new(line, separator, quote_char, eol_char); + let mut record = SplitFields::new( + line, + parse_options.separator, + parse_options.quote_char, + parse_options.eol_char, + ); for i in 0..header_length { if let Some((slice, needs_escaping)) = record.next() { @@ -392,18 +378,30 @@ fn infer_file_schema_inner( slice }; let s = parse_bytes_with_encoding(slice_escaped, encoding)?; - let dtype = match &null_values { - None => Some(infer_field_schema(&s, try_parse_dates, decimal_comma)), + let dtype = match &parse_options.null_values { + None => Some(infer_field_schema( + &s, + parse_options.try_parse_dates, + parse_options.decimal_comma, + )), Some(NullValues::AllColumns(names)) => { if !names.iter().any(|nv| nv == s.as_ref()) { - Some(infer_field_schema(&s, try_parse_dates, decimal_comma)) + Some(infer_field_schema( + &s, + parse_options.try_parse_dates, + parse_options.decimal_comma, + )) } else { None } }, Some(NullValues::AllColumnsSingle(name)) => { if s.as_ref() != name.as_str() { - Some(infer_field_schema(&s, try_parse_dates, decimal_comma)) + Some(infer_field_schema( + &s, + parse_options.try_parse_dates, + parse_options.decimal_comma, + )) } else { None } @@ -416,12 +414,20 @@ fn infer_file_schema_inner( if let Some(null_name) = null_name { if null_name.1.as_str() != s.as_ref() { - Some(infer_field_schema(&s, try_parse_dates, decimal_comma)) + Some(infer_field_schema( + &s, + parse_options.try_parse_dates, + parse_options.decimal_comma, + )) } else { None } } else { - Some(infer_field_schema(&s, try_parse_dates, decimal_comma)) + Some(infer_field_schema( + &s, + parse_options.try_parse_dates, + parse_options.decimal_comma, + )) } }, }; @@ -435,7 +441,12 @@ fn infer_file_schema_inner( // new line characters in an escaped field. So we set a (somewhat arbitrary) // upper bound to the number of escaped lines we accept. // On the chunking side we also have logic to make this more robust. - if slice.iter().filter(|b| **b == eol_char).count() > 8 { + if slice + .iter() + .filter(|b| **b == parse_options.eol_char) + .count() + > 8 + { if verbose() { eprintln!("falling back to single core reading because of many escaped new line chars.") } @@ -478,29 +489,23 @@ fn infer_file_schema_inner( // so that the inference is consistent with and without eol char if rows_count == 0 && !reader_bytes.is_empty() - && reader_bytes[reader_bytes.len() - 1] != eol_char + && reader_bytes[reader_bytes.len() - 1] != parse_options.eol_char && recursion_count == 0 { let mut rb = Vec::with_capacity(reader_bytes.len() + 1); rb.extend_from_slice(reader_bytes); - rb.push(eol_char); + rb.push(parse_options.eol_char); return infer_file_schema_inner( &ReaderBytes::Owned(rb.into()), - separator, + parse_options, max_read_rows, has_header, schema_overwrite, skip_rows, skip_rows_after_header, - comment_prefix, - quote_char, - eol_char, - null_values, - try_parse_dates, recursion_count + 1, raise_if_empty, n_threads, - decimal_comma, ); } @@ -526,64 +531,46 @@ pub(super) fn check_decimal_comma(decimal_comma: bool, separator: u8) -> PolarsR #[allow(clippy::too_many_arguments)] pub fn infer_file_schema( reader_bytes: &ReaderBytes, - separator: u8, + parse_options: &CsvParseOptions, max_read_rows: Option, has_header: bool, schema_overwrite: Option<&Schema>, skip_rows: usize, skip_lines: usize, skip_rows_after_header: usize, - comment_prefix: Option<&CommentPrefix>, - quote_char: Option, - eol_char: u8, - null_values: Option<&NullValues>, - try_parse_dates: bool, raise_if_empty: bool, n_threads: &mut Option, - decimal_comma: bool, ) -> PolarsResult<(Schema, usize, usize)> { - check_decimal_comma(decimal_comma, separator)?; + check_decimal_comma(parse_options.decimal_comma, parse_options.separator)?; if skip_lines > 0 { polars_ensure!(skip_rows == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"); - let bytes = skip_lines_naive(reader_bytes, eol_char, skip_lines); + let bytes = skip_lines_naive(reader_bytes, parse_options.eol_char, skip_lines); let reader_bytes = ReaderBytes::Borrowed(bytes); infer_file_schema_inner( &reader_bytes, - separator, + parse_options, max_read_rows, has_header, schema_overwrite, skip_rows, skip_rows_after_header, - comment_prefix, - quote_char, - eol_char, - null_values, - try_parse_dates, 0, raise_if_empty, n_threads, - decimal_comma, ) } else { infer_file_schema_inner( reader_bytes, - separator, + parse_options, max_read_rows, has_header, schema_overwrite, skip_rows, skip_rows_after_header, - comment_prefix, - quote_char, - eol_char, - null_values, - try_parse_dates, 0, raise_if_empty, n_threads, - decimal_comma, ) } } diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 039408a349aa..c284232f93ef 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -250,7 +250,7 @@ impl LazyCsvReader { PolarsResult::Ok( infer_file_schema( &reader_bytes, - parse_options.separator, + &parse_options, self.read_options.infer_schema_length, self.read_options.has_header, // we set it to None and modify them after the schema is updated @@ -258,14 +258,8 @@ impl LazyCsvReader { skip_rows, skip_lines, self.read_options.skip_rows_after_header, - parse_options.comment_prefix.as_ref(), - parse_options.quote_char, - parse_options.eol_char, - None, - parse_options.try_parse_dates, self.read_options.raise_if_empty, &mut n_threads, - parse_options.decimal_comma, )? .0, ) diff --git a/crates/polars-stream/src/nodes/io_sources/csv.rs b/crates/polars-stream/src/nodes/io_sources/csv.rs index e2574d1d5830..ef99443b2a58 100644 --- a/crates/polars-stream/src/nodes/io_sources/csv.rs +++ b/crates/polars-stream/src/nodes/io_sources/csv.rs @@ -16,7 +16,7 @@ use polars_io::prelude::_csv_read_internal::{ NullValuesCompiled, }; use polars_io::prelude::buffer::validate_utf8; -use polars_io::prelude::{CommentPrefix, CsvEncoding, CsvReadOptions}; +use polars_io::prelude::{CsvEncoding, CsvParseOptions, CsvReadOptions}; use polars_io::utils::compression::maybe_decompress_bytes; use polars_io::utils::slice::SplitSlicePosition; use polars_io::RowIndex; @@ -532,20 +532,13 @@ impl CsvSourceNode { #[derive(Default)] struct ChunkReader { reader_schema: SchemaRef, + parse_options: Arc, fields_to_cast: Vec, #[cfg(feature = "dtype-categorical")] _cat_lock: Option, - separator: u8, ignore_errors: bool, projection: Vec, - quote_char: Option, - eol_char: u8, - comment_prefix: Option, - encoding: CsvEncoding, null_values: Option, - missing_is_null: bool, - truncate_ragged_lines: bool, - decimal_comma: bool, validate_utf8: bool, row_index: Option, include_file_paths: Option, @@ -576,10 +569,9 @@ impl ChunkReader { #[cfg(feature = "dtype-categorical")] let _cat_lock = has_categorical.then(polars_core::StringCacheHolder::hold); - let parse_options = &*options.parse_options; + let parse_options = options.parse_options.clone(); // Logic from `CoreReader::new()` - let separator = parse_options.separator; let null_values = parse_options .null_values @@ -607,20 +599,13 @@ impl ChunkReader { Ok(Self { reader_schema, + parse_options, fields_to_cast, #[cfg(feature = "dtype-categorical")] _cat_lock, - separator, ignore_errors: options.ignore_errors, projection, - quote_char: parse_options.quote_char, - eol_char: parse_options.eol_char, - comment_prefix: parse_options.comment_prefix.clone(), - encoding: parse_options.encoding, null_values, - missing_is_null: parse_options.missing_is_null, - truncate_ragged_lines: parse_options.truncate_ragged_lines, - decimal_comma: parse_options.decimal_comma, validate_utf8, row_index, include_file_paths, @@ -640,23 +625,16 @@ impl ChunkReader { read_chunk( chunk, - self.separator, + &self.parse_options, &self.reader_schema, self.ignore_errors, &self.projection, - 0, // bytes_offset_thread - self.quote_char, - self.eol_char, - self.comment_prefix.as_ref(), + 0, // bytes_offset_thread n_lines, // capacity - self.encoding, self.null_values.as_ref(), - self.missing_is_null, - self.truncate_ragged_lines, usize::MAX, // chunk_size chunk.len(), // stop_at_nbytes Some(0), // starting_point_offset - self.decimal_comma, ) .and_then(|mut df| { let n_lines_is_correct = df.height() == n_lines;