Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add transport initialize to support service discovery for mpc #84

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/ppc-framework/protocol/GrpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class GrpcConfig
// the max send message size in bytes
uint64_t m_maxSendMessageSize = 1024 * 1024 * 1024;
// the max received message size in bytes
uint16_t m_maxReceivedMessageSize = 1024 * 1024 * 1024;
uint64_t m_maxReceivedMessageSize = 1024 * 1024 * 1024;
int m_compressAlgorithm = 0;
};

Expand Down
38 changes: 37 additions & 1 deletion cpp/tools/build_wedpr_mpc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ generate_config_ini() {
local rpc_listen_port="${3}"
local agency_info="${4}"
local agency_id="${5}"
local grpc_listen_ip="${6}"
local grpc_listen_port="${7}"
local nodeid="${8}"

cat <<EOF >"${output}"
[agency]
Expand Down Expand Up @@ -364,6 +367,19 @@ generate_config_ini() {
; the connection-timeout, in ms, default is 1000ms
connection-timeout = 2000

[transport]
; the endpoint information
listen_ip = ${grpc_listen_ip}
listen_port = ${grpc_listen_port}
host_ip =
; the threadPoolSize
thread_count = 4
; the gatewayService endpoint information
gateway_target =
; the components
components =
nodeid=${nodeid}

[cert]
; directory the certificates located in
cert_path=./conf
Expand Down Expand Up @@ -584,6 +600,22 @@ gen_rsa_node_cert() {
LOG_INFO "Generate ${ndpath} cert successful!"
}

# we use sm_param to generate the private key
generate_private_key() {
local output_path="${1}"
if [ ! -d "${output_path}" ]; then
mkdir -p ${output_path}
fi
if [ ! -f ${sm2_params} ]; then
generate_sm_sm2_param ${sm2_params}
fi
${OPENSSL_CMD} genpkey -paramfile ${sm2_params} -out ${output_path}/node.pem 2>/dev/null
$OPENSSL_CMD ec -in "$output_path/node.pem" -text -noout 2> /dev/null | sed -n '3,5p' | sed 's/://g' | tr "\n" " " | sed 's/ //g' | cat > "$output_path/node.privateKey"
${OPENSSL_CMD} ec -text -noout -in "${output_path}/node.pem" 2>/dev/null | sed -n '7,11p' | tr -d ": \n" | awk '{print substr($0,3);}' | cat >"$output_path"/node.nodeid
private_key=$(cat $output_path/node.privateKey)
echo ${private_key}
}

gen_sm_node_cert_with_ext() {
local capath="$1"
local certpath="$2"
Expand Down Expand Up @@ -686,7 +718,11 @@ deploy_nodes()
# generate the config.ini
local rpc_port=5894
local agency_id="agency${count}"
generate_config_ini "${output_dir}/config.ini" "${listen_ip}" "${rpc_port}" "${agency_info}" ${agency_id}
local grpc_port=18100
# the nodeid
private_key=$(generate_private_key "${output_dir}/conf")
node_id=$(cat "${output_dir}/conf/node.nodeid")
generate_config_ini "${output_dir}/config.ini" "${listen_ip}" "${rpc_port}" "${agency_info}" ${agency_id} "${listen_ip}" "${grpc_port}" "${node_id}"
print_result
}

Expand Down
1 change: 1 addition & 0 deletions cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ void MPCService::execCommand(const std::string cmd, int& outExitStatus, std::str
}
catch (const std::exception& e)
{
MPC_LOG(WARNING) << LOG_DESC("[MPCService] execCommand failed") << LOG_KV("cmd", cmd);
BOOST_THROW_EXCEPTION(
RunMpcFailException() << errinfo_comment(
"invalid params:" + std::string(boost::diagnostic_information(e))));
Expand Down
7 changes: 4 additions & 3 deletions cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,14 @@ class PPCConfig
return m_frontConfig->selfEndPoint().host() + ":" + std::to_string(m_rpcConfig.listenPort);
}

private:
virtual void loadEndpointConfig(ppc::protocol::EndPoint& endPoint, bool requireHostIp,
std::string const& sectionName, boost::property_tree::ptree const& pt);
// load the front config
virtual void loadFrontConfig(bool requireTransport,
ppc::front::FrontConfigBuilder::Ptr const& frontConfigBuilder,
boost::property_tree::ptree const& pt);

private:
virtual void loadEndpointConfig(ppc::protocol::EndPoint& endPoint, bool requireHostIp,
std::string const& sectionName, boost::property_tree::ptree const& pt);
// load the grpc config
ppc::protocol::GrpcConfig::Ptr loadGrpcConfig(
std::string const& sectionName, boost::property_tree::ptree const& pt);
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-main/mpc-node/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
aux_source_directory(. SRC_LIST)

add_executable(${MPC_BINARY_NAME} ${SRC_LIST})
target_link_libraries(${MPC_BINARY_NAME} PUBLIC ${MPC_TARGET} ${RPC_TARGET} ${HELPER_TARGET} TBB::tbb)
target_link_libraries(${MPC_BINARY_NAME} PUBLIC ${MPC_TARGET} ${RPC_TARGET} ${HELPER_TARGET} ${WEDPR_TRANSPORT_SDK_TARGET} TBB::tbb)
56 changes: 47 additions & 9 deletions cpp/wedpr-main/mpc-node/MPCInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
* @date 2023-03-24
*/
#include "MPCInitializer.h"
#include "ppc-framework/front/FrontConfig.h"
#include "ppc-framework/protocol/ServiceType.h"
#include "ppc-mpc/src/MPCService.h"
#include "ppc-tools/src/config/PPCConfig.h"
#include "wedpr-protocol/protocol/src/ServiceConfig.h"
#include "wedpr-transport/sdk/src/TransportBuilder.h"

using namespace ppc::rpc;
using namespace bcos;
using namespace ppc::sdk;
using namespace ppc::mpc;
using namespace ppc::tools;
using namespace ppc::front;
using namespace ppc::protocol;

MPCInitializer::MPCInitializer() : m_transportBuilder(std::make_shared<TransportBuilder>()) {}

/// TODO: mpc support the gateway
void MPCInitializer::init(std::string const& _configPath)
{
// init the log
Expand All @@ -40,28 +48,54 @@ void MPCInitializer::init(std::string const& _configPath)
// init the rpc
INIT_LOG(INFO) << LOG_DESC("init the rpc");
// load the rpc config
auto ppcConfig = std::make_shared<PPCConfig>();
ppcConfig->loadRpcConfig(pt);
ppcConfig->loadMPCConfig(pt);
m_config = std::make_shared<PPCConfig>();
m_config->loadRpcConfig(pt);
m_config->loadMPCConfig(pt);
// bool useMysql = pt.get<bool>("mpc.use_mysql", false);
auto storageConfig = ppcConfig->storageConfig();
auto mpcConfig = ppcConfig->mpcConfig();
auto rpcFactory = std::make_shared<RpcFactory>(ppcConfig->agencyID());
m_rpc = rpcFactory->buildRpc(ppcConfig, nullptr);
auto storageConfig = m_config->storageConfig();
auto mpcConfig = m_config->mpcConfig();
auto rpcFactory = std::make_shared<RpcFactory>(m_config->agencyID());
m_rpc = rpcFactory->buildRpc(m_config, nullptr);
auto mpcService = std::make_shared<MPCService>();
mpcService->setMPCConfig(mpcConfig);
mpcService->setStorageConfig(storageConfig);
m_rpc->registerHandler("run", std::bind(&MPCService::runMpcRpc, mpcService,
std::placeholders::_1, std::placeholders::_2));
m_rpc->registerHandler("kill", std::bind(&MPCService::killMpcRpc, mpcService,
std::placeholders::_1, std::placeholders::_2));
INIT_LOG(INFO) << LOG_DESC("init the mpc rpc success");
// init the transport
initTransport(pt);
}

void MPCInitializer::initTransport(boost::property_tree::ptree const& property)
{
INIT_LOG(INFO) << LOG_DESC("initTransport: load front config");
m_config->loadFrontConfig(true, m_transportBuilder->frontConfigBuilder(), property);
INIT_LOG(INFO) << LOG_DESC("initTransport: load front config success");

INIT_LOG(INFO) << LOG_DESC("init the mpc rpc success");
// add the service meta
ServiceConfigBuilder serviceConfigBuilder;
auto entryPoint =
serviceConfigBuilder.buildEntryPoint(MPC_SERVICE_TYPE, m_config->accessEntrypoint());
auto serviceConfig = serviceConfigBuilder.buildServiceConfig();
serviceConfig.addEntryPoint(entryPoint);
auto serviceMeta = serviceConfig.encode();
m_config->frontConfig()->setMeta(serviceMeta);
INIT_LOG(INFO) << LOG_DESC("initTransport: register serviceMeta")
<< LOG_KV("serviceMeta", serviceMeta);
INIT_LOG(INFO) << LOG_DESC("initTransport: buildProTransport");
m_transport = m_transportBuilder->buildProTransport(m_config->frontConfig());
INIT_LOG(INFO) << LOG_DESC("initTransport: buildProTransport success");
}

void MPCInitializer::start()
{
// start the transport
if (m_transport)
{
m_transport->start();
}
// start the ppc mpc
if (m_rpc)
{
Expand All @@ -75,4 +109,8 @@ void MPCInitializer::stop()
{
m_rpc->stop();
}
if (m_transport)
{
m_transport->stop();
}
}
18 changes: 17 additions & 1 deletion cpp/wedpr-main/mpc-node/MPCInitializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,36 @@ namespace ppc::rpc
{
class Rpc;
}
namespace ppc::tools
{
class PPCConfig;
}
namespace ppc::sdk
{
class TransportBuilder;
class Transport;
}; // namespace ppc::sdk
namespace ppc::mpc
{
class MPCInitializer
{
public:
using Ptr = std::shared_ptr<MPCInitializer>;
MPCInitializer() {}
MPCInitializer();
virtual ~MPCInitializer() { stop(); }

virtual void init(std::string const& _configPath);
virtual void start();
virtual void stop();

protected:
virtual void initTransport(boost::property_tree::ptree const& property);

private:
std::shared_ptr<ppc::tools::PPCConfig> m_config;
std::shared_ptr<ppc::sdk::TransportBuilder> m_transportBuilder;
std::shared_ptr<ppc::sdk::Transport> m_transport;

bcos::BoostLogInitializer::Ptr m_logInitializer;
std::shared_ptr<ppc::rpc::Rpc> m_rpc;
};
Expand Down
2 changes: 2 additions & 0 deletions cpp/wedpr-protocol/grpc/server/GrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ void GrpcServer::start()
}
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
grpc::ServerBuilder builder;
// disable port reuse
builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0);
// without authentication
builder.AddListeningPort(m_config->listenEndPoint(), grpc::InsecureServerCredentials());
// register the service
Expand Down
18 changes: 10 additions & 8 deletions cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ void FrontImpl::start()
return;
}
m_running = true;
if (m_nodeDiscovery)
{
m_nodeDiscovery->start();
}
m_thread = std::make_shared<std::thread>([&] {
bcos::pthread_setThreadName("front_io_service");
while (m_running)
Expand All @@ -77,10 +81,7 @@ void FrontImpl::start()
}
FRONT_LOG(INFO) << "Front exit";
});
if (m_nodeDiscovery)
{
m_nodeDiscovery->start();
}
FRONT_LOG(INFO) << LOG_DESC("start front success");
}


Expand All @@ -97,6 +98,10 @@ void FrontImpl::stop()
return;
}
m_running = false;
if (m_nodeDiscovery)
{
m_nodeDiscovery->stop();
}
if (m_ioService)
{
m_ioService->stop();
Expand All @@ -113,10 +118,7 @@ void FrontImpl::stop()
m_thread->detach();
}
}
if (m_nodeDiscovery)
{
m_nodeDiscovery->stop();
}
FRONT_LOG(INFO) << LOG_DESC("stop front success");
}

void FrontImpl::asyncSendResponse(bcos::bytesConstRef dstNode, std::string const& traceID,
Expand Down
10 changes: 9 additions & 1 deletion cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ NodeDiscovery::NodeDiscovery(ppc::gateway::IGateway::Ptr gatewayClient)
{
// fetch the meta information every 10s
m_metaFetcher = std::make_shared<bcos::Timer>(10 * 1000, "metaFetcher");
m_metaFetcher->registerTimeoutHandler([this]() { fetchMetaInfoFromGateway(); });
}

std::vector<ppc::protocol::INodeInfo::Ptr> NodeDiscovery::getAliveNodeList() const
Expand All @@ -38,6 +37,15 @@ std::vector<ppc::protocol::INodeInfo::Ptr> NodeDiscovery::getAliveNodeList() con

void NodeDiscovery::start()
{
auto self = weak_from_this();
m_metaFetcher->registerTimeoutHandler([self]() {
auto fetcher = self.lock();
if (!fetcher)
{
return;
}
fetcher->fetchMetaInfoFromGateway();
});
if (m_metaFetcher)
{
m_metaFetcher->start();
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-transport/ppc-front/ppc-front/NodeDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace ppc::front
{
class NodeDiscovery : public INodeDiscovery
class NodeDiscovery : public INodeDiscovery, public std::enable_shared_from_this<NodeDiscovery>
{
public:
using Ptr = std::shared_ptr<NodeDiscovery>;
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-transport/sdk/src/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Transport
public:
using Ptr = std::shared_ptr<Transport>;
Transport(ppc::front::FrontConfig::Ptr config);
virtual ~Transport() = default;
virtual ~Transport() { stop(); }

virtual void start() { m_front->start(); }
virtual void stop() { m_front->stop(); }
Expand Down
Loading