Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix grpc send large package failed for over package size limit #82

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions cpp/ppc-framework/protocol/GrpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
* @date 2024-09-02
*/
#pragma once
#include "ppc-framework/Common.h"
#include "ppc-framework/protocol/EndPoint.h"
#include <memory>
#include <sstream>
#include <string>

namespace ppc::protocol
Expand Down Expand Up @@ -64,9 +66,63 @@ class GrpcConfig

bool enableDnslookup() const { return m_enableDnslookup; }

uint64_t maxSendMessageSize() const { return m_maxSendMessageSize; }
uint64_t maxReceivedMessageSize() const { return m_maxReceivedMessageSize; }

void setMaxSendMessageSize(uint64_t maxSendMessageSize)
{
m_maxSendMessageSize = maxSendMessageSize;
}
void setMaxReceivedMessageSize(uint64_t maxReceivedMessageSize)
{
m_maxReceivedMessageSize = maxReceivedMessageSize;
}

/*
typedef enum {
GRPC_COMPRESS_NONE = 0,
GRPC_COMPRESS_DEFLATE,
GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_ALGORITHMS_COUNT
} grpc_compression_algorithm;
*/
int compressAlgorithm() const { return m_compressAlgorithm; }

void setCompressAlgorithm(int compressAlgorithm)
{
if (compressAlgorithm < 0 || compressAlgorithm > 2)
{
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
"Invalid compress algorithm, must between 0-3"));
}
m_compressAlgorithm = compressAlgorithm;
}

protected:
bool m_enableHealthCheck = true;
std::string m_loadBalancePolicy = "round_robin";
bool m_enableDnslookup = false;

// the max send message size in bytes
uint64_t m_maxSendMessageSize = 1024 * 1024 * 1024;
// the max received message size in bytes
uint16_t m_maxReceivedMessageSize = 1024 * 1024 * 1024;
int m_compressAlgorithm = 0;
};

inline std::string printGrpcConfig(ppc::protocol::GrpcConfig::Ptr const& grpcConfig)
{
if (!grpcConfig)
{
return "nullptr";
}
std::ostringstream stringstream;
stringstream << LOG_KV("loadBalancePolicy", grpcConfig->loadBalancePolicy())
<< LOG_KV("enableHealthCheck", grpcConfig->enableHealthCheck())
<< LOG_KV("enableDnslookup", grpcConfig->enableDnslookup())
<< LOG_KV("maxSendMessageSize", grpcConfig->maxSendMessageSize())
<< LOG_KV("maxReceivedMessageSize", grpcConfig->maxReceivedMessageSize())
<< LOG_KV("compressAlgorithm", grpcConfig->compressAlgorithm());
return stringstream.str();
}
} // namespace ppc::protocol
8 changes: 8 additions & 0 deletions cpp/wedpr-protocol/grpc/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
#pragma once
#include "ppc-framework/Common.h"
#include "ppc-framework/protocol/GrpcConfig.h"
#include <grpc/compression.h>
#include <grpcpp/grpcpp.h>

namespace ppc::protocol
{
#define GRPC_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC]"

inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr const& grpcConfig)
{
grpc::ChannelArguments args;
Expand All @@ -47,6 +50,11 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con
{
args.SetInt("grpc.enable_dns_srv_lookup", 1);
}
args.SetMaxReceiveMessageSize(grpcConfig->maxReceivedMessageSize());
args.SetMaxSendMessageSize(grpcConfig->maxSendMessageSize());
// the compress algorithm
args.SetCompressionAlgorithm((grpc_compression_algorithm)(grpcConfig->compressAlgorithm()));
GRPC_LOG(INFO) << LOG_DESC("toChannelConfig") << printGrpcConfig(grpcConfig);
return args;
}
} // namespace ppc::protocol
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,30 @@ public void setEnableDnslookup(boolean enableDnslookup) {
public boolean enableDnslookup() {
return wedpr_java_transportJNI.GrpcConfig_enableDnslookup(swigCPtr, this);
}

public java.math.BigInteger maxSendMessageSize() {
return wedpr_java_transportJNI.GrpcConfig_maxSendMessageSize(swigCPtr, this);
}

public java.math.BigInteger maxReceivedMessageSize() {
return wedpr_java_transportJNI.GrpcConfig_maxReceivedMessageSize(swigCPtr, this);
}

public void setMaxSendMessageSize(java.math.BigInteger maxSendMessageSize) {
wedpr_java_transportJNI.GrpcConfig_setMaxSendMessageSize(
swigCPtr, this, maxSendMessageSize);
}

public void setMaxReceivedMessageSize(java.math.BigInteger maxReceivedMessageSize) {
wedpr_java_transportJNI.GrpcConfig_setMaxReceivedMessageSize(
swigCPtr, this, maxReceivedMessageSize);
}

public int compressAlgorithm() {
return wedpr_java_transportJNI.GrpcConfig_compressAlgorithm(swigCPtr, this);
}

public void setCompressAlgorithm(int compressAlgorithm) {
wedpr_java_transportJNI.GrpcConfig_setCompressAlgorithm(swigCPtr, this, compressAlgorithm);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public static String printFrontDesc(FrontConfig config) {
return wedpr_java_transportJNI.printFrontDesc(FrontConfig.getCPtr(config), config);
}

public static String printGrpcConfig(GrpcConfig grpcConfig) {
return wedpr_java_transportJNI.printGrpcConfig(GrpcConfig.getCPtr(grpcConfig), grpcConfig);
}

public static String printOptionalField(MessageOptionalHeader optionalHeader) {
return wedpr_java_transportJNI.printOptionalField(
MessageOptionalHeader.getCPtr(optionalHeader), optionalHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,25 @@ public static final native void GrpcConfig_setEnableDnslookup(

public static final native boolean GrpcConfig_enableDnslookup(long jarg1, GrpcConfig jarg1_);

public static final native java.math.BigInteger GrpcConfig_maxSendMessageSize(
long jarg1, GrpcConfig jarg1_);

public static final native java.math.BigInteger GrpcConfig_maxReceivedMessageSize(
long jarg1, GrpcConfig jarg1_);

public static final native void GrpcConfig_setMaxSendMessageSize(
long jarg1, GrpcConfig jarg1_, java.math.BigInteger jarg2);

public static final native void GrpcConfig_setMaxReceivedMessageSize(
long jarg1, GrpcConfig jarg1_, java.math.BigInteger jarg2);

public static final native int GrpcConfig_compressAlgorithm(long jarg1, GrpcConfig jarg1_);

public static final native void GrpcConfig_setCompressAlgorithm(
long jarg1, GrpcConfig jarg1_, int jarg2);

public static final native String printGrpcConfig(long jarg1, GrpcConfig jarg1_);

public static final native void delete_MessageOptionalHeader(long jarg1);

public static final native void MessageOptionalHeader_encode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4453,6 +4453,208 @@ SWIGEXPORT jboolean JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_
}


SWIGEXPORT jobject JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1maxSendMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
jobject jresult = 0 ;
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
std::shared_ptr< ppc::protocol::GrpcConfig const > *smartarg1 = 0 ;
uint64_t result;

(void)jenv;
(void)jcls;
(void)jarg1_;

smartarg1 = *(std::shared_ptr< const ppc::protocol::GrpcConfig > **)&jarg1;
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
result = (uint64_t)((ppc::protocol::GrpcConfig const *)arg1)->maxSendMessageSize();
{
jbyteArray ba = jenv->NewByteArray(9);
jbyte* bae = jenv->GetByteArrayElements(ba, 0);
jclass clazz = jenv->FindClass("java/math/BigInteger");
jmethodID mid = jenv->GetMethodID(clazz, "<init>", "([B)V");
jobject bigint;
int i;

bae[0] = 0;
for(i=1; i<9; i++ ) {
bae[i] = (jbyte)(result>>8*(8-i));
}

jenv->ReleaseByteArrayElements(ba, bae, 0);
bigint = jenv->NewObject(clazz, mid, ba);
jenv->DeleteLocalRef(ba);
jresult = bigint;
}
return jresult;
}


SWIGEXPORT jobject JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1maxReceivedMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
jobject jresult = 0 ;
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
std::shared_ptr< ppc::protocol::GrpcConfig const > *smartarg1 = 0 ;
uint64_t result;

(void)jenv;
(void)jcls;
(void)jarg1_;

smartarg1 = *(std::shared_ptr< const ppc::protocol::GrpcConfig > **)&jarg1;
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
result = (uint64_t)((ppc::protocol::GrpcConfig const *)arg1)->maxReceivedMessageSize();
{
jbyteArray ba = jenv->NewByteArray(9);
jbyte* bae = jenv->GetByteArrayElements(ba, 0);
jclass clazz = jenv->FindClass("java/math/BigInteger");
jmethodID mid = jenv->GetMethodID(clazz, "<init>", "([B)V");
jobject bigint;
int i;

bae[0] = 0;
for(i=1; i<9; i++ ) {
bae[i] = (jbyte)(result>>8*(8-i));
}

jenv->ReleaseByteArrayElements(ba, bae, 0);
bigint = jenv->NewObject(clazz, mid, ba);
jenv->DeleteLocalRef(ba);
jresult = bigint;
}
return jresult;
}


SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1setMaxSendMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jobject jarg2) {
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
uint64_t arg2 ;
std::shared_ptr< ppc::protocol::GrpcConfig > *smartarg1 = 0 ;

(void)jenv;
(void)jcls;
(void)jarg1_;

smartarg1 = *(std::shared_ptr< ppc::protocol::GrpcConfig > **)&jarg1;
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
{
jclass clazz;
jmethodID mid;
jbyteArray ba;
jbyte* bae;
jsize sz;
int i;

if (!jarg2) {
SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "BigInteger null");
return ;
}
clazz = jenv->GetObjectClass(jarg2);
mid = jenv->GetMethodID(clazz, "toByteArray", "()[B");
ba = (jbyteArray)jenv->CallObjectMethod(jarg2, mid);
bae = jenv->GetByteArrayElements(ba, 0);
sz = jenv->GetArrayLength(ba);
arg2 = 0;
if (sz > 0) {
arg2 = (uint64_t)(signed char)bae[0];
for(i=1; i<sz; i++) {
arg2 = (arg2 << 8) | (uint64_t)(unsigned char)bae[i];
}
}
jenv->ReleaseByteArrayElements(ba, bae, 0);
}
(arg1)->setMaxSendMessageSize(arg2);
}


SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1setMaxReceivedMessageSize(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jobject jarg2) {
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
uint64_t arg2 ;
std::shared_ptr< ppc::protocol::GrpcConfig > *smartarg1 = 0 ;

(void)jenv;
(void)jcls;
(void)jarg1_;

smartarg1 = *(std::shared_ptr< ppc::protocol::GrpcConfig > **)&jarg1;
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
{
jclass clazz;
jmethodID mid;
jbyteArray ba;
jbyte* bae;
jsize sz;
int i;

if (!jarg2) {
SWIG_JavaThrowException(jenv, SWIG_JavaNullPointerException, "BigInteger null");
return ;
}
clazz = jenv->GetObjectClass(jarg2);
mid = jenv->GetMethodID(clazz, "toByteArray", "()[B");
ba = (jbyteArray)jenv->CallObjectMethod(jarg2, mid);
bae = jenv->GetByteArrayElements(ba, 0);
sz = jenv->GetArrayLength(ba);
arg2 = 0;
if (sz > 0) {
arg2 = (uint64_t)(signed char)bae[0];
for(i=1; i<sz; i++) {
arg2 = (arg2 << 8) | (uint64_t)(unsigned char)bae[i];
}
}
jenv->ReleaseByteArrayElements(ba, bae, 0);
}
(arg1)->setMaxReceivedMessageSize(arg2);
}


SWIGEXPORT jint JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1compressAlgorithm(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
jint jresult = 0 ;
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
std::shared_ptr< ppc::protocol::GrpcConfig const > *smartarg1 = 0 ;
int result;

(void)jenv;
(void)jcls;
(void)jarg1_;

smartarg1 = *(std::shared_ptr< const ppc::protocol::GrpcConfig > **)&jarg1;
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
result = (int)((ppc::protocol::GrpcConfig const *)arg1)->compressAlgorithm();
jresult = (jint)result;
return jresult;
}


SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_GrpcConfig_1setCompressAlgorithm(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_, jint jarg2) {
ppc::protocol::GrpcConfig *arg1 = (ppc::protocol::GrpcConfig *) 0 ;
int arg2 ;
std::shared_ptr< ppc::protocol::GrpcConfig > *smartarg1 = 0 ;

(void)jenv;
(void)jcls;
(void)jarg1_;

smartarg1 = *(std::shared_ptr< ppc::protocol::GrpcConfig > **)&jarg1;
arg1 = (ppc::protocol::GrpcConfig *)(smartarg1 ? smartarg1->get() : 0);
arg2 = (int)jarg2;
(arg1)->setCompressAlgorithm(arg2);
}


SWIGEXPORT jstring JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_printGrpcConfig(JNIEnv *jenv, jclass jcls, jlong jarg1, jobject jarg1_) {
jstring jresult = 0 ;
ppc::protocol::GrpcConfig::Ptr *arg1 = 0 ;
ppc::protocol::GrpcConfig::Ptr tempnull1 ;
std::string result;

(void)jenv;
(void)jcls;
(void)jarg1_;
arg1 = jarg1 ? *(ppc::protocol::GrpcConfig::Ptr **)&jarg1 : &tempnull1;
result = ppc::protocol::printGrpcConfig((std::shared_ptr< ppc::protocol::GrpcConfig > const &)*arg1);
jresult = jenv->NewStringUTF((&result)->c_str());
return jresult;
}


SWIGEXPORT void JNICALL Java_com_webank_wedpr_sdk_jni_generated_wedpr_1java_1transportJNI_delete_1MessageOptionalHeader(JNIEnv *jenv, jclass jcls, jlong jarg1) {
ppc::protocol::MessageOptionalHeader *arg1 = (ppc::protocol::MessageOptionalHeader *) 0 ;
std::shared_ptr< ppc::protocol::MessageOptionalHeader > *smartarg1 = 0 ;
Expand Down
Loading
Loading