Skip to content

Commit

Permalink
fixed integration of int32 and subgraph
Browse files Browse the repository at this point in the history
  • Loading branch information
gf777 committed May 31, 2024
1 parent 45558cf commit bfa6b94
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 35 deletions.
30 changes: 16 additions & 14 deletions include/kreeq.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ using parallelMap = phmap::parallel_flat_hash_map<uint64_t, DBGkmer,
8,
phmap::NullMutex>;

using parallelMap32 = phmap::parallel_flat_hash_map<uint64_t, DBGkmer32,
std::hash<uint64_t>,
std::equal_to<uint64_t>,
std::allocator<std::pair<const uint64_t, DBGkmer32>>,
8,
phmap::NullMutex>;

class DBG : public Kmap<UserInputKreeq, DBGkmer, uint8_t> {

std::atomic<uint64_t> totMissingKmers{0}, totKcount{0}, totEdgeMissingKmers{0}, buffers{0};
Expand All @@ -55,28 +62,21 @@ class DBG : public Kmap<UserInputKreeq, DBGkmer, uint8_t> {
InSequencesDBG *genome;

// subgraph objects
parallelMap *DBGsubgraph = new parallelMap;
std::vector<parallelMap*> DBGTmpSubgraphs;
parallelMap32 *DBGsubgraph = new parallelMap32;
std::vector<parallelMap32*> DBGTmpSubgraphs;
InSequences GFAsubgraph;


std::queue<std::string*> readBatches;

uint64_t totEdgeCount = 0;

using parallelMap32 = phmap::parallel_flat_hash_map<uint64_t, DBGkmer32,
std::hash<uint64_t>,
std::equal_to<uint64_t>,
std::allocator<std::pair<const uint64_t, DBGkmer32>>,
8,
phmap::NullMutex>;

std::vector<parallelMap32*> maps32;

public:

DBG(UserInputKreeq& userInput) : Kmap{userInput.kmerLen} , userInput(userInput) {

for(uint16_t m = 0; m<mapCount; ++m)
maps32.push_back(new parallelMap32);

if (userInput.inDBG.size() == 0) { // if we are not reading an existing db
lg.verbose("Deleting any tmp file");
for(uint16_t m = 0; m<mapCount; ++m) {// remove tmp buffers and maps if any
Expand All @@ -88,8 +88,6 @@ class DBG : public Kmap<UserInputKreeq, DBGkmer, uint8_t> {
remove((userInput.prefix + "/.index").c_str());
}
jobWait(threadPool);
for(uint16_t m = 0; m<mapCount; ++m)
maps32.push_back(new parallelMap32);
initHashing(); // start parallel hashing
}
};
Expand Down Expand Up @@ -170,6 +168,10 @@ class DBG : public Kmap<UserInputKreeq, DBGkmer, uint8_t> {

bool unionSum(parallelMap* map1, parallelMap* map2, uint16_t m);

bool mergeSubMaps(parallelMap32* map1, parallelMap32* map2, uint8_t subMapIndex);

bool unionSum(parallelMap32* map1, parallelMap32* map2);

void report();

void printTable(std::string ext);
Expand Down
79 changes: 65 additions & 14 deletions src/graph-builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,9 @@ bool DBG::buffersToMaps() {
jobs.push_back([this, i] { return processBuffers(i); });

threadPool.queueJobs(jobs);

jobWait(threadPool);

consolidateTmpMaps();

dumpHighCopyKmers();

return true;
Expand Down Expand Up @@ -534,17 +532,10 @@ void DBG::finalize() {
if (userInput.inDBG.size() == 0) {

readingDone = true;

joinThreads();

lg.verbose("Converting buffers to maps");

buffersToMaps();

loadHighCopyKmers(); // reload high copy kmers for computation steps

}

}

void DBG::stats() {
Expand Down Expand Up @@ -640,9 +631,7 @@ std::array<uint16_t, 2> DBG::computeMapRange(std::array<uint16_t, 2> mapRange) {
mapRange[1] = m + 1;

}

return mapRange;

}

void DBG::loadMapRange(std::array<uint16_t, 2> mapRange) {
Expand All @@ -654,14 +643,12 @@ void DBG::loadMapRange(std::array<uint16_t, 2> mapRange) {

threadPool.queueJobs(jobs);
jobWait(threadPool);

}

void DBG::deleteMapRange(std::array<uint16_t, 2> mapRange) {

for(uint16_t m = mapRange[0]; m<mapRange[1]; ++m)
deleteMap(m);

}

void DBG::cleanup() {
Expand Down Expand Up @@ -796,7 +783,6 @@ void DBG::kunion(){ // concurrent merging of the maps that store the same hashes
}

std::vector<std::function<bool()>> jobs;

std::vector<uint64_t> fileSizes;

for (uint16_t m = 0; m<mapCount; ++m) // compute size of map files
Expand Down Expand Up @@ -942,3 +928,68 @@ bool DBG::unionSum(parallelMap* map1, parallelMap* map2, uint16_t m) {
return true;

}

// subgraph functions

bool DBG::mergeSubMaps(parallelMap32* map1, parallelMap32* map2, uint8_t subMapIndex) {

auto& inner = map1->get_inner(subMapIndex); // to retrieve the submap at given index
auto& submap1 = inner.set_; // can be a set or a map, depending on the type of map1
auto& inner2 = map2->get_inner(subMapIndex);
auto& submap2 = inner2.set_;

for (auto pair : submap1) { // for each element in map1, find it in map2 and increase its value

auto got = submap2.find(pair.first); // insert or find this kmer in the hash table
if (got == submap2.end()){
submap2.insert(pair);
}else{

DBGkmer32& dbgkmerMap = got->second;

for (uint64_t w = 0; w<4; ++w) { // update weights

if (LARGEST - dbgkmerMap.fw[w] >= pair.second.fw[w])
dbgkmerMap.fw[w] += pair.second.fw[w];
else
dbgkmerMap.fw[w] = LARGEST;
if (LARGEST - dbgkmerMap.bw[w] >= pair.second.bw[w])
dbgkmerMap.bw[w] += pair.second.bw[w];
else
dbgkmerMap.bw[w] = LARGEST;

}

if (LARGEST - dbgkmerMap.cov >= pair.second.cov)
dbgkmerMap.cov += pair.second.cov; // increase kmer coverage
else
dbgkmerMap.cov = LARGEST;

};

}

return true;

}


bool DBG::unionSum(parallelMap32* map1, parallelMap32* map2) {

std::vector<std::function<bool()>> jobs;

if (map1->subcnt() != map2->subcnt()) {
fprintf(stderr, "Maps don't have the same numbers of submaps (%zu != %zu). Terminating.\n", map1->subcnt(), map2->subcnt());
exit(EXIT_FAILURE);
}

for(std::size_t subMapIndex = 0; subMapIndex < map1->subcnt(); ++subMapIndex)
jobs.push_back([this, map1, map2, subMapIndex] { return mergeSubMaps(map1, map2, subMapIndex); });

threadPool.queueJobs(jobs);

jobWait(threadPool);

return true;

}
8 changes: 5 additions & 3 deletions src/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ void Input::read() {

lg.verbose("Reads loaded.");
knav.finalize();
}else{
loadGraph();
}

if (userInput.inDBG.size() > 0)
userInput.prefix = userInput.inDBG[0];
knav.loadHighCopyKmers(); // reload high copy kmers for computation steps

InSequencesDBG genome; // initialize sequence collection object
if (!userInput.inSequence.empty()) {

Expand Down Expand Up @@ -152,6 +153,7 @@ void Input::read() {

loadGraph();
DBG knav(userInput); // navigational kmerdb
knav.loadHighCopyKmers(); // reload high copy kmers for computation steps

InSequencesDBG genome; // initialize sequence collection object
if (!userInput.inSequence.empty()) {
Expand Down
15 changes: 11 additions & 4 deletions src/kreeq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,8 @@ bool DBG::DBGsubgraphFromSegment(InSegment *inSegment, std::array<uint16_t, 2> m

std::string sHeader = inSegment->getSeqHeader();
parallelMap *map;
parallelMap *segmentSubmap = new parallelMap;
parallelMap32 *map32;
parallelMap32 *segmentSubmap = new parallelMap32;
uint64_t key, i;
bool isFw = false;
std::vector<uint64_t> segmentCoordinates;
Expand Down Expand Up @@ -960,7 +961,13 @@ bool DBG::DBGsubgraphFromSegment(InSegment *inSegment, std::array<uint16_t, 2> m
auto got = map->find(key);

if (got != map->end()) {
segmentSubmap->insert(*got);
if (got->second.cov != 255) {
segmentSubmap->insert(*got);
}else{
map32 = maps32[i];
auto got = map32->find(key);
segmentSubmap->insert(*got);
}
}
}
}
Expand All @@ -976,7 +983,7 @@ bool DBG::DBGsubgraphFromSegment(InSegment *inSegment, std::array<uint16_t, 2> m

void DBG::mergeSubgraphs() {

for (parallelMap *map1 : DBGTmpSubgraphs) {
for (parallelMap32 *map1 : DBGTmpSubgraphs) {
unionSum(map1, DBGsubgraph);
delete map1;
}
Expand All @@ -993,7 +1000,7 @@ void DBG::DBGgraphToGFA() {
std::string* inSequence = new std::string(reverseHash(pair.first));
Sequence* sequence = new Sequence {std::to_string(idCounter++), "", inSequence};
sequence->seqPos = seqPos; // remember the order
std::vector<Tag> inTags = {Tag{'i',"KC",std::to_string(pair.second.cov)}};
std::vector<Tag> inTags = {Tag{'i',"RC",std::to_string(pair.second.cov)}};
GFAsubgraph.appendSegment(sequence, inTags);
seqPos++;
}
Expand Down
Binary file added testFiles/test1.kreeq/.map.hc.bin
Binary file not shown.
Binary file added testFiles/test2.kreeq/.map.hc.bin
Binary file not shown.
112 changes: 112 additions & 0 deletions validateFiles/test.37.tst
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
kreeq-decompressor lookup -i testFiles/decompressor1.bkwig -c testFiles/decompressor1.bed
embedded
21
sequence1:20-30
3,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
4,1,2

sequence1:40-43
2,2,1
2,2,2
2,2,2

sequence2:15-70
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
2,2,0
2,2,2
2,2,2
4,1,2
2,2,1
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,1,2
2,2,1
2,2,2
2,2,2
2,2,2
2,0,2
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
0,0,0
2,2,0
2,2,2
2,2,2
2,1,2

sequence2:30-39
2,2,1
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2

sequence3:20-30
3,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,0,2
0,0,0
0,0,0
0,0,0

sequence3:60-70
2,2,1
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,2,2
2,1,2

Loading

0 comments on commit bfa6b94

Please sign in to comment.