Skip to content

Commit

Permalink
update generated sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Nov 18, 2024
1 parent fbdba98 commit 2a3824c
Show file tree
Hide file tree
Showing 35 changed files with 1,288 additions and 1,025 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @file ServiceType.h
* @file Constant.h
* @author: yujiechen
* @date 2024-11-07
*/
Expand All @@ -26,4 +26,5 @@ namespace ppc::protocol
{
const static std::string PSI_SERVICE_TYPE = "PSI";
const static std::string MPC_SERVICE_TYPE = "MPC";
const static size_t LARGE_MSG_THRESHOLD = 30 * 1024 * 1024;
} // namespace ppc::protocol
42 changes: 25 additions & 17 deletions cpp/ppc-framework/protocol/GrpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,45 @@ class GrpcConfig
m_compressAlgorithm = compressAlgorithm;
}

uint64_t maxMsgSize() const { return m_maxMsgSize; }
void setMaxMsgSize(uint64_t maxMsgSize)
{
if (maxMsgSize > c_maxMsgSize)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"The maxMsgSize limit is " + std::to_string(c_maxMsgSize)));
}
m_maxMsgSize = maxMsgSize;
}

protected:
bool m_enableHealthCheck = true;
std::string m_loadBalancePolicy = "round_robin";
bool m_enableDnslookup = false;

// Note: grpc use int to set the maxMsgSize
uint64_t const c_maxMsgSize = INT_MAX;

// the max send message size in bytes
uint64_t m_maxSendMessageSize = c_maxMsgSize;
// the max received message size in bytes
uint64_t m_maxReceivedMessageSize = c_maxMsgSize;
// the max msg size
uint64_t m_maxMsgSize = c_maxMsgSize;
int m_compressAlgorithm = 0;
};

class GrpcServerConfig : public GrpcConfig
class GrpcServerConfig
{
public:
using Ptr = std::shared_ptr<GrpcServerConfig>;
GrpcServerConfig() = default;
GrpcServerConfig(EndPoint endPoint, bool enableHealthCheck)
: m_endPoint(std::move(endPoint)), m_enableHealthCheck(enableHealthCheck)
{}
~GrpcServerConfig() override = default;
GrpcServerConfig() { m_grpcConfig = std::make_shared<GrpcConfig>(); }
GrpcServerConfig(EndPoint endPoint, bool enableHealthCheck) : GrpcServerConfig()
{
m_endPoint = std::move(endPoint);
m_enableHealthCheck = enableHealthCheck;
}
virtual ~GrpcServerConfig() = default;

std::string listenEndPoint() const { return m_endPoint.listenEndPoint(); }

Expand All @@ -122,21 +138,13 @@ class GrpcServerConfig : public GrpcConfig
EndPoint& mutableEndPoint() { return m_endPoint; }
bool enableHealthCheck() const { return m_enableHealthCheck; }

uint64_t maxMsgSize() const { return m_maxMsgSize; }
void setMaxMsgSize(uint64_t maxMsgSize)
{
if (maxMsgSize > c_maxMsgSize)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"The maxMsgSize limit is " + std::to_string(c_maxMsgSize)));
}
m_maxMsgSize = maxMsgSize;
}
GrpcConfig::Ptr const& grpcConfig() const { return m_grpcConfig; }

protected:
ppc::protocol::EndPoint m_endPoint;
bool m_enableHealthCheck = true;
uint64_t m_maxMsgSize = c_maxMsgSize;
// the grpc config
GrpcConfig::Ptr m_grpcConfig;
};


Expand Down
30 changes: 22 additions & 8 deletions cpp/ppc-framework/protocol/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ class MessageHeader
class Message
{
public:
const static size_t LARGER_MSG_THRESHOLD = 30 * 1024 * 1024;

using Ptr = std::shared_ptr<Message>;
Message() = default;
virtual ~Message() {}
Expand All @@ -207,7 +205,14 @@ class Message

bool isRespPacket() const { return m_header->isRespPacket(); }
void setRespPacket() { m_header->setRespPacket(); }
void setPayload(std::shared_ptr<bcos::bytes> _payload) { m_payload = std::move(_payload); }
void setPayload(std::shared_ptr<bcos::bytes> _payload)
{
m_payload = std::move(_payload);
if (m_payload)
{
m_payloadLen = m_payload->size();
}
}
// for swig wrapper
OutputBuffer payloadBuffer() const
{
Expand All @@ -218,9 +223,18 @@ class Message
return OutputBuffer{(unsigned char*)m_payload->data(), m_payload->size()};
}

void setFrontMessage(MessagePayload::Ptr frontMessage)
void setFrontMessage(MessagePayload::Ptr frontMessage, bool releasePayload = false)
{
m_frontMessage = std::move(frontMessage);
if (!releasePayload)
{
return;
}
if (m_payload)
{
m_payload->clear();
bcos::bytes().swap(*m_payload);
}
}

MessagePayload::Ptr const& frontMessage() const { return m_frontMessage; }
Expand All @@ -229,10 +243,7 @@ class Message
virtual bool encode(bcos::bytes& _buffer) = 0;
virtual int64_t decode(bcos::bytesConstRef _buffer) = 0;

virtual uint32_t length() const
{
return m_header->length() + (m_payload ? m_payload->size() : 0);
}
virtual uint32_t length() const { return m_header->length() + m_payloadLen; }

virtual std::shared_ptr<bcos::bytes> payload() const { return m_payload; }

Expand All @@ -253,6 +264,9 @@ class Message
MessageHeader::Ptr m_header;
// Note: allocate here in case of wsService nullptr access caused coredump
std::shared_ptr<bcos::bytes> m_payload = std::make_shared<bcos::bytes>();
uint64_t m_payloadLen = 0;
;

MessagePayload::Ptr m_frontMessage = nullptr;
};

Expand Down
10 changes: 10 additions & 0 deletions cpp/wedpr-computing/ppc-psi/src/PSIConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#include "ppc-framework/Helper.h"
#include "ppc-framework/front/FrontInterface.h"
#include "ppc-framework/io/DataResourceLoader.h"
#include "ppc-framework/protocol/Constant.h"
#include "ppc-framework/protocol/Protocol.h"
#include "psi-framework/interfaces/PSIMessageInterface.h"
#include <gperftools/malloc_extension.h>
#include <future>
#include <utility>

Expand Down Expand Up @@ -95,6 +97,14 @@ class PSIConfig
}
},
_responseCallback);
// release the large buffer if no-need to use
if (ppcMsg->data() && ppcMsg->data()->size() > ppc::protocol::LARGE_MSG_THRESHOLD)
{
PSI_LOG(INFO) << LOG_DESC("sendMsg: Release large buffer since the message")
<< LOG_KV("size", ppcMsg->data()->size());
ppcMsg->releasePayload();
MallocExtension::instance()->ReleaseFreeMemory();
}
}

void asyncSendResponse(bcos::bytes const& fromNode, std::string const& _taskID,
Expand Down
63 changes: 35 additions & 28 deletions cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
using namespace ppc::psi;
using namespace bcos;

void MasterCache::addCalculatorCipher(std::string _peerId,
std::map<uint32_t, bcos::bytes>&& _cipherData, uint32_t seq, uint32_t dataBatchCount)
void MasterCache::addCalculatorCipher(std::string _peerId, std::vector<bcos::bytes>&& _cipherData,
std::vector<long> const& dataIndex, uint32_t seq, uint32_t dataBatchCount)
{
auto peerIndex = getPeerIndex(_peerId);
if (peerIndex == -1)
Expand All @@ -39,9 +39,11 @@ void MasterCache::addCalculatorCipher(std::string _peerId,
{
m_calculatorDataBatchCount = dataBatchCount;
}
uint64_t i = 0;
for (auto&& it : _cipherData)
{
updateMasterDataRef(peerIndex, std::move(it.second), it.first);
updateMasterDataRef(peerIndex, std::move(it), dataIndex[i]);
i++;
}
// try to merge the
if (m_calculatorDataBatchCount > 0 &&
Expand All @@ -62,7 +64,7 @@ void MasterCache::addCalculatorCipher(std::string _peerId,
<< LOG_KV("dataBatchCount", m_calculatorDataBatchCount);
// release the cipherData
_cipherData.clear();
std::map<uint32_t, bcos::bytes>().swap(_cipherData);
std::vector<bcos::bytes>().swap(_cipherData);
MallocExtension::instance()->ReleaseFreeMemory();
}

Expand Down Expand Up @@ -184,8 +186,8 @@ bool MasterCache::tryToIntersection()
}
m_cacheState = CacheState::IntersectionProgressing;

ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToIntersection ") << printCacheState()
<< LOG_KV("masterData", m_masterDataRef.size());
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToIntersection ") << printCacheState()
<< LOG_KV("* masterData", m_masterDataRef.size());
auto startT = utcSteadyTime();
// iterator the masterDataRef to obtain intersection
for (auto&& it : m_masterDataRef)
Expand All @@ -208,30 +210,32 @@ bool MasterCache::tryToIntersection()
}
releaseCache();
m_cacheState = CacheState::Intersectioned;
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToIntersection success") << printCacheState()
<< LOG_KV("intersectionSize", m_intersecCipher.size())
<< LOG_KV("timecost", (utcSteadyTime() - startT));
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToIntersection success") << printCacheState()
<< LOG_KV("* intersectionSize", m_intersecCipher.size())
<< LOG_KV("* timecost", (utcSteadyTime() - startT));
return true;
}

std::vector<std::pair<uint64_t, bcos::bytes>> MasterCache::encryptIntersection(
bcos::bytes const& randomKey)
PSIMessageInterface::Ptr MasterCache::encryptIntersection(bcos::bytes const& randomKey)
{
std::vector<std::pair<uint64_t, bcos::bytes>> cipherData(m_intersecCipher.size());
auto message = m_config->psiMsgFactory()->createPSIMessage(
uint32_t(EcdhMultiPSIMessageType::SEND_ENCRYPTED_INTERSECTION_SET_TO_CALCULATOR));
message->setFrom(m_taskState->task()->selfParty()->id());
message->resizeData(m_intersecCipher.size());
tbb::parallel_for(
tbb::blocked_range<size_t>(0U, m_intersecCipher.size()), [&](auto const& range) {
for (auto i = range.begin(); i < range.end(); i++)
{
auto cipherValue =
m_config->eccCrypto()->ecMultiply(m_intersecCipher[i], randomKey);
cipherData[i] = std::make_pair(m_intersecCipherIndex[i], cipherValue);
message->setDataPair(i, m_intersecCipherIndex[i], cipherValue);
}
});
ECDH_MULTI_LOG(INFO) << LOG_DESC("encryptIntersection")
<< LOG_KV("cipherCount", m_intersecCipher.size()) << printCacheState();
// Note: release the m_intersecCipher, make share it not been used after released
releaseItersection();
ECDH_MULTI_LOG(INFO) << LOG_DESC("encryptIntersection")
<< LOG_KV("cipherCount", cipherData.size()) << printCacheState();
return cipherData;
return message;
}

bcos::bytes CalculatorCache::getPlainDataByIndex(uint64_t index)
Expand All @@ -257,8 +261,8 @@ bool CalculatorCache::tryToFinalize()
return false;
}
auto startT = utcSteadyTime();
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: compute intersection")
<< LOG_KV("cipherRef", m_cipherRef.size()) << printCacheState();
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: compute intersection")
<< LOG_KV("* cipherRef", m_cipherRef.size()) << printCacheState();
m_cacheState = CacheState::Finalizing;
// find the intersection
for (auto const& it : m_cipherRef)
Expand All @@ -273,28 +277,28 @@ bool CalculatorCache::tryToFinalize()
}
}
m_cacheState = CacheState::Finalized;
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: compute intersection success")
<< printCacheState() << LOG_KV("cipherRef", m_cipherRef.size())
<< LOG_KV("intersectionSize", m_intersectionResult.size())
<< LOG_KV("timecost", (utcSteadyTime() - startT));
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: compute intersection success")
<< printCacheState() << LOG_KV("* cipherRef", m_cipherRef.size())
<< LOG_KV("* intersectionSize", m_intersectionResult.size())
<< LOG_KV("* timecost", (utcSteadyTime() - startT));

releaseDataAfterFinalize();
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: syncIntersections") << printCacheState();
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: syncIntersections") << printCacheState();
m_cacheState = CacheState::Syncing;
syncIntersections();
m_cacheState = CacheState::Synced;

m_cacheState = CacheState::StoreProgressing;
m_taskState->storePSIResult(m_config->dataResourceLoader(), m_intersectionResult);
m_cacheState = CacheState::Stored;
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: syncIntersections and store success")
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: syncIntersections and store success")
<< printCacheState();
return true;
}

void CalculatorCache::syncIntersections()
{
ECDH_MULTI_LOG(INFO) << LOG_DESC("syncIntersections") << printCacheState();
ECDH_MULTI_LOG(INFO) << LOG_DESC("*** syncIntersections **") << printCacheState();
auto peers = m_taskState->task()->getAllPeerParties();
auto taskID = m_taskState->task()->id();
// notify task result
Expand Down Expand Up @@ -398,21 +402,24 @@ bool CalculatorCache::appendMasterCipher(
return m_receiveAllMasterCipher;
}

void CalculatorCache::setIntersectionCipher(std::map<uint32_t, bcos::bytes>&& _cipherData)
void CalculatorCache::setIntersectionCipher(
std::vector<bcos::bytes>&& _cipherData, std::vector<long> const& dataIndex)
{
ECDH_MULTI_LOG(INFO) << LOG_DESC("setIntersectionCipher")
<< LOG_KV("dataSize", _cipherData.size())
<< LOG_KV("cipherRefSize", m_cipherRef.size()) << printCacheState();
bcos::Guard lock(m_mutex);
uint64_t i = 0;
for (auto&& it : _cipherData)
{
updateCipherRef(std::move(it.second), it.first);
updateCipherRef(std::move(it), dataIndex[i]);
i++;
}
m_receiveIntersection = true;
ECDH_MULTI_LOG(INFO) << LOG_DESC("setIntersectionCipher finshed")
<< LOG_KV("cipherRefSize", m_cipherRef.size()) << printCacheState();
// release the cipherData
_cipherData.clear();
std::map<uint32_t, bcos::bytes>().swap(_cipherData);
std::vector<bcos::bytes>().swap(_cipherData);
MallocExtension::instance()->ReleaseFreeMemory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class MasterCache
<< LOG_KV("taskID", m_taskState->task()->id());
}

void addCalculatorCipher(std::string _peerId, std::map<uint32_t, bcos::bytes>&& _cipherData,
uint32_t seq, uint32_t dataBatchCount);
void addCalculatorCipher(std::string _peerId, std::vector<bcos::bytes>&& _cipherData,
std::vector<long> const& dataIndex, uint32_t seq, uint32_t dataBatchCount);

void addPartnerCipher(std::string _peerId, std::vector<bcos::bytes>&& _cipherData, uint32_t seq,
uint32_t parternerDataCount);
Expand All @@ -83,7 +83,7 @@ class MasterCache
return stringstream.str();
}

std::vector<std::pair<uint64_t, bcos::bytes>> encryptIntersection(bcos::bytes const& randomKey);
PSIMessageInterface::Ptr encryptIntersection(bcos::bytes const& randomKey);

private:
bool shouldIntersection()
Expand Down Expand Up @@ -218,7 +218,8 @@ class CalculatorCache
bool appendMasterCipher(
std::vector<bcos::bytes>&& _cipherData, uint32_t seq, uint32_t dataBatchSize);

void setIntersectionCipher(std::map<uint32_t, bcos::bytes>&& _cipherData);
void setIntersectionCipher(
std::vector<bcos::bytes>&& _cipherData, std::vector<long> const& dataIndex);

void appendPlainData(ppc::io::DataBatch::Ptr const& data)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "EcdhMultiPSIImpl.h"
#include "Common.h"
#include "ppc-framework/protocol/Constant.h"
#include <gperftools/malloc_extension.h>

using namespace ppc::psi;
Expand Down Expand Up @@ -314,10 +315,10 @@ void EcdhMultiPSIImpl::executeWorker()
psiMsg->setUUID(pop_msg->uuid());
ECDH_MULTI_LOG(TRACE) << LOG_DESC("onReceiveMessage") << printPSIMessage(psiMsg)
<< LOG_KV("uuid", psiMsg->uuid());
// release the larger payload immediately
if (payLoad->size() >= ppc::protocol::Message::LARGER_MSG_THRESHOLD)
// release the large payload immediately
if (payLoad && payLoad->size() >= ppc::protocol::LARGE_MSG_THRESHOLD)
{
ECDH_MULTI_LOG(INFO) << LOG_DESC("Release larger message payload")
ECDH_MULTI_LOG(INFO) << LOG_DESC("Release large message payload")
<< LOG_KV("size", payLoad->size());
pop_msg->releasePayload();
MallocExtension::instance()->ReleaseFreeMemory();
Expand Down
Loading

0 comments on commit 2a3824c

Please sign in to comment.