Skip to content

Commit

Permalink
copy-free iterator for fastq buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-k committed Dec 28, 2024
1 parent 005f458 commit 84fc18b
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions src/fastq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fn build_record<'a, S: TryFrom<&'a [u8]>>(
lines: &[&'a [u8]; 4],
) -> Result<Record<'a, S>, io::Error> {
// test for valid header start

if lines[0][0] != b'@' {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down Expand Up @@ -45,31 +46,35 @@ fn build_record<'a, S: TryFrom<&'a [u8]>>(
})
}

pub struct FastqReader<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]> = &'a [u8]> {
/*
pub struct FastqReader<R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]> = Vec<u8>> {
reader: Pin<Box<R>>,
buffer: Vec<u8>,
_p: PhantomData<&'a ()>,
// _p: PhantomData<&'a ()>,
_s: PhantomData<S>,
}
*/

pub struct Fastq<'a, S: TryFrom<&'a [u8]> = &'a [u8]> {
pub struct Fastq<'a, S: TryFrom<&'a [u8]> = Vec<u8>> {
buffer: &'a [u8],
pos: usize,
_s: PhantomData<S>,
}

impl<R: BufRead + Unpin, S: for<'a> TryFrom<&'a [u8]>> FastqReader<'_, R, S> {
/*
impl<R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> FastqReader<R, S> {
pub fn new(reader: R) -> Self {
FastqReader {
reader: Box::pin(reader),
buffer: Vec::<u8>::with_capacity(1024),
_p: PhantomData,
// _p: PhantomData,
_s: PhantomData,
}
}
}
*/

impl<'src, S: for<'a> TryFrom<&'a [u8]>> Fastq<'src, S> {
impl<'src, S: TryFrom<&'src [u8]>> Fastq<'src, S> {
pub fn new(buf: &'src [u8]) -> Self {
Fastq {
buffer: buf,
Expand All @@ -79,8 +84,9 @@ impl<'src, S: for<'a> TryFrom<&'a [u8]>> Fastq<'src, S> {
}
}

impl<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> FastqReader<'a, R, S> {
fn parse(&mut self) -> Option<Result<Record<'a, S>, std::io::Error>> {
/*
impl<R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> FastqReader<R, S> {
fn parse<'buf>(&'buf mut self) -> Option<Result<Record<'buf, S>, std::io::Error>> {
self.buffer.clear();
// total bytes read
Expand Down Expand Up @@ -126,6 +132,7 @@ impl<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> FastqReader<'a, R, S>
Some(build_record(&lines))
}
}
*/

impl<'src, S: TryFrom<&'src [u8]>> Fastq<'src, S> {
fn parse(&mut self) -> Option<Result<Record<'src, S>, std::io::Error>> {
Expand All @@ -138,7 +145,7 @@ impl<'src, S: TryFrom<&'src [u8]>> Fastq<'src, S> {
for crs_i in &mut lines {
if let Some(n) = self.buffer[self.pos..].iter().position(|&b| b == b'\n') {
*crs_i = &self.buffer[self.pos..self.pos + n];
self.pos += n;
self.pos += n + 1;
} else {
// truncated records
return Some(Err(io::Error::new(
Expand All @@ -152,15 +159,16 @@ impl<'src, S: TryFrom<&'src [u8]>> Fastq<'src, S> {
}
}

impl<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> Iterator for FastqReader<'a, R, S> {
/*
impl<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> Iterator for &'a mut FastqReader<R, S> {
type Item = Result<Record<'a, S>, std::io::Error>;
fn next(&mut self) -> Option<Self::Item> {
self.parse()
}
}
impl<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> AsyncIterator for FastqReader<'a, R, S> {
impl<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> AsyncIterator for FastqReader<R, S> {
type Item = Result<Record<'a, S>, std::io::Error>;
fn poll_next(
Expand All @@ -172,6 +180,7 @@ impl<'a, R: BufRead + Unpin, S: for<'b> TryFrom<&'b [u8]>> AsyncIterator for Fas
Poll::Ready(record)
}
}
*/

impl<'a, S: TryFrom<&'a [u8]>> Iterator for Fastq<'a, S> {
type Item = Result<Record<'a, S>, std::io::Error>;
Expand All @@ -181,7 +190,7 @@ impl<'a, S: TryFrom<&'a [u8]>> Iterator for Fastq<'a, S> {
}
}

impl<'a, S: TryFrom<&'a [u8]>> AsyncIterator for Fastq<'a, S> {
impl<'a, S: for<'b> TryFrom<&'b [u8]>> AsyncIterator for Fastq<'a, S> {
type Item = Result<Record<'a, S>, std::io::Error>;

fn poll_next(
Expand All @@ -198,28 +207,27 @@ impl<'a, S: TryFrom<&'a [u8]>> AsyncIterator for Fastq<'a, S> {
mod tests {
use super::*;
use futures::task::noop_waker;
use futures::Stream;
// use futures::Stream;
use std::io::Cursor;
use std::iter::Iterator;
use std::task::{Context, Poll};

#[test]
fn test_fastq_iterator() {
let data = b"@SEQ_ID_1
const FQ1: &'static [u8] = b"@SEQ_ID_1
ACTCGATCGCGACGAA
+
AFFFFFFFFFFFFEBA
@SEQ_ID_2
CATCGACTACGGCG
+
GGGGGGGGGGGGGG\n";
let reader = Cursor::new(data as &[u8]);
//let mut fastq: FastqReader<Cursor<&[u8]>> = FastqReader::new(reader);

let mut fastq = FastqReader::new(reader);
#[test]
fn test_fastq_iterator() {
// let reader = Cursor::new(FQ1);

let mut fastq: Fastq<&[u8]> = Fastq::new(FQ1);

let record1: Record<Vec<u8>> = fastq.next().unwrap().unwrap();
assert_eq!(record1.raw_fields, b"SEQ_ID_1".to_vec());
let record1: Record = fastq.next().unwrap().unwrap();
assert_eq!(record1.raw_fields, b"SEQ_ID_1");
assert_eq!(record1.raw_seq, b"ACTCGATCGCGACGAA");
assert_eq!(record1.raw_quality.unwrap(), b"AFFFFFFFFFFFFEBA");
/*
Expand Down

0 comments on commit 84fc18b

Please sign in to comment.