Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add exception handle when send message failed for ecdh-multi-psi #92

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cpp/wedpr-computing/ppc-pir/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ set(TEST_BINARY_NAME test-ppc-pir)

add_executable(${TEST_BINARY_NAME} ${SOURCES})
target_include_directories(${TEST_BINARY_NAME} PRIVATE .)
# target_link_libraries(${PIR_TARGET} PUBLIC ${IO_TARGET} ${FRONT_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} TBB::tbb TCMalloc)

# target_link_libraries(${TEST_BINARY_NAME} ${PIR_TARGET} ${RPC_TARGET} ${CRYPTO_TARGET} ${BOOST_UNIT_TEST})
target_link_libraries(${TEST_BINARY_NAME} PUBLIC ${IO_TARGET} ${FRONT_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} ${PIR_TARGET} ${RPC_TARGET} ${CRYPTO_TARGET} ${PROTOCOL_TARGET} ${BOOST_UNIT_TEST})
target_link_libraries(${TEST_BINARY_NAME} PUBLIC ${CRYPTO_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} ${PIR_TARGET} ${PROTOCOL_TARGET} ${BOOST_UNIT_TEST})
add_test(NAME test-pir WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND ${TEST_BINARY_NAME})
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ void CM2020PSIReceiver::prepareInputs()
m_originLocations.resize(m_rInputSize);
m_oprfOutputs.reserve(m_rInputSize);
m_oprfOutputs.resize(m_rInputSize);

CM2020_PSI_LOG(INFO) << LOG_BADGE("prepareInputs success") << LOG_KV("taskID", m_taskID)
<< LOG_KV("inputSize", m_rInputSize);
syncInputsSize();
}
catch (const std::exception& e)
Expand All @@ -187,7 +188,7 @@ void CM2020PSIReceiver::syncInputsSize()
encodeUnsignedNum(data, m_rInputSize);
CM2020_PSI_LOG(TRACE) << LOG_BADGE("syncInputsSize") << LOG_KV("taskID", m_taskID)
<< LOG_KV("data", *bcos::toHexString(*data))
<< LOG_KV("m_rInputSize", m_rInputSize);
<< LOG_KV("inputSize", m_rInputSize);
auto message = m_config->ppcMsgFactory()->buildPPCMessage(
uint8_t(TaskType::PSI), uint8_t(TaskAlgorithmType::CM_PSI_2PC), m_taskID, data);
message->setMessageType(uint8_t(CM2020PSIMessageType::RECEIVER_SIZE));
Expand Down Expand Up @@ -242,7 +243,8 @@ void CM2020PSIReceiver::onBatchPointBReceived(PPCMessageFace::Ptr _message)
{
return;
}
CM2020_PSI_LOG(INFO) << LOG_BADGE("handleBatchPointB") << LOG_KV("taskID", m_taskID);
CM2020_PSI_LOG(INFO) << LOG_BADGE("handleBatchPointB") << LOG_KV("taskID", m_taskID)
<< LOG_KV("payloadSize", _message->data()->size());

try
{
Expand All @@ -261,7 +263,8 @@ void CM2020PSIReceiver::onSenderSizeReceived(front::PPCMessageFace::Ptr _message
{
return;
}
CM2020_PSI_LOG(INFO) << LOG_BADGE("onSenderSizeReceived") << LOG_KV("taskID", m_taskID);
CM2020_PSI_LOG(INFO) << LOG_BADGE("onSenderSizeReceived") << LOG_KV("taskID", m_taskID)
<< LOG_KV("payloadSize", _message->data()->size());
try
{
decodeUnsignedNum(m_sInputSize, _message->data());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ void CM2020PSISender::onReceiverSizeReceived(front::PPCMessageFace::Ptr _message
{
decodeUnsignedNum(m_rInputSize, _message->data());
m_progress->mark<std::string>("RECEIVE_SIZE");
CM2020_PSI_LOG(INFO) << LOG_BADGE("onReceiverSizeReceived") << LOG_KV("taskID", m_taskID)
<< LOG_KV("inputSize", m_rInputSize);
}
catch (const std::exception& e)
{
Expand Down Expand Up @@ -238,6 +240,10 @@ void CM2020PSISender::onPointAReceived(front::PPCMessageFace::Ptr _message)

auto retPair = m_ot->receiverGeneratePointsB(m_otChoices, pointA);

CM2020_PSI_LOG(INFO) << LOG_BADGE("onPointAReceived and send pointB")
<< LOG_KV("taskID", m_taskID)
<< LOG_KV("dataSize", retPair.first->size())
<< LOG_KV("otNumber", m_otChoices->size());
// send batch point B to ot sender
auto message = m_config->ppcMsgFactory()->buildPPCMessage(uint8_t(TaskType::PSI),
uint8_t(TaskAlgorithmType::CM_PSI_2PC), m_taskID, retPair.first);
Expand Down
29 changes: 24 additions & 5 deletions cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,26 @@ void MasterCache::encryptAndSendIntersection(uint64_t dataBatchIdx, bcos::bytes
});
auto taskID = m_taskState->task()->id();
auto self = weak_from_this();
for (auto const& calcultor : calculators)
for (auto const& calculator : calculators)
{
m_config->generateAndSendPPCMessage(
calcultor.first, taskID, message,
[self](bcos::Error::Ptr&& _error) {
if (!_error)
calculator.first, taskID, message,
[self, calculator](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
auto cache = self.lock();
if (!cache)
{
return;
}
ECDH_MULTI_LOG(WARNING)
<< LOG_DESC("encryptAndSendIntersection: send message to calcultor failed")
<< LOG_KV("task", cache->m_taskState->task()->id())
<< LOG_KV("calculator", calculator.first) << LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
cache->m_taskState->onTaskException(_error->errorMessage());
},
dataBatchIdx);
}
Expand Down Expand Up @@ -317,10 +328,16 @@ bool CalculatorCache::tryToFinalize()
{
continue;
}
if (it.second.plainDataIndex >= 0)
if (it.second.plainDataIndex > 0)
{
m_intersectionResult.emplace_back(getPlainDataByIndex(it.second.plainDataIndex));
}
// means the header field, swap with the first element
if (it.second.plainDataIndex == 0)
{
m_intersectionResult.emplace_back(m_intersectionResult[0]);
m_intersectionResult[0] = getPlainDataByIndex(it.second.plainDataIndex);
}
}
m_cacheState = CacheState::Finalized;
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: compute intersection success")
Expand Down Expand Up @@ -356,6 +373,7 @@ void CalculatorCache::syncIntersections()
message->setVersion(-1);
for (auto& peer : peers)
{
// Note: sync task failed will not change the task status
m_config->generateAndSendPPCMessage(
peer.first, taskID, message,
[taskID, peer](bcos::Error::Ptr&& _error) {
Expand All @@ -381,6 +399,7 @@ void CalculatorCache::syncIntersections()
message->setVersion(0);
for (auto& peer : peers)
{
// Note: sync task failed will not change the task status
m_config->generateAndSendPPCMessage(
peer.first, taskID, message,
[taskID, peer](bcos::Error::Ptr&& _error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,36 @@ bcos::bytes EcdhMultiPSICalculator::generateRandomA(std::string _taskID)
message->setData(std::vector<bcos::bytes>{randomValue});
message->setDataBatchCount(randomValue.size());
message->setFrom(m_taskState->task()->selfParty()->id());
auto self = weak_from_this();
for (auto const& partner : m_partnerParties)
{
ECDH_CAL_LOG(INFO) << LOG_KV("PART1: Calculator generateRandomA to ", partner.first)
ECDH_CAL_LOG(INFO) << LOG_DESC("send generateRandomA to partner")
<< LOG_KV("partner", partner.first)
<< LOG_KV(" Random: ", *toHexString(randomValue));
m_config->generateAndSendPPCMessage(
partner.first, _taskID, message,
[self = weak_from_this(), partner](bcos::Error::Ptr&& _error) {
if (!_error)
[self, _taskID, partner](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
ECDH_CAL_LOG(INFO)
<< LOG_KV("PART1: Calculator generateRandomA success to ", partner.first);
<< LOG_DESC("send generated randomA to partner success")
<< LOG_KV("task", _taskID) << LOG_KV("partner", partner.first);
return;
}
auto psi = self.lock();
if (!psi)
{
return;
}
if (!psi->m_taskState)
{
return;
}
ECDH_CAL_LOG(WARNING)
<< LOG_DESC("send generated randomA to partner failed")
<< LOG_KV("task", _taskID) << LOG_KV("partner", partner.first)
<< LOG_KV("code", _error->errorCode()) << LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
0);
}
Expand Down Expand Up @@ -147,12 +159,13 @@ void EcdhMultiPSICalculator::blindData(std::string _taskID, bcos::bytes _randA)
message->setDataBatchCount(0);
}
// send cipher
auto self = weak_from_this();
for (auto const& master : m_masterParties)
{
m_config->generateAndSendPPCMessage(
master.first, _taskID, message,
[self = weak_from_this(), master](bcos::Error::Ptr&& _error) {
if (!_error)
[self, _taskID, master](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
Expand All @@ -161,6 +174,12 @@ void EcdhMultiPSICalculator::blindData(std::string _taskID, bcos::bytes _randA)
{
return;
}
ECDH_CAL_LOG(WARNING)
<< LOG_DESC("send blindedData to master failed")
<< LOG_KV("task", _taskID) << LOG_KV("master", master.first)
<< LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
seq);
dataOffset += dataBatch->size();
Expand Down Expand Up @@ -215,22 +234,23 @@ void EcdhMultiPSICalculator::onReceiveIntersecCipher(PSIMessageInterface::Ptr _m
message->setFrom(m_taskState->task()->selfParty()->id());
// try to finalize
auto ret = m_calculatorCache->tryToFinalize();
auto taskID = m_taskID;
for (auto const& master : m_masterParties)
{
ECDH_CAL_LOG(INFO) << LOG_DESC("onReceiveIntersecCipher: send response to the master")
<< LOG_KV("master", master.first) << printPSIMessage(_msg);
// no any bad influences when send response failed
m_config->generateAndSendPPCMessage(
master.first, m_taskID, message,
[self = weak_from_this()](bcos::Error::Ptr&& _error) {
if (!_error)
{
return;
}
auto psi = self.lock();
if (!psi)
master.first, taskID, message,
[taskID](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
ECDH_CAL_LOG(WARNING)
<< LOG_DESC("onReceiveIntersecCipher: send response to the master failed")
<< LOG_KV("task", taskID) << LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
},
0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,28 @@ void EcdhMultiPSIMaster::blindData()
message->setDataBatchCount(0);
}
message->setFrom(m_taskState->task()->selfParty()->id());
auto self = weak_from_this();
for (auto const& calcultor : m_calculatorParties)
{
// TODO: handle the send failed case
m_config->generateAndSendPPCMessage(
calcultor.first, m_taskID, message,
[](bcos::Error::Ptr&& _error) {
if (_error)
[self, seq, calcultor](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
auto psi = self.lock();
if (!psi)
{
return;
}
ECDH_MASTER_LOG(WARNING)
<< LOG_DESC("send blindedData to calculator failed")
<< LOG_KV("calculator", calcultor.first) << LOG_KV("seq", seq)
<< LOG_KV("task", psi->m_taskState->task()->id())
<< LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
seq);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ void EcdhMultiPSIPartner::onReceiveRandomA(bcos::bytesPointer _randA)
message->setDataBatchCount(0);
}
// generate and send encryptedHashSet
auto self = weak_from_this();
for (auto const& master : m_masterParties)
{
m_config->generateAndSendPPCMessage(
master.first, m_taskState->task()->id(), message,
[self = weak_from_this()](bcos::Error::Ptr&& _error) {
if (!_error)
[self, master, seq](bcos::Error::Ptr&& _error) {
if (!_error || _error->errorCode() == 0)
{
return;
}
Expand All @@ -131,6 +132,13 @@ void EcdhMultiPSIPartner::onReceiveRandomA(bcos::bytesPointer _randA)
{
return;
}
ECDH_PARTNER_LOG(WARNING)
<< LOG_DESC("send blinded data to master failed") << LOG_KV("seq", seq)
<< LOG_KV("master", master.first)
<< LOG_KV("task", psi->m_taskState->task()->id())
<< LOG_KV("code", _error->errorCode())
<< LOG_KV("msg", _error->errorMessage());
psi->m_taskState->onTaskException(_error->errorMessage());
},
seq);
}
Expand Down
13 changes: 13 additions & 0 deletions cpp/wedpr-computing/ppc-psi/src/psi-framework/TaskState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,19 @@ void TaskState::onTaskException(std::string const& _errorMsg)
<< LOG_KV("msg", boost::diagnostic_information(e));
}
}
if (m_notifyPeerFinishHandler)
{
try
{
m_notifyPeerFinishHandler();
}
catch (std::exception const& e)
{
PSI_LOG(WARNING) << LOG_DESC("notifyPeerFinish exception")
<< LOG_KV("taskID", m_task->id())
<< LOG_KV("msg", boost::diagnostic_information(e));
}
}
auto taskResult = std::make_shared<TaskResult>(m_task->id());
auto msg = "Task " + m_task->id() + " exception, error : " + _errorMsg;
auto error = std::make_shared<bcos::Error>(-1, msg);
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-protocol/protocol/src/PPCMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class PPCMessage : public PPCMessageFace

using Ptr = std::shared_ptr<PPCMessage>;
PPCMessage() { m_data = std::make_shared<bcos::bytes>(); }
~PPCMessage() override { releasePayload(); }
// Note: the payload passed in by the upper layer cannot be released at will
~PPCMessage() override = default;

uint8_t version() const override { return m_version; }
void setVersion(uint8_t _version) override { m_version = _version; }
Expand Down
2 changes: 2 additions & 0 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace
m_front->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_TOPIC, routeInfo,
bcos::bytesConstRef((bcos::byte*)data.data(), data.size()), _message->seq(), _timeout,
_callback, msgCallback);
// release the data
bcos::bytes().swap(data);
}

// send response when receiving message from given agencyID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __repr__(self):

class TransportLoader:
transport_builder = TransportBuilder()
transport_builder.initLog("conf/wedpr_sdk_log_config.ini")

@staticmethod
def load(transport_config: TransportConfig) -> Transport:
Expand Down
4 changes: 2 additions & 2 deletions python/ppc_model/conf/logging.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ keys=root,wsgi

[logger_root]
level=INFO
handlers=consoleHandler,fileHandler
handlers=fileHandler

[logger_wsgi]
level = INFO
Expand All @@ -12,7 +12,7 @@ qualname = wsgi
propagate = 0

[handlers]
keys=fileHandler,consoleHandler,accessHandler
keys=fileHandler,accessHandler

[handler_accessHandler]
class=handlers.TimedRotatingFileHandler
Expand Down
17 changes: 17 additions & 0 deletions python/ppc_model/conf/wedpr_sdk_log_config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[log]
enable = true
log_path = ./log
; info debug trace
level = info
; MB
max_log_file_size = 200
; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message
;format = %Severity%|ppcs-gateway|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
format = %Severity%|%TimeStamp%|%Message%
enable_rotate_by_hour = true
; if archive_path is empty, the archive function will be disabled
; archive_path = ./log/
;compress_archive_file = true
; ; 0 = no limit, in MB
; max_archive_size = 0
; min_free_space = 0
2 changes: 1 addition & 1 deletion python/ppc_model/ppc_model_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def register_task_handler():
TransLogger(app, setup_console_handler=False), numthreads=2)

protocol = 'http'
message = f"Starting ppc model server at {protocol}://{app.config['HOST']}:{app.config['HTTP_PORT']} successfully"
message = f"* Starting ppc model server at {protocol}://{app.config['HOST']}:{app.config['HTTP_PORT']} successfully"
print(message)
components.logger().info(message)
server.start()
Expand Down
Loading
Loading