From 37ba07210a984289b79befb6eed9f53432fa20ea Mon Sep 17 00:00:00 2001 From: Chirag Jain Date: Wed, 25 Jul 2018 12:39:52 -0400 Subject: [PATCH] Multi-threading added --- Makefile.in | 2 +- src/cgi/core_genome_identity.cpp | 89 +++++++++++++++---------- src/cgi/include/computeCoreIdentity.hpp | 78 +++++++++++----------- src/map/include/map_parameters.hpp | 1 + src/map/include/parseCmdArgs.hpp | 20 +++++- src/map/include/winSketch.hpp | 20 ++++-- 6 files changed, 129 insertions(+), 81 deletions(-) diff --git a/Makefile.in b/Makefile.in index 95f4ed0..6be7ee9 100644 --- a/Makefile.in +++ b/Makefile.in @@ -1,4 +1,4 @@ -CXXFLAGS += -O3 -DNDEBUG -std=c++11 -Isrc -I @mathinc@ +CXXFLAGS += -O3 -DNDEBUG -std=c++11 -fopenmp -Isrc -I @mathinc@ CPPFLAGS += @amcppflags@ UNAME_S=$(shell uname -s) diff --git a/src/cgi/core_genome_identity.cpp b/src/cgi/core_genome_identity.cpp index 1df8c1a..3ca5db1 100644 --- a/src/cgi/core_genome_identity.cpp +++ b/src/cgi/core_genome_identity.cpp @@ -8,6 +8,7 @@ #include #include #include +#include //Own includes #include "map/include/map_parameters.hpp" @@ -23,6 +24,12 @@ int main(int argc, char** argv) { + /* + * Make sure env variable MALLOC_ARENA_MAX is unset + * for efficient multi-threaded execution + */ + unsetenv((char *)"MALLOC_ARENA_MAX"); + CommandLineProcessing::ArgvParser cmd; using namespace std::placeholders; // for _1, _2, _3... @@ -34,60 +41,72 @@ int main(int argc, char** argv) skch::parseandSave(argc, argv, cmd, parameters); - //Redirect Mashmap's mapping output to null fs, using file name for CGI output std::string fileName = parameters.outFileName; -#ifdef DEBUG - parameters.outFileName = parameters.outFileName + ".map"; -#else + //To redirect Mashmap's mapping output to null fs, using file name for CGI output parameters.outFileName = "/dev/null"; -#endif - - auto t0 = skch::Time::now(); - - //Build the sketch for reference - skch::Sketch referSketch(parameters); - - std::chrono::duration timeRefSketch = skch::Time::now() - t0; - std::cerr << "INFO, skch::main, Time spent sketching the reference : " << timeRefSketch.count() << " sec" << std::endl; - //Initialize the files to delete the existing content - { -#ifdef DEBUG - std::ofstream outstrm2(fileName + ".map.1way"); - std::ofstream outstrm3(fileName + ".map.2way"); -#endif - if(parameters.visualize) - std::ofstream outstrm4(fileName + ".visual"); - } + //Set up for parallel execution + omp_set_num_threads( parameters.threads ); + std::vector parameters_split (parameters.threads); + cgi::splitReferenceGenomes (parameters, parameters_split); //Final output vector of ANI computation std::vector finalResults; - //Loop over query genomes - for(uint64_t queryno = 0; queryno < parameters.querySequences.size(); queryno++) +#pragma omp parallel for + for (uint64_t i = 0; i < parameters.threads; i++) { - t0 = skch::Time::now(); + //start timer + auto t0 = skch::Time::now(); + + //Build the sketch for reference + skch::Sketch referSketch(parameters_split[i]); + + std::chrono::duration timeRefSketch = skch::Time::now() - t0; + + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::main, Time spent sketching the reference : " << timeRefSketch.count() << " sec" << std::endl; + + //Final output vector of ANI computation + std::vector finalResults_local; + + //Loop over query genomes + for(uint64_t queryno = 0; queryno < parameters_split[i].querySequences.size(); queryno++) + { + t0 = skch::Time::now(); + + skch::MappingResultsVector_t mapResults; + uint64_t totalQueryFragments = 0; + + auto fn = std::bind(skch::Map::insertL2ResultsToVec, std::ref(mapResults), _1); + skch::Map mapper = skch::Map(parameters_split[i], referSketch, totalQueryFragments, queryno, fn); + + std::chrono::duration timeMapQuery = skch::Time::now() - t0; + + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::main, Time spent mapping fragments in query #" << queryno + 1 << " : " << timeMapQuery.count() << " sec" << std::endl; - skch::MappingResultsVector_t mapResults; - uint64_t totalQueryFragments = 0; + t0 = skch::Time::now(); - auto fn = std::bind(skch::Map::insertL2ResultsToVec, std::ref(mapResults), _1); - skch::Map mapper = skch::Map(parameters, referSketch, totalQueryFragments, queryno, fn); + cgi::computeCGI(parameters_split[i], mapResults, mapper, referSketch, totalQueryFragments, queryno, fileName, finalResults_local); - std::chrono::duration timeMapQuery = skch::Time::now() - t0; - std::cerr << "INFO, skch::main, Time spent mapping fragments in query #" << queryno + 1 << " : " << timeMapQuery.count() << " sec" << std::endl; + std::chrono::duration timeCGI = skch::Time::now() - t0; - t0 = skch::Time::now(); + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::main, Time spent post mapping : " << timeCGI.count() << " sec" << std::endl; + } - cgi::computeCGI(parameters, mapResults, mapper, referSketch, totalQueryFragments, queryno, fileName, finalResults); + cgi::correctRefGenomeIds (finalResults_local); - std::chrono::duration timeCGI = skch::Time::now() - t0; - std::cerr << "INFO, skch::main, Time spent post mapping : " << timeCGI.count() << " sec" << std::endl; +#pragma omp critical + finalResults.insert (finalResults.end(), finalResults_local.begin(), finalResults_local.end()); } + //report output in file cgi::outputCGI (parameters, finalResults, fileName); + //report output as matrix if (parameters.matrixOutput) cgi::outputPhylip (parameters, finalResults, fileName); } diff --git a/src/cgi/include/computeCoreIdentity.hpp b/src/cgi/include/computeCoreIdentity.hpp index 87259e4..aa769c1 100644 --- a/src/cgi/include/computeCoreIdentity.hpp +++ b/src/cgi/include/computeCoreIdentity.hpp @@ -10,6 +10,7 @@ #include #include #include +#include //Own includes #include "map/include/base_types.hpp" @@ -172,25 +173,6 @@ namespace cgi } } -#ifdef DEBUG - { - std::ofstream outstrm(fileName + ".map.1way", std::ios::app); - - //Report all mappings that contribute to core-genome identity estimate - for(auto &e : mappings_1way) - { - if(e.nucIdentity != 0) - outstrm << parameters.querySequences[queryFileNo] - << " " << parameters.refSequences[e.genomeId] - << " " << e.querySeqId - << " " << e.refSequenceId - << " " << e.mapRefPosBin - << " " << e.nucIdentity - << "\n"; - } - } -#endif - ///2. Now, we compute 2-way ANI //For each mapped region, and within a reference bin bucket, single best query mapping is preserved { @@ -214,24 +196,6 @@ namespace cgi } } -#ifdef DEBUG - { - std::ofstream outstrm(fileName + ".map.2way", std::ios::app); - - //Report all mappings that contribute to core-genome identity estimate - for(auto &e : mappings_2way) - { - outstrm << parameters.querySequences[queryFileNo] - << " " << parameters.refSequences[e.genomeId] - << " " << e.querySeqId - << " " << e.refSequenceId - << " " << e.mapRefPosBin - << " " << e.nucIdentity - << "\n"; - } - } -#endif - { if(parameters.visualize) { @@ -239,7 +203,6 @@ namespace cgi } } - //Do average for ANI/AAI computation //mappings_2way should be sorted by genomeId @@ -397,6 +360,45 @@ namespace cgi outstrm.close(); } + + /** + * @brief generate multiple parameter objects from one + * @details purpose it to divide the list of reference genomes + * into as many buckets as there are threads + * @param[in] parameters + * @param[out] parameters_split + */ + void splitReferenceGenomes(skch::Parameters ¶meters, + std::vector ¶meters_split) + { + for (int i = 0; i < parameters.threads; i++) + { + parameters_split[i] = parameters; + + //update the reference genomes list + parameters_split[i].refSequences.clear(); + + //assign ref. genome to threads in round-robin fashion + for (int j = 0; j < parameters.refSequences.size(); j++) + { + if (j % parameters.threads == i) + parameters_split[i].refSequences.push_back (parameters.refSequences[j]); + } + } + } + + /** + * @brief update thread local reference genome ids to global ids + * @param[in/out] CGI_ResultsVector + */ + void correctRefGenomeIds (std::vector &CGI_ResultsVector) + { + int tid = omp_get_thread_num(); + int thread_count = omp_get_num_threads(); + + for (auto &e : CGI_ResultsVector) + e.refGenomeId = e.refGenomeId * thread_count + tid; + } } #endif diff --git a/src/map/include/map_parameters.hpp b/src/map/include/map_parameters.hpp index 41dba83..7ad70e8 100644 --- a/src/map/include/map_parameters.hpp +++ b/src/map/include/map_parameters.hpp @@ -24,6 +24,7 @@ namespace skch int windowSize; //window size used for sketching int minReadLength; //minimum read length which code maps int minFragments; //minimum fragment mappings for trusting ANI value + int threads; //thread count int alphabetSize; //alphabet size uint64_t referenceSize; //Approximate reference size float percentageIdentity; //user defined threshold for good similarity diff --git a/src/map/include/parseCmdArgs.hpp b/src/map/include/parseCmdArgs.hpp index 45792e0..baed839 100644 --- a/src/map/include/parseCmdArgs.hpp +++ b/src/map/include/parseCmdArgs.hpp @@ -52,9 +52,12 @@ Example usage: \n\ cmd.defineOption("kmer", "kmer size <= 16 [default : 16]", ArgvParser::OptionRequiresValue); cmd.defineOptionAlternative("kmer","k"); + cmd.defineOption("threads", "thread count for parallel execution [default : 1]", ArgvParser::OptionRequiresValue); + cmd.defineOptionAlternative("threads","t"); + cmd.defineOption("fragLen", "fragment length [default : 3,000]", ArgvParser::OptionRequiresValue); - cmd.defineOption("minFrag", "minimum fragments for trusting ANI [default : 50]", ArgvParser::OptionRequiresValue); + cmd.defineOption("minFrag", "minimum matched fragments for trusting ANI [default : 50]", ArgvParser::OptionRequiresValue); cmd.defineOption("visualize", "output mappings for visualization, can be enabled for single genome to single genome comparison only [disabled by default]"); @@ -141,6 +144,7 @@ Example usage: \n\ std::cerr << "Query = " << parameters.querySequences << std::endl; std::cerr << "Kmer size = " << parameters.kmerSize << std::endl; std::cerr << "Fragment length = " << parameters.minReadLength << std::endl; + std::cerr << "Threads = " << parameters.threads << std::endl; std::cerr << "ANI output file = " << parameters.outFileName << std::endl; std::cerr << ">>>>>>>>>>>>>>>>>>" << std::endl; } @@ -196,7 +200,10 @@ Example usage: \n\ } //Size of reference - parameters.referenceSize = skch::CommonFunc::getReferenceSize(parameters.refSequences); + //parameters.referenceSize = skch::CommonFunc::getReferenceSize(parameters.refSequences); + + //fix reference length to a typical bacterial genome length + parameters.referenceSize = 5000000; str.clear(); //Parse query files @@ -261,6 +268,15 @@ Example usage: \n\ parameters.kmerSize = 5; } + if(cmd.foundOption("threads")) + { + str << cmd.optionValue("threads"); + str >> parameters.threads; + str.clear(); + } + else + parameters.threads = 1; + parameters.p_value = 1e-03; if(cmd.foundOption("fragLen")) diff --git a/src/map/include/winSketch.hpp b/src/map/include/winSketch.hpp index 81f9167..2cdfcda 100644 --- a/src/map/include/winSketch.hpp +++ b/src/map/include/winSketch.hpp @@ -13,6 +13,7 @@ #include #include #include +#include //Own includes #include "map/include/commonFunc.hpp" @@ -165,7 +166,8 @@ namespace skch fclose(file); } - std::cerr << "INFO, skch::Sketch::build, minimizers picked from reference = " << minimizerIndex.size() << std::endl; + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::Sketch::build, minimizers picked from reference = " << minimizerIndex.size() << std::endl; } @@ -182,7 +184,8 @@ namespace skch MinimizerMetaData{e.seqId, e.wpos}); } - std::cerr << "INFO, skch::Sketch::index, unique minimizers = " << minimizerPosLookupIndex.size() << std::endl; + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::Sketch::index, unique minimizers = " << minimizerPosLookupIndex.size() << std::endl; } /** @@ -197,7 +200,8 @@ namespace skch for(auto &e : this->minimizerPosLookupIndex) this->minimizerFreqHistogram[e.second.size()] += 1; - std::cerr << "INFO, skch::Sketch::computeFreqHist, Frequency histogram of minimizers = " << *this->minimizerFreqHistogram.begin() << " ... " << *this->minimizerFreqHistogram.rbegin() << std::endl; + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, Frequency histogram of minimizers = " << *this->minimizerFreqHistogram.begin() << " ... " << *this->minimizerFreqHistogram.rbegin() << std::endl; //2. Compute frequency threshold to ignore most frequent minimizers @@ -227,9 +231,15 @@ namespace skch } if(this->freqThreshold != std::numeric_limits::max()) - std::cerr << "INFO, skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, ignore minimizers occurring >= " << this->freqThreshold << " times during lookup." << std::endl; + { + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, ignore minimizers occurring >= " << this->freqThreshold << " times during lookup." << std::endl; + } else - std::cerr << "INFO, skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, consider all minimizers during lookup." << std::endl; + { + if ( omp_get_thread_num() == 0) + std::cerr << "INFO [thread 0], skch::Sketch::computeFreqHist, With threshold " << this->percentageThreshold << "\%, consider all minimizers during lookup." << std::endl; + } }