From 95ec36d565a892835ba75e07113f5228465a6256 Mon Sep 17 00:00:00 2001 From: Chang Guo Date: Tue, 9 Jan 2024 22:04:13 -0700 Subject: [PATCH] KVCache for remote v1 --- source/adios2/CMakeLists.txt | 17 +- source/adios2/engine/bp5/BP5Reader.cpp | 42 +++- source/adios2/toolkit/cache/KVCacheBase64.cpp | 66 ++++++ source/adios2/toolkit/cache/KVCacheBase64.h | 25 +++ source/adios2/toolkit/cache/KVCacheCommon.cpp | 113 ----------- source/adios2/toolkit/cache/KVCacheCommon.h | 50 ++++- source/adios2/toolkit/cache/KVCacheCommon.inl | 192 ++++++++++++++++++ 7 files changed, 380 insertions(+), 125 deletions(-) create mode 100644 source/adios2/toolkit/cache/KVCacheBase64.cpp create mode 100644 source/adios2/toolkit/cache/KVCacheBase64.h delete mode 100644 source/adios2/toolkit/cache/KVCacheCommon.cpp create mode 100644 source/adios2/toolkit/cache/KVCacheCommon.inl diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 455bccdfc..6fdf9b05d 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -123,7 +123,12 @@ add_library(adios2_core toolkit/aggregator/mpi/MPIAggregator.cpp toolkit/aggregator/mpi/MPIChain.cpp toolkit/aggregator/mpi/MPIShmChain.cpp - toolkit/cache/KVCacheCommon.cpp toolkit/cache/KVCacheCommon.h) + + toolkit/cache/KVCacheCommon.h + toolkit/cache/KVCacheCommon.inl +# toolkit/cache/KVCacheBase64.h +# toolkit/cache/KVCacheBase64.cpp + ) set_property(TARGET adios2_core PROPERTY EXPORT_NAME core) set_property(TARGET adios2_core PROPERTY OUTPUT_NAME adios2${ADIOS2_LIBRARY_SUFFIX}_core) @@ -207,10 +212,18 @@ endif() set(ADIOS2_USE_Cache True) if (ADIOS2_USE_Cache) # add_subdirectory(toolkit/cache) +# Note: need to export LD_LIBRARY_PATH=/data/gc/ornl/hiredis-1.2.0/build:/data/gc/ornl/openssl:$LD_LIBRARY_PATH set(HIREDIS_DIR "/data/gc/ornl/hiredis-1.2.0/") include_directories(${HIREDIS_DIR}) set(HIREDIS_LIBRARY ${HIREDIS_DIR}/build/libhiredis.so) target_link_libraries(adios2_core PRIVATE hiredis) + + set(OPENSSL_DIR "/data/gc/ornl/openssl/") + set(OPENSSL_INCLUDE_DIR ${OPENSSL_DIR}/include) + set(OPENSSL_LIBRARY_SSL ${OPENSSL_DIR}/libssl.so) + set(OPENSSL_LIBRARY_CRYPTO ${OPENSSL_DIR}/libcrypto.so) + target_include_directories(adios2_core PRIVATE ${OPENSSL_INCLUDE_DIR}) + target_link_libraries(adios2_core PRIVATE ${OPENSSL_LIBRARY_SSL} ${OPENSSL_LIBRARY_CRYPTO}) endif () if(ADIOS2_HAVE_Campaign) @@ -436,7 +449,7 @@ else() endif() # The main "adios2" target provides all interfaces. -add_library(adios2 INTERFACE) +add_library(adios2 INTERFACE toolkit/cache/KVCacheBase64.h) target_link_libraries(adios2 INTERFACE ${maybe_adios2_c_mpi} adios2_c ${maybe_adios2_cxx11_mpi} adios2_cxx11 diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 30b15ce88..5767d6073 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -292,6 +292,9 @@ void BP5Reader::PerformRemoteGets() } for (auto &Req : GetRequests) { + const DataType varType = m_IO.InquireVariableType(Req.VarName); + size_t numOfElements = m_KVCacheCommon.size(Req.Count); + if (getenv("useKVCache")) { std::string cacheKey; @@ -299,17 +302,50 @@ void BP5Reader::PerformRemoteGets() Req.Count, cacheKey); if (m_KVCacheCommon.exists(cacheKey)) { - m_KVCacheCommon.get(cacheKey, Req.Data); - continue; + +#define declare_type_get(T) \ + if (varType == helper::GetDataType()) \ + { \ + std::vector reqData; \ + reqData.resize(numOfElements); \ + m_KVCacheCommon.get(cacheKey, reqData); \ + std::memcpy(Req.Data, reqData.data(), numOfElements * sizeof(T)); \ + } + ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_get) +#undef declare_type_get + continue; } } m_Remote.Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data); + // std::vector temp; + + // temp.resize(size); + // std::memcpy(temp.data(), Req.Data, size * sizeof(int8_t)); + // temp get data from Req.Data +// std::string cacheKey; +// m_KVCacheCommon.keyComposition(Req.VarName, Req.RelStep, Req.BlockID, Req.Start, +// Req.Count, cacheKey); +// std::vector reqData; +// reqData.resize(numOfElements); +// std::memcpy(reqData.data(), Req.Data, numOfElements * sizeof(int8_t)); +// m_KVCacheCommon.set(cacheKey, reqData); + if (getenv("useKVCache")) { std::string cacheKey; m_KVCacheCommon.keyComposition(Req.VarName, Req.RelStep, Req.BlockID, Req.Start, Req.Count, cacheKey); - m_KVCacheCommon.set(cacheKey, Req.Data); + +#define declare_type_set(T) \ + if (varType == helper::GetDataType()) \ + { \ + std::vector reqData; \ + reqData.resize(numOfElements); \ + std::memcpy(reqData.data(), Req.Data, numOfElements * sizeof(T)); \ + m_KVCacheCommon.set(cacheKey, reqData); \ + } + ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_set) +#undef declare_type_set } } if (getenv("useKVCache")) diff --git a/source/adios2/toolkit/cache/KVCacheBase64.cpp b/source/adios2/toolkit/cache/KVCacheBase64.cpp new file mode 100644 index 000000000..5f3fdfee4 --- /dev/null +++ b/source/adios2/toolkit/cache/KVCacheBase64.cpp @@ -0,0 +1,66 @@ +// +// Created by cguo51 on 1/8/24. +// +#include "adios2/toolkit/cache/KVCacheBase64.h" +//namespace adios2 +//{ +std::string base64Encode(const std::vector &data) +{ + BIO *bio, *b64; + BUF_MEM *bptr; + + b64 = BIO_new(BIO_f_base64()); + BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); + bio = BIO_new(BIO_s_mem()); + bio = BIO_push(b64, bio); + + BIO_write(bio, data.data(), static_cast(data.size())); + BIO_flush(bio); + BIO_get_mem_ptr(bio, &bptr); + + std::string result(bptr->data, bptr->length); + + BIO_free_all(bio); + + return result; +} + +std::vector base64Decode(const std::string &encoded) +{ + BIO *bio, *b64; + std::vector result(encoded.size()); + + b64 = BIO_new(BIO_f_base64()); + BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); + bio = BIO_new_mem_buf(encoded.c_str(), static_cast(encoded.size())); + bio = BIO_push(b64, bio); + + int decodedLength = BIO_read(bio, result.data(), static_cast(result.size())); + result.resize(decodedLength); + + BIO_free_all(bio); + + return result; +} + +template +void encodeVector(const std::vector &vec, std::string &encodedString) +{ + std::vector rawData(reinterpret_cast(vec.data()), + reinterpret_cast(vec.data() + vec.size())); + encodedString = base64Encode(rawData); +} + +template +void decodeVector(const std::string &str, std::vector &vec) +{ + std::vector rawData = base64Decode(str); + + // Calculate the number of elements based on the total size of rawData + size_t numElements = rawData.size() / sizeof(T); + + // Construct the result vector using the correct size + vec(reinterpret_cast(rawData.data()), + reinterpret_cast(rawData.data() + numElements * sizeof(T))); +} +//}; // adios2 \ No newline at end of file diff --git a/source/adios2/toolkit/cache/KVCacheBase64.h b/source/adios2/toolkit/cache/KVCacheBase64.h new file mode 100644 index 000000000..3c772e05f --- /dev/null +++ b/source/adios2/toolkit/cache/KVCacheBase64.h @@ -0,0 +1,25 @@ +// +// Created by cguo51 on 1/8/24. +// +#ifndef ADIOS2_KVCACHEBASE64_H +#define ADIOS2_KVCACHEBASE64_H +#include +#include +#include +#include +#include +#include +//namespace adios2 +//{ + +std::string base64Encode(const std::vector& data); + +std::vector base64Decode(const std::string& encoded); + +template +void encodeVector(const std::vector& vec, std::string& encodedString); + +template +void decodeVector(const std::string& str, std::vector& vec); +//}; // adios2 +#endif // ADIOS2_KVCACHEBASE64_H diff --git a/source/adios2/toolkit/cache/KVCacheCommon.cpp b/source/adios2/toolkit/cache/KVCacheCommon.cpp deleted file mode 100644 index 86bd9c5a7..000000000 --- a/source/adios2/toolkit/cache/KVCacheCommon.cpp +++ /dev/null @@ -1,113 +0,0 @@ -// -// Created by cguo51 on 12/30/23. -// - -#include "KVCacheCommon.h" - -namespace adios2 -{ - -void KVCacheCommon::openConnection() -{ - m_redisContext = redisConnect(m_host.c_str(), m_port); - if (m_redisContext == NULL || m_redisContext->err) - { - if (m_redisContext) - { - std::cout << "Error: " << m_redisContext->errstr << std::endl; - redisFree(m_redisContext); - } - else - { - std::cout << "Can't allocate kvcache context" << std::endl; - } - } - else - { - std::cout << "Connected to kvcache server" << std::endl; - } -} - -void KVCacheCommon::closeConnection() -{ - redisFree(m_redisContext); -} - -void KVCacheCommon::set(std::string key, void* Data) -{ - m_key = key; - m_value = (char*)Data; - m_command = "SET " + m_key + " " + m_value; - m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); - if (m_redisReply == NULL) - { - std::cout << "Error: " << m_redisContext->errstr << std::endl; - } - else - { - std::cout << "SET: " << m_redisReply->str << std::endl; - freeReplyObject(m_redisReply); - } -} - -void KVCacheCommon::get(std::string key, void* Data) -{ - m_key = key; - m_command = "GET " + m_key; - m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); - if (m_redisReply == NULL) - { - std::cout << "Error: " << m_redisContext->errstr << std::endl; - } - else - { - std::cout << "GET: " << m_redisReply->str << std::endl; - Data = (void*)m_redisReply->str; - freeReplyObject(m_redisReply); - } -} - -void KVCacheCommon::del(std::string key) -{ - m_key = key; - m_command = "DEL " + m_key; - m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); - if (m_redisReply == NULL) - { - std::cout << "Error: " << m_redisContext->errstr << std::endl; - } - else - { - std::cout << "DEL: " << m_redisReply->str << std::endl; - freeReplyObject(m_redisReply); - } -} - -bool KVCacheCommon::exists(std::string key) -{ - m_key = key; - m_command = "EXISTS " + m_key; - m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); - if (m_redisReply == NULL) - { - std::cout << "Key does not exist" << std::endl; - return false; - } - else - { - std::cout << "EXISTS: " << m_redisReply->str << std::endl; - freeReplyObject(m_redisReply); - return true; - } -} - -void KVCacheCommon::keyComposition(char *VarName, size_t AbsStep, size_t BlockID, Dims Start, Dims Count, std::string &cacheKey) -{ - std::string key = VarName + std::to_string(AbsStep) + std::to_string(BlockID); - std::string box = QueryBox::serializeQueryBox(QueryBox{Start, Count}); - cacheKey = key + box; -} - - - -}; // namespace adios2 diff --git a/source/adios2/toolkit/cache/KVCacheCommon.h b/source/adios2/toolkit/cache/KVCacheCommon.h index c61c1cfa2..27cf8367e 100644 --- a/source/adios2/toolkit/cache/KVCacheCommon.h +++ b/source/adios2/toolkit/cache/KVCacheCommon.h @@ -27,22 +27,58 @@ class KVCacheCommon KVCacheCommon(std::string host="localhost", int port=6379): m_host(host), m_port(port){}; - void openConnection(); + inline void openConnection(); - void closeConnection(); + inline void closeConnection(); - void set(std::string key, void* Data); + template + void set(std::string key, const std::vector& vec); - void get(std::string key, void* Data); + template + void get(std::string key, std::vector& vec); - void del(std::string key); + inline void del(std::string key); - bool exists(std::string key); + inline bool exists(std::string key); - void keyComposition(char *VarName, size_t AbsStep, size_t BlockID, Dims Start, Dims Count, std::string &cacheKey); + inline void keyComposition(char *VarName, size_t AbsStep, size_t BlockID, Dims Start, Dims Count, std::string &cacheKey); + +// template +// void serializeVector(const std::vector& vec, std::string& serializedString) { +// nlohmann::json j = vec; +// serializedString = j.dump(); +// } +// +// template +// void deserializeVector(const std::string& str, std::vector& vec) { +// nlohmann::json j = nlohmann::json::parse(str); +// vec = j.get>(); +// } + + size_t size(Dims Count) const + { + size_t size = 1; + for(auto i: Count) + { + size *= i; + } + return size; + } + + inline std::string base64Encode(const std::vector& data); + + inline std::vector base64Decode(const std::string& encoded); + + template + void encodeVector(const std::vector& vec, std::string& encodedString); + + template + void decodeVector(const std::string& str, std::vector& vec); }; }; // adios2 +#include "KVCacheCommon.inl" + #endif // ADIOS2_KVCACHECOMMON_H diff --git a/source/adios2/toolkit/cache/KVCacheCommon.inl b/source/adios2/toolkit/cache/KVCacheCommon.inl new file mode 100644 index 000000000..f6be9e5c8 --- /dev/null +++ b/source/adios2/toolkit/cache/KVCacheCommon.inl @@ -0,0 +1,192 @@ +// +// Created by cguo51 on 12/30/23. +// +#ifndef KVCACHECOMMON_INL +#define KVCACHECOMMON_INL +#include "adios2/toolkit/cache/KVCacheBase64.h" + +namespace adios2 +{ + +void KVCacheCommon::openConnection() +{ + m_redisContext = redisConnect(m_host.c_str(), m_port); + if (m_redisContext == NULL || m_redisContext->err) + { + if (m_redisContext) + { + std::cout << "Error: " << m_redisContext->errstr << std::endl; + redisFree(m_redisContext); + } + else + { + std::cout << "Can't allocate kvcache context" << std::endl; + } + } + else + { + std::cout << "Connected to kvcache server" << std::endl; + } +} + +void KVCacheCommon::closeConnection() +{ + redisFree(m_redisContext); +} +template +void KVCacheCommon::set(std::string key, const std::vector& vec) +{ + m_key = key; + encodeVector(vec, m_value); +// m_key = "test"; +// m_value = "test"; + m_command = "SET " + m_key + " " + m_value; + m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); + if (m_redisReply == NULL) + { + std::cout << "Error: " << m_redisContext->errstr << std::endl; + } + else + { + std::cout << "SET: " << m_redisReply->str << std::endl; + freeReplyObject(m_redisReply); + } +} +template +void KVCacheCommon::get(std::string key, std::vector& vec) +{ + m_key = key; + m_command = "GET " + m_key; + m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); + if (m_redisReply == NULL) + { + std::cout << "Error: " << m_redisContext->errstr << std::endl; + } + else + { + std::cout << "GET: " << m_redisReply->str << std::endl; + decodeVector(m_redisReply->str, vec); + freeReplyObject(m_redisReply); + } +} + +void KVCacheCommon::del(std::string key) +{ + m_key = key; + m_command = "DEL " + m_key; + m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); + if (m_redisReply == NULL) + { + std::cout << "Error: " << m_redisContext->errstr << std::endl; + } + else + { + std::cout << "DEL: " << m_redisReply->str << std::endl; + freeReplyObject(m_redisReply); + } +} + +bool KVCacheCommon::exists(std::string key) +{ + m_key = key; + m_command = "EXISTS " + m_key; + std::cout << "EXISTS: " << m_command.c_str() << std::endl; + // m_command = "EXISTS mytest"; + m_redisReply = (redisReply *)redisCommand(m_redisContext, m_command.c_str()); + if (m_redisReply == NULL) + { + std::cout << "Key does not exist" << std::endl; + return false; + } + else + { + if (!m_redisReply->integer) + { + std::cout << "Key does not exist" << std::endl; + return false; + } + std::cout << "EXISTS: " << m_redisReply->str << std::endl; + freeReplyObject(m_redisReply); + return true; + } +} + +void KVCacheCommon::keyComposition(char *VarName, size_t AbsStep, size_t BlockID, Dims Start, Dims Count, std::string &cacheKey) +{ + std::string key = VarName + std::to_string(AbsStep) + std::to_string(BlockID); + std::string box = QueryBox::serializeQueryBox(QueryBox{Start, Count}); + cacheKey = key + box; + // replace special characters + std::replace(cacheKey.begin(), cacheKey.end(), '"', '_'); + std::replace(cacheKey.begin(), cacheKey.end(), ',', '_'); + std::replace(cacheKey.begin(), cacheKey.end(), '(', '_'); + std::replace(cacheKey.begin(), cacheKey.end(), ')', '_'); + std::replace(cacheKey.begin(), cacheKey.end(), '[', '_'); + std::replace(cacheKey.begin(), cacheKey.end(), ']', '_'); + std::replace(cacheKey.begin(), cacheKey.end(), '{', '_'); + std::replace(cacheKey.begin(), cacheKey.end(), '}', '_'); +} + +std::string KVCacheCommon::base64Encode(const std::vector &data) +{ + BIO *bio, *b64; + BUF_MEM *bptr; + + b64 = BIO_new(BIO_f_base64()); + BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); + bio = BIO_new(BIO_s_mem()); + bio = BIO_push(b64, bio); + + BIO_write(bio, data.data(), static_cast(data.size())); + BIO_flush(bio); + BIO_get_mem_ptr(bio, &bptr); + + std::string result(bptr->data, bptr->length); + + BIO_free_all(bio); + + return result; +} + +std::vector KVCacheCommon::base64Decode(const std::string &encoded) +{ + BIO *bio, *b64; + std::vector result(encoded.size()); + + b64 = BIO_new(BIO_f_base64()); + BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); + bio = BIO_new_mem_buf(encoded.c_str(), static_cast(encoded.size())); + bio = BIO_push(b64, bio); + + int decodedLength = BIO_read(bio, result.data(), static_cast(result.size())); + result.resize(decodedLength); + + BIO_free_all(bio); + + return result; +} + +template +void KVCacheCommon::encodeVector(const std::vector &vec, std::string &encodedString) +{ + std::vector rawData(reinterpret_cast(vec.data()), + reinterpret_cast(vec.data() + vec.size())); + encodedString = base64Encode(rawData); +} + +template +void KVCacheCommon::decodeVector(const std::string &str, std::vector &vec) +{ + std::vector rawData = base64Decode(str); + + // Calculate the number of elements based on the total size of rawData + size_t numElements = rawData.size() / sizeof(T); + + // Construct the result vector using the correct size + std::vector result(reinterpret_cast(rawData.data()), + reinterpret_cast(rawData.data() + numElements * sizeof(T))); + vec = result; +} + +}; // namespace adios2 +#endif // KVCACHECOMMON_INL \ No newline at end of file