Skip to content

Commit

Permalink
decrease copy overhead (#67)
Browse files Browse the repository at this point in the history
* upload cpp_toolkit

* decrease copy overhead

* update swig generated files
  • Loading branch information
cyjseagull authored Oct 28, 2024
1 parent c4cc6cd commit 40aab9f
Show file tree
Hide file tree
Showing 24 changed files with 136 additions and 1,027 deletions.
33 changes: 22 additions & 11 deletions .github/workflows/cpp_full_node_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
sudo apt install -y lcov ccache wget libgmp-dev python3-dev
export GCC='gcc-10'
export CXX='g++-10'
bash -x cpp/tools/install_depends.sh -o ubuntu
sudo bash -x cpp/tools/install_depends.sh -o ubuntu
mkdir -p cpp/build && cd cpp/build && cmake -DTESTS=ON -DCOVERAGE=ON -DCMAKE_TOOLCHAIN_FILE=${{ env.VCPKG_ROOT }}/scripts/buildsystems/vcpkg.cmake ../ || cat /Users/runner/work/WeDPR-Component/WeDPR-Component/vcpkg/buildtrees/libhdfs3/config-x64-osx-dbg-err.log
- name: Build for macOS
if: runner.os == 'macOS'
Expand All @@ -78,6 +78,21 @@ jobs:
with:
name: config-x64-linux-dbg-err.log
path: /home/runner/work/WeDPR-Component/WeDPR-Component/vcpkg/buildtrees/libhdfs3/config-x64-linux-dbg-err.log
- uses: actions/upload-artifact@v4
if: runner.os == 'macOS'
with:
name: ppc-air-node-macOS
path: ./cpp/build/bin/ppc-air-node
- uses: actions/upload-artifact@v4
if: runner.os == 'macOS'
with:
name: ppc-pro-node-macOS
path: ./cpp/build/bin/ppc-pro-node
- uses: actions/upload-artifact@v4
if: runner.os == 'macOS'
with:
name: ppc-gateway-service-macOS
path: ./cpp/build/bin/ppc-gateway-service
#- name: Test
# if: runner.os != 'Windows'
# run: |
Expand Down Expand Up @@ -126,7 +141,7 @@ jobs:
yum update -y
yum install -y epel-release centos-release-scl centos-release-scl-rh
yum install -y https://packages.endpointdev.com/rhel/7/os/x86_64/endpoint-repo.x86_64.rpm
yum install -y java-11-openjdk-devel git make gcc gcc-c++ glibc-static glibc-devel openssl cmake3 ccache devtoolset-11 llvm-toolset-7.0 rh-perl530-perl libzstd-devel zlib-devel flex bison python-devel python3-devel
yum install -y wget java-11-openjdk-devel git make gcc gcc-c++ glibc-static glibc-devel openssl cmake3 ccache devtoolset-11 llvm-toolset-7.0 rh-perl530-perl libzstd-devel zlib-devel flex bison python-devel python3-devel
yum install -y rh-perl530-perl cmake3 zlib-devel ccache lcov python-devel python3-devel
yum install -y git
- uses: actions-rs/toolchain@v1
Expand All @@ -135,7 +150,7 @@ jobs:
override: true
- name: Build
run: |
bash cpp/tools/install_depends.sh -o centos
bash -x cpp/tools/install_depends.sh -o centos
alias cmake='cmake3'
. /opt/rh/devtoolset-11/enable
mkdir -p cpp/build
Expand All @@ -154,17 +169,13 @@ jobs:
# cd build && CTEST_OUTPUT_ON_FAILURE=TRUE make test
- uses: actions/upload-artifact@v3
with:
name: ppc-air-node-centos-x64
name: ppc-air-node-x64
path: ./cpp/build/bin/ppc-air-node
- uses: actions/upload-artifact@v3
with:
name: ppc-pro-node-centos-x64
name: ppc-pro-node-x64
path: ./cpp/build/bin/ppc-pro-node
- uses: actions/upload-artifact@v3
with:
name: ppc-gateway-service-centos-x64
path: ./cpp/build/bin/ppc-gateway-service
- uses: actions/upload-artifact@v3
with:
name: libppc-crypto-sdk-jni.so
path: ./cpp/wedpr-component-sdk/bindings/java/src/main/resources/META-INF/native/libppc-crypto-sdk-jni.so
name: ppc-gateway-service-x64
path: ./cpp/build/bin/ppc-gateway-service
8 changes: 6 additions & 2 deletions .github/workflows/cpp_toolkit_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
brew install lcov
bash cpp/tools/install_depends.sh -o macos
mkdir -p cpp/build && cd cpp/build && cmake -DBUILD_WEDPR_TOOLKIT=ON -DBUILD_PYTHON=ON -DCMAKE_TOOLCHAIN_FILE=${{ env.VCPKG_ROOT }}/scripts/buildsystems/vcpkg.cmake ../
mkdir -p cpp/build && cd cpp/build && cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_WEDPR_TOOLKIT=ON -DBUILD_PYTHON=ON -DCMAKE_TOOLCHAIN_FILE=${{ env.VCPKG_ROOT }}/scripts/buildsystems/vcpkg.cmake ../
make -j3
- uses: actions/upload-artifact@v4
if: runner.os == 'macos'
Expand Down Expand Up @@ -146,4 +146,8 @@ jobs:
mkdir -p cpp/build
cd cpp/build
cmake3 -DCMAKE_BUILD_TYPE=Release -DBUILD_WEDPR_TOOLKIT=ON -DBUILD_PYTHON=ON -DCMAKE_TOOLCHAIN_FILE=/usr/local/share/vcpkg/scripts/buildsystems/vcpkg.cmake ../
cmake3 --build . --parallel 3
cmake3 --build . --parallel 3
- uses: actions/upload-artifact@v3
with:
name: libwedpr_java_transport_jni.so
path: ./cpp/wedpr-transport/sdk-wrapper/java/bindings/src/main/resources/META-INF/native/libwedpr_java_transport_jni.so
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ elseif(BUILD_UDF)
elseif(BUILD_SDK)
add_sources("${SDK_SOURCE_LIST}")
elseif(BUILD_WEDPR_TOOLKIT)
add_sources("${TRANSPORT_SDK_TOOLKIT_SOURCE_LIST}")
include(swig)
add_sources("${TRANSPORT_SDK_TOOLKIT_SOURCE_LIST}")
endif()
########### set the sources end ###########

Expand Down
2 changes: 1 addition & 1 deletion cpp/cmake/python.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ message("Python_Development_FOUND:${Python_Development_FOUND}")
message("Python_LIBRARIES:${Python_LIBRARIES}")
message("Python_INCLUDE_DIRS:${Python_INCLUDE_DIRS}")

list(APPEND CMAKE_SWIG_FLAGS "-py3" "-DPY3")
list(APPEND CMAKE_SWIG_FLAGS "-DPY3")

function(search_python_module)
set(options NO_VERSION)
Expand Down
1 change: 1 addition & 0 deletions cpp/cmake/swig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cmake_minimum_required(VERSION 3.18)

# Will need swig
set(CMAKE_SWIG_FLAGS)

find_package(SWIG REQUIRED)
include(UseSWIG)

Expand Down
28 changes: 12 additions & 16 deletions cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ class IFront : virtual public IFrontClient
* @param callback callback
*/
virtual void asyncSendMessage(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout, ppc::protocol::ReceiveMsgFunc errorCallback,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytesConstRef payload,
int seq, long timeout, ppc::protocol::ReceiveMsgFunc errorCallback,
ppc::protocol::MessageCallback callback) = 0;

/////// to simplify SDK wrapper ////
Expand All @@ -181,14 +181,13 @@ class IFront : virtual public IFrontClient
uint64_t payloadSize, int seq, long timeout, ErrorCallback::Ptr errorCallback,
IMessageHandler::Ptr msgHandler)
{
// TODO: optimize here
bcos::bytes copyedPayload(payload, payload + payloadSize);
asyncSendMessage(routeType, routeInfo, std::move(copyedPayload), seq, timeout,
asyncSendMessage(routeType, routeInfo,
bcos::bytesConstRef((bcos::byte*)payload, payloadSize), seq, timeout,
populateErrorCallback(errorCallback), populateMsgCallback(msgHandler));
}

virtual void asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) = 0;
virtual void asyncSendResponse(bcos::bytesConstRef dstNode, std::string const& traceID,
bcos::bytesConstRef payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback) = 0;

/////// to simplify SDK wrapper ////

Expand All @@ -198,19 +197,16 @@ class IFront : virtual public IFrontClient
std::string const& traceID, char* payload, uint64_t payloadSize, int seq,
ErrorCallback::Ptr errorCallback)
{
// TODO: optimize here
bcos::bytes copiedDstNode(dstNode, dstNode + dstNodeSize);
bcos::bytes copyedPayload(payload, payload + payloadSize);
asyncSendResponse(copiedDstNode, traceID, std::move(copyedPayload), seq,
asyncSendResponse(bcos::bytesConstRef((bcos::byte*)dstNode, dstNodeSize), traceID,
bcos::bytesConstRef((bcos::byte*)payload, payloadSize), seq,
populateErrorCallback(errorCallback));
}

// the sync interface for async_send_message
virtual bcos::Error::Ptr push(uint16_t routeType,
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytes&& payload, int seq,
long timeout) = 0;
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, bcos::bytesConstRef payload,
int seq, long timeout) = 0;

// TODO: optmize here
// Note: the python not support function overload, for different interfaces with the same
// functionality, it is best to define methods with different names the 'payload', 'payloadSize'
// should not been changed any more, since the swig has defined by the name to convert python
Expand All @@ -219,8 +215,8 @@ class IFront : virtual public IFrontClient
ppc::protocol::MessageOptionalHeader::Ptr const& routeInfo, char* payload,
uint64_t payloadSize, int seq, long timeout)
{
bcos::bytes copyedPayload(payload, payload + payloadSize);
return push(routeType, routeInfo, std::move(copyedPayload), seq, timeout);
return push(routeType, routeInfo, bcos::bytesConstRef((bcos::byte*)payload, payloadSize),
seq, timeout);
}
virtual ppc::protocol::Message::Ptr pop(std::string const& topic, long timeoutMs) = 0;
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;
Expand Down
14 changes: 11 additions & 3 deletions cpp/ppc-framework/protocol/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ class MessageOptionalHeader

// !!! Note: the first paramater type should not been changed, for it's used for pass-in java
// byte[] into c bytes
virtual void setSrcNode(char* data, uint64_t length) { m_srcNode.assign(data, data + length); }
virtual void setSrcNodeBuffer(char* data, uint64_t length)
{
m_srcNode.assign(data, data + length);
}

// the target nodeID that should receive the message
virtual bcos::bytes const& dstNode() const { return m_dstNode; }
Expand All @@ -68,8 +71,12 @@ class MessageOptionalHeader
// Note: this will be copied to java through jni
return OutputBuffer{(unsigned char*)m_dstNode.data(), m_dstNode.size()};
}
virtual void setDstNode(bcos::bytes const& dstNode) { m_dstNode = dstNode; }

virtual void setDstNode(bcos::bytes const& dstNode)
{
m_dstNode = dstNode;
m_dstNodePtr = bcos::bytesConstRef((bcos::byte*)m_dstNode.data(), m_dstNode.size());
}
virtual void setDstNodePtr(bcos::bytesConstRef const& dstNodePtr) { m_dstNodePtr = dstNodePtr; }
// !!! Note: the first paramater type(char*) should not been changed, for it's used for pass-in
// java byte[] into c bytes
// Note: the python not support function override
Expand Down Expand Up @@ -100,6 +107,7 @@ class MessageOptionalHeader
std::string m_srcInst;
// the target nodeID that should receive the message
bcos::bytes m_dstNode;
bcos::bytesConstRef m_dstNodePtr; // to decrease the copy overhead
// the target agency that need receive the message
std::string m_dstInst;
};
Expand Down
18 changes: 15 additions & 3 deletions cpp/ppc-framework/protocol/MessagePayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,20 @@ class MessagePayload
// for swig wrapper here
virtual OutputBuffer dataBuffer() const
{
return OutputBuffer{(unsigned char*)m_data.data(), m_data.size()};
return OutputBuffer{(unsigned char*)m_dataPtr.data(), m_dataPtr.size()};
}
virtual void setData(bcos::bytes&& data) { m_data = std::move(data); }
virtual void setData(bcos::bytes const& data) { m_data = data; }
virtual void setData(bcos::bytes&& data)
{
m_data = std::move(data);
m_dataPtr = bcos::bytesConstRef((bcos::byte*)m_data.data(), m_data.size());
}
virtual void setData(bcos::bytes const& data)
{
m_data = data;
m_dataPtr = bcos::bytesConstRef((bcos::byte*)m_data.data(), m_data.size());
}
virtual void setDataPtr(bcos::bytesConstRef dataPtr) { m_dataPtr = dataPtr; }
virtual bcos::bytesConstRef const& dataPtr() const { return m_dataPtr; }
// the seq
virtual uint16_t seq() const { return m_seq; }
virtual void setSeq(uint16_t seq) { m_seq = seq; }
Expand All @@ -76,6 +86,8 @@ class MessagePayload
// the traceID
std::string m_traceID;
bcos::bytes m_data;
// used to decrease the copy-overhead
bcos::bytesConstRef m_dataPtr;
uint16_t m_ext = 0;
int64_t mutable m_length;
};
Expand Down
7 changes: 4 additions & 3 deletions cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ void MessageOptionalHeaderImpl::encode(bcos::bytes& buffer) const
buffer.insert(buffer.end(), (byte*)&srcInstLen, (byte*)&srcInstLen + 2);
buffer.insert(buffer.end(), m_srcInst.begin(), m_srcInst.end());
// the target nodeID that should receive the message
uint16_t dstNodeLen = boost::asio::detail::socket_ops::host_to_network_short(m_dstNode.size());
uint16_t dstNodeLen =
boost::asio::detail::socket_ops::host_to_network_short(m_dstNodePtr.size());
buffer.insert(buffer.end(), (byte*)&dstNodeLen, (byte*)&dstNodeLen + 2);
buffer.insert(buffer.end(), m_dstNode.begin(), m_dstNode.end());
bcos::bytes m_dstNode;
buffer.insert(buffer.end(), m_dstNodePtr.begin(), m_dstNodePtr.end());
// the target agency that need receive the message
uint16_t dstInstLen = boost::asio::detail::socket_ops::host_to_network_short(m_dstInst.size());
buffer.insert(buffer.end(), (byte*)&dstInstLen, (byte*)&dstInstLen + 2);
Expand Down Expand Up @@ -74,6 +74,7 @@ int64_t MessageOptionalHeaderImpl::decode(bcos::bytesConstRef data, uint64_t con
offset = decodeNetworkBuffer(m_srcInst, data.data(), data.size(), offset, false);
// dstNode
offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset, false);
m_dstNodePtr = bcos::bytesConstRef((bcos::byte*)m_dstNode.data(), m_dstNode.size());
// dstInst
offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset, false);
// topic
Expand Down
9 changes: 6 additions & 3 deletions cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ int64_t MessagePayloadImpl::encode(bcos::bytes& buffer) const
buffer.insert(buffer.end(), (byte*)&traceIDLen, (byte*)&traceIDLen + 2);
buffer.insert(buffer.end(), m_traceID.begin(), m_traceID.end());
// data
uint32_t dataLen = boost::asio::detail::socket_ops::host_to_network_long(m_data.size());
uint32_t dataLen = boost::asio::detail::socket_ops::host_to_network_long(m_dataPtr.size());
buffer.insert(buffer.end(), (byte*)&dataLen, (byte*)&dataLen + 4);
buffer.insert(buffer.end(), m_data.begin(), m_data.end());
buffer.insert(buffer.end(), m_dataPtr.begin(), m_dataPtr.end());
// update the length
m_length = buffer.size();
return m_length;
Expand Down Expand Up @@ -75,5 +75,8 @@ int64_t MessagePayloadImpl::decode(bcos::bytesConstRef buffer)
auto offset =
decodeNetworkBuffer(m_traceID, buffer.data(), buffer.size(), (pointer - buffer.data()));
// data
return decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset, true);
auto ret = decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset, true);
// reset the dataPtr
m_dataPtr = bcos::bytesConstRef((bcos::byte*)m_data.data(), m_data.size());
return ret;
}
8 changes: 5 additions & 3 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ void Front::asyncSendMessage(const std::string& _agencyID, front::PPCMessageFace
};
}
// ROUTE_THROUGH_TOPIC will hold the topic
m_front->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_TOPIC, routeInfo, std::move(data),
_message->seq(), _timeout, _callback, msgCallback);
m_front->asyncSendMessage((uint16_t)RouteType::ROUTE_THROUGH_TOPIC, routeInfo,
bcos::bytesConstRef((bcos::byte*)data.data(), data.size()), _message->seq(), _timeout,
_callback, msgCallback);
}

// send response when receiving message from given agencyID
Expand All @@ -145,7 +146,8 @@ void Front::asyncSendResponse(bcos::bytes const& dstNode, std::string const& tra
{
bcos::bytes data;
message->encode(data);
m_front->asyncSendResponse(dstNode, traceID, std::move(data), 0, _callback);
m_front->asyncSendResponse(bcos::bytesConstRef((bcos::byte*)dstNode.data(), dstNode.size()),
traceID, bcos::bytesConstRef((bcos::byte*)data.data(), data.size()), 0, _callback);
}

/**
Expand Down
16 changes: 8 additions & 8 deletions cpp/wedpr-transport/ppc-front/ppc-front/FrontImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,18 @@ void FrontImpl::stop()
}
}

void FrontImpl::asyncSendResponse(bcos::bytes const& dstNode, std::string const& traceID,
bcos::bytes&& payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback)
void FrontImpl::asyncSendResponse(bcos::bytesConstRef dstNode, std::string const& traceID,
bcos::bytesConstRef payload, int seq, ppc::protocol::ReceiveMsgFunc errorCallback)
{
// generate the frontMessage
auto frontMessage = m_messageFactory->build();
frontMessage->setTraceID(traceID);
frontMessage->setSeq(seq);
frontMessage->setData(std::move(payload));
frontMessage->setDataPtr(payload);

auto routeInfo = m_routerInfoBuilder->build();
routeInfo->setSrcNode(m_nodeID);
routeInfo->setDstNode(dstNode);
routeInfo->setDstNodePtr(dstNode);

asyncSendMessageToGateway(true, std::move(frontMessage), RouteType::ROUTE_THROUGH_NODEID,
traceID, routeInfo, -1, errorCallback);
Expand All @@ -139,15 +139,15 @@ void FrontImpl::asyncSendResponse(bcos::bytes const& dstNode, std::string const&
* @param callback callback
*/
void FrontImpl::asyncSendMessage(uint16_t routeType, MessageOptionalHeader::Ptr const& routeInfo,
bcos::bytes&& payload, int seq, long timeout, ReceiveMsgFunc errorCallback,
bcos::bytesConstRef payload, int seq, long timeout, ReceiveMsgFunc errorCallback,
MessageCallback callback)
{
// generate the frontMessage
MessagePayload::Ptr frontMessage = m_messageFactory->build();
auto traceID = ppc::generateUUID();
frontMessage->setTraceID(traceID);
frontMessage->setSeq(seq);
frontMessage->setData(std::move(payload));
frontMessage->setDataPtr(payload);
m_callbackManager->addCallback(traceID, timeout, callback);
auto self = weak_from_this();
// send the message to the gateway
Expand Down Expand Up @@ -268,11 +268,11 @@ void FrontImpl::onReceiveMessage(Message::Ptr const& msg, ReceiveMsgFunc callbac

// the sync interface for asyncSendMessage
bcos::Error::Ptr FrontImpl::push(uint16_t routeType, MessageOptionalHeader::Ptr const& routeInfo,
bcos::bytes&& payload, int seq, long timeout)
bcos::bytesConstRef payload, int seq, long timeout)
{
auto promise = std::make_shared<std::promise<bcos::Error::Ptr>>();
asyncSendMessage(
routeType, routeInfo, std::move(payload), seq, timeout,
routeType, routeInfo, payload, seq, timeout,
[promise](bcos::Error::Ptr error) { promise->set_value(error); }, nullptr);
return promise->get_future().get();
}
Expand Down
Loading

0 comments on commit 40aab9f

Please sign in to comment.