From 2d65cc987ce8aee8680fb4b81162eb8d4875ffa9 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 15 Dec 2024 11:22:00 +0100 Subject: [PATCH] feat: Add 'skip_lines' for CSV (#20301) --- crates/polars-io/src/csv/read/options.rs | 16 ++++- crates/polars-io/src/csv/read/parser.rs | 11 +++ crates/polars-io/src/csv/read/read_impl.rs | 30 ++++++--- crates/polars-io/src/csv/read/reader.rs | 1 + .../src/csv/read/schema_inference.rs | 67 +++++++++++++------ crates/polars-lazy/src/scan/csv.rs | 11 +++ crates/polars-python/src/batched_csv.rs | 4 +- crates/polars-python/src/dataframe/io.rs | 4 +- crates/polars-python/src/lazyframe/general.rs | 4 +- .../polars-stream/src/nodes/io_sources/csv.rs | 2 + py-polars/polars/io/csv/batched_reader.py | 2 + py-polars/polars/io/csv/functions.py | 27 +++++++- py-polars/tests/unit/io/test_csv.py | 21 ++++++ 13 files changed, 164 insertions(+), 36 deletions(-) diff --git a/crates/polars-io/src/csv/read/options.rs b/crates/polars-io/src/csv/read/options.rs index cbbf796d45d7..c6d009b9fc97 100644 --- a/crates/polars-io/src/csv/read/options.rs +++ b/crates/polars-io/src/csv/read/options.rs @@ -31,7 +31,10 @@ pub struct CsvReadOptions { pub parse_options: Arc, pub has_header: bool, pub chunk_size: usize, + /// Skip rows according to the CSV spec. pub skip_rows: usize, + /// Skip lines according to newline char (e.g. escaping will be ignored) + pub skip_lines: usize, pub skip_rows_after_header: usize, pub infer_schema_length: Option, pub raise_if_empty: bool, @@ -76,6 +79,7 @@ impl Default for CsvReadOptions { has_header: true, chunk_size: 1 << 18, skip_rows: 0, + skip_lines: 0, skip_rows_after_header: 0, infer_schema_length: Some(100), raise_if_empty: true, @@ -197,12 +201,22 @@ impl CsvReadOptions { self } - /// Number of rows to skip before the header row. + /// Start reading after ``skip_rows`` rows. The header will be parsed at this + /// offset. Note that we respect CSV escaping/comments when skipping rows. + /// If you want to skip by newline char only, use `skip_lines`. pub fn with_skip_rows(mut self, skip_rows: usize) -> Self { self.skip_rows = skip_rows; self } + /// Start reading after `skip_lines` lines. The header will be parsed at this + /// offset. Note that CSV escaping will not be respected when skipping lines. + /// If you want to skip valid CSV rows, use ``skip_rows``. + pub fn with_skip_lines(mut self, skip_lines: usize) -> Self { + self.skip_lines = skip_lines; + self + } + /// Number of rows to skip after the header row. pub fn with_skip_rows_after_header(mut self, skip_rows_after_header: usize) -> Self { self.skip_rows_after_header = skip_rows_after_header; diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index decec9c2d40f..61cc1c0c1b00 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -141,6 +141,17 @@ pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option &[u8] { + for _ in 0..skip { + if let Some(pos) = next_line_position_naive(input, eol_char) { + input = &input[pos..]; + } else { + return input; + } + } + input +} + /// Find the nearest next line position that is not embedded in a String field. pub(super) fn next_line_position( mut input: &[u8], diff --git a/crates/polars-io/src/csv/read/read_impl.rs b/crates/polars-io/src/csv/read/read_impl.rs index 808f57c0f5cf..4b73063fbb5f 100644 --- a/crates/polars-io/src/csv/read/read_impl.rs +++ b/crates/polars-io/src/csv/read/read_impl.rs @@ -14,8 +14,8 @@ use rayon::prelude::*; use super::buffer::init_buffers; use super::options::{CommentPrefix, CsvEncoding, NullValues, NullValuesCompiled}; use super::parser::{ - is_comment_line, parse_lines, skip_bom, skip_line_ending, skip_this_line, CountLines, - SplitLines, + is_comment_line, parse_lines, skip_bom, skip_line_ending, skip_lines_naive, skip_this_line, + CountLines, SplitLines, }; use super::reader::prepare_csv_schema; use super::schema_inference::{check_decimal_comma, infer_file_schema}; @@ -105,6 +105,7 @@ pub(crate) struct CoreReader<'a> { /// Current line number, used in error reporting current_line: usize, ignore_errors: bool, + skip_lines: usize, skip_rows_before_header: usize, // after the header, we need to take embedded lines into account skip_rows_after_header: usize, @@ -144,6 +145,7 @@ impl<'a> CoreReader<'a> { reader_bytes: ReaderBytes<'a>, n_rows: Option, skip_rows: usize, + skip_lines: usize, mut projection: Option>, max_records: Option, separator: Option, @@ -207,6 +209,7 @@ impl<'a> CoreReader<'a> { has_header, schema_overwrite.as_deref(), skip_rows, + skip_lines, skip_rows_after_header, comment_prefix.as_ref(), quote_char, @@ -247,6 +250,7 @@ impl<'a> CoreReader<'a> { projection, current_line: usize::from(has_header), ignore_errors, + skip_lines, skip_rows_before_header: skip_rows, skip_rows_after_header, n_rows, @@ -280,6 +284,7 @@ impl<'a> CoreReader<'a> { quote_char, eol_char, self.schema.len(), + self.skip_lines, self.skip_rows_before_header, self.skip_rows_after_header, self.comment_prefix.as_ref(), @@ -608,6 +613,7 @@ pub fn find_starting_point( quote_char: Option, eol_char: u8, schema_len: usize, + skip_lines: usize, skip_rows_before_header: usize, skip_rows_after_header: usize, comment_prefix: Option<&CommentPrefix>, @@ -616,14 +622,20 @@ pub fn find_starting_point( let full_len = bytes.len(); let starting_point_offset = bytes.as_ptr() as usize; - // Skip utf8 byte-order-mark (BOM) - bytes = skip_bom(bytes); + bytes = if skip_lines > 0 { + polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"); + skip_lines_naive(bytes, eol_char, skip_lines) + } else { + // Skip utf8 byte-order-mark (BOM) + bytes = skip_bom(bytes); - // \n\n can be a empty string row of a single column - // in other cases we skip it. - if schema_len > 1 { - bytes = skip_line_ending(bytes, eol_char) - } + // \n\n can be a empty string row of a single column + // in other cases we skip it. + if schema_len > 1 { + bytes = skip_line_ending(bytes, eol_char) + } + bytes + }; // skip 'n' leading rows if skip_rows_before_header > 0 { diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index cb93fd117046..fee35e21c306 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -114,6 +114,7 @@ impl CsvReader { reader_bytes, self.options.n_rows, self.options.skip_rows, + self.options.skip_lines, self.options.projection.clone().map(|x| x.as_ref().clone()), self.options.infer_schema_length, Some(parse_options.separator), diff --git a/crates/polars-io/src/csv/read/schema_inference.rs b/crates/polars-io/src/csv/read/schema_inference.rs index 6b5f3c3e46aa..74d230d547ba 100644 --- a/crates/polars-io/src/csv/read/schema_inference.rs +++ b/crates/polars-io/src/csv/read/schema_inference.rs @@ -12,6 +12,7 @@ use super::options::{CommentPrefix, CsvEncoding, NullValues}; use super::parser::{is_comment_line, skip_bom, skip_line_ending, SplitLines}; use super::splitfields::SplitFields; use super::CsvReadOptions; +use crate::csv::read::parser::skip_lines_naive; use crate::mmap::ReaderBytes; use crate::utils::{BOOLEAN_RE, FLOAT_RE, FLOAT_RE_DECIMAL, INTEGER_RE}; @@ -37,6 +38,7 @@ impl SchemaInferenceResult { let schema_overwrite_arc = options.schema_overwrite.clone(); let schema_overwrite = schema_overwrite_arc.as_ref().map(|x| x.as_ref()); let skip_rows = options.skip_rows; + let skip_lines = options.skip_lines; let skip_rows_after_header = options.skip_rows_after_header; let comment_prefix = parse_options.comment_prefix.as_ref(); let quote_char = parse_options.quote_char; @@ -56,6 +58,7 @@ impl SchemaInferenceResult { has_header, schema_overwrite, skip_rows, + skip_lines, skip_rows_after_header, comment_prefix, quote_char, @@ -527,9 +530,8 @@ pub fn infer_file_schema( max_read_rows: Option, has_header: bool, schema_overwrite: Option<&Schema>, - // we take &mut because we maybe need to skip more rows dependent - // on the schema inference skip_rows: usize, + skip_lines: usize, skip_rows_after_header: usize, comment_prefix: Option<&CommentPrefix>, quote_char: Option, @@ -541,22 +543,47 @@ pub fn infer_file_schema( decimal_comma: bool, ) -> PolarsResult<(Schema, usize, usize)> { check_decimal_comma(decimal_comma, separator)?; - infer_file_schema_inner( - reader_bytes, - separator, - max_read_rows, - has_header, - schema_overwrite, - skip_rows, - skip_rows_after_header, - comment_prefix, - quote_char, - eol_char, - null_values, - try_parse_dates, - 0, - raise_if_empty, - n_threads, - decimal_comma, - ) + + if skip_lines > 0 { + polars_ensure!(skip_rows == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"); + let bytes = skip_lines_naive(reader_bytes, eol_char, skip_lines); + let reader_bytes = ReaderBytes::Borrowed(bytes); + infer_file_schema_inner( + &reader_bytes, + separator, + max_read_rows, + has_header, + schema_overwrite, + skip_rows, + skip_rows_after_header, + comment_prefix, + quote_char, + eol_char, + null_values, + try_parse_dates, + 0, + raise_if_empty, + n_threads, + decimal_comma, + ) + } else { + infer_file_schema_inner( + reader_bytes, + separator, + max_read_rows, + has_header, + schema_overwrite, + skip_rows, + skip_rows_after_header, + comment_prefix, + quote_char, + eol_char, + null_values, + try_parse_dates, + 0, + raise_if_empty, + n_threads, + decimal_comma, + ) + } } diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 423a9c326406..039408a349aa 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -96,12 +96,21 @@ impl LazyCsvReader { } /// Skip the first `n` rows during parsing. The header will be parsed at row `n`. + /// Note that by row we mean valid CSV, encoding and comments are respected. #[must_use] pub fn with_skip_rows(mut self, skip_rows: usize) -> Self { self.read_options.skip_rows = skip_rows; self } + /// Skip the first `n` lines during parsing. The header will be parsed at line `n`. + /// We don't respect CSV escaping when skipping lines. + #[must_use] + pub fn with_skip_lines(mut self, skip_lines: usize) -> Self { + self.read_options.skip_lines = skip_lines; + self + } + /// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset /// of the total schema. #[must_use] @@ -235,6 +244,7 @@ impl LazyCsvReader { let mut infer_schema = |reader_bytes: ReaderBytes| { let skip_rows = self.read_options.skip_rows; + let skip_lines = self.read_options.skip_lines; let parse_options = self.read_options.get_parse_options(); PolarsResult::Ok( @@ -246,6 +256,7 @@ impl LazyCsvReader { // we set it to None and modify them after the schema is updated None, skip_rows, + skip_lines, self.read_options.skip_rows_after_header, parse_options.comment_prefix.as_ref(), parse_options.quote_char, diff --git a/crates/polars-python/src/batched_csv.rs b/crates/polars-python/src/batched_csv.rs index a406d7b6ddf3..06bd35ccaa4d 100644 --- a/crates/polars-python/src/batched_csv.rs +++ b/crates/polars-python/src/batched_csv.rs @@ -22,7 +22,7 @@ pub struct PyBatchedCsv { impl PyBatchedCsv { #[staticmethod] #[pyo3(signature = ( - infer_schema_length, chunk_size, has_header, ignore_errors, n_rows, skip_rows, + infer_schema_length, chunk_size, has_header, ignore_errors, n_rows, skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path, schema_overrides, overwrite_dtype_slice, low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header, row_index, @@ -35,6 +35,7 @@ impl PyBatchedCsv { ignore_errors: bool, n_rows: Option, skip_rows: usize, + skip_lines: usize, projection: Option>, separator: &str, rechunk: bool, @@ -97,6 +98,7 @@ impl PyBatchedCsv { .with_has_header(has_header) .with_n_rows(n_rows) .with_skip_rows(skip_rows) + .with_skip_rows(skip_lines) .with_ignore_errors(ignore_errors) .with_projection(projection.map(Arc::new)) .with_rechunk(rechunk) diff --git a/crates/polars-python/src/dataframe/io.rs b/crates/polars-python/src/dataframe/io.rs index 960816231cd3..a4fbd3d7a340 100644 --- a/crates/polars-python/src/dataframe/io.rs +++ b/crates/polars-python/src/dataframe/io.rs @@ -30,7 +30,7 @@ impl PyDataFrame { #[cfg(feature = "csv")] #[pyo3(signature = ( py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows, - skip_rows, projection, separator, rechunk, columns, encoding, n_threads, path, + skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path, overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header, row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema) @@ -44,6 +44,7 @@ impl PyDataFrame { ignore_errors: bool, n_rows: Option, skip_rows: usize, + skip_lines: usize, projection: Option>, separator: &str, rechunk: bool, @@ -100,6 +101,7 @@ impl PyDataFrame { .with_has_header(has_header) .with_n_rows(n_rows) .with_skip_rows(skip_rows) + .with_skip_lines(skip_lines) .with_ignore_errors(ignore_errors) .with_projection(projection.map(Arc::new)) .with_rechunk(rechunk) diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 9b1ab31c74a1..6081a18f4a68 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -113,7 +113,7 @@ impl PyLazyFrame { #[staticmethod] #[cfg(feature = "csv")] - #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, + #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype, low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string, infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header, encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema, @@ -127,6 +127,7 @@ impl PyLazyFrame { has_header: bool, ignore_errors: bool, skip_rows: usize, + skip_lines: usize, n_rows: Option, cache: bool, overwrite_dtype: Option)>>, @@ -213,6 +214,7 @@ impl PyLazyFrame { .with_has_header(has_header) .with_ignore_errors(ignore_errors) .with_skip_rows(skip_rows) + .with_skip_lines(skip_lines) .with_n_rows(n_rows) .with_cache(cache) .with_dtype_overwrite(overwrite_dtype.map(Arc::new)) diff --git a/crates/polars-stream/src/nodes/io_sources/csv.rs b/crates/polars-stream/src/nodes/io_sources/csv.rs index 1c8b66ab2353..e2574d1d5830 100644 --- a/crates/polars-stream/src/nodes/io_sources/csv.rs +++ b/crates/polars-stream/src/nodes/io_sources/csv.rs @@ -259,6 +259,7 @@ impl CsvSourceNode { let quote_char = parse_options.quote_char; let eol_char = parse_options.eol_char; + let skip_lines = options.skip_lines; let skip_rows_before_header = options.skip_rows; let skip_rows_after_header = options.skip_rows_after_header; let comment_prefix = parse_options.comment_prefix.clone(); @@ -353,6 +354,7 @@ impl CsvSourceNode { quote_char, eol_char, schema_len, + skip_lines, skip_rows_before_header, skip_rows_after_header, comment_prefix, diff --git a/py-polars/polars/io/csv/batched_reader.py b/py-polars/polars/io/csv/batched_reader.py index 4207f476ee5c..cc82e59fe0e7 100644 --- a/py-polars/polars/io/csv/batched_reader.py +++ b/py-polars/polars/io/csv/batched_reader.py @@ -36,6 +36,7 @@ def __init__( comment_prefix: str | None = None, quote_char: str | None = '"', skip_rows: int = 0, + skip_lines: int = 0, schema_overrides: SchemaDict | Sequence[PolarsDataType] | None = None, null_values: str | Sequence[str] | dict[str, str] | None = None, missing_utf8_is_empty_string: bool = False, @@ -82,6 +83,7 @@ def __init__( ignore_errors=ignore_errors, n_rows=n_rows, skip_rows=skip_rows, + skip_lines=skip_lines, projection=projection, separator=separator, rechunk=rechunk, diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 0896b103d996..cd543fc659cb 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -52,6 +52,7 @@ def read_csv( comment_prefix: str | None = None, quote_char: str | None = '"', skip_rows: int = 0, + skip_lines: int = 0, schema: SchemaDict | None = None, schema_overrides: ( Mapping[str, PolarsDataType] | Sequence[PolarsDataType] | None @@ -112,7 +113,13 @@ def read_csv( Single byte character used for csv quoting, default = `"`. Set to None to turn off special handling and escaping of quotes. skip_rows - Start reading after `skip_rows` lines. + Start reading after ``skip_rows`` rows. The header will be parsed at this + offset. Note that we respect CSV escaping/comments when skipping rows. + If you want to skip by newline char only, use `skip_lines`. + skip_lines + Start reading after `skip_lines` lines. The header will be parsed at this + offset. Note that CSV escaping will not be respected when skipping lines. + If you want to skip valid CSV rows, use ``skip_rows``. schema Provide the schema. This means that polars doesn't do schema inference. This argument expects the complete schema, whereas `schema_overrides` can be @@ -488,6 +495,7 @@ def read_csv( comment_prefix=comment_prefix, quote_char=quote_char, skip_rows=skip_rows, + skip_lines=skip_lines, schema_overrides=schema_overrides, # type: ignore[arg-type] schema=schema, null_values=null_values, @@ -532,6 +540,7 @@ def read_csv( comment_prefix=comment_prefix, quote_char=quote_char, skip_rows=skip_rows, + skip_lines=skip_lines, schema_overrides=schema_overrides, schema=schema, null_values=null_values, @@ -569,6 +578,7 @@ def _read_csv_impl( comment_prefix: str | None = None, quote_char: str | None = '"', skip_rows: int = 0, + skip_lines: int = 0, schema: None | SchemaDict = None, schema_overrides: None | (SchemaDict | Sequence[PolarsDataType]) = None, null_values: str | Sequence[str] | dict[str, str] | None = None, @@ -638,6 +648,7 @@ def _read_csv_impl( comment_prefix=comment_prefix, quote_char=quote_char, skip_rows=skip_rows, + skip_lines=skip_lines, schema=schema, schema_overrides=dtypes_dict, null_values=null_values, @@ -677,6 +688,7 @@ def _read_csv_impl( ignore_errors, n_rows, skip_rows, + skip_lines, projection, separator, rechunk, @@ -1031,6 +1043,7 @@ def scan_csv( comment_prefix: str | None = None, quote_char: str | None = '"', skip_rows: int = 0, + skip_lines: int = 0, schema: SchemaDict | None = None, schema_overrides: SchemaDict | Sequence[PolarsDataType] | None = None, null_values: str | Sequence[str] | dict[str, str] | None = None, @@ -1086,8 +1099,13 @@ def scan_csv( Single byte character used for csv quoting, default = `"`. Set to None to turn off special handling and escaping of quotes. skip_rows - Start reading after `skip_rows` lines. The header will be parsed at this - offset. + Start reading after ``skip_rows`` rows. The header will be parsed at this + offset. Note that we respect CSV escaping/comments when skipping rows. + If you want to skip by newline char only, use `skip_lines`. + skip_lines + Start reading after `skip_lines` lines. The header will be parsed at this + offset. Note that CSV escaping will not be respected when skipping lines. + If you want to skip valid CSV rows, use ``skip_rows``. schema Provide the schema. This means that polars doesn't do schema inference. This argument expects the complete schema, whereas `schema_overrides` can be @@ -1300,6 +1318,7 @@ def with_column_names(cols: list[str]) -> list[str]: comment_prefix=comment_prefix, quote_char=quote_char, skip_rows=skip_rows, + skip_lines=skip_lines, schema_overrides=schema_overrides, # type: ignore[arg-type] schema=schema, null_values=null_values, @@ -1345,6 +1364,7 @@ def _scan_csv_impl( comment_prefix: str | None = None, quote_char: str | None = '"', skip_rows: int = 0, + skip_lines: int = 0, schema: SchemaDict | None = None, schema_overrides: SchemaDict | None = None, null_values: str | Sequence[str] | dict[str, str] | None = None, @@ -1398,6 +1418,7 @@ def _scan_csv_impl( has_header=has_header, ignore_errors=ignore_errors, skip_rows=skip_rows, + skip_lines=skip_lines, n_rows=n_rows, cache=cache, overwrite_dtype=dtype_list, diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 73387f27eb10..772ecaa42a03 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -2400,3 +2400,24 @@ def test_csv_ragged_lines_20062() -> None: "U": [None, 1.0], "V": [None, 2.0], } + + +def test_csv_skip_lines() -> None: + fh = io.BytesIO() + fh.write(b'Header line "1" -> quote count 2\n') + fh.write(b'Header line "2"" -> quote count 3\n') + fh.write(b'Header line "3" -> quote count 2 => Total 7 quotes ERROR\n') + fh.write(b"column_01, column_02, column_03\n") + fh.write(b"123.12, 21, 99.9\n") + fh.write(b"65.84, 75, 64.7\n") + fh.seek(0) + + df = pl.read_csv(fh, has_header=True, skip_lines=3) + assert df.to_dict(as_series=False) == { + "column_01": [123.12, 65.84], + " column_02": [" 21", " 75"], + " column_03": [" 99.9", " 64.7"], + } + + fh.seek(0) + assert_frame_equal(pl.scan_csv(fh, has_header=True, skip_lines=3).collect(), df)