From dc95c5c9fe0955a9ed49b1efaed11c8a0bfae4e1 Mon Sep 17 00:00:00 2001 From: Chang Guo Date: Fri, 21 Jun 2024 16:40:13 -0400 Subject: [PATCH] modify cache to support parallel --- source/adios2/engine/bp5/BP5Reader.cpp | 100 ++++++++++++++++--------- 1 file changed, 65 insertions(+), 35 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index edb0f2e38..b87000887 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -348,22 +348,42 @@ void BP5Reader::PerformRemoteGets() // TP startGenerate = NOW(); auto GetRequests = m_BP5Deserializer->PendingGetRequests; std::vector handles; + #ifdef ADIOS2_HAVE_KVCACHE // open kv cache connection + struct RequestInfo + { + size_t ReqSeq; + DataType varType; + size_t ReqCount; + std::string CacheKey; + size_t TypeSize; + Dims Count; + Dims Start; + void *Data; + }; + std::vector getRequestsInfo; + if (getenv("useKVCache")) { m_KVCacheCommon.openConnection(); + + std::vector getRequestsInfo; } #endif + + int req_seq = -1; for (auto &Req : GetRequests) { + req_seq++; #ifdef ADIOS2_HAVE_KVCACHE // get data from cache - const DataType varType = m_IO.InquireVariableType(Req.VarName); - QueryBox targetBox(Req.Start, Req.Count); - size_t numOfElements = targetBox.size(); - std::string keyPrefix = m_KVCacheCommon.keyPrefix(Req.VarName, Req.RelStep, Req.BlockID); - std::string targetKey = m_KVCacheCommon.keyComposition(keyPrefix, Req.Start, Req.Count); if (getenv("useKVCache")) { + const DataType varType = m_IO.InquireVariableType(Req.VarName); + QueryBox targetBox(Req.Start, Req.Count); + size_t numOfElements = targetBox.size(); + std::string keyPrefix = m_KVCacheCommon.keyPrefix(Req.VarName, Req.RelStep, Req.BlockID); + std::string targetKey = m_KVCacheCommon.keyComposition(keyPrefix, Req.Start, Req.Count); + // Exact Match: check if targetKey exists if (m_KVCacheCommon.exists(targetKey)) { @@ -372,12 +392,12 @@ void BP5Reader::PerformRemoteGets() { \ std::vector reqData; \ reqData.resize(numOfElements); \ - m_KVCacheCommon.get(targetKey, reqData); \ + m_KVCacheCommon.get(targetKey, reqData); \ std::memcpy(Req.Data, reqData.data(), numOfElements * sizeof(T)); \ } ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_get) #undef declare_type_get - continue; + } else { int max_depth = 999; std::set samePrefixKeys; @@ -432,15 +452,19 @@ void BP5Reader::PerformRemoteGets() if (varType == helper::GetDataType()) \ { \ const int typeSize = sizeof(T); \ - std::vector reqData; \ - reqData.resize(numOfElements); \ for (auto &box : regularBoxes){ \ - std::string boxKey = m_KVCacheCommon.keyComposition(keyPrefix, box.start, box.count); \ - std::vector srcData; \ - srcData.resize(box.size()); \ - m_Remote.Get(Req.VarName, Req.RelStep, Req.BlockID, box.count, box.start, srcData.data()); \ - helper::NdCopy(reinterpret_cast(srcData.data()), helper::CoreDims(box.start), box.count, true, false, reinterpret_cast(reqData.data()), Req.Start, Req.Count, true, false, typeSize); \ - m_KVCacheCommon.set(boxKey, srcData); \ + RequestInfo ReqInfo; \ + ReqInfo.ReqSeq = req_seq; \ + ReqInfo.varType = varType; \ + ReqInfo.ReqCount = box.size(); \ + ReqInfo.CacheKey = m_KVCacheCommon.keyComposition(keyPrefix, box.start, box.count); \ + ReqInfo.TypeSize = typeSize; \ + ReqInfo.Count = box.count; \ + ReqInfo.Start = box.start; \ + ReqInfo.Data = malloc(box.size() * sizeof(T)); \ + auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, box.count, box.start, ReqInfo.Data); \ + handles.push_back(handle); \ + getRequestsInfo.push_back(ReqInfo); \ } \ for (int i = 0; i < cachedBoxes.size(); i++){ \ std::string boxKey = cachedKeys[i]; \ @@ -448,43 +472,49 @@ void BP5Reader::PerformRemoteGets() std::vector srcData; \ srcData.resize(box.size()); \ m_KVCacheCommon.get(boxKey, srcData); \ - helper::NdCopy(reinterpret_cast(srcData.data()), helper::CoreDims(cachedBoxes[i].start), cachedBoxes[i].count, true, false, reinterpret_cast(reqData.data()), Req.Start, Req.Count, true, false, typeSize); \ + helper::NdCopy(reinterpret_cast(srcData.data()), helper::CoreDims(cachedBoxes[i].start), cachedBoxes[i].count, true, false, reinterpret_cast(Req.Data), Req.Start, Req.Count, true, false, typeSize); \ } \ - std::memcpy(Req.Data, reqData.data(), numOfElements * sizeof(T)); \ } ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_full_contain) #undef declare_type_full_contain - - continue; } + + continue; } #endif - auto handle = - m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data); + + auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data); handles.push_back(handle); } + + std::size_t handle_seq = -1; for (auto &handle : handles) { + handle_seq++; m_Remote->WaitForGet(handle); - - #ifdef ADIOS2_HAVE_KVCACHE // set data to cache + #ifdef ADIOS2_HAVE_KVCACHE // close cache connection if (getenv("useKVCache")) { - -#define declare_type_set(T) \ - if (varType == helper::GetDataType()) \ - { \ - std::vector reqData; \ - reqData.resize(numOfElements); \ - std::memcpy(reqData.data(), Req.Data, numOfElements * sizeof(T)); \ - m_KVCacheCommon.set(targetKey, reqData); \ - } - ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_set) -#undef declare_type_set + auto &ReqInfo = getRequestsInfo[handle_seq]; + auto &Req = GetRequests[ReqInfo.ReqSeq]; + helper::NdCopy(reinterpret_cast(ReqInfo.Data), helper::CoreDims(ReqInfo.Start), ReqInfo.Count, true, false, reinterpret_cast(Req.Data), Req.Start, Req.Count, true, false, ReqInfo.TypeSize); + + #define declare_type_set(T) \ + if (ReqInfo.varType == helper::GetDataType()) \ + { \ + std::vector reqData; \ + reqData.resize(ReqInfo.ReqCount); \ + std::memcpy(reqData.data(), Req.Data, ReqInfo.ReqCount * sizeof(T)); \ + m_KVCacheCommon.set(ReqInfo.CacheKey, reqData); \ + } + ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_set) + #undef declare_type_set + + free(ReqInfo.Data); } #endif - } + #ifdef ADIOS2_HAVE_KVCACHE // close cache connection if (getenv("useKVCache")) {