Skip to content

Commit

Permalink
modify cache to support parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Change72 committed Jun 21, 2024
1 parent 6efcfd3 commit dc95c5c
Showing 1 changed file with 65 additions and 35 deletions.
100 changes: 65 additions & 35 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,22 +348,42 @@ void BP5Reader::PerformRemoteGets()
// TP startGenerate = NOW();
auto GetRequests = m_BP5Deserializer->PendingGetRequests;
std::vector<Remote::GetHandle> handles;

#ifdef ADIOS2_HAVE_KVCACHE // open kv cache connection
struct RequestInfo
{
size_t ReqSeq;
DataType varType;
size_t ReqCount;
std::string CacheKey;
size_t TypeSize;
Dims Count;
Dims Start;
void *Data;
};
std::vector<RequestInfo> getRequestsInfo;

if (getenv("useKVCache"))
{
m_KVCacheCommon.openConnection();

std::vector<RequestInfo> getRequestsInfo;
}
#endif

int req_seq = -1;
for (auto &Req : GetRequests)
{
req_seq++;
#ifdef ADIOS2_HAVE_KVCACHE // get data from cache
const DataType varType = m_IO.InquireVariableType(Req.VarName);
QueryBox targetBox(Req.Start, Req.Count);
size_t numOfElements = targetBox.size();
std::string keyPrefix = m_KVCacheCommon.keyPrefix(Req.VarName, Req.RelStep, Req.BlockID);
std::string targetKey = m_KVCacheCommon.keyComposition(keyPrefix, Req.Start, Req.Count);
if (getenv("useKVCache"))
{
const DataType varType = m_IO.InquireVariableType(Req.VarName);
QueryBox targetBox(Req.Start, Req.Count);
size_t numOfElements = targetBox.size();
std::string keyPrefix = m_KVCacheCommon.keyPrefix(Req.VarName, Req.RelStep, Req.BlockID);
std::string targetKey = m_KVCacheCommon.keyComposition(keyPrefix, Req.Start, Req.Count);

// Exact Match: check if targetKey exists
if (m_KVCacheCommon.exists(targetKey))
{
Expand All @@ -372,12 +392,12 @@ void BP5Reader::PerformRemoteGets()
{ \
std::vector<T> reqData; \
reqData.resize(numOfElements); \
m_KVCacheCommon.get(targetKey, reqData); \
m_KVCacheCommon.get(targetKey, reqData); \
std::memcpy(Req.Data, reqData.data(), numOfElements * sizeof(T)); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_get)
#undef declare_type_get
continue;

} else {
int max_depth = 999;
std::set<std::string> samePrefixKeys;
Expand Down Expand Up @@ -432,59 +452,69 @@ void BP5Reader::PerformRemoteGets()
if (varType == helper::GetDataType<T>()) \
{ \
const int typeSize = sizeof(T); \
std::vector<T> reqData; \
reqData.resize(numOfElements); \
for (auto &box : regularBoxes){ \
std::string boxKey = m_KVCacheCommon.keyComposition(keyPrefix, box.start, box.count); \
std::vector<T> srcData; \
srcData.resize(box.size()); \
m_Remote.Get(Req.VarName, Req.RelStep, Req.BlockID, box.count, box.start, srcData.data()); \
helper::NdCopy(reinterpret_cast<char*>(srcData.data()), helper::CoreDims(box.start), box.count, true, false, reinterpret_cast<char*>(reqData.data()), Req.Start, Req.Count, true, false, typeSize); \
m_KVCacheCommon.set(boxKey, srcData); \
RequestInfo ReqInfo; \
ReqInfo.ReqSeq = req_seq; \
ReqInfo.varType = varType; \
ReqInfo.ReqCount = box.size(); \
ReqInfo.CacheKey = m_KVCacheCommon.keyComposition(keyPrefix, box.start, box.count); \
ReqInfo.TypeSize = typeSize; \
ReqInfo.Count = box.count; \
ReqInfo.Start = box.start; \
ReqInfo.Data = malloc(box.size() * sizeof(T)); \
auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, box.count, box.start, ReqInfo.Data); \
handles.push_back(handle); \
getRequestsInfo.push_back(ReqInfo); \
} \
for (int i = 0; i < cachedBoxes.size(); i++){ \
std::string boxKey = cachedKeys[i]; \
QueryBox box(boxKey); \
std::vector<T> srcData; \
srcData.resize(box.size()); \
m_KVCacheCommon.get(boxKey, srcData); \
helper::NdCopy(reinterpret_cast<char*>(srcData.data()), helper::CoreDims(cachedBoxes[i].start), cachedBoxes[i].count, true, false, reinterpret_cast<char*>(reqData.data()), Req.Start, Req.Count, true, false, typeSize); \
helper::NdCopy(reinterpret_cast<char*>(srcData.data()), helper::CoreDims(cachedBoxes[i].start), cachedBoxes[i].count, true, false, reinterpret_cast<char*>(Req.Data), Req.Start, Req.Count, true, false, typeSize); \
} \
std::memcpy(Req.Data, reqData.data(), numOfElements * sizeof(T)); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_full_contain)
#undef declare_type_full_contain

continue;
}

continue;
}
#endif
auto handle =
m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);

auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
handles.push_back(handle);
}

std::size_t handle_seq = -1;
for (auto &handle : handles)
{
handle_seq++;
m_Remote->WaitForGet(handle);

#ifdef ADIOS2_HAVE_KVCACHE // set data to cache
#ifdef ADIOS2_HAVE_KVCACHE // close cache connection
if (getenv("useKVCache"))
{

#define declare_type_set(T) \
if (varType == helper::GetDataType<T>()) \
{ \
std::vector<T> reqData; \
reqData.resize(numOfElements); \
std::memcpy(reqData.data(), Req.Data, numOfElements * sizeof(T)); \
m_KVCacheCommon.set(targetKey, reqData); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_set)
#undef declare_type_set
auto &ReqInfo = getRequestsInfo[handle_seq];
auto &Req = GetRequests[ReqInfo.ReqSeq];
helper::NdCopy(reinterpret_cast<char*>(ReqInfo.Data), helper::CoreDims(ReqInfo.Start), ReqInfo.Count, true, false, reinterpret_cast<char*>(Req.Data), Req.Start, Req.Count, true, false, ReqInfo.TypeSize);

#define declare_type_set(T) \
if (ReqInfo.varType == helper::GetDataType<T>()) \
{ \
std::vector<T> reqData; \
reqData.resize(ReqInfo.ReqCount); \
std::memcpy(reqData.data(), Req.Data, ReqInfo.ReqCount * sizeof(T)); \
m_KVCacheCommon.set(ReqInfo.CacheKey, reqData); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_set)
#undef declare_type_set

free(ReqInfo.Data);
}
#endif

}

#ifdef ADIOS2_HAVE_KVCACHE // close cache connection
if (getenv("useKVCache"))
{
Expand Down

0 comments on commit dc95c5c

Please sign in to comment.