Skip to content

Commit

Permalink
refactor common record construction
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-k committed Dec 25, 2024
1 parent 254bfbf commit 7e8071e
Showing 1 changed file with 102 additions and 47 deletions.
149 changes: 102 additions & 47 deletions src/fastq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,42 @@ use std::task::Poll;
pub use crate::error::ParseError;
pub use crate::record::{Phred, Record};

fn build_record<'a, S: TryFrom<&'a [u8]>>(
lines: &[&'a [u8]; 4],
) -> Result<Record<'a, S>, io::Error> {
// test for valid header start
if lines[0][0] != b'@' {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid FASTQ header",
));
}

// test for valid separator line
let sep_pos = lines[2][0];
if !sep_pos == b'+' {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid FASTQ separator",
));
}

// test that quality and sequence strings are equal length
if lines[1].len() != lines[3].len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"FASTQ quality and sequence lines differ",
));
}

Ok(Record {
raw_fields: lines[0],
raw_seq: lines[1],
raw_quality: Some(lines[3]),
_p: PhantomData,
})
}

pub struct FastqReader<'a, R: BufRead, S: TryFrom<&'a [u8]> = &'a [u8]> {
reader: Pin<Box<R>>,
buffer: Vec<u8>,
Expand Down Expand Up @@ -50,73 +86,68 @@ impl<'a, R: BufRead + Into<Box<R>> + Unpin, S: TryFrom<&'a [u8]>> FastqReader<'a
let mut t_bs = 0;

// indices of carriage returns
let mut crs = [0; 4];
let mut crs: [usize; 4] = [0; 4];

let reader = self.reader.as_mut().get_mut();

for (i, crs_i) in crs.iter_mut().enumerate() {
for i in 0..4 {
match reader.read_until(b'\n', &mut self.buffer) {
// end of file
Ok(0) => {
if i == 0 {
return if i == 0 {
// proper file end
return None;
}
// truncated records
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Truncated FASTQ record",
)));
None
} else {
// truncated records
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Truncated FASTQ record",
)));
};
}
Ok(n_bs) => {
*crs_i = n_bs + t_bs;
t_bs += n_bs;
Ok(n) => {
crs[i] = n + t_bs;
t_bs += n;
}
Err(e) => return Some(Err(e)),
}
}

// test for valid header start
if !self.buffer[0] == b'@' {
return Some(Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid FASTQ header",
)));
}

// test for valid separator line
let sep_pos = crs[2] + 1;
if !self.buffer[sep_pos] == b'+' {
return Some(Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid FASTQ separator",
)));
}

// test that quality and sequence strings are equal length
if (crs[1] - crs[0]) != (crs[3] - crs[2]) {
return Some(Err(io::Error::new(
io::ErrorKind::InvalidData,
"FASTQ quality and sequence lines differ",
)));
}
let buf = unsafe { std::slice::from_raw_parts(self.buffer.as_ptr(), self.buffer.len()) };

unsafe {
let buf = std::slice::from_raw_parts(self.buffer.as_ptr(), self.buffer.len());
let lines: [&[u8]; 4] = [
&buf[0..crs[0]],
&buf[crs[0] + 1..crs[1]],
&buf[crs[1] + 1..crs[2]],
&buf[crs[2] + 1..crs[3]],
];

Some(Ok(Record {
raw_fields: &buf[1..crs[0] - 1],
raw_seq: &buf[crs[0]..crs[1] - 1],
raw_quality: Some(&buf[crs[2]..crs[3] - 1]),
_p: PhantomData,
}))
}
Some(build_record(&lines))
}
}

impl<'src, S: TryFrom<&'src [u8]>> Fastq<'src, S> {
fn parse(&mut self) -> Option<Result<Record<'src, S>, std::io::Error>> {
todo!()
if self.pos >= self.buffer.len() {
return None;
}

let mut lines: [&[u8]; 4] = [&[]; 4];

for crs_i in &mut lines {
if let Some(n) = self.buffer[self.pos..].iter().position(|&b| b == b'\n') {
*crs_i = &self.buffer[self.pos..self.pos + n];
self.pos += n;
} else {
// truncated records
return Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Truncated FASTQ record",
)));
}
}

Some(build_record(&lines))
}
}

Expand All @@ -140,6 +171,29 @@ impl<'a, R: BufRead + Unpin, S: TryFrom<&'a [u8]>> AsyncIterator for FastqReader
Poll::Ready(record)
}
}

impl<'a, S: TryFrom<&'a [u8]>> Iterator for Fastq<'a, S> {
type Item = Result<Record<'a, S>, std::io::Error>;

fn next(&mut self) -> Option<Self::Item> {
self.parse()
}
}

impl<'a, S: TryFrom<&'a [u8]>> AsyncIterator for Fastq<'a, S> {
type Item = Result<Record<'a, S>, std::io::Error>;

fn poll_next(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let record = unsafe { self.get_unchecked_mut().parse() };

Poll::Ready(record)
}
}

/*
#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -213,3 +267,4 @@ GGGGGGGGGGGGGG\n";
//assert_eq!(fastq.as_mut().poll_next(&mut cx), Poll::Ready(None));
}
}
*/

0 comments on commit 7e8071e

Please sign in to comment.