Skip to content

Commit

Permalink
introduce Reader trait for sync/async iterators of Records
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-k committed Sep 21, 2024
1 parent fde3cdc commit 4d6070c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 69 deletions.
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bio-streams"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
authors = ["jeff-k <[email protected]>"]
description = "Streaming bioinformatics data types"
Expand All @@ -13,13 +13,11 @@ readme = "README.md"
futures = "0.3"
futures-test = "0.3"
bio-seq = "0.13"
#bio-seq = { path = "../bio-seq/bio-seq" }

[dev-dependencies]
flate2 = "1"
clap = { version="4", features=["derive"] }
bio-seq = "0.13"
#bio-seq = { path = "../bio-seq/bio-seq" }

[[example]]
name = "fqcheck"
Expand Down
26 changes: 10 additions & 16 deletions src/fasta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,7 @@ use futures::stream::Stream;

use bio_seq::prelude::*;

use crate::Record;

#[derive(Debug, PartialEq)]
pub enum FastaError {
InvalidId(String),
TruncatedRecord,
InvalidSequence,
FileError,
}
use crate::{FastxError, Reader, Record};

pub struct Fasta<R: BufRead, T = Seq<Dna>>
where
Expand Down Expand Up @@ -50,7 +42,7 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fasta<R, T
}
}

fn parse_record(&mut self) -> Option<Result<Record<T>, FastaError>> {
fn parse_record(&mut self) -> Option<Result<Record<T>, FastxError>> {
let reader = Pin::get_mut(self.reader.as_mut());

let mut seq_buf: Vec<u8> = Vec::new();
Expand All @@ -67,13 +59,13 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fasta<R, T
// let end = end_pos(&self.line_buf);
Vec::from(&self.line_buf[1..end_pos(&self.line_buf)])
} else {
return Some(Err(FastaError::InvalidId(
return Some(Err(FastxError::InvalidId(
String::from_utf8_lossy(&self.line_buf).into_owned(),
)));
}
} else {
// premature end of fasta?
return Some(Err(FastaError::TruncatedRecord));
return Some(Err(FastxError::TruncatedRecord));
}
} else {
self.field_buf.take().unwrap()
Expand All @@ -100,7 +92,7 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fasta<R, T
let seq = match T::try_from(&seq_buf) {
Ok(s) => s,
Err(_) => {
return Some(Err(FastaError::InvalidSequence));
return Some(Err(FastxError::InvalidSequence("TODO".to_string())));
}
};
Some(Ok(Record {
Expand All @@ -111,16 +103,16 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fasta<R, T
}
}

impl<R: BufRead + Unpin, A: Codec> Iterator for Fasta<R, Seq<A>> {
type Item = Result<Record<Seq<A>>, FastaError>;
impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Iterator for Fasta<R, T> {
type Item = Result<Record<T>, FastxError>;

fn next(&mut self) -> Option<Self::Item> {
self.parse_record()
}
}

impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fasta<R, T> {
type Item = Result<Record<T>, FastaError>;
type Item = Result<Record<T>, FastxError>;

fn poll_next(
self: Pin<&mut Self>,
Expand All @@ -132,6 +124,8 @@ impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fasta<
}
}

impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Reader<T> for Fasta<R, T> {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
68 changes: 18 additions & 50 deletions src/fastq.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,15 @@
use core::marker::{PhantomData, Unpin};

use std::fmt;
use std::io::BufRead;
use std::pin::Pin;
use std::task::{Context, Poll};
//use std::error;

use futures::stream::Stream;

use bio_seq::prelude::*;

use crate::record::Phred;
use crate::Record;

#[derive(Debug, PartialEq)]
pub enum FastqError {
InvalidSeparationLine,
InvalidId(String),
TruncatedRecord,
InvalidSequence(String),
InvalidQuality,
FileError,
}

impl fmt::Display for FastqError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FastqError::InvalidSeparationLine => write!(f, "Invalid separation character"),
FastqError::InvalidId(id) => write!(f, "Invalid id: {id}"),
FastqError::TruncatedRecord => write!(f, "Truncated record"),
FastqError::InvalidSequence(seq) => write!(f, "Invalid sequence: {seq}"),
FastqError::InvalidQuality => write!(f, "Invalid quailty string"),
FastqError::FileError => write!(f, "File error"),
}
}
}

/*
impl error::Error for FastqError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
}
}
}
*/
use crate::{FastxError, Reader, Record};

pub struct Fastq<R: BufRead, T = Seq<Dna>>
where
Expand Down Expand Up @@ -71,7 +37,7 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fastq<R, T
}
}

fn parse_record(&mut self) -> Option<Result<Record<T>, FastqError>> {
fn parse_record(&mut self) -> Option<Result<Record<T>, FastxError>> {
let mut quality = Vec::<Phred>::new();
let reader = Pin::get_mut(self.reader.as_mut());

Expand All @@ -81,7 +47,7 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fastq<R, T
self.qual_buf.clear();

if reader.read_until(b'\n', &mut self.id_buf).is_err() {
return Some(Err(FastqError::FileError));
return Some(Err(FastxError::FileError));
}
if self.id_buf.is_empty() {
// This is the only condition where an empty reader means
Expand All @@ -90,45 +56,45 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fastq<R, T
}
// The id line must begin with '@'
if self.id_buf[0] != b'@' {
return Some(Err(FastqError::InvalidId(
return Some(Err(FastxError::InvalidId(
String::from_utf8_lossy(&self.id_buf).into_owned(),
)));
}

if reader.read_until(b'\n', &mut self.seq_buf).is_err() {
return Some(Err(FastqError::FileError));
return Some(Err(FastxError::FileError));
}
if self.seq_buf.is_empty() {
return Some(Err(FastqError::TruncatedRecord));
return Some(Err(FastxError::TruncatedRecord));
}

if reader.read_until(b'\n', &mut self.sep_buf).is_err() {
return Some(Err(FastqError::FileError));
return Some(Err(FastxError::FileError));
}
if self.sep_buf.is_empty() {
return Some(Err(FastqError::TruncatedRecord));
return Some(Err(FastxError::TruncatedRecord));
}

// Detect whether the '+' separation line is valid
if self.sep_buf.len() != 2 || self.sep_buf[0] != b'+' {
return Some(Err(FastqError::InvalidSeparationLine));
return Some(Err(FastxError::InvalidSeparationLine));
}
if reader.read_until(b'\n', &mut self.qual_buf).is_err() {
return Some(Err(FastqError::FileError));
return Some(Err(FastxError::FileError));
}
if self.qual_buf.is_empty() {
return Some(Err(FastqError::TruncatedRecord));
return Some(Err(FastxError::TruncatedRecord));
}

// Parse the contents of the sequence and quality lines
if self.qual_buf.len() != self.seq_buf.len() {
return Some(Err(FastqError::InvalidQuality));
return Some(Err(FastxError::InvalidQuality));
}

let seq = match T::try_from(&self.seq_buf[..self.seq_buf.len() - 1]) {
Ok(parsed_seq) => parsed_seq,
Err(_) => {
return Some(Err(FastqError::InvalidSequence(
return Some(Err(FastxError::InvalidSequence(
String::from_utf8_lossy(&self.seq_buf).into_owned(),
)))
}
Expand All @@ -148,16 +114,16 @@ impl<R: BufRead + Into<Box<R>> + Unpin, T: for<'a> TryFrom<&'a [u8]>> Fastq<R, T
}
}

impl<R: BufRead + Unpin, A: Codec> Iterator for Fastq<R, Seq<A>> {
type Item = Result<Record<Seq<A>>, FastqError>;
impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Iterator for Fastq<R, T> {
type Item = Result<Record<T>, FastxError>;

fn next(&mut self) -> Option<Self::Item> {
self.parse_record()
}
}

impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fastq<R, T> {
type Item = Result<Record<T>, FastqError>;
type Item = Result<Record<T>, FastxError>;

fn poll_next(
self: Pin<&mut Self>,
Expand All @@ -169,6 +135,8 @@ impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Stream for Fastq<
}
}

impl<R: BufRead + Unpin, T: Unpin + for<'a> TryFrom<&'a [u8]>> Reader<T> for Fastq<R, T> {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 8 additions & 0 deletions src/record.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use core::fmt;
use core::str;
use futures::Stream;

use crate::FastxError;

#[derive(Debug, PartialEq, Copy, Clone)]
pub struct Phred(u8);
Expand Down Expand Up @@ -51,3 +54,8 @@ impl<'a, T: for<'b> TryFrom<&'b [u8]>> Record<T> {
unimplemented!()
}
}

pub trait Reader<T: for<'a> TryFrom<&'a [u8]>>:
Iterator<Item = Result<Record<T>, FastxError>> + Stream<Item = Result<Record<T>, FastxError>>
{
}

0 comments on commit 4d6070c

Please sign in to comment.