Skip to content

Commit

Permalink
add exception handle when send message failed for ecdh-multi-psi (#92)
Browse files Browse the repository at this point in the history
* add exception handle when send message failed for ecdh-multi-psi

* python transport sdk support initLog
  • Loading branch information
cyjseagull authored Nov 18, 2024
1 parent c3be9d5 commit 427d4ef
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 41 deletions.
4 changes: 1 addition & 3 deletions cpp/wedpr-computing/ppc-pir/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ set(TEST_BINARY_NAME test-ppc-pir)

add_executable(${TEST_BINARY_NAME} ${SOURCES})
target_include_directories(${TEST_BINARY_NAME} PRIVATE .)
# target_link_libraries(${PIR_TARGET} PUBLIC ${IO_TARGET} ${FRONT_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} TBB::tbb TCMalloc)

# target_link_libraries(${TEST_BINARY_NAME} ${PIR_TARGET} ${RPC_TARGET} ${CRYPTO_TARGET} ${BOOST_UNIT_TEST})
target_link_libraries(${TEST_BINARY_NAME} PUBLIC ${IO_TARGET} ${FRONT_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} ${PIR_TARGET} ${RPC_TARGET} ${CRYPTO_TARGET} ${PROTOCOL_TARGET} ${BOOST_UNIT_TEST})
target_link_libraries(${TEST_BINARY_NAME} PUBLIC ${CRYPTO_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} ${PIR_TARGET} ${PROTOCOL_TARGET} ${BOOST_UNIT_TEST})
add_test(NAME test-pir WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND ${TEST_BINARY_NAME})
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ void CM2020PSIReceiver::prepareInputs()
m_originLocations.resize(m_rInputSize);
m_oprfOutputs.reserve(m_rInputSize);
m_oprfOutputs.resize(m_rInputSize);

CM2020_PSI_LOG(INFO) << LOG_BADGE("prepareInputs success") << LOG_KV("taskID", m_taskID)
<< LOG_KV("inputSize", m_rInputSize);
syncInputsSize();
}
catch (const std::exception& e)
Expand All @@ -187,7 +188,7 @@ void CM2020PSIReceiver::syncInputsSize()
encodeUnsignedNum(data, m_rInputSize);
CM2020_PSI_LOG(TRACE) << LOG_BADGE("syncInputsSize") << LOG_KV("taskID", m_taskID)
<< LOG_KV("data", *bcos::toHexString(*data))
<< LOG_KV("m_rInputSize", m_rInputSize);
<< LOG_KV("inputSize", m_rInputSize);
auto message = m_config->ppcMsgFactory()->buildPPCMessage(
uint8_t(TaskType::PSI), uint8_t(TaskAlgorithmType::CM_PSI_2PC), m_taskID, data);
message->setMessageType(uint8_t(CM2020PSIMessageType::RECEIVER_SIZE));
Expand Down Expand Up @@ -242,7 +243,8 @@ void CM2020PSIReceiver::onBatchPointBReceived(PPCMessageFace::Ptr _message)
{
return;
}
CM2020_PSI_LOG(INFO) << LOG_BADGE("handleBatchPointB") << LOG_KV("taskID", m_taskID);
CM2020_PSI_LOG(INFO) << LOG_BADGE("handleBatchPointB") << LOG_KV("taskID", m_taskID)
<< LOG_KV("payloadSize", _message->data()->size());

try
{
Expand All @@ -261,7 +263,8 @@ void CM2020PSIReceiver::onSenderSizeReceived(front::PPCMessageFace::Ptr _message
{
return;
}
CM2020_PSI_LOG(INFO) << LOG_BADGE("onSenderSizeReceived") << LOG_KV("taskID", m_taskID);
CM2020_PSI_LOG(INFO) << LOG_BADGE("onSenderSizeReceived") << LOG_KV("taskID", m_taskID)
<< LOG_KV("payloadSize", _message->data()->size());
try
{
decodeUnsignedNum(m_sInputSize, _message->data());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ void CM2020PSISender::onReceiverSizeReceived(front::PPCMessageFace::Ptr _message
{
decodeUnsignedNum(m_rInputSize, _message->data());
m_progress->mark<std::string>("RECEIVE_SIZE");
CM2020_PSI_LOG(INFO) << LOG_BADGE("onReceiverSizeReceived") << LOG_KV("taskID", m_taskID)
<< LOG_KV("inputSize", m_rInputSize);
}
catch (const std::exception& e)
{
Expand Down Expand Up @@ -238,6 +240,10 @@ void CM2020PSISender::onPointAReceived(front::PPCMessageFace::Ptr _message)

auto retPair = m_ot->receiverGeneratePointsB(m_otChoices, pointA);

CM2020_PSI_LOG(INFO) << LOG_BADGE("onPointAReceived and send pointB")
<< LOG_KV("taskID", m_taskID)
<< LOG_KV("dataSize", retPair.first->size())
<< LOG_KV("otNumber", m_otChoices->size());
// send batch point B to ot sender
auto message = m_config->ppcMsgFactory()->buildPPCMessage(uint8_t(TaskType::PSI),
uint8_t(TaskAlgorithmType::CM_PSI_2PC), m_taskID, retPair.first);
Expand Down
29 changes: 24 additions & 5 deletions cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,26 @@ void MasterCache::encryptAndSendIntersection(uint64_t dataBatchIdx, bcos::bytes
});
auto taskID = m_taskState->task()->id();
auto self = weak_from_this();
for (auto const& calcultor : calculators)
for (auto const& calculator : calculators)
{
m_config->generateAndSendPPCMessage(
calcultor.first, taskID, message,
[self](bcos::Error::Ptr&& _error) {
if (!_error)
calculator.first, taskID, message,
[self, calculator](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
auto cache = self.lock();
if (!cache)
{
return;
}
ECDH_MULTI_LOG(WARNING)
<< LOG_DESC("encryptAndSendIntersection: send message to calcultor failed")
<< LOG_KV("task", cache->m_taskState->task()->id())
<< LOG_KV("calculator", calculator.first) << LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
cache->m_taskState->onTaskException(_error->errorMessage());
},
dataBatchIdx);
}
Expand Down Expand Up @@ -317,10 +328,16 @@ bool CalculatorCache::tryToFinalize()
{
continue;
}
if (it.second.plainDataIndex >= 0)
if (it.second.plainDataIndex > 0)
{
m_intersectionResult.emplace_back(getPlainDataByIndex(it.second.plainDataIndex));
}
// means the header field, swap with the first element
if (it.second.plainDataIndex == 0)
{
m_intersectionResult.emplace_back(m_intersectionResult[0]);
m_intersectionResult[0] = getPlainDataByIndex(it.second.plainDataIndex);
}
}
m_cacheState = CacheState::Finalized;
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: compute intersection success")
Expand Down Expand Up @@ -356,6 +373,7 @@ void CalculatorCache::syncIntersections()
message->setVersion(-1);
for (auto& peer : peers)
{
// Note: sync task failed will not change the task status
m_config->generateAndSendPPCMessage(
peer.first, taskID, message,
[taskID, peer](bcos::Error::Ptr&& _error) {
Expand All @@ -381,6 +399,7 @@ void CalculatorCache::syncIntersections()
message->setVersion(0);
for (auto& peer : peers)
{
// Note: sync task failed will not change the task status
m_config->generateAndSendPPCMessage(
peer.first, taskID, message,
[taskID, peer](bcos::Error::Ptr&& _error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,36 @@ bcos::bytes EcdhMultiPSICalculator::generateRandomA(std::string _taskID)
message->setData(std::vector<bcos::bytes>{randomValue});
message->setDataBatchCount(randomValue.size());
message->setFrom(m_taskState->task()->selfParty()->id());
auto self = weak_from_this();
for (auto const& partner : m_partnerParties)
{
ECDH_CAL_LOG(INFO) << LOG_KV("PART1: Calculator generateRandomA to ", partner.first)
ECDH_CAL_LOG(INFO) << LOG_DESC("send generateRandomA to partner")
<< LOG_KV("partner", partner.first)
<< LOG_KV(" Random: ", *toHexString(randomValue));
m_config->generateAndSendPPCMessage(
partner.first, _taskID, message,
[self = weak_from_this(), partner](bcos::Error::Ptr&& _error) {
if (!_error)
[self, _taskID, partner](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
ECDH_CAL_LOG(INFO)
<< LOG_KV("PART1: Calculator generateRandomA success to ", partner.first);
<< LOG_DESC("send generated randomA to partner success")
<< LOG_KV("task", _taskID) << LOG_KV("partner", partner.first);
return;
}
auto psi = self.lock();
if (!psi)
{
return;
}
if (!psi->m_taskState)
{
return;
}
ECDH_CAL_LOG(WARNING)
<< LOG_DESC("send generated randomA to partner failed")
<< LOG_KV("task", _taskID) << LOG_KV("partner", partner.first)
<< LOG_KV("code", _error->errorCode()) << LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
0);
}
Expand Down Expand Up @@ -147,12 +159,13 @@ void EcdhMultiPSICalculator::blindData(std::string _taskID, bcos::bytes _randA)
message->setDataBatchCount(0);
}
// send cipher
auto self = weak_from_this();
for (auto const& master : m_masterParties)
{
m_config->generateAndSendPPCMessage(
master.first, _taskID, message,
[self = weak_from_this(), master](bcos::Error::Ptr&& _error) {
if (!_error)
[self, _taskID, master](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
Expand All @@ -161,6 +174,12 @@ void EcdhMultiPSICalculator::blindData(std::string _taskID, bcos::bytes _randA)
{
return;
}
ECDH_CAL_LOG(WARNING)
<< LOG_DESC("send blindedData to master failed")
<< LOG_KV("task", _taskID) << LOG_KV("master", master.first)
<< LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
seq);
dataOffset += dataBatch->size();
Expand Down Expand Up @@ -215,22 +234,23 @@ void EcdhMultiPSICalculator::onReceiveIntersecCipher(PSIMessageInterface::Ptr _m
message->setFrom(m_taskState->task()->selfParty()->id());
// try to finalize
auto ret = m_calculatorCache->tryToFinalize();
auto taskID = m_taskID;
for (auto const& master : m_masterParties)
{
ECDH_CAL_LOG(INFO) << LOG_DESC("onReceiveIntersecCipher: send response to the master")
<< LOG_KV("master", master.first) << printPSIMessage(_msg);
// no any bad influences when send response failed
m_config->generateAndSendPPCMessage(
master.first, m_taskID, message,
[self = weak_from_this()](bcos::Error::Ptr&& _error) {
if (!_error)
{
return;
}
auto psi = self.lock();
if (!psi)
master.first, taskID, message,
[taskID](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
ECDH_CAL_LOG(WARNING)
<< LOG_DESC("onReceiveIntersecCipher: send response to the master failed")
<< LOG_KV("task", taskID) << LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
},
0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,28 @@ void EcdhMultiPSIMaster::blindData()
message->setDataBatchCount(0);
}
message->setFrom(m_taskState->task()->selfParty()->id());
auto self = weak_from_this();
for (auto const& calcultor : m_calculatorParties)
{
// TODO: handle the send failed case
m_config->generateAndSendPPCMessage(
calcultor.first, m_taskID, message,
[](bcos::Error::Ptr&& _error) {
if (_error)
[self, seq, calcultor](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
auto psi = self.lock();
if (!psi)
{
return;
}
ECDH_MASTER_LOG(WARNING)
<< LOG_DESC("send blindedData to calculator failed")
<< LOG_KV("calculator", calcultor.first) << LOG_KV("seq", seq)
<< LOG_KV("task", psi->m_taskState->task()->id())
<< LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
seq);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ void EcdhMultiPSIPartner::onReceiveRandomA(bcos::bytesPointer _randA)
message->setDataBatchCount(0);
}
// generate and send encryptedHashSet
auto self = weak_from_this();
for (auto const& master : m_masterParties)
{
m_config->generateAndSendPPCMessage(
master.first, m_taskState->task()->id(), message,
[self = weak_from_this()](bcos::Error::Ptr&& _error) {
if (!_error)
[self, master, seq](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
Expand All @@ -131,6 +132,13 @@ void EcdhMultiPSIPartner::onReceiveRandomA(bcos::bytesPointer _randA)
{
return;
}
ECDH_PARTNER_LOG(WARNING)
<< LOG_DESC("send blinded data to master failed") << LOG_KV("seq", seq)
<< LOG_KV("master", master.first)
<< LOG_KV("task", psi->m_taskState->task()->id())
<< LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
seq);
}
Expand Down
13 changes: 13 additions & 0 deletions cpp/wedpr-computing/ppc-psi/src/psi-framework/TaskState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,19 @@ void TaskState::onTaskException(std::string const& _errorMsg)
<< LOG_KV("msg", boost::diagnostic_information(e));
}
}
if (m_notifyPeerFinishHandler)
{
try
{
m_notifyPeerFinishHandler();
}
catch (std::exception const& e)
{
PSI_LOG(WARNING) << LOG_DESC("notifyPeerFinish exception")
<< LOG_KV("taskID", m_task->id())
<< LOG_KV("msg", boost::diagnostic_information(e));
}
}
auto taskResult = std::make_shared<TaskResult>(m_task->id());
auto msg = "Task " + m_task->id() + " exception, error : " + _errorMsg;
auto error = std::make_shared<bcos::Error>(-1, msg);
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-protocol/protocol/src/PPCMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class PPCMessage : public PPCMessageFace

using Ptr = std::shared_ptr<PPCMessage>;
PPCMessage() { m_data = std::make_shared<bcos::bytes>(); }
~PPCMessage() override { releasePayload(); }
// Note: the payload passed in by the upper layer cannot be released at will
~PPCMessage() override = default;

uint8_t version() const override { return m_version; }
void setVersion(uint8_t _version) override { m_version = _version; }
Expand Down
2 changes: 2 additions & 0 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace
m_front->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_TOPIC, routeInfo,
bcos::bytesConstRef((bcos::byte*)data.data(), data.size()), _message->seq(), _timeout,
_callback, msgCallback);
// release the data
bcos::bytes().swap(data);
}

// send response when receiving message from given agencyID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __repr__(self):

class TransportLoader:
transport_builder = TransportBuilder()
transport_builder.initLog("conf/wedpr_sdk_log_config.ini")

@staticmethod
def load(transport_config: TransportConfig) -> Transport:
Expand Down
4 changes: 2 additions & 2 deletions python/ppc_model/conf/logging.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ keys=root,wsgi

[logger_root]
level=INFO
handlers=consoleHandler,fileHandler
handlers=fileHandler

[logger_wsgi]
level = INFO
Expand All @@ -12,7 +12,7 @@ qualname = wsgi
propagate = 0

[handlers]
keys=fileHandler,consoleHandler,accessHandler
keys=fileHandler,accessHandler

[handler_accessHandler]
class=handlers.TimedRotatingFileHandler
Expand Down
17 changes: 17 additions & 0 deletions python/ppc_model/conf/wedpr_sdk_log_config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[log]
enable = true
log_path = ./log
; info debug trace
level = info
; MB
max_log_file_size = 200
; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message
;format = %Severity%|ppcs-gateway|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
format = %Severity%|%TimeStamp%|%Message%
enable_rotate_by_hour = true
; if archive_path is empty, the archive function will be disabled
; archive_path = ./log/
;compress_archive_file = true
; ; 0 = no limit, in MB
; max_archive_size = 0
; min_free_space = 0
2 changes: 1 addition & 1 deletion python/ppc_model/ppc_model_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def register_task_handler():
TransLogger(app, setup_console_handler=False), numthreads=2)

protocol = 'http'
message = f"Starting ppc model server at {protocol}://{app.config['HOST']}:{app.config['HTTP_PORT']} successfully"
message = f"* Starting ppc model server at {protocol}://{app.config['HOST']}:{app.config['HTTP_PORT']} successfully"
print(message)
components.logger().info(message)
server.start()
Expand Down
Loading

0 comments on commit 427d4ef

Please sign in to comment.