Skip to content

Commit

Permalink
Properly handle the case where we don't write any data to the tempfile
Browse files Browse the repository at this point in the history
  • Loading branch information
jackh726 committed Feb 4, 2024
1 parent 50618e4 commit 053185c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
23 changes: 16 additions & 7 deletions bigtools/src/utils/cli/bigwigtobedgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ufmt::uwrite;
#[command(
name = "bigwigtobedgraph",
about = "Converts an input bigWig to a bedGraph.",
long_about = "Converts an input bigWig to a bedGraph. Can be multi-threaded for substantial speedups. Note for roughly each core, one temporary file will be opened."
long_about = "Converts an input bigWig to a bedGraph. Can be multi-threaded for substantial speedups."
)]
pub struct BigWigToBedGraphArgs {
/// the bigwig to get convert to bedgraph
Expand All @@ -42,10 +42,16 @@ pub struct BigWigToBedGraphArgs {
/// If set, restrict output to regions overlapping the bed file
pub overlap_bed: Option<String>,

/// Set the number of threads to use. This tool will nearly always benefit from more cores (<= # chroms). Note: for parts of the runtime, the actual usage may be nthreads+1
/// Set the number of threads to use. This tool will nearly always benefit from more cores (<= # chroms).
#[arg(short = 't', long)]
#[arg(default_value_t = 6)]
pub nthreads: usize,

/// Do not create temporary files for intermediate data. (Only applicable when using multiple threads.)
/// By default, approximately one temporary file will be opened for each core.
#[arg(long)]
#[arg(default_value_t = false)]
pub inmemory: bool,
}

pub fn bigwigtobedgraph(args: BigWigToBedGraphArgs) -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -92,6 +98,7 @@ pub fn bigwigtobedgraph(args: BigWigToBedGraphArgs) -> Result<(), Box<dyn Error>
args.start,
args.end,
runtime.handle(),
args.inmemory,
))?;
}
}
Expand Down Expand Up @@ -143,18 +150,19 @@ pub async fn write_bg<R: Reopen + SeekableRead + Send + 'static>(
start: Option<u32>,
end: Option<u32>,
runtime: &runtime::Handle,
inmemory: bool,
) -> Result<(), BBIReadError> {
let start = chrom.as_ref().and_then(|_| start);
let end = chrom.as_ref().and_then(|_| end);

let chroms: Vec<ChromInfo> = bigwig.chroms().to_vec();
let chrom_files: Vec<io::Result<(_, TempFileBuffer<File>)>> = chroms
let chrom_files: Vec<io::Result<(_, TempFileBuffer<File>, String)>> = chroms
.into_iter()
.filter(|c| chrom.as_ref().map_or(true, |chrom| &c.name == chrom))
.map(|chrom| {
let bigwig = bigwig.reopen()?;
let (buf, file): (TempFileBuffer<File>, TempFileBufferWriter<File>) =
TempFileBuffer::new(true);
TempFileBuffer::new(inmemory);
let writer = io::BufWriter::new(file);
async fn file_future<R: Reopen + SeekableRead + 'static>(
mut bigwig: BigWigRead<R>,
Expand Down Expand Up @@ -183,17 +191,18 @@ pub async fn write_bg<R: Reopen + SeekableRead + Send + 'static>(
}
let start = start.unwrap_or(0);
let end = end.unwrap_or(chrom.length);
let name = chrom.name.clone();
let handle = runtime
.spawn(file_future(bigwig, chrom, writer, start, end))
.map(|f| f.unwrap());
Ok((handle, buf))
Ok((handle, buf, name))
})
.collect::<Vec<_>>();

for res in chrom_files {
let (f, mut buf) = res.unwrap();
let (f, mut buf, name) = res?;
buf.switch(out_file);
f.await.unwrap();
f.await?;
while !buf.is_real_file_ready() {
tokio::task::yield_now().await;
}
Expand Down
14 changes: 5 additions & 9 deletions bigtools/src/utils/file/tempfilebuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<R: Write + Send + 'static> TempFileBuffer<R> {
BufferState::Real(_) => panic!("Should not have switched already."),
BufferState::InMemory(data) => Ok(data.len() as u64),
BufferState::Temp(ref mut t) => t.seek(io::SeekFrom::Current(0)),
BufferState::NotStarted => panic!("No data was written."),
BufferState::NotStarted => Ok(0),
}
}

Expand Down Expand Up @@ -113,11 +113,10 @@ impl<R: Write + Send + 'static> TempFileBuffer<R> {
// Writer was dropped with no tempfile being created (or written to)
real_file
}
(None, BufferState::InMemory(_) | BufferState::Temp(_)) => {
(None, BufferState::Real(real_file)) => real_file,
(None, BufferState::InMemory(_) | BufferState::Temp(_) | BufferState::NotStarted) => {
panic!("Should have switched already.")
}
(None, BufferState::Real(real_file)) => real_file,
(None, BufferState::NotStarted) => panic!("No data was written."),
}
}

Expand All @@ -144,8 +143,8 @@ impl<R: Write + Send + 'static> TempFileBuffer<R> {
BufferState::InMemory(data) => {
real.write_all(&data)?;
}
BufferState::NotStarted => {}
BufferState::Real(_) => panic!("Should only be writing to real file."),
BufferState::NotStarted => panic!("No data was written."),
}
Ok(())
}
Expand Down Expand Up @@ -197,10 +196,7 @@ impl<R> Drop for TempFileBufferWriter<R> {
let &(ref lock, ref cvar) = &*self.closed;
let mut closed = lock.lock().unwrap();
let buffer_state = std::mem::replace(&mut self.buffer_state, BufferState::NotStarted);
match buffer_state {
BufferState::NotStarted => {}
state => *closed = Some(state),
}
*closed = Some(buffer_state);
cvar.notify_one();
drop(closed);
}
Expand Down

0 comments on commit 053185c

Please sign in to comment.