Skip to content

Commit

Permalink
Merge pull request #266 from waveygang/boundary-fx
Browse files Browse the repository at this point in the history
axis-weighted chaining
  • Loading branch information
ekg authored Aug 26, 2024
2 parents c95b6a3 + d2069e7 commit 064fafa
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 455 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ wfmash
src/common/wflign/build
build

# include directory
include

#Others
*.cache
*~
Expand Down
1 change: 1 addition & 0 deletions src/align/include/align_parameters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct Parameters {
bool emit_md_tag; //Output the MD tag
bool sam_format; //Emit the output in SAM format (PAF default)
bool no_seq_in_sam; //Do not fill the SEQ field in SAM format
bool multithread_fasta_input; //Multithreaded fasta input

#ifdef WFA_PNG_TSV_TIMING
// plotting
Expand Down
18 changes: 13 additions & 5 deletions src/align/include/computeAlignments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ void processor_manager(seq_atomic_queue_t& seq_queue,
std::vector<std::atomic<bool>> thread_should_exit(max_processors);

const size_t queue_capacity = seq_queue.capacity();
const size_t low_threshold = queue_capacity * 0.2;
const size_t low_threshold = 1;
const size_t high_threshold = queue_capacity * 0.8;

auto spawn_processor = [&](size_t id) {
Expand All @@ -383,14 +383,22 @@ void processor_manager(seq_atomic_queue_t& seq_queue,
// Start with one processor
spawn_processor(0);
size_t current_processors = 1;
uint64_t exhausted = 0;

while (!reader_done.load() || !line_queue.was_empty() || !seq_queue.was_empty()) {
size_t queue_size = seq_queue.was_size();

if (queue_size < low_threshold && current_processors < max_processors) {
spawn_processor(current_processors++);
} else if (queue_size > high_threshold && current_processors > 1) {
thread_should_exit[--current_processors].store(true);
if (param.multithread_fasta_input) {
if (queue_size < low_threshold && current_processors < max_processors) {
++exhausted;
} else if (queue_size > high_threshold && current_processors > 1) {
thread_should_exit[--current_processors].store(true);
}

if (exhausted > 20 && queue_size < low_threshold) {
spawn_processor(current_processors++);
exhausted = 0;
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand Down
Loading

0 comments on commit 064fafa

Please sign in to comment.