From be641730d49eca8e3674a09d5b2d213cd8ce4e7e Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 11 Nov 2024 14:34:11 +0800 Subject: [PATCH 1/3] add transport initialize to support service discovery for mpc --- cpp/tools/build_wedpr_mpc.sh | 38 ++++++++++++- .../ppc-tools/src/config/PPCConfig.h | 7 ++- cpp/wedpr-main/mpc-node/CMakeLists.txt | 2 +- cpp/wedpr-main/mpc-node/MPCInitializer.cpp | 56 ++++++++++++++++--- cpp/wedpr-main/mpc-node/MPCInitializer.h | 18 +++++- 5 files changed, 106 insertions(+), 15 deletions(-) diff --git a/cpp/tools/build_wedpr_mpc.sh b/cpp/tools/build_wedpr_mpc.sh index 2195fe7c..938b9184 100644 --- a/cpp/tools/build_wedpr_mpc.sh +++ b/cpp/tools/build_wedpr_mpc.sh @@ -326,6 +326,9 @@ generate_config_ini() { local rpc_listen_port="${3}" local agency_info="${4}" local agency_id="${5}" + local grpc_listen_ip="${6}" + local grpc_listen_port="${7}" + local nodeid="${8}" cat <"${output}" [agency] @@ -364,6 +367,19 @@ generate_config_ini() { ; the connection-timeout, in ms, default is 1000ms connection-timeout = 2000 +[transport] + ; the endpoint information + listen_ip = ${grpc_listen_ip} + listen_port = ${grpc_listen_port} + host_ip = + ; the threadPoolSize + thread_count = 4 + ; the gatewayService endpoint information + gateway_target = + ; the components + components = + nodeid=${nodeid} + [cert] ; directory the certificates located in cert_path=./conf @@ -584,6 +600,22 @@ gen_rsa_node_cert() { LOG_INFO "Generate ${ndpath} cert successful!" } +# we use sm_param to generate the private key +generate_private_key() { + local output_path="${1}" + if [ ! -d "${output_path}" ]; then + mkdir -p ${output_path} + fi + if [ ! -f ${sm2_params} ]; then + generate_sm_sm2_param ${sm2_params} + fi + ${OPENSSL_CMD} genpkey -paramfile ${sm2_params} -out ${output_path}/node.pem 2>/dev/null + $OPENSSL_CMD ec -in "$output_path/node.pem" -text -noout 2> /dev/null | sed -n '3,5p' | sed 's/://g' | tr "\n" " " | sed 's/ //g' | cat > "$output_path/node.privateKey" + ${OPENSSL_CMD} ec -text -noout -in "${output_path}/node.pem" 2>/dev/null | sed -n '7,11p' | tr -d ": \n" | awk '{print substr($0,3);}' | cat >"$output_path"/node.nodeid + private_key=$(cat $output_path/node.privateKey) + echo ${private_key} +} + gen_sm_node_cert_with_ext() { local capath="$1" local certpath="$2" @@ -686,7 +718,11 @@ deploy_nodes() # generate the config.ini local rpc_port=5894 local agency_id="agency${count}" - generate_config_ini "${output_dir}/config.ini" "${listen_ip}" "${rpc_port}" "${agency_info}" ${agency_id} + local grpc_port=18100 + # the nodeid + private_key=$(generate_private_key "${output_dir}/conf") + node_id=$(cat "${output_dir}/conf/node.nodeid") + generate_config_ini "${output_dir}/config.ini" "${listen_ip}" "${rpc_port}" "${agency_info}" ${agency_id} "${listen_ip}" "${grpc_port}" "${node_id}" print_result } diff --git a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h index 85b993bd..27fa7b69 100644 --- a/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h +++ b/cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h @@ -248,13 +248,14 @@ class PPCConfig return m_frontConfig->selfEndPoint().host() + ":" + std::to_string(m_rpcConfig.listenPort); } -private: - virtual void loadEndpointConfig(ppc::protocol::EndPoint& endPoint, bool requireHostIp, - std::string const& sectionName, boost::property_tree::ptree const& pt); // load the front config virtual void loadFrontConfig(bool requireTransport, ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder, boost::property_tree::ptree const& pt); + +private: + virtual void loadEndpointConfig(ppc::protocol::EndPoint& endPoint, bool requireHostIp, + std::string const& sectionName, boost::property_tree::ptree const& pt); // load the grpc config ppc::protocol::GrpcConfig::Ptr loadGrpcConfig( std::string const& sectionName, boost::property_tree::ptree const& pt); diff --git a/cpp/wedpr-main/mpc-node/CMakeLists.txt b/cpp/wedpr-main/mpc-node/CMakeLists.txt index 9b752de2..fd3942db 100644 --- a/cpp/wedpr-main/mpc-node/CMakeLists.txt +++ b/cpp/wedpr-main/mpc-node/CMakeLists.txt @@ -1,4 +1,4 @@ aux_source_directory(. SRC_LIST) add_executable(${MPC_BINARY_NAME} ${SRC_LIST}) -target_link_libraries(${MPC_BINARY_NAME} PUBLIC ${MPC_TARGET} ${RPC_TARGET} ${HELPER_TARGET} TBB::tbb) +target_link_libraries(${MPC_BINARY_NAME} PUBLIC ${MPC_TARGET} ${RPC_TARGET} ${HELPER_TARGET} ${WEDPR_TRANSPORT_SDK_TARGET} TBB::tbb) diff --git a/cpp/wedpr-main/mpc-node/MPCInitializer.cpp b/cpp/wedpr-main/mpc-node/MPCInitializer.cpp index 31a720bd..e1fda6f0 100644 --- a/cpp/wedpr-main/mpc-node/MPCInitializer.cpp +++ b/cpp/wedpr-main/mpc-node/MPCInitializer.cpp @@ -18,15 +18,23 @@ * @date 2023-03-24 */ #include "MPCInitializer.h" +#include "ppc-framework/front/FrontConfig.h" +#include "ppc-framework/protocol/ServiceType.h" #include "ppc-mpc/src/MPCService.h" #include "ppc-tools/src/config/PPCConfig.h" +#include "wedpr-protocol/protocol/src/ServiceConfig.h" +#include "wedpr-transport/sdk/src/TransportBuilder.h" using namespace ppc::rpc; using namespace bcos; +using namespace ppc::sdk; using namespace ppc::mpc; using namespace ppc::tools; +using namespace ppc::front; +using namespace ppc::protocol; + +MPCInitializer::MPCInitializer() : m_transportBuilder(std::make_shared()) {} -/// TODO: mpc support the gateway void MPCInitializer::init(std::string const& _configPath) { // init the log @@ -40,14 +48,14 @@ void MPCInitializer::init(std::string const& _configPath) // init the rpc INIT_LOG(INFO) << LOG_DESC("init the rpc"); // load the rpc config - auto ppcConfig = std::make_shared(); - ppcConfig->loadRpcConfig(pt); - ppcConfig->loadMPCConfig(pt); + m_config = std::make_shared(); + m_config->loadRpcConfig(pt); + m_config->loadMPCConfig(pt); // bool useMysql = pt.get("mpc.use_mysql", false); - auto storageConfig = ppcConfig->storageConfig(); - auto mpcConfig = ppcConfig->mpcConfig(); - auto rpcFactory = std::make_shared(ppcConfig->agencyID()); - m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr); + auto storageConfig = m_config->storageConfig(); + auto mpcConfig = m_config->mpcConfig(); + auto rpcFactory = std::make_shared(m_config->agencyID()); + m_rpc = rpcFactory->buildRpc(m_config, nullptr); auto mpcService = std::make_shared(); mpcService->setMPCConfig(mpcConfig); mpcService->setStorageConfig(storageConfig); @@ -55,13 +63,39 @@ void MPCInitializer::init(std::string const& _configPath) std::placeholders::_1, std::placeholders::_2)); m_rpc->registerHandler("kill", std::bind(&MPCService::killMpcRpc, mpcService, std::placeholders::_1, std::placeholders::_2)); + INIT_LOG(INFO) << LOG_DESC("init the mpc rpc success"); + // init the transport + initTransport(pt); +} +void MPCInitializer::initTransport(boost::property_tree::ptree const& property) +{ + INIT_LOG(INFO) << LOG_DESC("initTransport: load front config"); + m_config->loadFrontConfig(true, m_transportBuilder->frontConfigBuilder(), property); + INIT_LOG(INFO) << LOG_DESC("initTransport: load front config success"); - INIT_LOG(INFO) << LOG_DESC("init the mpc rpc success"); + // add the service meta + ServiceConfigBuilder serviceConfigBuilder; + auto entryPoint = + serviceConfigBuilder.buildEntryPoint(MPC_SERVICE_TYPE, m_config->accessEntrypoint()); + auto serviceConfig = serviceConfigBuilder.buildServiceConfig(); + serviceConfig.addEntryPoint(entryPoint); + auto serviceMeta = serviceConfig.encode(); + m_config->frontConfig()->setMeta(serviceMeta); + INIT_LOG(INFO) << LOG_DESC("initTransport: register serviceMeta") + << LOG_KV("serviceMeta", serviceMeta); + INIT_LOG(INFO) << LOG_DESC("initTransport: buildProTransport"); + m_transport = m_transportBuilder->buildProTransport(m_config->frontConfig()); + INIT_LOG(INFO) << LOG_DESC("initTransport: buildProTransport success"); } void MPCInitializer::start() { + // start the transport + if (m_transport) + { + m_transport->start(); + } // start the ppc mpc if (m_rpc) { @@ -75,4 +109,8 @@ void MPCInitializer::stop() { m_rpc->stop(); } + if (m_transport) + { + m_transport->stop(); + } } diff --git a/cpp/wedpr-main/mpc-node/MPCInitializer.h b/cpp/wedpr-main/mpc-node/MPCInitializer.h index 4e0da526..7606dbce 100644 --- a/cpp/wedpr-main/mpc-node/MPCInitializer.h +++ b/cpp/wedpr-main/mpc-node/MPCInitializer.h @@ -26,20 +26,36 @@ namespace ppc::rpc { class Rpc; } +namespace ppc::tools +{ +class PPCConfig; +} +namespace ppc::sdk +{ +class TransportBuilder; +class Transport; +}; // namespace ppc::sdk namespace ppc::mpc { class MPCInitializer { public: using Ptr = std::shared_ptr; - MPCInitializer() {} + MPCInitializer(); virtual ~MPCInitializer() { stop(); } virtual void init(std::string const& _configPath); virtual void start(); virtual void stop(); +protected: + virtual void initTransport(boost::property_tree::ptree const& property); + private: + std::shared_ptr m_config; + std::shared_ptr m_transportBuilder; + std::shared_ptr m_transport; + bcos::BoostLogInitializer::Ptr m_logInitializer; std::shared_ptr m_rpc; }; From 07c557e4912c85ad0212b3ba8c1bed8896e1f50a Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 11 Nov 2024 15:46:51 +0800 Subject: [PATCH 2/3] optimize log --- cpp/ppc-framework/protocol/GrpcConfig.h | 2 +- cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp | 1 + .../ppc-front/ppc-front/FrontImpl.cpp | 18 ++++++++++-------- .../ppc-front/ppc-front/NodeDiscovery.cpp | 10 +++++++++- .../ppc-front/ppc-front/NodeDiscovery.h | 2 +- cpp/wedpr-transport/sdk/src/Transport.h | 2 +- 6 files changed, 23 insertions(+), 12 deletions(-) diff --git a/cpp/ppc-framework/protocol/GrpcConfig.h b/cpp/ppc-framework/protocol/GrpcConfig.h index 4c570877..6b317fc8 100644 --- a/cpp/ppc-framework/protocol/GrpcConfig.h +++ b/cpp/ppc-framework/protocol/GrpcConfig.h @@ -106,7 +106,7 @@ class GrpcConfig // the max send message size in bytes uint64_t m_maxSendMessageSize = 1024 * 1024 * 1024; // the max received message size in bytes - uint16_t m_maxReceivedMessageSize = 1024 * 1024 * 1024; + uint64_t m_maxReceivedMessageSize = 1024 * 1024 * 1024; int m_compressAlgorithm = 0; }; diff --git a/cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp b/cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp index d1631677..e31115f8 100644 --- a/cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp +++ b/cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp @@ -420,6 +420,7 @@ void MPCService::execCommand(const std::string cmd, int& outExitStatus, std::str } catch (const std::exception& e) { + MPC_LOG(WARNING) << LOG_DESC("[MPCService] execCommand failed") << LOG_KV("cmd", cmd); BOOST_THROW_EXCEPTION( RunMpcFailException() << errinfo_comment( "invalid params:" + std::string(boost::diagnostic_information(e)))); diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp index cb976b08..4ff6ed09 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp @@ -56,6 +56,10 @@ void FrontImpl::start() return; } m_running = true; + if (m_nodeDiscovery) + { + m_nodeDiscovery->start(); + } m_thread = std::make_shared([&] { bcos::pthread_setThreadName("front_io_service"); while (m_running) @@ -77,10 +81,7 @@ void FrontImpl::start() } FRONT_LOG(INFO) << "Front exit"; }); - if (m_nodeDiscovery) - { - m_nodeDiscovery->start(); - } + FRONT_LOG(INFO) << LOG_DESC("start front success"); } @@ -97,6 +98,10 @@ void FrontImpl::stop() return; } m_running = false; + if (m_nodeDiscovery) + { + m_nodeDiscovery->stop(); + } if (m_ioService) { m_ioService->stop(); @@ -113,10 +118,7 @@ void FrontImpl::stop() m_thread->detach(); } } - if (m_nodeDiscovery) - { - m_nodeDiscovery->stop(); - } + FRONT_LOG(INFO) << LOG_DESC("stop front success"); } void FrontImpl::asyncSendResponse(bcos::bytesConstRef dstNode, std::string const& traceID, diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.cpp b/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.cpp index b1627d79..c0d8e922 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.cpp +++ b/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.cpp @@ -27,7 +27,6 @@ NodeDiscovery::NodeDiscovery(ppc::gateway::IGateway::Ptr gatewayClient) { // fetch the meta information every 10s m_metaFetcher = std::make_shared(10 * 1000, "metaFetcher"); - m_metaFetcher->registerTimeoutHandler([this]() { fetchMetaInfoFromGateway(); }); } std::vector NodeDiscovery::getAliveNodeList() const @@ -38,6 +37,15 @@ std::vector NodeDiscovery::getAliveNodeList() con void NodeDiscovery::start() { + auto self = weak_from_this(); + m_metaFetcher->registerTimeoutHandler([self]() { + auto fetcher = self.lock(); + if (!fetcher) + { + return; + } + fetcher->fetchMetaInfoFromGateway(); + }); if (m_metaFetcher) { m_metaFetcher->start(); diff --git a/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.h b/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.h index 067460d4..712a109e 100644 --- a/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.h +++ b/cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.h @@ -25,7 +25,7 @@ namespace ppc::front { -class NodeDiscovery : public INodeDiscovery +class NodeDiscovery : public INodeDiscovery, public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; diff --git a/cpp/wedpr-transport/sdk/src/Transport.h b/cpp/wedpr-transport/sdk/src/Transport.h index c6175c91..a3fa1680 100644 --- a/cpp/wedpr-transport/sdk/src/Transport.h +++ b/cpp/wedpr-transport/sdk/src/Transport.h @@ -31,7 +31,7 @@ class Transport public: using Ptr = std::shared_ptr; Transport(ppc::front::FrontConfig::Ptr config); - virtual ~Transport() = default; + virtual ~Transport() { stop(); } virtual void start() { m_front->start(); } virtual void stop() { m_front->stop(); } From 7d278321de0b619ce8dae9fc0b54e75859706e8f Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Mon, 11 Nov 2024 17:37:18 +0800 Subject: [PATCH 3/3] grpc disable port reuse --- cpp/wedpr-protocol/grpc/server/GrpcServer.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/wedpr-protocol/grpc/server/GrpcServer.cpp b/cpp/wedpr-protocol/grpc/server/GrpcServer.cpp index ee289ee1..fef0e383 100644 --- a/cpp/wedpr-protocol/grpc/server/GrpcServer.cpp +++ b/cpp/wedpr-protocol/grpc/server/GrpcServer.cpp @@ -39,6 +39,8 @@ void GrpcServer::start() } grpc::reflection::InitProtoReflectionServerBuilderPlugin(); grpc::ServerBuilder builder; + // disable port reuse + builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0); // without authentication builder.AddListeningPort(m_config->listenEndPoint(), grpc::InsecureServerCredentials()); // register the service