Skip to content

Commit

Permalink
KVCache for remote v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Chang Guo committed Jan 10, 2024
1 parent aa99e9e commit 95ec36d
Show file tree
Hide file tree
Showing 7 changed files with 380 additions and 125 deletions.
17 changes: 15 additions & 2 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ add_library(adios2_core
toolkit/aggregator/mpi/MPIAggregator.cpp
toolkit/aggregator/mpi/MPIChain.cpp
toolkit/aggregator/mpi/MPIShmChain.cpp
toolkit/cache/KVCacheCommon.cpp toolkit/cache/KVCacheCommon.h)

toolkit/cache/KVCacheCommon.h
toolkit/cache/KVCacheCommon.inl
# toolkit/cache/KVCacheBase64.h
# toolkit/cache/KVCacheBase64.cpp
)
set_property(TARGET adios2_core PROPERTY EXPORT_NAME core)
set_property(TARGET adios2_core PROPERTY OUTPUT_NAME adios2${ADIOS2_LIBRARY_SUFFIX}_core)

Expand Down Expand Up @@ -207,10 +212,18 @@ endif()
set(ADIOS2_USE_Cache True)
if (ADIOS2_USE_Cache)
# add_subdirectory(toolkit/cache)
# Note: need to export LD_LIBRARY_PATH=/data/gc/ornl/hiredis-1.2.0/build:/data/gc/ornl/openssl:$LD_LIBRARY_PATH
set(HIREDIS_DIR "/data/gc/ornl/hiredis-1.2.0/")
include_directories(${HIREDIS_DIR})
set(HIREDIS_LIBRARY ${HIREDIS_DIR}/build/libhiredis.so)
target_link_libraries(adios2_core PRIVATE hiredis)

set(OPENSSL_DIR "/data/gc/ornl/openssl/")
set(OPENSSL_INCLUDE_DIR ${OPENSSL_DIR}/include)
set(OPENSSL_LIBRARY_SSL ${OPENSSL_DIR}/libssl.so)
set(OPENSSL_LIBRARY_CRYPTO ${OPENSSL_DIR}/libcrypto.so)
target_include_directories(adios2_core PRIVATE ${OPENSSL_INCLUDE_DIR})
target_link_libraries(adios2_core PRIVATE ${OPENSSL_LIBRARY_SSL} ${OPENSSL_LIBRARY_CRYPTO})
endif ()

if(ADIOS2_HAVE_Campaign)
Expand Down Expand Up @@ -436,7 +449,7 @@ else()
endif()

# The main "adios2" target provides all interfaces.
add_library(adios2 INTERFACE)
add_library(adios2 INTERFACE toolkit/cache/KVCacheBase64.h)
target_link_libraries(adios2 INTERFACE
${maybe_adios2_c_mpi} adios2_c
${maybe_adios2_cxx11_mpi} adios2_cxx11
Expand Down
42 changes: 39 additions & 3 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,60 @@ void BP5Reader::PerformRemoteGets()
}
for (auto &Req : GetRequests)
{
const DataType varType = m_IO.InquireVariableType(Req.VarName);
size_t numOfElements = m_KVCacheCommon.size(Req.Count);

if (getenv("useKVCache"))
{
std::string cacheKey;
m_KVCacheCommon.keyComposition(Req.VarName, Req.RelStep, Req.BlockID, Req.Start,
Req.Count, cacheKey);
if (m_KVCacheCommon.exists(cacheKey))
{
m_KVCacheCommon.get(cacheKey, Req.Data);
continue;

#define declare_type_get(T) \
if (varType == helper::GetDataType<T>()) \
{ \
std::vector<T> reqData; \
reqData.resize(numOfElements); \
m_KVCacheCommon.get(cacheKey, reqData); \
std::memcpy(Req.Data, reqData.data(), numOfElements * sizeof(T)); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_get)
#undef declare_type_get
continue;
}
}
m_Remote.Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
// std::vector<int8_t> temp;

// temp.resize(size);
// std::memcpy(temp.data(), Req.Data, size * sizeof(int8_t));
// temp get data from Req.Data
// std::string cacheKey;
// m_KVCacheCommon.keyComposition(Req.VarName, Req.RelStep, Req.BlockID, Req.Start,
// Req.Count, cacheKey);
// std::vector<int8_t> reqData;
// reqData.resize(numOfElements);
// std::memcpy(reqData.data(), Req.Data, numOfElements * sizeof(int8_t));
// m_KVCacheCommon.set(cacheKey, reqData);

if (getenv("useKVCache"))
{
std::string cacheKey;
m_KVCacheCommon.keyComposition(Req.VarName, Req.RelStep, Req.BlockID, Req.Start,
Req.Count, cacheKey);
m_KVCacheCommon.set(cacheKey, Req.Data);

#define declare_type_set(T) \
if (varType == helper::GetDataType<T>()) \
{ \
std::vector<T> reqData; \
reqData.resize(numOfElements); \
std::memcpy(reqData.data(), Req.Data, numOfElements * sizeof(T)); \
m_KVCacheCommon.set(cacheKey, reqData); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type_set)
#undef declare_type_set
}
}
if (getenv("useKVCache"))
Expand Down
66 changes: 66 additions & 0 deletions source/adios2/toolkit/cache/KVCacheBase64.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Created by cguo51 on 1/8/24.
//
#include "adios2/toolkit/cache/KVCacheBase64.h"
//namespace adios2
//{
std::string base64Encode(const std::vector<char> &data)
{
BIO *bio, *b64;
BUF_MEM *bptr;

b64 = BIO_new(BIO_f_base64());
BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
bio = BIO_new(BIO_s_mem());
bio = BIO_push(b64, bio);

BIO_write(bio, data.data(), static_cast<int>(data.size()));
BIO_flush(bio);
BIO_get_mem_ptr(bio, &bptr);

std::string result(bptr->data, bptr->length);

BIO_free_all(bio);

return result;
}

std::vector<char> base64Decode(const std::string &encoded)
{
BIO *bio, *b64;
std::vector<char> result(encoded.size());

b64 = BIO_new(BIO_f_base64());
BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
bio = BIO_new_mem_buf(encoded.c_str(), static_cast<int>(encoded.size()));
bio = BIO_push(b64, bio);

int decodedLength = BIO_read(bio, result.data(), static_cast<int>(result.size()));
result.resize(decodedLength);

BIO_free_all(bio);

return result;
}

template <typename T>
void encodeVector(const std::vector<T> &vec, std::string &encodedString)
{
std::vector<char> rawData(reinterpret_cast<const char *>(vec.data()),
reinterpret_cast<const char *>(vec.data() + vec.size()));
encodedString = base64Encode(rawData);
}

template <typename T>
void decodeVector(const std::string &str, std::vector<T> &vec)
{
std::vector<char> rawData = base64Decode(str);

// Calculate the number of elements based on the total size of rawData
size_t numElements = rawData.size() / sizeof(T);

// Construct the result vector using the correct size
vec(reinterpret_cast<const T *>(rawData.data()),
reinterpret_cast<const T *>(rawData.data() + numElements * sizeof(T)));
}
//}; // adios2
25 changes: 25 additions & 0 deletions source/adios2/toolkit/cache/KVCacheBase64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//
// Created by cguo51 on 1/8/24.
//
#ifndef ADIOS2_KVCACHEBASE64_H
#define ADIOS2_KVCACHEBASE64_H
#include <iostream>
#include <vector>
#include <string>
#include <openssl/bio.h>
#include <openssl/evp.h>
#include <openssl/buffer.h>
//namespace adios2
//{

std::string base64Encode(const std::vector<char>& data);

std::vector<char> base64Decode(const std::string& encoded);

template <typename T>
void encodeVector(const std::vector<T>& vec, std::string& encodedString);

template <typename T>
void decodeVector(const std::string& str, std::vector<T>& vec);
//}; // adios2
#endif // ADIOS2_KVCACHEBASE64_H
113 changes: 0 additions & 113 deletions source/adios2/toolkit/cache/KVCacheCommon.cpp

This file was deleted.

50 changes: 43 additions & 7 deletions source/adios2/toolkit/cache/KVCacheCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,58 @@ class KVCacheCommon

KVCacheCommon(std::string host="localhost", int port=6379): m_host(host), m_port(port){};

void openConnection();
inline void openConnection();

void closeConnection();
inline void closeConnection();

void set(std::string key, void* Data);
template <typename T>
void set(std::string key, const std::vector<T>& vec);

void get(std::string key, void* Data);
template <typename T>
void get(std::string key, std::vector<T>& vec);

void del(std::string key);
inline void del(std::string key);

bool exists(std::string key);
inline bool exists(std::string key);

void keyComposition(char *VarName, size_t AbsStep, size_t BlockID, Dims Start, Dims Count, std::string &cacheKey);
inline void keyComposition(char *VarName, size_t AbsStep, size_t BlockID, Dims Start, Dims Count, std::string &cacheKey);

// template <typename T>
// void serializeVector(const std::vector<T>& vec, std::string& serializedString) {
// nlohmann::json j = vec;
// serializedString = j.dump();
// }
//
// template <typename T>
// void deserializeVector(const std::string& str, std::vector<T>& vec) {
// nlohmann::json j = nlohmann::json::parse(str);
// vec = j.get<std::vector<T>>();
// }

size_t size(Dims Count) const
{
size_t size = 1;
for(auto i: Count)
{
size *= i;
}
return size;
}

inline std::string base64Encode(const std::vector<char>& data);

inline std::vector<char> base64Decode(const std::string& encoded);

template <typename T>
void encodeVector(const std::vector<T>& vec, std::string& encodedString);

template <typename T>
void decodeVector(const std::string& str, std::vector<T>& vec);
};


}; // adios2

#include "KVCacheCommon.inl"

#endif // ADIOS2_KVCACHECOMMON_H
Loading

0 comments on commit 95ec36d

Please sign in to comment.