Skip to content

Commit

Permalink
add storage option check
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Nov 14, 2024
1 parent be857b8 commit 6026a71
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 10 deletions.
50 changes: 49 additions & 1 deletion cpp/ppc-framework/protocol/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#endif

#include "ppc-framework/Common.h"
#include <bcos-utilities/Log.h>
#include <map>
#include <memory>
Expand Down Expand Up @@ -402,6 +402,35 @@ struct SQLConnectionOption
std::string user;
std::string password;
std::string database;
void check() const
{
if (host.size() == 0)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid SQL option: Must set the host!"));
}
if (user.size() == 0)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid SQL option: Must set the user!"));
}
if (password.size() == 0)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid SQL option: Must set the password!"));
}
if (database.size() == 0)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid SQL option: Must set the database!"));
}
if (port == 0 || port > 65535)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid SQL Option, Must set valid port!"));
}
}

inline std::string desc() const
{
std::stringstream oss;
Expand All @@ -423,6 +452,25 @@ struct FileStorageConnectionOption
bool replaceDataNodeOnFailure = false;
// the default connection-timeout for the hdfs is 1000ms
uint16_t connectionTimeout = 1000;

void check() const
{
if (nameNode.size() == 0)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid HDFS Option, Must set the nameNode!"));
}
if (userName.size() == 0)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid HDFS Option, Must set the userName!"));
}
if (nameNodePort == 0 || nameNodePort > 65535)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid HDFS Option, Must set valid namenodeport!"));
}
}
inline std::string desc() const
{
std::stringstream oss;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,5 +266,5 @@ void CalculatorCache::setIntersectionCipher(std::map<uint32_t, bcos::bytes>&& _c
m_intersectionCipher = std::move(_cipherData);
m_receiveIntersection = true;
ECDH_MULTI_LOG(INFO) << LOG_DESC("setIntersectionCipher")
<< LOG_KV("dataSize", _cipherData.size()) << printCacheState();
<< LOG_KV("dataSize", m_intersectionCipher.size()) << printCacheState();
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ void EcdhMultiPSICalculator::loadAndEncrypt(std::string _taskID, bcos::bytes _ra
seq);
dataOffset += dataBatch->size();
}
dataBatch->release();
} while (!m_taskState->sqlReader());
}
catch (std::exception& e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ void EcdhMultiPSIMaster::encAndSendIntersectionData()
for (auto& calcultor : m_calculatorParties)
{
ECDH_MASTER_LOG(INFO) << LOG_DESC("send intersection cipher to calculator")
<< LOG_KV("taskID", m_taskState->task()->id())
<< LOG_KV("intersectionSize", encryptedData.size())
<< LOG_KV("target", calcultor.first);
m_config->generateAndSendPPCMessage(calcultor.first, m_taskID, message,
[self = weak_from_this()](bcos::Error::Ptr&& _error) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ void PPCConfig::loadHDFSConfig(boost::property_tree::ptree const& _pt)
// the name node port
option->nameNodePort = _pt.get<int>("hdfs_storage.name_node_port", 8020);
// the user
option->userName = _pt.get<std::string>("hdfs_storage.user", "root");
option->userName = _pt.get<std::string>("hdfs_storage.user", "");
// checkNonEmptyField("hdfs_storage.user", option->userName);
// the token
option->token = _pt.get<std::string>("hdfs_storage.token", "");
Expand Down
39 changes: 33 additions & 6 deletions cpp/wedpr-storage/ppc-io/src/DataResourceLoaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,40 @@ DataResourceLoaderImpl::DataResourceLoaderImpl(
m_sqlStorageFactory(_sqlStorageFactory),
m_fileStorageFactory(_fileStorageFactory),
m_remoteStorageFactory(_remoteStorageFactory)
{}

void DataResourceLoaderImpl::lazyLoadHdfsStorage()
{
if (m_fileStorageConnectionOpt)
if (!m_fileStorageConnectionOpt)
{
m_hdfsStorage = m_fileStorageFactory->createFileStorage(
DataResourceType::HDFS, m_fileStorageConnectionOpt);
return;
}
if (m_sqlConnectionOpt)
bcos::Guard l(x_hdfsStorage);
if (m_hdfsStorage)
{
m_sqlStorage =
m_sqlStorageFactory->createSQLStorage(DataResourceType::MySQL, m_sqlConnectionOpt);
return;
}
m_hdfsStorage =
m_fileStorageFactory->createFileStorage(DataResourceType::HDFS, m_fileStorageConnectionOpt);
IO_LOG(INFO) << LOG_DESC("lazyLoadHdfsStorage") << m_fileStorageConnectionOpt->desc();
return;
}

void DataResourceLoaderImpl::lazyLoadSqlStorage()
{
if (!m_sqlConnectionOpt)
{
return;
}
bcos::Guard l(x_sqlStorage);
if (m_sqlStorage)
{
return;
}
m_sqlStorage =
m_sqlStorageFactory->createSQLStorage(DataResourceType::MySQL, m_sqlConnectionOpt);
IO_LOG(INFO) << LOG_DESC("lazyLoadSqlStorage") << m_sqlConnectionOpt->desc();
}

LineReader::Ptr DataResourceLoaderImpl::loadReader(DataResourceDesc::ConstPtr _desc,
DataSchema _schema, bool _parseByColumn, FileStorage::Ptr const& _fileStorage)
Expand Down Expand Up @@ -104,6 +125,7 @@ LineReader::Ptr DataResourceLoaderImpl::loadSQLResource(
}
else if (m_sqlConnectionOpt)
{
lazyLoadSqlStorage();
storage = m_sqlStorage;
}
else
Expand Down Expand Up @@ -133,6 +155,7 @@ LineReader::Ptr DataResourceLoaderImpl::loadHDFSResource(
}
else if (m_fileStorageConnectionOpt)
{
lazyLoadHdfsStorage();
storage = m_hdfsStorage;
}
else
Expand Down Expand Up @@ -170,6 +193,7 @@ void DataResourceLoaderImpl::deleteResource(
}
else if (m_fileStorageConnectionOpt)
{
lazyLoadHdfsStorage();
storage = m_hdfsStorage;
}
else
Expand Down Expand Up @@ -219,6 +243,7 @@ void DataResourceLoaderImpl::renameResource(ppc::protocol::DataResourceDesc::Con
}
else if (m_fileStorageConnectionOpt)
{
lazyLoadHdfsStorage();
storage = m_hdfsStorage;
}
else
Expand Down Expand Up @@ -277,6 +302,7 @@ void DataResourceLoaderImpl::checkResourceExists(
}
else if (m_fileStorageConnectionOpt)
{
lazyLoadHdfsStorage();
storage = m_hdfsStorage;
}
else
Expand Down Expand Up @@ -330,6 +356,7 @@ LineWriter::Ptr DataResourceLoaderImpl::loadWriter(
}
else if (m_fileStorageConnectionOpt)
{
lazyLoadHdfsStorage();
storage = m_hdfsStorage;
}
else
Expand Down
6 changes: 6 additions & 0 deletions cpp/wedpr-storage/ppc-io/src/DataResourceLoaderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,19 @@ class DataResourceLoaderImpl : public DataResourceLoader
virtual LineReader::Ptr loadHDFSResource(ppc::protocol::DataResourceDesc::ConstPtr _desc,
ppc::storage::FileStorage::Ptr const& _fileStorage);

void lazyLoadHdfsStorage();
void lazyLoadSqlStorage();

private:
// the sql storage
ppc::protocol::SQLConnectionOption::Ptr m_sqlConnectionOpt;
ppc::storage::SQLStorage::Ptr m_sqlStorage;
bcos::Mutex x_sqlStorage;

// the hdfs storage
ppc::protocol::FileStorageConnectionOption::Ptr m_fileStorageConnectionOpt;
ppc::storage::FileStorage::Ptr m_hdfsStorage;
bcos::Mutex x_hdfsStorage;

ppc::protocol::RemoteStorageConnectionOption::Ptr m_remoteStorageConnectionOpt;

Expand Down
1 change: 1 addition & 0 deletions cpp/wedpr-storage/ppc-storage/src/FileStorageFactoryImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FileStorageFactoryImpl : public FileStorageFactory
{
case ppc::protocol::DataResourceType::HDFS:
{
_option->check();
return std::make_shared<HDFSStorage>(_option);
}
default:
Expand Down
1 change: 1 addition & 0 deletions cpp/wedpr-storage/ppc-storage/src/SQLStorageFactoryImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class SQLStorageFactoryImpl : public SQLStorageFactory
{
case ppc::protocol::DataResourceType::MySQL:
{
_option->check();
return std::make_shared<MySQLStorage>(_option);
}
default:
Expand Down

0 comments on commit 6026a71

Please sign in to comment.