Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for source byte-range tracking for ByteRecord #286

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/byte_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ impl fmt::Debug for ByteRecord {
struct ByteRecordInner {
/// The position of this byte record.
pos: Option<Position>,
/// The source span represented by this byte record.
span: Option<Span>,
/// All fields in this record, stored contiguously.
fields: Vec<u8>,
/// The number of and location of each field in this record.
Expand Down Expand Up @@ -136,6 +138,7 @@ impl ByteRecord {
pub fn with_capacity(buffer: usize, fields: usize) -> ByteRecord {
ByteRecord(Box::new(ByteRecordInner {
pos: None,
span: None,
fields: vec![0; buffer],
bounds: Bounds::with_capacity(fields),
}))
Expand Down Expand Up @@ -370,6 +373,7 @@ impl ByteRecord {
let mut trimmed =
ByteRecord::with_capacity(self.as_slice().len(), self.len());
trimmed.set_position(self.position().cloned());
trimmed.set_span(self.span().cloned());
for field in &*self {
trimmed.push_field(field.trim());
}
Expand Down Expand Up @@ -460,6 +464,18 @@ impl ByteRecord {
self.0.pos = pos;
}

/// Return the source span of this record, if available.
#[inline]
pub fn span(&self) -> Option<&Span> {
self.0.span.as_ref()
}

/// Sets the source span of this record.
#[inline]
pub fn set_span(&mut self, span: Option<Span>) {
self.0.span = span;
}

/// Return the start and end position of a field in this record.
///
/// If no such field exists at the given index, then return `None`.
Expand Down Expand Up @@ -639,6 +655,40 @@ impl Position {
}
}

/// A span in a CSV source bytes
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Span {
start: u64,
end: u64,
}

impl Span {
/// Create a new span
#[inline]
pub fn new(start: u64, end: u64) -> Self {
assert!(end >= start);
Span { start, end }
}

/// Retrieve the start of this span
#[inline]
pub fn start(&self) -> u64 {
self.start
}

/// Retrieve the end of this span
#[inline]
pub fn end(&self) -> u64 {
self.end
}

/// Retrieve the length of this span
#[inline]
pub fn len(&self) -> usize {
(self.end - self.start) as usize
}
}

/// The bounds of fields in a single record.
#[derive(Clone, Debug, Eq, PartialEq)]
struct Bounds {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ use std::result;

use serde::{Deserialize, Deserializer};

pub use crate::byte_record::{ByteRecord, ByteRecordIter, Position};
pub use crate::byte_record::{ByteRecord, ByteRecordIter, Position, Span};
pub use crate::deserializer::{DeserializeError, DeserializeErrorKind};
pub use crate::error::{
Error, ErrorKind, FromUtf8Error, IntoInnerError, Result, Utf8Error,
Expand Down
3 changes: 2 additions & 1 deletion src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::result;
use csv_core::{Reader as CoreReader, ReaderBuilder as CoreReaderBuilder};
use serde::de::DeserializeOwned;

use crate::byte_record::{ByteRecord, Position};
use crate::byte_record::{ByteRecord, Position, Span};
use crate::error::{Error, ErrorKind, Result, Utf8Error};
use crate::string_record::StringRecord;
use crate::{Terminator, Trim};
Expand Down Expand Up @@ -1667,6 +1667,7 @@ impl<R: io::Read> Reader<R> {
}
Record => {
record.set_len(endlen);
record.set_span(Some(Span::new(record.position().unwrap().byte(), self.state.cur_pos.byte())));
self.state.add_record(record)?;
return Ok(true);
}
Expand Down
10 changes: 9 additions & 1 deletion src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,10 @@ impl<W: io::Write> Writer<W> {
// The maximum number of bytes for the terminator.
+ 2;
if self.buf.writable().len() < upper_bound {
return self.write_record(record);
// Flush before writing a record, so we only flush on whole records
self.flush_buf()?;
// Fail if we cannot free enough buffer space
assert!(self.buf.writable().len() >= upper_bound, "Not enough buffer space");
}
let mut first = true;
for field in record.iter() {
Expand Down Expand Up @@ -1082,6 +1085,11 @@ impl<W: io::Write> Writer<W> {
}
}

/// Return an immutable reference to the underlying writer.
pub fn inner(&self) -> &W {
self.wtr.as_ref().expect("Called inner() with a None self.wtr")
}

/// Write a CSV delimiter.
fn write_delimiter(&mut self) -> Result<()> {
loop {
Expand Down