Skip to content

Commit

Permalink
Multi-threading added
Browse files Browse the repository at this point in the history
  • Loading branch information
cjain7 committed Jul 25, 2018
1 parent 94a0d7a commit 37ba072
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Makefile.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CXXFLAGS += -O3 -DNDEBUG -std=c++11 -Isrc -I @mathinc@
CXXFLAGS += -O3 -DNDEBUG -std=c++11 -fopenmp -Isrc -I @mathinc@
CPPFLAGS += @amcppflags@

UNAME_S=$(shell uname -s)
Expand Down
89 changes: 54 additions & 35 deletions src/cgi/core_genome_identity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ctime>
#include <chrono>
#include <functional>
#include <omp.h>

//Own includes
#include "map/include/map_parameters.hpp"
Expand All @@ -23,6 +24,12 @@

int main(int argc, char** argv)
{
/*
* Make sure env variable MALLOC_ARENA_MAX is unset
* for efficient multi-threaded execution
*/
unsetenv((char *)"MALLOC_ARENA_MAX");

CommandLineProcessing::ArgvParser cmd;
using namespace std::placeholders; // for _1, _2, _3...

Expand All @@ -34,60 +41,72 @@ int main(int argc, char** argv)

skch::parseandSave(argc, argv, cmd, parameters);

//Redirect Mashmap's mapping output to null fs, using file name for CGI output
std::string fileName = parameters.outFileName;

#ifdef DEBUG
parameters.outFileName = parameters.outFileName + ".map";
#else
//To redirect Mashmap's mapping output to null fs, using file name for CGI output
parameters.outFileName = "/dev/null";
#endif

auto t0 = skch::Time::now();

//Build the sketch for reference
skch::Sketch referSketch(parameters);

std::chrono::duration<double> timeRefSketch = skch::Time::now() - t0;
std::cerr << "INFO, skch::main, Time spent sketching the reference : " << timeRefSketch.count() << " sec" << std::endl;

//Initialize the files to delete the existing content
{
#ifdef DEBUG
std::ofstream outstrm2(fileName + ".map.1way");
std::ofstream outstrm3(fileName + ".map.2way");
#endif
if(parameters.visualize)
std::ofstream outstrm4(fileName + ".visual");
}
//Set up for parallel execution
omp_set_num_threads( parameters.threads );
std::vector <skch::Parameters> parameters_split (parameters.threads);
cgi::splitReferenceGenomes (parameters, parameters_split);

//Final output vector of ANI computation
std::vector<cgi::CGI_Results> finalResults;

//Loop over query genomes
for(uint64_t queryno = 0; queryno < parameters.querySequences.size(); queryno++)
#pragma omp parallel for
for (uint64_t i = 0; i < parameters.threads; i++)
{
t0 = skch::Time::now();
//start timer
auto t0 = skch::Time::now();

//Build the sketch for reference
skch::Sketch referSketch(parameters_split[i]);

std::chrono::duration<double> timeRefSketch = skch::Time::now() - t0;

if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::main, Time spent sketching the reference : " << timeRefSketch.count() << " sec" << std::endl;

//Final output vector of ANI computation
std::vector<cgi::CGI_Results> finalResults_local;

//Loop over query genomes
for(uint64_t queryno = 0; queryno < parameters_split[i].querySequences.size(); queryno++)
{
t0 = skch::Time::now();

skch::MappingResultsVector_t mapResults;
uint64_t totalQueryFragments = 0;

auto fn = std::bind(skch::Map::insertL2ResultsToVec, std::ref(mapResults), _1);
skch::Map mapper = skch::Map(parameters_split[i], referSketch, totalQueryFragments, queryno, fn);

std::chrono::duration<double> timeMapQuery = skch::Time::now() - t0;

if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::main, Time spent mapping fragments in query #" << queryno + 1 << " : " << timeMapQuery.count() << " sec" << std::endl;

skch::MappingResultsVector_t mapResults;
uint64_t totalQueryFragments = 0;
t0 = skch::Time::now();

auto fn = std::bind(skch::Map::insertL2ResultsToVec, std::ref(mapResults), _1);
skch::Map mapper = skch::Map(parameters, referSketch, totalQueryFragments, queryno, fn);
cgi::computeCGI(parameters_split[i], mapResults, mapper, referSketch, totalQueryFragments, queryno, fileName, finalResults_local);

std::chrono::duration<double> timeMapQuery = skch::Time::now() - t0;
std::cerr << "INFO, skch::main, Time spent mapping fragments in query #" << queryno + 1 << " : " << timeMapQuery.count() << " sec" << std::endl;
std::chrono::duration<double> timeCGI = skch::Time::now() - t0;

t0 = skch::Time::now();
if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::main, Time spent post mapping : " << timeCGI.count() << " sec" << std::endl;
}

cgi::computeCGI(parameters, mapResults, mapper, referSketch, totalQueryFragments, queryno, fileName, finalResults);
cgi::correctRefGenomeIds (finalResults_local);

std::chrono::duration<double> timeCGI = skch::Time::now() - t0;
std::cerr << "INFO, skch::main, Time spent post mapping : " << timeCGI.count() << " sec" << std::endl;
#pragma omp critical
finalResults.insert (finalResults.end(), finalResults_local.begin(), finalResults_local.end());
}

//report output in file
cgi::outputCGI (parameters, finalResults, fileName);

//report output as matrix
if (parameters.matrixOutput)
cgi::outputPhylip (parameters, finalResults, fileName);
}
78 changes: 40 additions & 38 deletions src/cgi/include/computeCoreIdentity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <algorithm>
#include <unordered_map>
#include <fstream>
#include <omp.h>

//Own includes
#include "map/include/base_types.hpp"
Expand Down Expand Up @@ -172,25 +173,6 @@ namespace cgi
}
}

#ifdef DEBUG
{
std::ofstream outstrm(fileName + ".map.1way", std::ios::app);

//Report all mappings that contribute to core-genome identity estimate
for(auto &e : mappings_1way)
{
if(e.nucIdentity != 0)
outstrm << parameters.querySequences[queryFileNo]
<< " " << parameters.refSequences[e.genomeId]
<< " " << e.querySeqId
<< " " << e.refSequenceId
<< " " << e.mapRefPosBin
<< " " << e.nucIdentity
<< "\n";
}
}
#endif

///2. Now, we compute 2-way ANI
//For each mapped region, and within a reference bin bucket, single best query mapping is preserved
{
Expand All @@ -214,32 +196,13 @@ namespace cgi
}
}

#ifdef DEBUG
{
std::ofstream outstrm(fileName + ".map.2way", std::ios::app);

//Report all mappings that contribute to core-genome identity estimate
for(auto &e : mappings_2way)
{
outstrm << parameters.querySequences[queryFileNo]
<< " " << parameters.refSequences[e.genomeId]
<< " " << e.querySeqId
<< " " << e.refSequenceId
<< " " << e.mapRefPosBin
<< " " << e.nucIdentity
<< "\n";
}
}
#endif

{
if(parameters.visualize)
{
outputVisualizationFile(parameters, mappings_2way, mapper, refSketch, queryFileNo, fileName);
}
}


//Do average for ANI/AAI computation
//mappings_2way should be sorted by genomeId

Expand Down Expand Up @@ -397,6 +360,45 @@ namespace cgi

outstrm.close();
}

/**
* @brief generate multiple parameter objects from one
* @details purpose it to divide the list of reference genomes
* into as many buckets as there are threads
* @param[in] parameters
* @param[out] parameters_split
*/
void splitReferenceGenomes(skch::Parameters &parameters,
std::vector <skch::Parameters> &parameters_split)
{
for (int i = 0; i < parameters.threads; i++)
{
parameters_split[i] = parameters;

//update the reference genomes list
parameters_split[i].refSequences.clear();

//assign ref. genome to threads in round-robin fashion
for (int j = 0; j < parameters.refSequences.size(); j++)
{
if (j % parameters.threads == i)
parameters_split[i].refSequences.push_back (parameters.refSequences[j]);
}
}
}

/**
* @brief update thread local reference genome ids to global ids
* @param[in/out] CGI_ResultsVector
*/
void correctRefGenomeIds (std::vector<cgi::CGI_Results> &CGI_ResultsVector)
{
int tid = omp_get_thread_num();
int thread_count = omp_get_num_threads();

for (auto &e : CGI_ResultsVector)
e.refGenomeId = e.refGenomeId * thread_count + tid;
}
}

#endif
1 change: 1 addition & 0 deletions src/map/include/map_parameters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace skch
int windowSize; //window size used for sketching
int minReadLength; //minimum read length which code maps
int minFragments; //minimum fragment mappings for trusting ANI value
int threads; //thread count
int alphabetSize; //alphabet size
uint64_t referenceSize; //Approximate reference size
float percentageIdentity; //user defined threshold for good similarity
Expand Down
20 changes: 18 additions & 2 deletions src/map/include/parseCmdArgs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ Example usage: \n\
cmd.defineOption("kmer", "kmer size <= 16 [default : 16]", ArgvParser::OptionRequiresValue);
cmd.defineOptionAlternative("kmer","k");

cmd.defineOption("threads", "thread count for parallel execution [default : 1]", ArgvParser::OptionRequiresValue);
cmd.defineOptionAlternative("threads","t");

cmd.defineOption("fragLen", "fragment length [default : 3,000]", ArgvParser::OptionRequiresValue);

cmd.defineOption("minFrag", "minimum fragments for trusting ANI [default : 50]", ArgvParser::OptionRequiresValue);
cmd.defineOption("minFrag", "minimum matched fragments for trusting ANI [default : 50]", ArgvParser::OptionRequiresValue);

cmd.defineOption("visualize", "output mappings for visualization, can be enabled for single genome to single genome comparison only [disabled by default]");

Expand Down Expand Up @@ -141,6 +144,7 @@ Example usage: \n\
std::cerr << "Query = " << parameters.querySequences << std::endl;
std::cerr << "Kmer size = " << parameters.kmerSize << std::endl;
std::cerr << "Fragment length = " << parameters.minReadLength << std::endl;
std::cerr << "Threads = " << parameters.threads << std::endl;
std::cerr << "ANI output file = " << parameters.outFileName << std::endl;
std::cerr << ">>>>>>>>>>>>>>>>>>" << std::endl;
}
Expand Down Expand Up @@ -196,7 +200,10 @@ Example usage: \n\
}

//Size of reference
parameters.referenceSize = skch::CommonFunc::getReferenceSize(parameters.refSequences);
//parameters.referenceSize = skch::CommonFunc::getReferenceSize(parameters.refSequences);

//fix reference length to a typical bacterial genome length
parameters.referenceSize = 5000000;
str.clear();

//Parse query files
Expand Down Expand Up @@ -261,6 +268,15 @@ Example usage: \n\
parameters.kmerSize = 5;
}

if(cmd.foundOption("threads"))
{
str << cmd.optionValue("threads");
str >> parameters.threads;
str.clear();
}
else
parameters.threads = 1;

parameters.p_value = 1e-03;

if(cmd.foundOption("fragLen"))
Expand Down
20 changes: 15 additions & 5 deletions src/map/include/winSketch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <map>
#include <cassert>
#include <zlib.h>
#include <omp.h>

//Own includes
#include "map/include/commonFunc.hpp"
Expand Down Expand Up @@ -165,7 +166,8 @@ namespace skch
fclose(file);
}

std::cerr << "INFO, skch::Sketch::build, minimizers picked from reference = " << minimizerIndex.size() << std::endl;
if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::Sketch::build, minimizers picked from reference = " << minimizerIndex.size() << std::endl;

}

Expand All @@ -182,7 +184,8 @@ namespace skch
MinimizerMetaData{e.seqId, e.wpos});
}

std::cerr << "INFO, skch::Sketch::index, unique minimizers = " << minimizerPosLookupIndex.size() << std::endl;
if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::Sketch::index, unique minimizers = " << minimizerPosLookupIndex.size() << std::endl;
}

/**
Expand All @@ -197,7 +200,8 @@ namespace skch
for(auto &e : this->minimizerPosLookupIndex)
this->minimizerFreqHistogram[e.second.size()] += 1;

std::cerr << "INFO, skch::Sketch::computeFreqHist, Frequency histogram of minimizers = " << *this->minimizerFreqHistogram.begin() << " ... " << *this->minimizerFreqHistogram.rbegin() << std::endl;
if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, Frequency histogram of minimizers = " << *this->minimizerFreqHistogram.begin() << " ... " << *this->minimizerFreqHistogram.rbegin() << std::endl;

//2. Compute frequency threshold to ignore most frequent minimizers

Expand Down Expand Up @@ -227,9 +231,15 @@ namespace skch
}

if(this->freqThreshold != std::numeric_limits<int>::max())
std::cerr << "INFO, skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, ignore minimizers occurring >= " << this->freqThreshold << " times during lookup." << std::endl;
{
if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, ignore minimizers occurring >= " << this->freqThreshold << " times during lookup." << std::endl;
}
else
std::cerr << "INFO, skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, consider all minimizers during lookup." << std::endl;
{
if ( omp_get_thread_num() == 0)
std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, consider all minimizers during lookup." << std::endl;
}

}

Expand Down

0 comments on commit 37ba072

Please sign in to comment.